diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index b60379444..5830680da 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -130,8 +130,8 @@ def maintain_leases(self): # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. - p99 = self._manager.ack_histogram.percentile(99) - _LOGGER.debug("The current p99 value is %d seconds.", p99) + deadline = self._manager.ack_deadline + _LOGGER.debug("The current deadline value is %d seconds.", deadline) # Make a copy of the leased messages. This is needed because it's # possible for another thread to modify the dictionary while @@ -173,7 +173,7 @@ def maintain_leases(self): # way for ``send_request`` to fail when the consumer # is inactive. self._manager.dispatcher.modify_ack_deadline( - [requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids] + [requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids] ) # Now wait an appropriate period of time and do this again. @@ -182,7 +182,7 @@ def maintain_leases(self): # period between 0 seconds and 90% of the lease. This use of # jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform(0.0, p99 * 0.9) + snooze = random.uniform(0.0, deadline * 0.9) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 9b3f8d5fe..0a25d4625 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -191,9 +191,19 @@ def ack_deadline(self): Returns: int: The ack deadline. """ - target = min([self._last_histogram_size * 2, self._last_histogram_size + 100]) - if len(self.ack_histogram) > target: + target_size = min( + self._last_histogram_size * 2, self._last_histogram_size + 100 + ) + hist_size = len(self.ack_histogram) + + if hist_size > target_size: + self._last_histogram_size = hist_size self._ack_deadline = self.ack_histogram.percentile(percent=99) + + if self.flow_control.max_duration_per_lease_extension > 0: + self._ack_deadline = min( + self._ack_deadline, self.flow_control.max_duration_per_lease_extension + ) return self._ack_deadline @property diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 28019f478..eb4f00681 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -87,12 +87,19 @@ # these settings can be altered to tweak Pub/Sub behavior. # The defaults should be fine for most use cases. FlowControl = collections.namedtuple( - "FlowControl", ["max_bytes", "max_messages", "max_lease_duration"] + "FlowControl", + [ + "max_bytes", + "max_messages", + "max_lease_duration", + "max_duration_per_lease_extension", + ], ) FlowControl.__new__.__defaults__ = ( 100 * 1024 * 1024, # max_bytes: 100mb 1000, # max_messages: 1000 1 * 60 * 60, # max_lease_duration: 1 hour. + 0, # max_duration_per_lease_extension: disabled ) if sys.version_info >= (3, 5): @@ -112,6 +119,11 @@ "The maximum amount of time in seconds to hold a lease on a message " "before dropping it from the lease management." ) + FlowControl.max_duration_per_lease_extension.__doc__ = ( + "The max amount of time in seconds for a single lease extension attempt. " + "Bounds the delay before a message redelivery if the subscriber " + "fails to extend the deadline." + ) _shared_modules = [ diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index ec954b89d..17409cb3f 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -84,6 +84,7 @@ def create_manager(flow_control=types.FlowControl()): manager.is_active = True manager.flow_control = flow_control manager.ack_histogram = histogram.Histogram() + manager.ack_deadline = 10 return manager diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 0886a4508..70f320fcc 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -144,6 +144,17 @@ def test_ack_deadline(): assert manager.ack_deadline == 20 +def test_ack_deadline_with_max_duration_per_lease_extension(): + manager = make_manager() + manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5) + + assert manager.ack_deadline == 5 + for _ in range(5): + manager.ack_histogram.add(20) + + assert manager.ack_deadline == 5 + + def test_maybe_pause_consumer_wo_consumer_set(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000)