From a1614db35a51d21c52bcba5e805071381d8f5133 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 23 Apr 2021 14:25:13 -0700 Subject: [PATCH] fix(pubsub): make config call permission error in Receive transparent (#3985) * fix(pubsub): make config call permission error in Receive transparent fix(pubsub): make config call permission error in Receive transparent remove extra whitespace clarify doc comments fix typos fix formatting * make call to config only if we see an ordering key * move comment to method doc --- pubsub/subscription.go | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 982743b71a0..5eca508ee23 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -48,6 +48,7 @@ type Subscription struct { mu sync.Mutex receiveActive bool + once sync.Once enableOrdering bool } @@ -224,7 +225,17 @@ type SubscriptionConfig struct { // The set of labels for the subscription. Labels map[string]string - // EnableMessageOrdering enables message ordering. + // EnableMessageOrdering enables message ordering on this subscription. + // This value is only used for subscription creation and update, and + // is not read locally in calls like Subscription.Receive(). + // + // If set to false, even if messages are published with ordering keys, + // messages will not be delivered in order. + // + // When calling Subscription.Receive(), the client will check this + // value with a call to Subscription.Config(), which requires the + // roles/viewer or roles/pubsub.viewer role on your service account. + // If that call fails, mesages with ordering keys will be delivered in order. EnableMessageOrdering bool // DeadLetterPolicy specifies the conditions for dead lettering messages in @@ -774,14 +785,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() - // Check config to check EnableMessageOrdering field. - // See: https://github.com/googleapis/google-cloud-go/issues/3884 - cfg, err := s.Config(ctx) - if err != nil { - return fmt.Errorf("sub.Config err: %v", err) - } - s.enableOrdering = cfg.EnableMessageOrdering - maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { maxCount = DefaultReceiveSettings.MaxOutstandingMessages @@ -911,6 +914,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes old(ackID, ack, receiveTime) } wg.Add(1) + if msg.OrderingKey != "" { + s.once.Do(s.checkOrdering) + } // Make sure the subscription has ordering enabled before adding to scheduler. var key string if s.enableOrdering { @@ -948,6 +954,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return group.Wait() } +// checkOrdering calls Config to check theEnableMessageOrdering field. +// If this call fails (e.g. because the service account doesn't have +// the roles/viewer or roles/pubsub.viewer role) we will assume +// EnableMessageOrdering to be true. +// See: https://github.com/googleapis/google-cloud-go/issues/3884 +func (s *Subscription) checkOrdering() { + ctx := context.Background() + cfg, err := s.Config(ctx) + if err != nil { + s.enableOrdering = true + } else { + s.enableOrdering = cfg.EnableMessageOrdering + } +} + type pullOptions struct { maxExtension time.Duration // the maximum time to extend a message's ack deadline in total maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc