Skip to content

Commit

Permalink
feat: Enable server side flow control by default with the option to t…
Browse files Browse the repository at this point in the history
…urn it off (#231)

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server. If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature --in case they encouter issues with server side flow control-- can pass in use_legacy_flow_control=True in SubscriberClient.subscribe().

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server.
If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced
at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature
--in case they encouter issues with server side flow control-- can pass in
use_legacy_flow_control=true in subscriberclient.subscribe().

Co-authored-by: Tianzi Cai <tianzi@google.com>
  • Loading branch information
fayssalmartanigcp and anguillanneuf committed Nov 10, 2020
1 parent b6d9bd7 commit 94d738c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
Expand Up @@ -105,6 +105,9 @@ class StreamingPullManager(object):
``projects/{project}/subscriptions/{subscription}``.
flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
control settings.
use_legacy_flow_control (bool): Disables enforcing flow control settings
at the Cloud PubSub server and uses the less accurate method of only
enforcing flow control at the client side.
scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler
to use to process messages. If not provided, a thread pool-based
scheduler will be used.
Expand All @@ -115,11 +118,17 @@ class StreamingPullManager(object):
RPC instead of over the streaming RPC."""

def __init__(
self, client, subscription, flow_control=types.FlowControl(), scheduler=None
self,
client,
subscription,
flow_control=types.FlowControl(),
scheduler=None,
use_legacy_flow_control=False,
):
self._client = client
self._subscription = subscription
self._flow_control = flow_control
self._use_legacy_flow_control = use_legacy_flow_control
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
Expand Down Expand Up @@ -587,8 +596,12 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
max_outstanding_messages=self._flow_control.max_messages,
max_outstanding_bytes=self._flow_control.max_bytes,
max_outstanding_messages=(
0 if self._use_legacy_flow_control else self._flow_control.max_messages
),
max_outstanding_bytes=(
0 if self._use_legacy_flow_control else self._flow_control.max_bytes
),
)

# Return the initial request.
Expand Down
19 changes: 17 additions & 2 deletions google/cloud/pubsub_v1/subscriber/client.py
Expand Up @@ -157,7 +157,14 @@ def api(self):
"""The underlying gapic API client."""
return self._api

def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
def subscribe(
self,
subscription,
callback,
flow_control=(),
scheduler=None,
use_legacy_flow_control=False,
):
"""Asynchronously start receiving messages on a given subscription.
This method starts a background thread to begin pulling messages from
Expand All @@ -179,6 +186,10 @@ def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
settings may lead to faster throughput for messages that do not take
a long time to process.
The ``use_legacy_flow_control`` argument disables enforcing flow control
settings at the Cloud PubSub server and uses the less accurate method of
only enforcing flow control at the client side.
This method starts the receiver in the background and returns a
*Future* representing its execution. Waiting on the future (calling
``result()``) will block forever or until a non-recoverable error
Expand Down Expand Up @@ -238,7 +249,11 @@ def callback(message):
flow_control = types.FlowControl(*flow_control)

manager = streaming_pull_manager.StreamingPullManager(
self, subscription, flow_control=flow_control, scheduler=scheduler
self,
subscription,
flow_control=flow_control,
scheduler=scheduler,
use_legacy_flow_control=use_legacy_flow_control,
)

future = futures.StreamingPullFuture(manager)
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -170,6 +170,16 @@ def test_streaming_flow_control():
assert request.max_outstanding_bytes == 1000


def test_streaming_flow_control_use_legacy_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
use_legacy_flow_control=True,
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
assert request.max_outstanding_messages == 0
assert request.max_outstanding_bytes == 0


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
Expand Down

0 comments on commit 94d738c

Please sign in to comment.