Skip to content

Commit

Permalink
fix(pubsub): mitigate race in checking ordering config (#4602)
Browse files Browse the repository at this point in the history
We use `sync.once.Do(s.checkOrdering)` to perform one config check for a subscription's message ordering field when a message with an ordering key is received. This way, we know if we should deliver messages in order. However, since `sub.Receive` generates multiple goroutines, `s.enableOrdering` is not guaranteed to be read after the first invocation of `s.checkOrdering`. This PR makes this race happen less often by making the config check closer to when the message is received.

This cannot be fully eliminated without making the config call at the start of every single `Receive`. This behavior is less desirable, as we want to make the config call only when we have to, that is when we receive a message with an ordering key. 

Fixes #3626
  • Loading branch information
hongalex committed Aug 12, 2021
1 parent ff488c8 commit 112eea2
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pubsub/subscription.go
Expand Up @@ -897,6 +897,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}
for i, msg := range msgs {
msg := msg
if msg.OrderingKey != "" {
s.once.Do(s.checkOrdering)
}
// TODO(jba): call acquire closer to when the message is allocated.
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
Expand All @@ -914,9 +917,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old(ackID, ack, receiveTime)
}
wg.Add(1)
if msg.OrderingKey != "" {
s.once.Do(s.checkOrdering)
}
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
Expand Down

0 comments on commit 112eea2

Please sign in to comment.