Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Enable server side flow control by default with the option to turn it off #3154

Merged
merged 11 commits into from Nov 11, 2020
8 changes: 7 additions & 1 deletion pubsub/iterator.go
Expand Up @@ -76,7 +76,13 @@ type messageIterator struct {
func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator {
var ps *pullStream
if !po.synchronous {
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.maxOutstandingMessages, po.maxOutstandingBytes)
maxMessages = po.maxOutstandingMessages
maxBytes = po.maxOutstandingBytes
hongalex marked this conversation as resolved.
Show resolved Hide resolved
if po.UseLegacyFlowControl {
hongalex marked this conversation as resolved.
Show resolved Hide resolved
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand Down
8 changes: 8 additions & 0 deletions pubsub/subscription.go
Expand Up @@ -489,6 +489,12 @@ type ReceiveSettings struct {
// for unprocessed messages.
MaxOutstandingBytes int

// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
// PubSub server and the less accurate method of only enforcing flow control
// at the client side is used.
// The default is false.
UseLegacyFlowControl bool

// NumGoroutines is the number of goroutines that each datastructure along
// the Receive path will spawn. Adjusting this value adjusts concurrency
// along the receive path.
Expand Down Expand Up @@ -820,6 +826,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(maxCount, maxBytes)

Expand Down Expand Up @@ -950,4 +957,5 @@ type pullOptions struct {
synchronous bool
maxOutstandingMessages int
maxOutstandingBytes int
useLegacyFlowControl bool
}