Skip to content

Commit

Permalink
Make max_duration_per_lease_extension high enough
Browse files Browse the repository at this point in the history
If flow_control.max_duration_per_lease_extension is set to too low a
value, it is adjusted to the minimum ACK deadline.
  • Loading branch information
plamut committed May 21, 2021
1 parent 223a986 commit 5407e01
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 32 deletions.
28 changes: 17 additions & 11 deletions google/cloud/pubsub_v1/subscriber/_protocol/histogram.py
Expand Up @@ -15,6 +15,10 @@
from __future__ import absolute_import, division


MIN_ACK_DEADLINE = 10
MAX_ACK_DEADLINE = 600


class Histogram(object):
"""Representation of a single histogram.
Expand Down Expand Up @@ -96,28 +100,30 @@ def max(self):
def min(self):
"""Return the minimum value in this histogram.
If there are no values in the histogram at all, return 10.
If there are no values in the histogram at all, return the min default.
Returns:
int: The minimum value in the histogram.
"""
if len(self._data) == 0:
return 10
return MIN_ACK_DEADLINE
return next(iter(sorted(self._data.keys())))

def add(self, value):
"""Add the value to this histogram.
Args:
value (int): The value. Values outside of ``10 <= x <= 600``
will be raised to ``10`` or reduced to ``600``.
value (int): The value. Values outside of
``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE``
will be raised to ``MIN_ACK_DEADLINE`` or reduced to
``MAX_ACK_DEADLINE``.
"""
# If the value is out of bounds, bring it in bounds.
value = int(value)
if value < 10:
value = 10
if value > 600:
value = 600
if value < MIN_ACK_DEADLINE:
value = MIN_ACK_DEADLINE
elif value > MAX_ACK_DEADLINE:
value = MAX_ACK_DEADLINE

# Add the value to the histogram's data dictionary.
self._data.setdefault(value, 0)
Expand All @@ -129,7 +135,7 @@ def percentile(self, percent):
Args:
percent (Union[int, float]): The precentile being sought. The
default consumer implementations use consistently use ``99``.
default consumer implementations consistently use ``99``.
Returns:
int: The value corresponding to the requested percentile.
Expand All @@ -150,5 +156,5 @@ def percentile(self, percent):
return k

# The only way to get here is if there was no data.
# In this case, just return 10 seconds.
return 10
# In this case, just return the shortest possible deadline.
return MIN_ACK_DEADLINE
Expand Up @@ -143,7 +143,7 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
self._ack_deadline = histogram.MIN_ACK_DEADLINE
self._rpc = None
self._callback = None
self._closing = threading.Lock()
Expand Down Expand Up @@ -248,10 +248,12 @@ def ack_deadline(self):
self._ack_deadline = self.ack_histogram.percentile(percent=99)

if self.flow_control.max_duration_per_lease_extension > 0:
self._ack_deadline = min(
self._ack_deadline,
# The setting in flow control could be too low, adjust if needed.
flow_control_setting = max(
self.flow_control.max_duration_per_lease_extension,
histogram.MIN_ACK_DEADLINE,
)
self._ack_deadline = min(self._ack_deadline, flow_control_setting)
return self._ack_deadline

@property
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -152,7 +152,8 @@ class LimitExceededBehavior(str, enum.Enum):
FlowControl.max_duration_per_lease_extension.__doc__ = (
"The max amount of time in seconds for a single lease extension attempt. "
"Bounds the delay before a message redelivery if the subscriber "
"fails to extend the deadline."
"fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored "
"if set to 0."
)


Expand Down
59 changes: 42 additions & 17 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Expand Up @@ -139,13 +139,49 @@ def fake_add(self, items):
leaser.add = stdlib_types.MethodType(fake_add, leaser)


def test_ack_deadline():
def test_ack_deadline_no_custom_flow_control_setting():
from google.cloud.pubsub_v1.subscriber._protocol import histogram

manager = make_manager()
assert manager.ack_deadline == 10
manager.ack_histogram.add(20)
assert manager.ack_deadline == 20
manager.ack_histogram.add(10)
assert manager.ack_deadline == 20

# Make sure that max_duration_per_lease_extension is disabled.
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0)

assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE

# When we get some historical data, the deadline is adjusted.
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2)
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2

# Adding just a single additional data point does not yet change the deadline.
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE)
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2


def test_ack_deadline_with_max_duration_per_lease_extension():
from google.cloud.pubsub_v1.subscriber._protocol import histogram

manager = make_manager()
manager._flow_control = types.FlowControl(
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1
)
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large

# The deadline configured in flow control should prevail.
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + 1


def test_ack_deadline_with_max_duration_per_lease_extension_too_low():
from google.cloud.pubsub_v1.subscriber._protocol import histogram

manager = make_manager()
manager._flow_control = types.FlowControl(
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE - 1
)
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large

# The deadline configured in flow control should be adjusted to the minimum allowed.
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE


def test_client_id():
Expand Down Expand Up @@ -181,17 +217,6 @@ def test_streaming_flow_control_use_legacy_flow_control():
assert request.max_outstanding_bytes == 0


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)

assert manager.ack_deadline == 5
for _ in range(5):
manager.ack_histogram.add(20)

assert manager.ack_deadline == 5


def test_maybe_pause_consumer_wo_consumer_set():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
Expand Down

0 comments on commit 5407e01

Please sign in to comment.