diff --git a/pubsub/subscription.go b/pubsub/subscription.go index d228ed12c33..5fa080724a3 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -48,7 +48,6 @@ type Subscription struct { mu sync.Mutex receiveActive bool - once sync.Once enableOrdering bool } @@ -785,6 +784,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() + s.checkOrdering() + maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { maxCount = DefaultReceiveSettings.MaxOutstandingMessages @@ -897,9 +898,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } for i, msg := range msgs { msg := msg - if msg.OrderingKey != "" { - s.once.Do(s.checkOrdering) - } // TODO(jba): call acquire closer to when the message is allocated. if err := fc.acquire(ctx, len(msg.Data)); err != nil { // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.