diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index fa3fac6d3..f899f4d08 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import deque +from collections import OrderedDict import logging import threading import warnings @@ -24,12 +24,21 @@ _LOGGER = logging.getLogger(__name__) -class _QuantityReservation(object): - """A (partial) reservation of a quantifiable resource.""" +class _QuantityReservation: + """A (partial) reservation of quantifiable resources.""" - def __init__(self, reserved, needed): - self.reserved = reserved - self.needed = needed + def __init__(self, bytes_reserved: int, bytes_needed: int, has_slot: bool): + self.bytes_reserved = bytes_reserved + self.bytes_needed = bytes_needed + self.has_slot = has_slot + + def __repr__(self): + return ( + f"{type(self).__name__}(" + f"bytes_reserved={self.bytes_reserved}, " + f"bytes_needed={self.bytes_needed}, " + f"has_slot={self.has_slot})" + ) class FlowController(object): @@ -48,14 +57,13 @@ def __init__(self, settings): self._message_count = 0 self._total_bytes = 0 - # A FIFO queue of threads blocked on adding a message, from first to last. + # A FIFO queue of threads blocked on adding a message that also tracks their + # reservations of available flow control bytes and message slots. # Only relevant if the configured limit exceeded behavior is BLOCK. - self._waiting = deque() + self._waiting = OrderedDict() - # Reservations of available flow control bytes by the waiting threads. - # Each value is a _QuantityReservation instance. - self._byte_reservations = dict() self._reserved_bytes = 0 + self._reserved_slots = 0 # The lock is used to protect all internal state (message and byte count, # waiting threads to add, etc.). @@ -131,11 +139,13 @@ def add(self, message): current_thread = threading.current_thread() while self._would_overflow(message): - if current_thread not in self._byte_reservations: - self._waiting.append(current_thread) - self._byte_reservations[current_thread] = _QuantityReservation( - reserved=0, needed=message._pb.ByteSize() + if current_thread not in self._waiting: + reservation = _QuantityReservation( + bytes_reserved=0, + bytes_needed=message._pb.ByteSize(), + has_slot=False, ) + self._waiting[current_thread] = reservation # Will be placed last. _LOGGER.debug( "Blocking until there is enough free capacity in the flow - " @@ -152,9 +162,9 @@ def add(self, message): # Message accepted, increase the load and remove thread stats. self._message_count += 1 self._total_bytes += message._pb.ByteSize() - self._reserved_bytes -= self._byte_reservations[current_thread].reserved - del self._byte_reservations[current_thread] - self._waiting.remove(current_thread) + self._reserved_bytes -= self._waiting[current_thread].bytes_reserved + self._reserved_slots -= 1 + del self._waiting[current_thread] def release(self, message): """Release a mesage from flow control. @@ -180,39 +190,52 @@ def release(self, message): self._message_count = max(0, self._message_count) self._total_bytes = max(0, self._total_bytes) - self._distribute_available_bytes() + self._distribute_available_capacity() # If at least one thread waiting to add() can be unblocked, wake them up. if self._ready_to_unblock(): _LOGGER.debug("Notifying threads waiting to add messages to flow.") self._has_capacity.notify_all() - def _distribute_available_bytes(self): - """Distribute availalbe free capacity among the waiting threads in FIFO order. + def _distribute_available_capacity(self): + """Distribute available capacity among the waiting threads in FIFO order. The method assumes that the caller has obtained ``_operational_lock``. """ - available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes + available_slots = ( + self._settings.message_limit - self._message_count - self._reserved_slots + ) + available_bytes = ( + self._settings.byte_limit - self._total_bytes - self._reserved_bytes + ) + + for reservation in self._waiting.values(): + if available_slots <= 0 and available_bytes <= 0: + break # Santa is now empty-handed, better luck next time. - for thread in self._waiting: - if available <= 0: - break + # Distribute any free slots. + if available_slots > 0 and not reservation.has_slot: + reservation.has_slot = True + self._reserved_slots += 1 + available_slots -= 1 - reservation = self._byte_reservations[thread] - still_needed = reservation.needed - reservation.reserved + # Distribute any free bytes. + if available_bytes <= 0: + continue - # Sanity check for any internal inconsistencies. - if still_needed < 0: + bytes_still_needed = reservation.bytes_needed - reservation.bytes_reserved + + if bytes_still_needed < 0: # Sanity check for any internal inconsistencies. msg = "Too many bytes reserved: {} / {}".format( - reservation.reserved, reservation.needed + reservation.bytes_reserved, reservation.bytes_needed ) warnings.warn(msg, category=RuntimeWarning) - still_needed = 0 + bytes_still_needed = 0 - can_give = min(still_needed, available) - reservation.reserved += can_give + can_give = min(bytes_still_needed, available_bytes) + reservation.bytes_reserved += can_give self._reserved_bytes += can_give - available -= can_give + available_bytes -= can_give def _ready_to_unblock(self): """Determine if any of the threads waiting to add a message can proceed. @@ -225,10 +248,10 @@ def _ready_to_unblock(self): if self._waiting: # It's enough to only check the head of the queue, because FIFO # distribution of any free capacity. - reservation = self._byte_reservations[self._waiting[0]] + first_reservation = next(iter(self._waiting.values())) return ( - reservation.reserved >= reservation.needed - and self._message_count < self._settings.message_limit + first_reservation.bytes_reserved >= first_reservation.bytes_needed + and first_reservation.has_slot ) return False @@ -245,16 +268,22 @@ def _would_overflow(self, message): Returns: bool """ - reservation = self._byte_reservations.get(threading.current_thread()) + reservation = self._waiting.get(threading.current_thread()) if reservation: - enough_reserved = reservation.reserved >= reservation.needed + enough_reserved = reservation.bytes_reserved >= reservation.bytes_needed + has_slot = reservation.has_slot else: enough_reserved = False + has_slot = False bytes_taken = self._total_bytes + self._reserved_bytes + message._pb.ByteSize() size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved - msg_count_overflow = self._message_count + 1 > self._settings.message_limit + + msg_count_overflow = not has_slot and ( + (self._message_count + self._reserved_slots + 1) + > self._settings.message_limit + ) return size_overflow or msg_count_overflow @@ -275,18 +304,15 @@ def _load_info(self, message_count=None, total_bytes=None): Returns: str """ - msg = "messages: {} / {}, bytes: {} / {} (reserved: {})" - if message_count is None: message_count = self._message_count if total_bytes is None: total_bytes = self._total_bytes - return msg.format( - message_count, - self._settings.message_limit, - total_bytes, - self._settings.byte_limit, - self._reserved_bytes, + return ( + f"messages: {message_count} / {self._settings.message_limit} " + f"(reserved: {self._reserved_slots}), " + f"bytes: {total_bytes} / {self._settings.byte_limit} " + f"(reserved: {self._reserved_bytes})" ) diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index 5e9d6c3ae..ee923a435 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -16,10 +16,14 @@ import threading import time +from typing import Callable +from typing import Sequence +from typing import Union import warnings import pytest +import google from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher.flow_controller import FlowController @@ -27,25 +31,20 @@ def _run_in_daemon( - flow_controller, - action, - messages, - all_done_event, - error_event=None, - action_pause=None, + action: Callable[["google.cloud.pubsub_v1.types.PubsubMessage"], None], + messages: Sequence["google.cloud.pubsub_v1.types.PubsubMessage"], + all_done_event: threading.Event, + error_event: threading.Event = None, + action_pause: Union[int, float] = None, ): - """Run flow controller action (add or remove messages) in a daemon thread. - """ - assert action in ("add", "release") + """Run flow controller action (add or remove messages) in a daemon thread.""" def run_me(): - method = getattr(flow_controller, action) - try: for msg in messages: if action_pause is not None: time.sleep(action_pause) - method(msg) + action(msg) except Exception: if error_event is not None: # pragma: NO COVER error_event.set() @@ -227,7 +226,7 @@ def test_blocking_on_overflow_until_free_capacity(): releasing_x_done = threading.Event() # Adding a message with free capacity should not block. - _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + _run_in_daemon(flow_controller.add, [msg1], adding_1_done) if not adding_1_done.wait(timeout=0.1): pytest.fail( # pragma: NO COVER "Adding a message with enough flow capacity blocked or errored." @@ -235,21 +234,21 @@ def test_blocking_on_overflow_until_free_capacity(): # Adding messages when there is not enough capacity should block, even if # added through multiple threads. - _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + _run_in_daemon(flow_controller.add, [msg2], adding_2_done) if adding_2_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") # pragma: NO COVER - _run_in_daemon(flow_controller, "add", [msg3], adding_3_done) + _run_in_daemon(flow_controller.add, [msg3], adding_3_done) if adding_3_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") # pragma: NO COVER - _run_in_daemon(flow_controller, "add", [msg4], adding_4_done) + _run_in_daemon(flow_controller.add, [msg4], adding_4_done) if adding_4_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") # pragma: NO COVER # After releasing one message, there should be room for a new message, which # should result in unblocking one of the waiting threads. - _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + _run_in_daemon(flow_controller.release, [msg1], releasing_1_done) if not releasing_1_done.wait(timeout=0.1): pytest.fail("Releasing a message blocked or errored.") # pragma: NO COVER @@ -266,7 +265,7 @@ def test_blocking_on_overflow_until_free_capacity(): # Release another message and verify that yet another thread gets unblocked. added_msg = [msg2, msg3, msg4][done_status.index(True)] - _run_in_daemon(flow_controller, "release", [added_msg], releasing_x_done) + _run_in_daemon(flow_controller.release, [added_msg], releasing_x_done) if not releasing_x_done.wait(timeout=0.1): pytest.fail("Releasing messages blocked or errored.") # pragma: NO COVER @@ -293,7 +292,7 @@ def test_error_if_mesage_would_block_indefinitely(): adding_done = threading.Event() error_event = threading.Event() - _run_in_daemon(flow_controller, "add", [msg], adding_done, error_event=error_event) + _run_in_daemon(flow_controller.add, [msg], adding_done, error_event=error_event) assert error_event.wait(timeout=0.1), "No error on adding too large a message." @@ -329,20 +328,20 @@ def test_threads_posting_large_messages_do_not_starve(): # enough messages should eventually allow the large message to come through, even # if more messages are added after it (those should wait for the large message). initial_messages = [grpc_types.PubsubMessage(data=b"x" * 10)] * 5 - _run_in_daemon(flow_controller, "add", initial_messages, adding_initial_done) + _run_in_daemon(flow_controller.add, initial_messages, adding_initial_done) assert adding_initial_done.wait(timeout=0.1) - _run_in_daemon(flow_controller, "add", [large_msg], adding_large_done) + _run_in_daemon(flow_controller.add, [large_msg], adding_large_done) # Continuously keep adding more messages after the large one. messages = [grpc_types.PubsubMessage(data=b"x" * 10)] * 10 - _run_in_daemon(flow_controller, "add", messages, adding_busy_done, action_pause=0.1) + _run_in_daemon(flow_controller.add, messages, adding_busy_done, action_pause=0.1) # At the same time, gradually keep releasing the messages - the freeed up # capacity should be consumed by the large message, not the other small messages # being added after it. _run_in_daemon( - flow_controller, "release", messages, releasing_busy_done, action_pause=0.1 + flow_controller.release, messages, releasing_busy_done, action_pause=0.1 ) # Sanity check - releasing should have completed by now. @@ -359,7 +358,7 @@ def test_threads_posting_large_messages_do_not_starve(): # Releasing the large message should unblock adding the remaining "busy" messages # that have not been added yet. - _run_in_daemon(flow_controller, "release", [large_msg], releasing_large_done) + _run_in_daemon(flow_controller.release, [large_msg], releasing_large_done) if not releasing_large_done.wait(timeout=0.1): pytest.fail("Releasing a message blocked or errored.") # pragma: NO COVER @@ -367,6 +366,41 @@ def test_threads_posting_large_messages_do_not_starve(): pytest.fail("Adding messages blocked or errored.") # pragma: NO COVER +def test_blocked_messages_are_accepted_in_fifo_order(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=1_000_000, # Unlimited for practical purposes in the test. + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + # It's OK if the message instance is shared, as flow controlelr is only concerned + # with byte sizes and counts, and not with particular message instances. + message = grpc_types.PubsubMessage(data=b"x") + + adding_done_events = [threading.Event() for _ in range(10)] + releasing_done_events = [threading.Event() for _ in adding_done_events] + + # Add messages. The first one will be accepted, and the rest should queue behind. + for adding_done in adding_done_events: + _run_in_daemon(flow_controller.add, [message], adding_done) + time.sleep(0.1) + + if not adding_done_events[0].wait(timeout=0.1): # pragma: NO COVER + pytest.fail("The first message unexpectedly got blocked on adding.") + + # For each message, check that it has indeed been added to the flow controller. + # Then release it to make room for the next message in line, and repeat the check. + enumeration = enumerate(zip(adding_done_events, releasing_done_events)) + for i, (adding_done, releasing_done) in enumeration: + if not adding_done.wait(timeout=0.1): # pragma: NO COVER + pytest.fail(f"Queued message still blocked on adding (i={i}).") + + _run_in_daemon(flow_controller.release, [message], releasing_done) + if not releasing_done.wait(timeout=0.1): # pragma: NO COVER + pytest.fail(f"Queued message was not released in time (i={i}).") + + def test_warning_on_internal_reservation_stats_error_when_unblocking(): settings = types.PublishFlowControl( message_limit=1, @@ -387,7 +421,7 @@ def test_warning_on_internal_reservation_stats_error_when_unblocking(): releasing_1_done = threading.Event() # Adding a message with free capacity should not block. - _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + _run_in_daemon(flow_controller.add, [msg1], adding_1_done) if not adding_1_done.wait(timeout=0.1): pytest.fail( # pragma: NO COVER "Adding a message with enough flow capacity blocked or errored." @@ -395,17 +429,17 @@ def test_warning_on_internal_reservation_stats_error_when_unblocking(): # Adding messages when there is not enough capacity should block, even if # added through multiple threads. - _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + _run_in_daemon(flow_controller.add, [msg2], adding_2_done) if adding_2_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") # pragma: NO COVER # Intentionally corrupt internal stats - reservation = next(iter(flow_controller._byte_reservations.values()), None) + reservation = next(iter(flow_controller._waiting.values()), None) assert reservation is not None, "No messages blocked by flow controller." - reservation.reserved = reservation.needed + 1 + reservation.bytes_reserved = reservation.bytes_needed + 1 with warnings.catch_warnings(record=True) as warned: - _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + _run_in_daemon(flow_controller.release, [msg1], releasing_1_done) if not releasing_1_done.wait(timeout=0.1): pytest.fail("Releasing a message blocked or errored.") # pragma: NO COVER