Skip to content

Commit

Permalink
test(pubsublite): integration tests for reservations (#4371)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Sep 15, 2021
1 parent 65b0f88 commit 23c02d9
Showing 1 changed file with 105 additions and 4 deletions.
109 changes: 105 additions & 4 deletions pubsublite/integration_test.go
Expand Up @@ -41,15 +41,21 @@ var (
pathCmpOptions = []cmp.Option{
cmpopts.IgnoreFields(wire.TopicPath{}, "Project"),
cmpopts.IgnoreFields(wire.SubscriptionPath{}, "Project"),
cmpopts.IgnoreFields(wire.ReservationPath{}, "Project"),
}
configCmpOptions = []cmp.Option{
cmp.Comparer(func(t1, t2 *TopicConfig) bool {
return cmp.Equal(t1, t2, cmpopts.IgnoreFields(TopicConfig{}, "Name")) && TopicPathsEqual(t1.Name, t2.Name)
return cmp.Equal(t1, t2, cmpopts.IgnoreFields(TopicConfig{}, "Name", "ThroughputReservation")) &&
TopicPathsEqual(t1.Name, t2.Name) && ReservationPathsEqual(t1.ThroughputReservation, t2.ThroughputReservation, true)
}),
cmp.Comparer(func(s1, s2 *SubscriptionConfig) bool {
return cmp.Equal(s1, s2, cmpopts.IgnoreFields(SubscriptionConfig{}, "Name", "Topic")) &&
TopicPathsEqual(s1.Topic, s2.Topic) && SubscriptionPathsEqual(s1.Name, s2.Name)
}),
cmp.Comparer(func(r1, r2 *ReservationConfig) bool {
return cmp.Equal(r1, r2, cmpopts.IgnoreFields(ReservationConfig{}, "Name")) &&
ReservationPathsEqual(r1.Name, r2.Name, false)
}),
}
)

Expand Down Expand Up @@ -77,6 +83,21 @@ func SubscriptionPathsEqual(subscription1, subscription2 string) bool {
return cmp.Equal(sp1, sp2, pathCmpOptions...)
}

func ReservationPathsEqual(reservation1, reservation2 string, allowEmpty bool) bool {
if allowEmpty && len(reservation1)+len(reservation2) == 0 {
return true
}
rp1, err := wire.ParseReservationPath(reservation1)
if err != nil {
return false
}
rp2, err := wire.ParseReservationPath(reservation2)
if err != nil {
return false
}
return cmp.Equal(rp1, rp2, pathCmpOptions...)
}

func initIntegrationTest(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
Expand Down Expand Up @@ -109,6 +130,12 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio
return admin
}

func cleanUpReservation(ctx context.Context, t *testing.T, admin *AdminClient, name string) {
if err := admin.DeleteReservation(ctx, name); err != nil {
t.Errorf("Failed to delete reservation %s: %v", name, err)
}
}

func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name string) {
if err := admin.DeleteTopic(ctx, name); err != nil {
t.Errorf("Failed to delete topic %s: %v", name, err)
Expand Down Expand Up @@ -159,11 +186,65 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
locationPath := wire.LocationPath{Project: proj, Location: zone}.String()
topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID}.String()
subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}.String()
reservationPath := wire.ReservationPath{Project: proj, Region: region, ReservationID: resourceID}.String()
t.Logf("Topic path: %s", topicPath)

admin := adminClient(ctx, t, region)
defer admin.Close()

// Reservation admin operations.
newResConfig := &ReservationConfig{
Name: reservationPath,
ThroughputCapacity: 3,
}

gotResConfig, err := admin.CreateReservation(ctx, *newResConfig)
if err != nil {
t.Fatalf("Failed to create reservation: %v", err)
}
defer cleanUpReservation(ctx, t, admin, reservationPath)
if diff := testutil.Diff(gotResConfig, newResConfig, configCmpOptions...); diff != "" {
t.Errorf("CreateReservation() got: -, want: +\n%s", diff)
}

if gotResConfig, err := admin.Reservation(ctx, reservationPath); err != nil {
t.Errorf("Failed to get reservation: %v", err)
} else if diff := testutil.Diff(gotResConfig, newResConfig, configCmpOptions...); diff != "" {
t.Errorf("Reservation() got: -, want: +\n%s", diff)
}

resIt := admin.Reservations(ctx, wire.LocationPath{proj, region}.String())
var foundRes *ReservationConfig
for {
res, err := resIt.Next()
if err == iterator.Done {
break
}
if ReservationPathsEqual(res.Name, reservationPath, false) {
foundRes = res
break
}
}
if foundRes == nil {
t.Error("Reservations() did not return reservation config")
} else if diff := testutil.Diff(foundRes, newResConfig, configCmpOptions...); diff != "" {
t.Errorf("Reservations() found config: -, want: +\n%s", diff)
}

resUpdate := ReservationConfigToUpdate{
Name: reservationPath,
ThroughputCapacity: 4,
}
wantUpdatedResConfig := &ReservationConfig{
Name: reservationPath,
ThroughputCapacity: 4,
}
if gotResConfig, err := admin.UpdateReservation(ctx, resUpdate); err != nil {
t.Errorf("Failed to update reservation: %v", err)
} else if diff := testutil.Diff(gotResConfig, wantUpdatedResConfig, configCmpOptions...); diff != "" {
t.Errorf("UpdateReservation() got: -, want: +\n%s", diff)
}

// Topic admin operations.
newTopicConfig := &TopicConfig{
Name: topicPath,
Expand All @@ -172,6 +253,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
SubscribeCapacityMiBPerSec: 4,
PerPartitionBytes: 30 * gibi,
RetentionDuration: 24 * time.Hour,
ThroughputReservation: reservationPath,
}

gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig)
Expand Down Expand Up @@ -213,11 +295,28 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
t.Errorf("Topics() found config: -, want: +\n%s", diff)
}

topicPathIt := admin.ReservationTopics(ctx, reservationPath)
foundTopicPath := false
for {
path, err := topicPathIt.Next()
if err == iterator.Done {
break
}
if TopicPathsEqual(topicPath, path) {
foundTopicPath = true
break
}
}
if !foundTopicPath {
t.Error("ReservationTopics() did not return topic path")
}

topicUpdate1 := TopicConfigToUpdate{
Name: topicPath,
PartitionCount: 2,
PublishCapacityMiBPerSec: 6,
SubscribeCapacityMiBPerSec: 8,
ThroughputReservation: "",
}
wantUpdatedTopicConfig1 := &TopicConfig{
Name: topicPath,
Expand All @@ -234,9 +333,10 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
}

topicUpdate2 := TopicConfigToUpdate{
Name: topicPath,
PerPartitionBytes: 35 * gibi,
RetentionDuration: InfiniteRetention,
Name: topicPath,
PerPartitionBytes: 35 * gibi,
RetentionDuration: InfiniteRetention,
ThroughputReservation: reservationPath,
}
wantUpdatedTopicConfig2 := &TopicConfig{
Name: topicPath,
Expand All @@ -245,6 +345,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 35 * gibi,
RetentionDuration: InfiniteRetention,
ThroughputReservation: reservationPath,
}
if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil {
t.Errorf("Failed to update topic: %v", err)
Expand Down

0 comments on commit 23c02d9

Please sign in to comment.