Skip to content

Commit

Permalink
feat(pubsub): Enable server side flow control by default with the opt…
Browse files Browse the repository at this point in the history
…ion to turn it off (#3154)

* feat: Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server.
If ReceiveSettings.MaxOutstandingMessages > 0 or ReceiveSettings.MaxOutstandingBytes > 0,
flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and ReceiveSettings.UseLegacyFlowControl can be set for users
who would like to opt-out of this feature in case they encounter issues with server side flow
control.

* Update subscription.go

* use unexported field in pulloptions

* use short variable declaration

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
fayssalmartanigcp and hongalex committed Nov 11, 2020
1 parent e12046b commit e392e61
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pubsub/iterator.go
Expand Up @@ -76,7 +76,13 @@ type messageIterator struct {
func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator {
var ps *pullStream
if !po.synchronous {
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.maxOutstandingMessages, po.maxOutstandingBytes)
maxMessages := po.maxOutstandingMessages
maxBytes := po.maxOutstandingBytes
if po.useLegacyFlowControl {
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand Down
8 changes: 8 additions & 0 deletions pubsub/subscription.go
Expand Up @@ -489,6 +489,12 @@ type ReceiveSettings struct {
// for unprocessed messages.
MaxOutstandingBytes int

// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
// PubSub server and the less accurate method of only enforcing flow control
// at the client side is used.
// The default is false.
UseLegacyFlowControl bool

// NumGoroutines is the number of goroutines that each datastructure along
// the Receive path will spawn. Adjusting this value adjusts concurrency
// along the receive path.
Expand Down Expand Up @@ -820,6 +826,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(maxCount, maxBytes)

Expand Down Expand Up @@ -950,4 +957,5 @@ type pullOptions struct {
synchronous bool
maxOutstandingMessages int
maxOutstandingBytes int
useLegacyFlowControl bool
}

0 comments on commit e392e61

Please sign in to comment.