diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 49a3742d3c3..8f1d9316bbf 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -76,7 +76,13 @@ type messageIterator struct { func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator { var ps *pullStream if !po.synchronous { - ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.maxOutstandingMessages, po.maxOutstandingBytes) + maxMessages := po.maxOutstandingMessages + maxBytes := po.maxOutstandingBytes + if po.useLegacyFlowControl { + maxMessages = 0 + maxBytes = 0 + } + ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes) } // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending // the first keepAlive halfway towards the minimum ack deadline. diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0af3238cbab..59f296e04cf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -489,6 +489,12 @@ type ReceiveSettings struct { // for unprocessed messages. MaxOutstandingBytes int + // UseLegacyFlowControl disables enforcing flow control settings at the Cloud + // PubSub server and the less accurate method of only enforcing flow control + // at the client side is used. + // The default is false. + UseLegacyFlowControl bool + // NumGoroutines is the number of goroutines that each datastructure along // the Receive path will spawn. Adjusting this value adjusts concurrency // along the receive path. @@ -820,6 +826,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes synchronous: s.ReceiveSettings.Synchronous, maxOutstandingMessages: maxCount, maxOutstandingBytes: maxBytes, + useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } fc := newFlowController(maxCount, maxBytes) @@ -950,4 +957,5 @@ type pullOptions struct { synchronous bool maxOutstandingMessages int maxOutstandingBytes int + useLegacyFlowControl bool }