Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send client id with StreamingPullRequest #58

Merged
merged 7 commits into from Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@
import functools
import logging
import threading
import uuid

import grpc
import six
Expand Down Expand Up @@ -116,6 +117,12 @@ def __init__(
self._closed = False
self._close_callbacks = []

# Generate a random client id tied to this object. All streaming pull
# connections (initial and re-connects) will then use the same client
# id. Doing so lets the server establish affinity even across stream
# disconncetions.
self._client_id = str(uuid.uuid4())

if scheduler is None:
self._scheduler = (
google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler()
Expand Down Expand Up @@ -567,6 +574,7 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
)

# Return the initial request.
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -96,6 +96,7 @@ def test_constructor_and_default_state():
assert manager._subscription == mock.sentinel.subscription
assert manager._scheduler is not None
assert manager._messages_on_hold is not None
assert manager._client_id is not None


def test_constructor_with_options():
Expand Down Expand Up @@ -144,6 +145,20 @@ def test_ack_deadline():
assert manager.ack_deadline == 20


def test_client_id():
manager1 = make_manager()
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10)
client_id_1 = request1.client_id
assert client_id_1

manager2 = make_manager()
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10)
client_id_2 = request2.client_id
assert client_id_2

assert client_id_1 != client_id_2


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
Expand Down