Skip to content

Commit

Permalink
fix: ACK deadline set for received messages can be too low (#416)
Browse files Browse the repository at this point in the history
Fixes #413.

This PR makes sure that the ACK deadline set for the received messages is always consistent with what the leaser uses internally when extending the ACK deadlines for the leased messages.

See the issue description and a [comment](#413 (comment)) explaining a possible sequence of events that lead to a bug.

**PR checklist**
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
  • Loading branch information
plamut committed May 26, 2021
1 parent de5429a commit e907f6e
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 64 deletions.
37 changes: 22 additions & 15 deletions google/cloud/pubsub_v1/subscriber/_protocol/histogram.py
Expand Up @@ -15,6 +15,10 @@
from __future__ import absolute_import, division


MIN_ACK_DEADLINE = 10
MAX_ACK_DEADLINE = 600


class Histogram(object):
"""Representation of a single histogram.
Expand All @@ -27,8 +31,9 @@ class Histogram(object):
are free to use a different formula.
The precision of data stored is to the nearest integer. Additionally,
values outside the range of ``10 <= x <= 600`` are stored as ``10`` or
``600``, since these are the boundaries of leases in the actual API.
values outside the range of ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` are stored
as ``MIN_ACK_DEADLINE`` or ``MAX_ACK_DEADLINE``, since these are the boundaries of
leases in the actual API.
"""

def __init__(self, data=None):
Expand Down Expand Up @@ -83,41 +88,43 @@ def __repr__(self):
def max(self):
"""Return the maximum value in this histogram.
If there are no values in the histogram at all, return 600.
If there are no values in the histogram at all, return ``MAX_ACK_DEADLINE``.
Returns:
int: The maximum value in the histogram.
"""
if len(self._data) == 0:
return 600
return MAX_ACK_DEADLINE
return next(iter(reversed(sorted(self._data.keys()))))

@property
def min(self):
"""Return the minimum value in this histogram.
If there are no values in the histogram at all, return 10.
If there are no values in the histogram at all, return ``MIN_ACK_DEADLINE``.
Returns:
int: The minimum value in the histogram.
"""
if len(self._data) == 0:
return 10
return MIN_ACK_DEADLINE
return next(iter(sorted(self._data.keys())))

def add(self, value):
"""Add the value to this histogram.
Args:
value (int): The value. Values outside of ``10 <= x <= 600``
will be raised to ``10`` or reduced to ``600``.
value (int): The value. Values outside of
``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE``
will be raised to ``MIN_ACK_DEADLINE`` or reduced to
``MAX_ACK_DEADLINE``.
"""
# If the value is out of bounds, bring it in bounds.
value = int(value)
if value < 10:
value = 10
if value > 600:
value = 600
if value < MIN_ACK_DEADLINE:
value = MIN_ACK_DEADLINE
elif value > MAX_ACK_DEADLINE:
value = MAX_ACK_DEADLINE

# Add the value to the histogram's data dictionary.
self._data.setdefault(value, 0)
Expand All @@ -129,7 +136,7 @@ def percentile(self, percent):
Args:
percent (Union[int, float]): The precentile being sought. The
default consumer implementations use consistently use ``99``.
default consumer implementations consistently use ``99``.
Returns:
int: The value corresponding to the requested percentile.
Expand All @@ -150,5 +157,5 @@ def percentile(self, percent):
return k

# The only way to get here is if there was no data.
# In this case, just return 10 seconds.
return 10
# In this case, just return the shortest possible deadline.
return MIN_ACK_DEADLINE
4 changes: 3 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -128,7 +128,9 @@ def maintain_leases(self):
# Determine the appropriate duration for the lease. This is
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
deadline = self._manager.ack_deadline
# Also update the deadline currently used if enough new ACK data has been
# gathered since the last deadline update.
deadline = self._manager._obtain_ack_deadline(maybe_update=True)
_LOGGER.debug("The current deadline value is %d seconds.", deadline)

# Make a copy of the leased messages. This is needed because it's
Expand Down
Expand Up @@ -143,7 +143,7 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
self._ack_deadline = histogram.MIN_ACK_DEADLINE
self._rpc = None
self._callback = None
self._closing = threading.Lock()
Expand Down Expand Up @@ -179,6 +179,11 @@ def __init__(
# currently on hold.
self._pause_resume_lock = threading.Lock()

# A lock protecting the current ACK deadline used in the lease management. This
# value can be potentially updated both by the leaser thread and by the message
# consumer thread when invoking the internal _on_response() callback.
self._ack_deadline_lock = threading.Lock()

# The threads created in ``.open()``.
self._dispatcher = None
self._leaser = None
Expand Down Expand Up @@ -223,29 +228,49 @@ def ack_histogram(self):

@property
def ack_deadline(self):
"""Return the current ack deadline based on historical time-to-ack.
This method is "sticky". It will only perform the computations to
check on the right ack deadline if the histogram has gained a
significant amount of new information.
"""Return the current ACK deadline based on historical data without updating it.
Returns:
int: The ack deadline.
"""
target_size = min(
self._last_histogram_size * 2, self._last_histogram_size + 100
)
hist_size = len(self.ack_histogram)
return self._obtain_ack_deadline(maybe_update=False)

def _obtain_ack_deadline(self, maybe_update: bool) -> int:
"""The actual `ack_deadline` implementation.
This method is "sticky". It will only perform the computations to check on the
right ACK deadline if explicitly requested AND if the histogram with past
time-to-ack data has gained a significant amount of new information.
Args:
maybe_update (bool):
If ``True``, also update the current ACK deadline before returning it if
enough new ACK data has been gathered.
if hist_size > target_size:
self._last_histogram_size = hist_size
self._ack_deadline = self.ack_histogram.percentile(percent=99)
Returns:
int: The current ACK deadline in seconds to use.
"""
with self._ack_deadline_lock:
if not maybe_update:
return self._ack_deadline

if self.flow_control.max_duration_per_lease_extension > 0:
self._ack_deadline = min(
self._ack_deadline, self.flow_control.max_duration_per_lease_extension
target_size = min(
self._last_histogram_size * 2, self._last_histogram_size + 100
)
return self._ack_deadline
hist_size = len(self.ack_histogram)

if hist_size > target_size:
self._last_histogram_size = hist_size
self._ack_deadline = self.ack_histogram.percentile(percent=99)

if self.flow_control.max_duration_per_lease_extension > 0:
# The setting in flow control could be too low, adjust if needed.
flow_control_setting = max(
self.flow_control.max_duration_per_lease_extension,
histogram.MIN_ACK_DEADLINE,
)
self._ack_deadline = min(self._ack_deadline, flow_control_setting)
return self._ack_deadline

@property
def load(self):
Expand Down Expand Up @@ -490,7 +515,7 @@ def open(self, callback, on_callback_error):
)

# Create the RPC
stream_ack_deadline_seconds = self.ack_histogram.percentile(99)
stream_ack_deadline_seconds = self.ack_deadline

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down Expand Up @@ -688,7 +713,7 @@ def _on_response(self, response):
# modack the messages we received, as this tells the server that we've
# received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
requests.ModAckRequest(message.ack_id, self.ack_deadline)
for message in received_messages
]
self._dispatcher.modify_ack_deadline(items)
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -152,7 +152,8 @@ class LimitExceededBehavior(str, enum.Enum):
FlowControl.max_duration_per_lease_extension.__doc__ = (
"The max amount of time in seconds for a single lease extension attempt. "
"Bounds the delay before a message redelivery if the subscriber "
"fails to extend the deadline."
"fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored "
"if set to 0."
)


Expand Down
19 changes: 11 additions & 8 deletions tests/unit/pubsub_v1/subscriber/test_histogram.py
Expand Up @@ -33,7 +33,7 @@ def test_contains():

def test_max():
histo = histogram.Histogram()
assert histo.max == 600
assert histo.max == histogram.MAX_ACK_DEADLINE
histo.add(120)
assert histo.max == 120
histo.add(150)
Expand All @@ -44,7 +44,7 @@ def test_max():

def test_min():
histo = histogram.Histogram()
assert histo.min == 10
assert histo.min == histogram.MIN_ACK_DEADLINE
histo.add(60)
assert histo.min == 60
histo.add(30)
Expand All @@ -63,20 +63,23 @@ def test_add():

def test_add_lower_limit():
histo = histogram.Histogram()
histo.add(5)
assert 5 not in histo
assert 10 in histo
low_value = histogram.MIN_ACK_DEADLINE - 1
histo.add(low_value)
assert low_value not in histo
assert histogram.MIN_ACK_DEADLINE in histo


def test_add_upper_limit():
histo = histogram.Histogram()
histo.add(12000)
assert 12000 not in histo
assert 600 in histo
high_value = histogram.MAX_ACK_DEADLINE + 1
histo.add(high_value)
assert high_value not in histo
assert histogram.MAX_ACK_DEADLINE in histo


def test_percentile():
histo = histogram.Histogram()
assert histo.percentile(42) == histogram.MIN_ACK_DEADLINE # default when empty
[histo.add(i) for i in range(101, 201)]
assert histo.percentile(100) == 200
assert histo.percentile(101) == 200
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_leaser.py
Expand Up @@ -84,7 +84,7 @@ def create_manager(flow_control=types.FlowControl()):
manager.is_active = True
manager.flow_control = flow_control
manager.ack_histogram = histogram.Histogram()
manager.ack_deadline = 10
manager._obtain_ack_deadline.return_value = 10
return manager


Expand Down

0 comments on commit e907f6e

Please sign in to comment.