From 223a986d9394302af12e5eb1940625a6a12bb456 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 20 May 2021 15:16:23 +0200 Subject: [PATCH 1/6] fix: use the same initial ACK deadline as leaser When a message is received, the initial MODACK value used should be the same value currently used by leaser. This makes sure the leaser will wake up and extend the ACK deadline before the initial one expires. --- .../_protocol/streaming_pull_manager.py | 41 +++++++++++-------- .../subscriber/test_streaming_pull_manager.py | 39 +++++++++++++++++- 2 files changed, 61 insertions(+), 19 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 2112ce0db..f80dfb1b1 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -179,6 +179,11 @@ def __init__( # currently on hold. self._pause_resume_lock = threading.Lock() + # A lock protecting the current ACK deadline used in the lease management. This + # value can be potentially updated both by the leaser thread and by the message + # consumer thread when invoking the internal _on_response() callback. + self._ack_deadline_lock = threading.Lock() + # The threads created in ``.open()``. self._dispatcher = None self._leaser = None @@ -223,29 +228,31 @@ def ack_histogram(self): @property def ack_deadline(self): - """Return the current ack deadline based on historical time-to-ack. + """Return and possibly update the current ACK deadline based on historical data. - This method is "sticky". It will only perform the computations to - check on the right ack deadline if the histogram has gained a + This method is "sticky". It will only perform the computations to check on the + right ACK deadline if the histogram with past time-to-ack data has gained a significant amount of new information. Returns: int: The ack deadline. """ - target_size = min( - self._last_histogram_size * 2, self._last_histogram_size + 100 - ) - hist_size = len(self.ack_histogram) + with self._ack_deadline_lock: + target_size = min( + self._last_histogram_size * 2, self._last_histogram_size + 100 + ) + hist_size = len(self.ack_histogram) - if hist_size > target_size: - self._last_histogram_size = hist_size - self._ack_deadline = self.ack_histogram.percentile(percent=99) + if hist_size > target_size: + self._last_histogram_size = hist_size + self._ack_deadline = self.ack_histogram.percentile(percent=99) - if self.flow_control.max_duration_per_lease_extension > 0: - self._ack_deadline = min( - self._ack_deadline, self.flow_control.max_duration_per_lease_extension - ) - return self._ack_deadline + if self.flow_control.max_duration_per_lease_extension > 0: + self._ack_deadline = min( + self._ack_deadline, + self.flow_control.max_duration_per_lease_extension, + ) + return self._ack_deadline @property def load(self): @@ -490,7 +497,7 @@ def open(self, callback, on_callback_error): ) # Create the RPC - stream_ack_deadline_seconds = self.ack_histogram.percentile(99) + stream_ack_deadline_seconds = self.ack_deadline get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds @@ -688,7 +695,7 @@ def _on_response(self, response): # modack the messages we received, as this tells the server that we've # received them. items = [ - requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) + requests.ModAckRequest(message.ack_id, self.ack_deadline) for message in received_messages ] self._dispatcher.modify_ack_deadline(items) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 25ab4f0ae..09cfd0aea 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -476,7 +476,10 @@ def test_heartbeat_inactive(): def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): manager = make_manager() - manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) + with mock.patch.object( + type(manager), "ack_deadline", new=mock.PropertyMock(return_value=18) + ): + manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) heartbeater.assert_called_once_with(manager) heartbeater.return_value.start.assert_called_once() @@ -503,7 +506,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == 10 # the default stream ACK timeout + assert initial_request_arg.args[0] == 18 assert not manager._client.api.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( @@ -774,6 +777,38 @@ def test__on_response_delivery_attempt(): assert msg2.delivery_attempt == 6 +def test__on_response_modifies_ack_deadline(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack_1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="ack_2", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + ), + ] + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=80) + + # Actually run the method and chack that correct MODACK value is used. + with mock.patch.object( + type(manager), "ack_deadline", new=mock.PropertyMock(return_value=18) + ): + manager._on_response(response) + + dispatcher.modify_ack_deadline.assert_called_once_with( + [requests.ModAckRequest("ack_1", 18), requests.ModAckRequest("ack_2", 18)] + ) + + def test__on_response_no_leaser_overload(): manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback From dc2b2337e6e44073bdfe42d6d679da75290dc88e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 21 May 2021 12:11:37 +0200 Subject: [PATCH 2/6] Make max_duration_per_lease_extension high enough If flow_control.max_duration_per_lease_extension is set to too low a value, it is adjusted to the minimum ACK deadline. --- .../subscriber/_protocol/histogram.py | 32 ++++++---- .../_protocol/streaming_pull_manager.py | 8 ++- google/cloud/pubsub_v1/types.py | 3 +- .../subscriber/test_streaming_pull_manager.py | 59 +++++++++++++------ 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py index 29ee6fc61..80bfd311e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py @@ -15,6 +15,10 @@ from __future__ import absolute_import, division +MIN_ACK_DEADLINE = 10 +MAX_ACK_DEADLINE = 600 + + class Histogram(object): """Representation of a single histogram. @@ -83,41 +87,43 @@ def __repr__(self): def max(self): """Return the maximum value in this histogram. - If there are no values in the histogram at all, return 600. + If there are no values in the histogram at all, return ``MAX_ACK_DEADLINE``. Returns: int: The maximum value in the histogram. """ if len(self._data) == 0: - return 600 + return MAX_ACK_DEADLINE return next(iter(reversed(sorted(self._data.keys())))) @property def min(self): """Return the minimum value in this histogram. - If there are no values in the histogram at all, return 10. + If there are no values in the histogram at all, return the min default. Returns: int: The minimum value in the histogram. """ if len(self._data) == 0: - return 10 + return MIN_ACK_DEADLINE return next(iter(sorted(self._data.keys()))) def add(self, value): """Add the value to this histogram. Args: - value (int): The value. Values outside of ``10 <= x <= 600`` - will be raised to ``10`` or reduced to ``600``. + value (int): The value. Values outside of + ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` + will be raised to ``MIN_ACK_DEADLINE`` or reduced to + ``MAX_ACK_DEADLINE``. """ # If the value is out of bounds, bring it in bounds. value = int(value) - if value < 10: - value = 10 - if value > 600: - value = 600 + if value < MIN_ACK_DEADLINE: + value = MIN_ACK_DEADLINE + elif value > MAX_ACK_DEADLINE: + value = MAX_ACK_DEADLINE # Add the value to the histogram's data dictionary. self._data.setdefault(value, 0) @@ -129,7 +135,7 @@ def percentile(self, percent): Args: percent (Union[int, float]): The precentile being sought. The - default consumer implementations use consistently use ``99``. + default consumer implementations consistently use ``99``. Returns: int: The value corresponding to the requested percentile. @@ -150,5 +156,5 @@ def percentile(self, percent): return k # The only way to get here is if there was no data. - # In this case, just return 10 seconds. - return 10 + # In this case, just return the shortest possible deadline. + return MIN_ACK_DEADLINE diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f80dfb1b1..58b9646db 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -143,7 +143,7 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 - self._ack_deadline = 10 + self._ack_deadline = histogram.MIN_ACK_DEADLINE self._rpc = None self._callback = None self._closing = threading.Lock() @@ -248,10 +248,12 @@ def ack_deadline(self): self._ack_deadline = self.ack_histogram.percentile(percent=99) if self.flow_control.max_duration_per_lease_extension > 0: - self._ack_deadline = min( - self._ack_deadline, + # The setting in flow control could be too low, adjust if needed. + flow_control_setting = max( self.flow_control.max_duration_per_lease_extension, + histogram.MIN_ACK_DEADLINE, ) + self._ack_deadline = min(self._ack_deadline, flow_control_setting) return self._ack_deadline @property diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 677e4774f..d72541a3b 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -152,7 +152,8 @@ class LimitExceededBehavior(str, enum.Enum): FlowControl.max_duration_per_lease_extension.__doc__ = ( "The max amount of time in seconds for a single lease extension attempt. " "Bounds the delay before a message redelivery if the subscriber " - "fails to extend the deadline." + "fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored " + "if set to 0." ) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 09cfd0aea..eed30df54 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -139,13 +139,49 @@ def fake_add(self, items): leaser.add = stdlib_types.MethodType(fake_add, leaser) -def test_ack_deadline(): +def test_ack_deadline_no_custom_flow_control_setting(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + manager = make_manager() - assert manager.ack_deadline == 10 - manager.ack_histogram.add(20) - assert manager.ack_deadline == 20 - manager.ack_histogram.add(10) - assert manager.ack_deadline == 20 + + # Make sure that max_duration_per_lease_extension is disabled. + manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0) + + assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + + # When we get some historical data, the deadline is adjusted. + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2) + assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2 + + # Adding just a single additional data point does not yet change the deadline. + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE) + assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2 + + +def test_ack_deadline_with_max_duration_per_lease_extension(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager = make_manager() + manager._flow_control = types.FlowControl( + max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1 + ) + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large + + # The deadline configured in flow control should prevail. + assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + 1 + + +def test_ack_deadline_with_max_duration_per_lease_extension_too_low(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager = make_manager() + manager._flow_control = types.FlowControl( + max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE - 1 + ) + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large + + # The deadline configured in flow control should be adjusted to the minimum allowed. + assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE def test_client_id(): @@ -181,17 +217,6 @@ def test_streaming_flow_control_use_legacy_flow_control(): assert request.max_outstanding_bytes == 0 -def test_ack_deadline_with_max_duration_per_lease_extension(): - manager = make_manager() - manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5) - - assert manager.ack_deadline == 5 - for _ in range(5): - manager.ack_histogram.add(20) - - assert manager.ack_deadline == 5 - - def test_maybe_pause_consumer_wo_consumer_set(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) From aed7784f444400200e1f86c578e01bb76c55bf0c Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 21 May 2021 13:08:21 +0200 Subject: [PATCH 3/6] Bump histogram test coverage to 100% --- .../pubsub_v1/subscriber/test_histogram.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_histogram.py b/tests/unit/pubsub_v1/subscriber/test_histogram.py index d3c15cdce..aacdc3050 100644 --- a/tests/unit/pubsub_v1/subscriber/test_histogram.py +++ b/tests/unit/pubsub_v1/subscriber/test_histogram.py @@ -33,7 +33,7 @@ def test_contains(): def test_max(): histo = histogram.Histogram() - assert histo.max == 600 + assert histo.max == histogram.MAX_ACK_DEADLINE histo.add(120) assert histo.max == 120 histo.add(150) @@ -44,7 +44,7 @@ def test_max(): def test_min(): histo = histogram.Histogram() - assert histo.min == 10 + assert histo.min == histogram.MIN_ACK_DEADLINE histo.add(60) assert histo.min == 60 histo.add(30) @@ -63,20 +63,23 @@ def test_add(): def test_add_lower_limit(): histo = histogram.Histogram() - histo.add(5) - assert 5 not in histo - assert 10 in histo + low_value = histogram.MIN_ACK_DEADLINE - 1 + histo.add(low_value) + assert low_value not in histo + assert histogram.MIN_ACK_DEADLINE in histo def test_add_upper_limit(): histo = histogram.Histogram() - histo.add(12000) - assert 12000 not in histo - assert 600 in histo + high_value = histogram.MAX_ACK_DEADLINE + 1 + histo.add(high_value) + assert high_value not in histo + assert histogram.MAX_ACK_DEADLINE in histo def test_percentile(): histo = histogram.Histogram() + assert histo.percentile(42) == histogram.MIN_ACK_DEADLINE # default when empty [histo.add(i) for i in range(101, 201)] assert histo.percentile(100) == 200 assert histo.percentile(101) == 200 From ee29da44f565b1df53e4bbdff67a69671df891a2 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 21 May 2021 18:22:12 +0200 Subject: [PATCH 4/6] Only update ACK deadline through leaser The ACK deadline is dynamically being adjusted based on ACK data, but an updated deadline value must not be used for the new messages that arrive while the leaser thread is still sleeping. Doing so could result in the messages' ACK deadlines expiring before the leaser wakes up and has a chance of extending the deadlines for these messages. An updated ACK deadline value for new messages is thus only used when the leaser starts using it, too. --- .../pubsub_v1/subscriber/_protocol/leaser.py | 4 +- .../_protocol/streaming_pull_manager.py | 24 ++++++++-- .../unit/pubsub_v1/subscriber/test_leaser.py | 2 +- .../subscriber/test_streaming_pull_manager.py | 45 +++++++++++++++---- 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index c1f8b46d2..8fd067aaf 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -128,7 +128,9 @@ def maintain_leases(self): # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. - deadline = self._manager.ack_deadline + # Also update the deadline currently used if enough new ACK data has been + # gathered since the last deadline update. + deadline = self._manager._obtain_ack_deadline(maybe_update=True) _LOGGER.debug("The current deadline value is %d seconds.", deadline) # Make a copy of the leased messages. This is needed because it's diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 58b9646db..da027fcbe 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -228,16 +228,32 @@ def ack_histogram(self): @property def ack_deadline(self): - """Return and possibly update the current ACK deadline based on historical data. + """Return the current ACK deadline based on historical data without updating it. + + Returns: + int: The ack deadline. + """ + return self._obtain_ack_deadline(maybe_update=False) + + def _obtain_ack_deadline(self, maybe_update: bool) -> int: + """The actual `ack_deadline` implementation. This method is "sticky". It will only perform the computations to check on the - right ACK deadline if the histogram with past time-to-ack data has gained a - significant amount of new information. + right ACK deadline if explicitly requested AND if the histogram with past + time-to-ack data has gained a significant amount of new information. + + Args: + maybe_update (bool): + If ``True``, also update the current ACK deadline before returning it if + enough new ACK data has been gathered. Returns: - int: The ack deadline. + int: The current ACK deadline in seconds to use. """ with self._ack_deadline_lock: + if not maybe_update: + return self._ack_deadline + target_size = min( self._last_histogram_size * 2, self._last_histogram_size + 100 ) diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index 2ecc0b9f3..f389e5205 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -84,7 +84,7 @@ def create_manager(flow_control=types.FlowControl()): manager.is_active = True manager.flow_control = flow_control manager.ack_histogram = histogram.Histogram() - manager.ack_deadline = 10 + manager._obtain_ack_deadline.return_value = 10 return manager diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index eed30df54..8e4f6daf0 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -139,7 +139,7 @@ def fake_add(self, items): leaser.add = stdlib_types.MethodType(fake_add, leaser) -def test_ack_deadline_no_custom_flow_control_setting(): +def test__obtain_ack_deadline_no_custom_flow_control_setting(): from google.cloud.pubsub_v1.subscriber._protocol import histogram manager = make_manager() @@ -147,18 +147,21 @@ def test_ack_deadline_no_custom_flow_control_setting(): # Make sure that max_duration_per_lease_extension is disabled. manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0) - assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE # When we get some historical data, the deadline is adjusted. manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2) - assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2 + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE * 2 # Adding just a single additional data point does not yet change the deadline. manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE) - assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2 + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE * 2 -def test_ack_deadline_with_max_duration_per_lease_extension(): +def test__obtain_ack_deadline_with_max_duration_per_lease_extension(): from google.cloud.pubsub_v1.subscriber._protocol import histogram manager = make_manager() @@ -168,10 +171,11 @@ def test_ack_deadline_with_max_duration_per_lease_extension(): manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should prevail. - assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + 1 + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE + 1 -def test_ack_deadline_with_max_duration_per_lease_extension_too_low(): +def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low(): from google.cloud.pubsub_v1.subscriber._protocol import histogram manager = make_manager() @@ -181,7 +185,32 @@ def test_ack_deadline_with_max_duration_per_lease_extension_too_low(): manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should be adjusted to the minimum allowed. - assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE + + +def test__obtain_ack_deadline_no_value_update(): + manager = make_manager() + + # Make sure that max_duration_per_lease_extension is disabled. + manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0) + + manager.ack_histogram.add(21) + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == 21 + + for _ in range(5): + manager.ack_histogram.add(35) # Gather some new ACK data. + + deadline = manager._obtain_ack_deadline(maybe_update=False) + assert deadline == 21 # still the same + + # Accessing the value through the ack_deadline property has no side effects either. + assert manager.ack_deadline == 21 + + # Updating the ack deadline is reflected on ack_deadline wrapper, too. + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert manager.ack_deadline == deadline == 35 def test_client_id(): From 64e4d7a12273eec4dd05f3f5ef4f51a270166b8a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 26 May 2021 13:41:08 +0200 Subject: [PATCH 5/6] Be more explicit about the default Histogram.min --- google/cloud/pubsub_v1/subscriber/_protocol/histogram.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py index 80bfd311e..80c5b459b 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py @@ -100,7 +100,7 @@ def max(self): def min(self): """Return the minimum value in this histogram. - If there are no values in the histogram at all, return the min default. + If there are no values in the histogram at all, return ``MIN_ACK_DEADLINE``. Returns: int: The minimum value in the histogram. From d772cab394c66d269fd9759e2590852282cc3472 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 26 May 2021 16:14:28 +0200 Subject: [PATCH 6/6] Avoid hardcoded values in Histogram docstring --- google/cloud/pubsub_v1/subscriber/_protocol/histogram.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py index 80c5b459b..0a4a81746 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py @@ -31,8 +31,9 @@ class Histogram(object): are free to use a different formula. The precision of data stored is to the nearest integer. Additionally, - values outside the range of ``10 <= x <= 600`` are stored as ``10`` or - ``600``, since these are the boundaries of leases in the actual API. + values outside the range of ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` are stored + as ``MIN_ACK_DEADLINE`` or ``MAX_ACK_DEADLINE``, since these are the boundaries of + leases in the actual API. """ def __init__(self, data=None):