diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 3159ba848..e8a4a8caf 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -105,6 +105,9 @@ class StreamingPullManager(object): ``projects/{project}/subscriptions/{subscription}``. flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow control settings. + use_legacy_flow_control (bool): Disables enforcing flow control settings + at the Cloud PubSub server and uses the less accurate method of only + enforcing flow control at the client side. scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler to use to process messages. If not provided, a thread pool-based scheduler will be used. @@ -115,11 +118,17 @@ class StreamingPullManager(object): RPC instead of over the streaming RPC.""" def __init__( - self, client, subscription, flow_control=types.FlowControl(), scheduler=None + self, + client, + subscription, + flow_control=types.FlowControl(), + scheduler=None, + use_legacy_flow_control=False, ): self._client = client self._subscription = subscription self._flow_control = flow_control + self._use_legacy_flow_control = use_legacy_flow_control self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 self._ack_deadline = 10 @@ -587,8 +596,12 @@ def _get_initial_request(self, stream_ack_deadline_seconds): stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, client_id=self._client_id, - max_outstanding_messages=self._flow_control.max_messages, - max_outstanding_bytes=self._flow_control.max_bytes, + max_outstanding_messages=( + 0 if self._use_legacy_flow_control else self._flow_control.max_messages + ), + max_outstanding_bytes=( + 0 if self._use_legacy_flow_control else self._flow_control.max_bytes + ), ) # Return the initial request. diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index e33a0e2e6..937be1552 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -157,7 +157,14 @@ def api(self): """The underlying gapic API client.""" return self._api - def subscribe(self, subscription, callback, flow_control=(), scheduler=None): + def subscribe( + self, + subscription, + callback, + flow_control=(), + scheduler=None, + use_legacy_flow_control=False, + ): """Asynchronously start receiving messages on a given subscription. This method starts a background thread to begin pulling messages from @@ -179,6 +186,10 @@ def subscribe(self, subscription, callback, flow_control=(), scheduler=None): settings may lead to faster throughput for messages that do not take a long time to process. + The ``use_legacy_flow_control`` argument disables enforcing flow control + settings at the Cloud PubSub server and uses the less accurate method of + only enforcing flow control at the client side. + This method starts the receiver in the background and returns a *Future* representing its execution. Waiting on the future (calling ``result()``) will block forever or until a non-recoverable error @@ -238,7 +249,11 @@ def callback(message): flow_control = types.FlowControl(*flow_control) manager = streaming_pull_manager.StreamingPullManager( - self, subscription, flow_control=flow_control, scheduler=scheduler + self, + subscription, + flow_control=flow_control, + scheduler=scheduler, + use_legacy_flow_control=use_legacy_flow_control, ) future = futures.StreamingPullFuture(manager) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index d3eb4351b..242c0804a 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -170,6 +170,16 @@ def test_streaming_flow_control(): assert request.max_outstanding_bytes == 1000 +def test_streaming_flow_control_use_legacy_flow_control(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000), + use_legacy_flow_control=True, + ) + request = manager._get_initial_request(stream_ack_deadline_seconds=10) + assert request.max_outstanding_messages == 0 + assert request.max_outstanding_bytes == 0 + + def test_ack_deadline_with_max_duration_per_lease_extension(): manager = make_manager() manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)