Skip to content

Commit

Permalink
feat(pubsub): implement max_duration_per_lease_extension option (#38)
Browse files Browse the repository at this point in the history
* feat(pubsub): implement max_duration_per_lease_extension

* comment change

* simplify ack_deadline()
  • Loading branch information
Gurov Ilya committed Mar 3, 2020
1 parent 4a7211b commit d911a2d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
8 changes: 4 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -130,8 +130,8 @@ def maintain_leases(self):
# Determine the appropriate duration for the lease. This is
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
p99 = self._manager.ack_histogram.percentile(99)
_LOGGER.debug("The current p99 value is %d seconds.", p99)
deadline = self._manager.ack_deadline
_LOGGER.debug("The current deadline value is %d seconds.", deadline)

# Make a copy of the leased messages. This is needed because it's
# possible for another thread to modify the dictionary while
Expand Down Expand Up @@ -173,7 +173,7 @@ def maintain_leases(self):
# way for ``send_request`` to fail when the consumer
# is inactive.
self._manager.dispatcher.modify_ack_deadline(
[requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids]
[requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids]
)

# Now wait an appropriate period of time and do this again.
Expand All @@ -182,7 +182,7 @@ def maintain_leases(self):
# period between 0 seconds and 90% of the lease. This use of
# jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
# where there are many clients.
snooze = random.uniform(0.0, p99 * 0.9)
snooze = random.uniform(0.0, deadline * 0.9)
_LOGGER.debug("Snoozing lease management for %f seconds.", snooze)
self._stop_event.wait(timeout=snooze)

Expand Down
Expand Up @@ -191,9 +191,19 @@ def ack_deadline(self):
Returns:
int: The ack deadline.
"""
target = min([self._last_histogram_size * 2, self._last_histogram_size + 100])
if len(self.ack_histogram) > target:
target_size = min(
self._last_histogram_size * 2, self._last_histogram_size + 100
)
hist_size = len(self.ack_histogram)

if hist_size > target_size:
self._last_histogram_size = hist_size
self._ack_deadline = self.ack_histogram.percentile(percent=99)

if self.flow_control.max_duration_per_lease_extension > 0:
self._ack_deadline = min(
self._ack_deadline, self.flow_control.max_duration_per_lease_extension
)
return self._ack_deadline

@property
Expand Down
14 changes: 13 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -87,12 +87,19 @@
# these settings can be altered to tweak Pub/Sub behavior.
# The defaults should be fine for most use cases.
FlowControl = collections.namedtuple(
"FlowControl", ["max_bytes", "max_messages", "max_lease_duration"]
"FlowControl",
[
"max_bytes",
"max_messages",
"max_lease_duration",
"max_duration_per_lease_extension",
],
)
FlowControl.__new__.__defaults__ = (
100 * 1024 * 1024, # max_bytes: 100mb
1000, # max_messages: 1000
1 * 60 * 60, # max_lease_duration: 1 hour.
0, # max_duration_per_lease_extension: disabled
)

if sys.version_info >= (3, 5):
Expand All @@ -112,6 +119,11 @@
"The maximum amount of time in seconds to hold a lease on a message "
"before dropping it from the lease management."
)
FlowControl.max_duration_per_lease_extension.__doc__ = (
"The max amount of time in seconds for a single lease extension attempt. "
"Bounds the delay before a message redelivery if the subscriber "
"fails to extend the deadline."
)


_shared_modules = [
Expand Down
1 change: 1 addition & 0 deletions tests/unit/pubsub_v1/subscriber/test_leaser.py
Expand Up @@ -84,6 +84,7 @@ def create_manager(flow_control=types.FlowControl()):
manager.is_active = True
manager.flow_control = flow_control
manager.ack_histogram = histogram.Histogram()
manager.ack_deadline = 10
return manager


Expand Down
11 changes: 11 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -144,6 +144,17 @@ def test_ack_deadline():
assert manager.ack_deadline == 20


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)

assert manager.ack_deadline == 5
for _ in range(5):
manager.ack_histogram.add(20)

assert manager.ack_deadline == 5


def test_maybe_pause_consumer_wo_consumer_set():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
Expand Down

0 comments on commit d911a2d

Please sign in to comment.