From 8cfcf53d03b9b442e7f0bc1c1b20c791e31c07b0 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 13 Aug 2021 10:40:29 -0700 Subject: [PATCH] fix(pubsub): always make config check to prevent race (#4606) In the previous iteration of this PR, #4602, the assumption was made that making the config check earlier will prevent race conditions in the test, which turned out to be incorrect. In addition, although previously ruled out as a solution, this PR makes it so that a config check is always made on the first call to `Receive`. I originally thought this would result in a poor experience for those users who don't have the `roles/pubsub.viewer` permission, but we default `enableOrdering` to `true` anyway if the config call fails due to lack of permissions. Using a default of `true` only negatively impacts those who don't want ordering on their subscription, and this behavior is [already documented](https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.14.0/pubsub/subscription.go#L235-L239). Fixes #3626 --- pubsub/subscription.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index d228ed12c33..5fa080724a3 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -48,7 +48,6 @@ type Subscription struct { mu sync.Mutex receiveActive bool - once sync.Once enableOrdering bool } @@ -785,6 +784,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() + s.checkOrdering() + maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { maxCount = DefaultReceiveSettings.MaxOutstandingMessages @@ -897,9 +898,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } for i, msg := range msgs { msg := msg - if msg.OrderingKey != "" { - s.once.Do(s.checkOrdering) - } // TODO(jba): call acquire closer to when the message is allocated. if err := fc.acquire(ctx, len(msg.Data)); err != nil { // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.