Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: send client id with StreamingPullRequest (#58)
- The client id is created randomly for each StreamingPullManager and is used to establish affinity across stream disconnections/retries.
- Server-client affinity is important for ordering keys, where the backend tries to send the same keys to the same client.

Fixes #62
  • Loading branch information
pradn committed Apr 22, 2020
1 parent c67f36e commit 9f8acfa
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
Expand Up @@ -18,6 +18,7 @@
import functools
import logging
import threading
import uuid

import grpc
import six
Expand Down Expand Up @@ -116,6 +117,12 @@ def __init__(
self._closed = False
self._close_callbacks = []

# Generate a random client id tied to this object. All streaming pull
# connections (initial and re-connects) will then use the same client
# id. Doing so lets the server establish affinity even across stream
# disconncetions.
self._client_id = str(uuid.uuid4())

if scheduler is None:
self._scheduler = (
google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler()
Expand Down Expand Up @@ -567,6 +574,7 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
)

# Return the initial request.
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -96,6 +96,7 @@ def test_constructor_and_default_state():
assert manager._subscription == mock.sentinel.subscription
assert manager._scheduler is not None
assert manager._messages_on_hold is not None
assert manager._client_id is not None


def test_constructor_with_options():
Expand Down Expand Up @@ -144,6 +145,20 @@ def test_ack_deadline():
assert manager.ack_deadline == 20


def test_client_id():
manager1 = make_manager()
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10)
client_id_1 = request1.client_id
assert client_id_1

manager2 = make_manager()
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10)
client_id_2 = request2.client_id
assert client_id_2

assert client_id_1 != client_id_2


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
Expand Down

0 comments on commit 9f8acfa

Please sign in to comment.