Navigation Menu

Skip to content

Commit

Permalink
fix(pubsub): make config call permission error in Receive transparent (
Browse files Browse the repository at this point in the history
…#3985)

* fix(pubsub): make config call permission error in Receive transparent

fix(pubsub): make config call permission error in Receive transparent

remove extra whitespace

clarify doc comments

fix typos

fix formatting

* make call to config only if we see an ordering key

* move comment to method doc
  • Loading branch information
hongalex committed Apr 23, 2021
1 parent 9e42639 commit a1614db
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions pubsub/subscription.go
Expand Up @@ -48,6 +48,7 @@ type Subscription struct {
mu sync.Mutex
receiveActive bool

once sync.Once
enableOrdering bool
}

Expand Down Expand Up @@ -224,7 +225,17 @@ type SubscriptionConfig struct {
// The set of labels for the subscription.
Labels map[string]string

// EnableMessageOrdering enables message ordering.
// EnableMessageOrdering enables message ordering on this subscription.
// This value is only used for subscription creation and update, and
// is not read locally in calls like Subscription.Receive().
//
// If set to false, even if messages are published with ordering keys,
// messages will not be delivered in order.
//
// When calling Subscription.Receive(), the client will check this
// value with a call to Subscription.Config(), which requires the
// roles/viewer or roles/pubsub.viewer role on your service account.
// If that call fails, mesages with ordering keys will be delivered in order.
EnableMessageOrdering bool

// DeadLetterPolicy specifies the conditions for dead lettering messages in
Expand Down Expand Up @@ -774,14 +785,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

// Check config to check EnableMessageOrdering field.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
cfg, err := s.Config(ctx)
if err != nil {
return fmt.Errorf("sub.Config err: %v", err)
}
s.enableOrdering = cfg.EnableMessageOrdering

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand Down Expand Up @@ -911,6 +914,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old(ackID, ack, receiveTime)
}
wg.Add(1)
if msg.OrderingKey != "" {
s.once.Do(s.checkOrdering)
}
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
Expand Down Expand Up @@ -948,6 +954,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return group.Wait()
}

// checkOrdering calls Config to check theEnableMessageOrdering field.
// If this call fails (e.g. because the service account doesn't have
// the roles/viewer or roles/pubsub.viewer role) we will assume
// EnableMessageOrdering to be true.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
func (s *Subscription) checkOrdering() {
ctx := context.Background()
cfg, err := s.Config(ctx)
if err != nil {
s.enableOrdering = true
} else {
s.enableOrdering = cfg.EnableMessageOrdering
}
}

type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
Expand Down

0 comments on commit a1614db

Please sign in to comment.