Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): make config call permission error in Receive transparent #3985

Merged
merged 8 commits into from Apr 23, 2021
39 changes: 30 additions & 9 deletions pubsub/subscription.go
Expand Up @@ -48,6 +48,7 @@ type Subscription struct {
mu sync.Mutex
receiveActive bool

once sync.Once
enableOrdering bool
}

Expand Down Expand Up @@ -224,7 +225,17 @@ type SubscriptionConfig struct {
// The set of labels for the subscription.
Labels map[string]string

// EnableMessageOrdering enables message ordering.
// EnableMessageOrdering enables message ordering on this subscription.
// This value is only used for subscription creation and update, and
// is not read locally in calls like Subscription.Receive().
//
// If set to false, even if messages are published with ordering keys,
// messages will not be delivered in order.
//
// When calling Subscription.Receive(), the client will check this
// value with a call to Subscription.Config(), which requires the
// roles/viewer or roles/pubsub.viewer role on your service account.
// If that call fails, mesages with ordering keys will be delivered in order.
EnableMessageOrdering bool

// DeadLetterPolicy specifies the conditions for dead lettering messages in
Expand Down Expand Up @@ -774,14 +785,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

// Check config to check EnableMessageOrdering field.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
cfg, err := s.Config(ctx)
if err != nil {
return fmt.Errorf("sub.Config err: %v", err)
}
s.enableOrdering = cfg.EnableMessageOrdering

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand Down Expand Up @@ -911,6 +914,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
old(ackID, ack, receiveTime)
}
wg.Add(1)
if msg.OrderingKey != "" {
s.once.Do(s.checkOrdering)
}
// Make sure the subscription has ordering enabled before adding to scheduler.
var key string
if s.enableOrdering {
Expand Down Expand Up @@ -948,6 +954,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return group.Wait()
}

// checkOrdering calls Config to check theEnableMessageOrdering field.
// If this call fails (e.g. because the service account doesn't have
// the roles/viewer or roles/pubsub.viewer role) we will assume
// EnableMessageOrdering to be true.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
func (s *Subscription) checkOrdering() {
ctx := context.Background()
cfg, err := s.Config(ctx)
if err != nil {
s.enableOrdering = true
} else {
s.enableOrdering = cfg.EnableMessageOrdering
}
}

type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
Expand Down