From ac85c2e6eda061aaa3fcea37e513b3add8220909 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 12 Jan 2021 07:02:04 +1100 Subject: [PATCH] test(pubsublite): fix flaky PartitionAssignment test (#3510) Wait for goroutines to finish before returning, otherwise the subscriber clients may still be terminating, even though all messages have been received. Fixes https://github.com/googleapis/google-cloud-go/issues/3507. --- pubsublite/ps/integration_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pubsublite/ps/integration_test.go b/pubsublite/ps/integration_test.go index 992876d70dd..ff95e533d13 100644 --- a/pubsublite/ps/integration_test.go +++ b/pubsublite/ps/integration_test.go @@ -28,6 +28,7 @@ import ( "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/internal/test" "github.com/google/go-cmp/cmp/cmpopts" + "golang.org/x/sync/errgroup" "google.golang.org/api/option" vkit "cloud.google.com/go/pubsublite/apiv1" @@ -599,19 +600,24 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) { } cctx, stopSubscribers := context.WithTimeout(context.Background(), defaultTestTimeout) + g, _ := errgroup.WithContext(ctx) for i := 0; i < subscriberCount; i++ { // Subscribers must be started in a goroutine as Receive() blocks. - go func() { + g.Go(func() error { subscriber := subscriberClient(cctx, t, DefaultReceiveSettings, subscriptionPath) - if err := subscriber.Receive(cctx, messageReceiver); err != nil { + err := subscriber.Receive(cctx, messageReceiver) + if err != nil { t.Errorf("Receive() got err: %v", err) } - }() + return err + }) } // Wait until all messages have been received. msgTracker.Wait(defaultTestTimeout) stopSubscribers() + // Wait until all subscribers have terminated. + g.Wait() }) }