Skip to content

Commit

Permalink
fix(pubsub): pass context into checkOrdering to allow cancel (#5316)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Jan 24, 2022
1 parent f9608d4 commit fc08c49
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
5 changes: 2 additions & 3 deletions pubsub/subscription.go
Expand Up @@ -840,7 +840,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

s.checkOrdering()
s.checkOrdering(ctx)

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
Expand Down Expand Up @@ -1033,8 +1033,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// the roles/viewer or roles/pubsub.viewer role) we will assume
// EnableMessageOrdering to be true.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
func (s *Subscription) checkOrdering() {
ctx := context.Background()
func (s *Subscription) checkOrdering(ctx context.Context) {
cfg, err := s.Config(ctx)
if err != nil {
s.enableOrdering = true
Expand Down
7 changes: 7 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -427,4 +427,11 @@ func TestOrdering_CreateSubscription(t *testing.T) {
if !cfg.EnableMessageOrdering {
t.Fatalf("Expected EnableMessageOrdering to be true in %s", orderSub.String())
}

// Test cancellation works as intended with ordering enabled.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
orderSub.Receive(ctx, func(ctx context.Context, msg *Message) {
msg.Ack()
})
}

0 comments on commit fc08c49

Please sign in to comment.