Skip to content

Commit

Permalink
test(pubsublite): fix flaky PartitionAssignment test (#3510)
Browse files Browse the repository at this point in the history
Wait for goroutines to finish before returning, otherwise the subscriber clients may still be terminating, even though all messages have been received.

Fixes #3507.
  • Loading branch information
tmdiep committed Jan 11, 2021
1 parent 78ee29b commit ac85c2e
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions pubsublite/ps/integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

vkit "cloud.google.com/go/pubsublite/apiv1"
Expand Down Expand Up @@ -599,19 +600,24 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
}

cctx, stopSubscribers := context.WithTimeout(context.Background(), defaultTestTimeout)
g, _ := errgroup.WithContext(ctx)
for i := 0; i < subscriberCount; i++ {
// Subscribers must be started in a goroutine as Receive() blocks.
go func() {
g.Go(func() error {
subscriber := subscriberClient(cctx, t, DefaultReceiveSettings, subscriptionPath)
if err := subscriber.Receive(cctx, messageReceiver); err != nil {
err := subscriber.Receive(cctx, messageReceiver)
if err != nil {
t.Errorf("Receive() got err: %v", err)
}
}()
return err
})
}

// Wait until all messages have been received.
msgTracker.Wait(defaultTestTimeout)
stopSubscribers()
// Wait until all subscribers have terminated.
g.Wait()
})
}

Expand Down

0 comments on commit ac85c2e

Please sign in to comment.