From 1e3d867be39e1ec4f5f310a49f13be4aed861efa Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 1 Jul 2021 18:21:18 -0400 Subject: [PATCH] test(pubsublite): integration tests for reservations --- pubsublite/integration_test.go | 109 +++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go index 53aff1ba399..f3837a275db 100644 --- a/pubsublite/integration_test.go +++ b/pubsublite/integration_test.go @@ -41,15 +41,21 @@ var ( pathCmpOptions = []cmp.Option{ cmpopts.IgnoreFields(wire.TopicPath{}, "Project"), cmpopts.IgnoreFields(wire.SubscriptionPath{}, "Project"), + cmpopts.IgnoreFields(wire.ReservationPath{}, "Project"), } configCmpOptions = []cmp.Option{ cmp.Comparer(func(t1, t2 *TopicConfig) bool { - return cmp.Equal(t1, t2, cmpopts.IgnoreFields(TopicConfig{}, "Name")) && TopicPathsEqual(t1.Name, t2.Name) + return cmp.Equal(t1, t2, cmpopts.IgnoreFields(TopicConfig{}, "Name", "ThroughputReservation")) && + TopicPathsEqual(t1.Name, t2.Name) && ReservationPathsEqual(t1.ThroughputReservation, t2.ThroughputReservation, true) }), cmp.Comparer(func(s1, s2 *SubscriptionConfig) bool { return cmp.Equal(s1, s2, cmpopts.IgnoreFields(SubscriptionConfig{}, "Name", "Topic")) && TopicPathsEqual(s1.Topic, s2.Topic) && SubscriptionPathsEqual(s1.Name, s2.Name) }), + cmp.Comparer(func(r1, r2 *ReservationConfig) bool { + return cmp.Equal(r1, r2, cmpopts.IgnoreFields(ReservationConfig{}, "Name")) && + ReservationPathsEqual(r1.Name, r2.Name, false) + }), } ) @@ -77,6 +83,21 @@ func SubscriptionPathsEqual(subscription1, subscription2 string) bool { return cmp.Equal(sp1, sp2, pathCmpOptions...) } +func ReservationPathsEqual(reservation1, reservation2 string, allowEmpty bool) bool { + if allowEmpty && len(reservation1)+len(reservation2) == 0 { + return true + } + rp1, err := wire.ParseReservationPath(reservation1) + if err != nil { + return false + } + rp2, err := wire.ParseReservationPath(reservation2) + if err != nil { + return false + } + return cmp.Equal(rp1, rp2, pathCmpOptions...) +} + func initIntegrationTest(t *testing.T) { if testing.Short() { t.Skip("Integration tests skipped in short mode") @@ -109,6 +130,12 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio return admin } +func cleanUpReservation(ctx context.Context, t *testing.T, admin *AdminClient, name string) { + if err := admin.DeleteReservation(ctx, name); err != nil { + t.Errorf("Failed to delete reservation %s: %v", name, err) + } +} + func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name string) { if err := admin.DeleteTopic(ctx, name); err != nil { t.Errorf("Failed to delete topic %s: %v", name, err) @@ -133,11 +160,65 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { locationPath := wire.LocationPath{Project: proj, Location: zone}.String() topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID}.String() subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}.String() + reservationPath := wire.ReservationPath{Project: proj, Region: region, ReservationID: resourceID}.String() t.Logf("Topic path: %s", topicPath) admin := adminClient(ctx, t, region) defer admin.Close() + // Reservation admin operations. + newResConfig := &ReservationConfig{ + Name: reservationPath, + ThroughputCapacity: 3, + } + + gotResConfig, err := admin.CreateReservation(ctx, *newResConfig) + if err != nil { + t.Fatalf("Failed to create reservation: %v", err) + } + defer cleanUpReservation(ctx, t, admin, reservationPath) + if diff := testutil.Diff(gotResConfig, newResConfig, configCmpOptions...); diff != "" { + t.Errorf("CreateReservation() got: -, want: +\n%s", diff) + } + + if gotResConfig, err := admin.Reservation(ctx, reservationPath); err != nil { + t.Errorf("Failed to get reservation: %v", err) + } else if diff := testutil.Diff(gotResConfig, newResConfig, configCmpOptions...); diff != "" { + t.Errorf("Reservation() got: -, want: +\n%s", diff) + } + + resIt := admin.Reservations(ctx, wire.LocationPath{proj, region}.String()) + var foundRes *ReservationConfig + for { + res, err := resIt.Next() + if err == iterator.Done { + break + } + if ReservationPathsEqual(res.Name, reservationPath, false) { + foundRes = res + break + } + } + if foundRes == nil { + t.Error("Reservations() did not return reservation config") + } else if diff := testutil.Diff(foundRes, newResConfig, configCmpOptions...); diff != "" { + t.Errorf("Reservations() found config: -, want: +\n%s", diff) + } + + resUpdate := ReservationConfigToUpdate{ + Name: reservationPath, + ThroughputCapacity: 4, + } + wantUpdatedResConfig := &ReservationConfig{ + Name: reservationPath, + ThroughputCapacity: 4, + } + if gotResConfig, err := admin.UpdateReservation(ctx, resUpdate); err != nil { + t.Errorf("Failed to update reservation: %v", err) + } else if diff := testutil.Diff(gotResConfig, wantUpdatedResConfig, configCmpOptions...); diff != "" { + t.Errorf("UpdateReservation() got: -, want: +\n%s", diff) + } + // Topic admin operations. newTopicConfig := &TopicConfig{ Name: topicPath, @@ -146,6 +227,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { SubscribeCapacityMiBPerSec: 4, PerPartitionBytes: 30 * gibi, RetentionDuration: 24 * time.Hour, + ThroughputReservation: reservationPath, } gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig) @@ -187,11 +269,28 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { t.Errorf("Topics() found config: -, want: +\n%s", diff) } + topicPathIt := admin.ReservationTopics(ctx, reservationPath) + foundTopicPath := false + for { + path, err := topicPathIt.Next() + if err == iterator.Done { + break + } + if TopicPathsEqual(topicPath, path) { + foundTopicPath = true + break + } + } + if !foundTopicPath { + t.Error("ReservationTopics() did not return topic path") + } + topicUpdate1 := TopicConfigToUpdate{ Name: topicPath, PartitionCount: 2, PublishCapacityMiBPerSec: 6, SubscribeCapacityMiBPerSec: 8, + ThroughputReservation: "", } wantUpdatedTopicConfig1 := &TopicConfig{ Name: topicPath, @@ -208,9 +307,10 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { } topicUpdate2 := TopicConfigToUpdate{ - Name: topicPath, - PerPartitionBytes: 35 * gibi, - RetentionDuration: InfiniteRetention, + Name: topicPath, + PerPartitionBytes: 35 * gibi, + RetentionDuration: InfiniteRetention, + ThroughputReservation: reservationPath, } wantUpdatedTopicConfig2 := &TopicConfig{ Name: topicPath, @@ -219,6 +319,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { SubscribeCapacityMiBPerSec: 8, PerPartitionBytes: 35 * gibi, RetentionDuration: InfiniteRetention, + ThroughputReservation: reservationPath, } if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil { t.Errorf("Failed to update topic: %v", err)