From e392e6157ee02a344528de63ab16baba61470b24 Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Wed, 11 Nov 2020 04:45:34 -0500 Subject: [PATCH] feat(pubsub): Enable server side flow control by default with the option to turn it off (#3154) * feat: Enable server side flow control by default with the option to turn it off This change enables sending flow control settings automatically to the server. If ReceiveSettings.MaxOutstandingMessages > 0 or ReceiveSettings.MaxOutstandingBytes > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and ReceiveSettings.UseLegacyFlowControl can be set for users who would like to opt-out of this feature in case they encounter issues with server side flow control. * Update subscription.go * use unexported field in pulloptions * use short variable declaration Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com> --- pubsub/iterator.go | 8 +++++++- pubsub/subscription.go | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 49a3742d3c3..8f1d9316bbf 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -76,7 +76,13 @@ type messageIterator struct { func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator { var ps *pullStream if !po.synchronous { - ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.maxOutstandingMessages, po.maxOutstandingBytes) + maxMessages := po.maxOutstandingMessages + maxBytes := po.maxOutstandingBytes + if po.useLegacyFlowControl { + maxMessages = 0 + maxBytes = 0 + } + ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes) } // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending // the first keepAlive halfway towards the minimum ack deadline. diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0af3238cbab..59f296e04cf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -489,6 +489,12 @@ type ReceiveSettings struct { // for unprocessed messages. MaxOutstandingBytes int + // UseLegacyFlowControl disables enforcing flow control settings at the Cloud + // PubSub server and the less accurate method of only enforcing flow control + // at the client side is used. + // The default is false. + UseLegacyFlowControl bool + // NumGoroutines is the number of goroutines that each datastructure along // the Receive path will spawn. Adjusting this value adjusts concurrency // along the receive path. @@ -820,6 +826,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes synchronous: s.ReceiveSettings.Synchronous, maxOutstandingMessages: maxCount, maxOutstandingBytes: maxBytes, + useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } fc := newFlowController(maxCount, maxBytes) @@ -950,4 +957,5 @@ type pullOptions struct { synchronous bool maxOutstandingMessages int maxOutstandingBytes int + useLegacyFlowControl bool }