diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 0a25d4625..2c3e51fee 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -18,6 +18,7 @@ import functools import logging import threading +import uuid import grpc import six @@ -116,6 +117,12 @@ def __init__( self._closed = False self._close_callbacks = [] + # Generate a random client id tied to this object. All streaming pull + # connections (initial and re-connects) will then use the same client + # id. Doing so lets the server establish affinity even across stream + # disconncetions. + self._client_id = str(uuid.uuid4()) + if scheduler is None: self._scheduler = ( google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler() @@ -567,6 +574,7 @@ def _get_initial_request(self, stream_ack_deadline_seconds): modify_deadline_seconds=[self.ack_deadline] * len(lease_ids), stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, + client_id=self._client_id, ) # Return the initial request. diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 70f320fcc..0475aaf6e 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -96,6 +96,7 @@ def test_constructor_and_default_state(): assert manager._subscription == mock.sentinel.subscription assert manager._scheduler is not None assert manager._messages_on_hold is not None + assert manager._client_id is not None def test_constructor_with_options(): @@ -144,6 +145,20 @@ def test_ack_deadline(): assert manager.ack_deadline == 20 +def test_client_id(): + manager1 = make_manager() + request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10) + client_id_1 = request1.client_id + assert client_id_1 + + manager2 = make_manager() + request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10) + client_id_2 = request2.client_id + assert client_id_2 + + assert client_id_1 != client_id_2 + + def test_ack_deadline_with_max_duration_per_lease_extension(): manager = make_manager() manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)