From f729fdf85e95032a0a13a4e387e586e0c099c8c6 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 8 May 2020 16:02:36 +0200 Subject: [PATCH] Add flow control support for multiple add() threads --- .../cloud/pubsub_v1/publisher/exceptions.py | 4 + .../pubsub_v1/publisher/flow_controller.py | 153 ++++++++-- .../publisher/test_flow_controller.py | 268 +++++++++++++----- 3 files changed, 329 insertions(+), 96 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index 89b3790a0..d34515c60 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -42,6 +42,10 @@ class FlowControlLimitError(Exception): """An action resulted in exceeding the flow control limits.""" +class PermanentlyBlockedError(FlowControlLimitError): + """A message exceeds *total* flow control limits and would block forever.""" + + __all__ = ( "FlowControlLimitError", "MessageTooLargeError", diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index ece6b160e..7032dacf1 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import deque import logging import threading import warnings @@ -23,6 +24,14 @@ _LOGGER = logging.getLogger(__name__) +class _QuantityReservation(object): + """A (partial) reservation of a quantifiable resource.""" + + def __init__(self, reserved, needed): + self.reserved = reserved + self.needed = needed + + class FlowController(object): """A class used to control the flow of messages passing through it. @@ -34,10 +43,22 @@ class FlowController(object): def __init__(self, settings): self._settings = settings + # Load statistics. They represent the number of messages added, but not + # yet released (and their total size). self._message_count = 0 self._total_bytes = 0 - # The lock is used to protect the internal state (message and byte count). + # A FIFO queue of threads blocked on adding a message, from first to last. + # Only relevant if the configured limit exceeded behavior is BLOCK. + self._waiting = deque() + + # Reservations of available flow control bytes by the waiting threads. + # Each value is a _QuantityReservation instance. + self._byte_reservations = dict() + self._reserved_bytes = 0 + + # The lock is used to protect all internal state (message and byte count, + # waiting threads to add, etc.). self._operational_lock = threading.Lock() # The condition for blocking the flow if capacity is exceeded. @@ -62,32 +83,30 @@ def add(self, message): return with self._operational_lock: - self._message_count += 1 - self._total_bytes += message.ByteSize() - - if not self._is_overflow(): + if not self._would_overflow(message): + self._message_count += 1 + self._total_bytes += message.ByteSize() return - # We have an overflow, react. + # Adding a message would overflow, react. if ( self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.ERROR ): + # Raising an error means rejecting a message, thus we do not + # add anything to the existing load, but we do report the would-be + # load if we accepted the message. msg = ( - "Flow control limits exceeded " + "Flow control limits would be exceeded " "(messages: {} / {}, bytes: {} / {})." ).format( - self._message_count, + self._message_count + 1, self._settings.message_limit, - self._total_bytes, + self._total_bytes + message.ByteSize(), self._settings.byte_limit, ) error = exceptions.FlowControlLimitError(msg) - # Raising an error means rejecting a message, thus we need to deduct - # the latter's contribution to the total load. - self._message_count -= 1 - self._total_bytes -= message.ByteSize() raise error assert ( @@ -95,13 +114,49 @@ def add(self, message): == types.LimitExceededBehavior.BLOCK ) - while self._is_overflow(): + # Sanity check - if a message exceeds total flow control limits all + # by itself, it would block forever, thus raise error. + if ( + message.ByteSize() > self._settings.byte_limit + or self._settings.message_limit < 1 + ): + error_msg = ( + "Flow control limits too low for the message. " + "(messages: {} / {}, bytes: {} / {})." + ).format( + 1, + self._settings.message_limit, + message.ByteSize(), + self._settings.byte_limit, + ) + raise exceptions.PermanentlyBlockedError(error_msg) + + current_thread = threading.current_thread() + + while self._would_overflow(message): + if current_thread not in self._byte_reservations: + self._waiting.append(current_thread) + self._byte_reservations[current_thread] = _QuantityReservation( + reserved=0, needed=message.ByteSize() + ) + _LOGGER.debug( "Blocking until there is enough free capacity in the flow." ) self._has_capacity.wait() _LOGGER.debug("Woke up from waiting on free capacity in the flow.") + # Message accepted, increase the load and remove thread stats if + # they exist in the waiting queue. + self._message_count += 1 + self._total_bytes += message.ByteSize() + + reservation = self._byte_reservations.get(current_thread) + if reservation: + self._reserved_bytes -= reservation.reserved + del self._byte_reservations[current_thread] + self._waiting.remove(current_thread) + def release(self, message): """Release a mesage from flow control. @@ -113,8 +168,7 @@ def release(self, message): return with self._operational_lock: - was_overflow = self._is_overflow() - + # Releasing a message decreases the load. self._message_count -= 1 self._total_bytes -= message.ByteSize() @@ -127,19 +181,72 @@ def release(self, message): self._message_count = max(0, self._message_count) self._total_bytes = max(0, self._total_bytes) - if was_overflow and not self._is_overflow(): + self._distribute_available_bytes() + + # If at least one thread waiting to add() can be unblocked, wake them up. + if self._ready_to_unblock(): _LOGGER.debug("Notifying threads waiting to add messages to flow.") self._has_capacity.notify_all() - def _is_overflow(self): - """Determine if the current message load exceeds flow control limits. + def _distribute_available_bytes(self): + """Distribute availalbe free capacity among the waiting threads in FIFO order. + + The method assumes that the caller has obtained ``_operational_lock``. + """ + available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes + + for thread in self._waiting: + if available <= 0: + break + + reservation = self._byte_reservations[thread] + still_needed = reservation.needed - reservation.reserved + can_give = min(still_needed, available) + + reservation.reserved += can_give + self._reserved_bytes += can_give + available -= can_give + + def _ready_to_unblock(self): + """Determine if any of the threads waiting to add a message can proceed. The method assumes that the caller has obtained ``_operational_lock``. Returns: bool """ - return ( - self._message_count > self._settings.message_limit - or self._total_bytes > self._settings.byte_limit - ) + if self._waiting: + # It's enough to only check the head of the queue, because FIFO + # distribution of any free capacity. + reservation = self._byte_reservations[self._waiting[0]] + return ( + reservation.reserved >= reservation.needed + and self._message_count < self._settings.message_limit + ) + + return False + + def _would_overflow(self, message): + """Determine if accepting a message would exceed flow control limits. + + The method assumes that the caller has obtained ``_operational_lock``. + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + + Returns: + bool + """ + reservation = self._byte_reservations.get(threading.current_thread()) + + if reservation: + enough_reserved = reservation.reserved >= reservation.needed + else: + enough_reserved = False + + bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize() + size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved + msg_count_overflow = self._message_count + 1 > self._settings.message_limit + + return size_overflow or msg_count_overflow diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index 3e4349f62..a4e848aeb 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import threading +import time import warnings import pytest @@ -24,18 +25,49 @@ from google.cloud.pubsub_v1.publisher.flow_controller import FlowController +def _run_in_daemon( + flow_controller, + action, + messages, + all_done_event, + error_event=None, + action_pause=None, +): + """Run flow controller action (add or remove messages) in a daemon thread. + """ + assert action in ("add", "release") + + def run_me(): + method = getattr(flow_controller, action) + + try: + for msg in messages: + if action_pause is not None: + time.sleep(action_pause) + method(msg) + except Exception: + if error_event is not None: + error_event.set() + else: + all_done_event.set() + + thread = threading.Thread(target=run_me) + thread.daemon = True + thread.start() + + def test_no_overflow_no_error(): settings = types.PublishFlowControl( message_limit=100, byte_limit=10000, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # there should be no errors for data in (b"foo", b"bar", b"baz"): msg = types.PubsubMessage(data=data) - instance.add(msg) + flow_controller.add(msg) def test_overflow_no_error_on_ignore(): @@ -44,11 +76,11 @@ def test_overflow_no_error_on_ignore(): byte_limit=2, limit_exceeded_behavior=types.LimitExceededBehavior.IGNORE, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # there should be no overflow errors - instance.add(types.PubsubMessage(data=b"foo")) - instance.add(types.PubsubMessage(data=b"bar")) + flow_controller.add(types.PubsubMessage(data=b"foo")) + flow_controller.add(types.PubsubMessage(data=b"bar")) def test_message_count_overflow_error(): @@ -57,11 +89,11 @@ def test_message_count_overflow_error(): byte_limit=10000, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) - instance.add(types.PubsubMessage(data=b"foo")) + flow_controller.add(types.PubsubMessage(data=b"foo")) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(types.PubsubMessage(data=b"bar")) + flow_controller.add(types.PubsubMessage(data=b"bar")) assert "messages: 2 / 1" in str(error.value) @@ -72,7 +104,7 @@ def test_byte_size_overflow_error(): byte_limit=199, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # Since the message data itself occupies 100 bytes, it means that both # messages combined will exceed the imposed byte limit of 199, but a single @@ -80,9 +112,9 @@ def test_byte_size_overflow_error(): msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) - instance.add(msg1) + flow_controller.add(msg1) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(msg2) + flow_controller.add(msg2) total_size = msg1.ByteSize() + msg2.ByteSize() expected_info = "bytes: {} / 199".format(total_size) @@ -95,7 +127,7 @@ def test_no_error_on_moderate_message_flow(): byte_limit=250, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -103,12 +135,12 @@ def test_no_error_on_moderate_message_flow(): # The flow control settings will accept two in-flight messages, but not three. # If releasing messages works correctly, the sequence below will not raise errors. - instance.add(msg1) - instance.add(msg2) - instance.release(msg1) - instance.add(msg3) - instance.release(msg2) - instance.release(msg3) + flow_controller.add(msg1) + flow_controller.add(msg2) + flow_controller.release(msg1) + flow_controller.add(msg3) + flow_controller.release(msg2) + flow_controller.release(msg3) def test_rejected_messages_do_not_increase_total_load(): @@ -117,21 +149,21 @@ def test_rejected_messages_do_not_increase_total_load(): byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) - instance.add(msg1) + flow_controller.add(msg1) for _ in range(5): with pytest.raises(exceptions.FlowControlLimitError): - instance.add(types.PubsubMessage(data=b"z" * 100)) + flow_controller.add(types.PubsubMessage(data=b"z" * 100)) # After releasing a message we should again be able to add another one, despite # previously trying to add a lot of other messages. - instance.release(msg1) - instance.add(msg2) + flow_controller.release(msg1) + flow_controller.add(msg2) def test_incorrectly_releasing_too_many_messages(): @@ -140,7 +172,7 @@ def test_incorrectly_releasing_too_many_messages(): byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -148,7 +180,7 @@ def test_incorrectly_releasing_too_many_messages(): # Releasing a message that would make the load negative should result in a warning. with warnings.catch_warnings(record=True) as warned: - instance.release(msg1) + flow_controller.release(msg1) assert len(warned) == 1 assert issubclass(warned[0].category, RuntimeWarning) @@ -157,10 +189,10 @@ def test_incorrectly_releasing_too_many_messages(): # Incorrectly removing a message does not mess up internal stats, we can # still only add a single message at a time to this flow. - instance.add(msg2) + flow_controller.add(msg2) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(msg3) + flow_controller.add(msg3) error_msg = str(error.value) assert "messages: 2 / 1" in error_msg @@ -171,11 +203,11 @@ def test_incorrectly_releasing_too_many_messages(): def test_blocking_on_overflow_until_free_capacity(): settings = types.PublishFlowControl( - message_limit=2, - byte_limit=250, + message_limit=1, + byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -184,58 +216,148 @@ def test_blocking_on_overflow_until_free_capacity(): # If there is a concurrency bug in FlowController, we do not want to block # the main thread running the tests, thus we delegate all add/release - # operations to daemon threads. - adding_123_done = threading.Event() + # operations to daemon threads and check the outcome (blocked/not blocked) + # through Events. + adding_1_done = threading.Event() + adding_2_done = threading.Event() + adding_3_done = threading.Event() adding_4_done = threading.Event() - releasing_12_done = threading.Event() + releasing_1_done = threading.Event() + releasing_x_done = threading.Event() - def add_messages(messages, all_done_event): - try: - for msg in messages: - instance.add(msg) - except Exception: - return - else: - all_done_event.set() + # Adding a message with free capacity should not block. + _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + if not adding_1_done.wait(timeout=0.1): + pytest.fail("Adding a message with enough flow capacity blocked or errored.") - def release_messages(messages, all_done_event): - try: - for msg in messages: - instance.release(msg) - except Exception: - return - else: - all_done_event.set() + # Adding messages when there is not enough capacity should block, even if + # added through multiple threads. + _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + if adding_2_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") - # The thread should block on adding the 3rd message. - adder_thread_123 = threading.Thread( - target=add_messages, args=([msg1, msg2, msg3], adding_123_done) - ) - adder_thread_123.daemon = True - adder_thread_123.start() + _run_in_daemon(flow_controller, "add", [msg3], adding_3_done) + if adding_3_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") - all_added = adding_123_done.wait(timeout=0.1) - if all_added: + _run_in_daemon(flow_controller, "add", [msg4], adding_4_done) + if adding_4_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") - # Start adding another message, but in the meantime also start freeing up - # enough flow capacity for it. - adder_thread_4 = threading.Thread(target=add_messages, args=([msg4], adding_4_done)) - adder_thread_4.daemon = True - adder_thread_4.start() + # After releasing one message, there should be room for a new message, which + # should result in unblocking one of the waiting threads. + _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + if not releasing_1_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + done_status = [ + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ] + + # In sum() we use the fact that True==1 and False==0, and that Event.wait() + # returns False only if it times out, i.e. its internal flag has not been set. + done_count = sum(done_status) + assert done_count == 1, "Exactly one thread should have been unblocked." + + # Release another message and verify that yet another thread gets unblocked. + added_msg = [msg2, msg3, msg4][done_status.index(True)] + _run_in_daemon(flow_controller, "release", [added_msg], releasing_x_done) + + if not releasing_x_done.wait(timeout=0.1): + pytest.fail("Releasing messages blocked or errored.") + + released_count = sum( + ( + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ) + ) + assert released_count == 2, "Exactly two threads should have been unblocked." + + +def test_error_if_mesage_would_block_indefinitely(): + settings = types.PublishFlowControl( + message_limit=0, # simulate non-sane settings + byte_limit=1, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg = types.PubsubMessage(data=b"xyz") + adding_done = threading.Event() + error_event = threading.Event() + + _run_in_daemon(flow_controller, "add", [msg], adding_done, error_event=error_event) + + assert error_event.wait(timeout=0.1), "No error on adding too large a message." + + # Now that we know that an error occurs, we can check its type directly + # without the fear of blocking indefinitely. + flow_controller = FlowController(settings) # we want a fresh controller + with pytest.raises(exceptions.PermanentlyBlockedError) as error_info: + flow_controller.add(msg) - releaser_thread = threading.Thread( - target=release_messages, args=([msg1, msg2], releasing_12_done) + error_msg = str(error_info.value) + assert "messages: 1 / 0" in error_msg + assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg + + +def test_threads_posting_large_messages_do_not_starve(): + settings = types.PublishFlowControl( + message_limit=100, # simulate non-sane settings + byte_limit=110, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + large_msg = types.PubsubMessage(data=b"x" * 100) # close to entire byte limit + + adding_initial_done = threading.Event() + adding_large_done = threading.Event() + adding_busy_done = threading.Event() + releasing_busy_done = threading.Event() + releasing_large_done = threading.Event() + + # Occupy some of the flow capacity, then try to add a large message. Releasing + # enough messages should eventually allow the large message to come through, even + # if more messages are added after it (those should wait for the large message). + initial_messages = [types.PubsubMessage(data=b"x" * 10)] * 5 + _run_in_daemon(flow_controller, "add", initial_messages, adding_initial_done) + assert adding_initial_done.wait(timeout=0.1) + + _run_in_daemon(flow_controller, "add", [large_msg], adding_large_done) + + # Continuously keep adding more messages after the large one. + messages = [types.PubsubMessage(data=b"x" * 10)] * 10 + _run_in_daemon(flow_controller, "add", messages, adding_busy_done, action_pause=0.1) + + # At the same time, gradually keep releasing the messages - the freeed up + # capacity should be consumed by the large message, not the other small messages + # being added after it. + _run_in_daemon( + flow_controller, "release", messages, releasing_busy_done, action_pause=0.1 ) - releaser_thread.daemon = True - releaser_thread.start() - all_released = releasing_12_done.wait(timeout=0.1) - if not all_released: + # Sanity check - releasing should have completed by now. + if not releasing_busy_done.wait(timeout=1.1): pytest.fail("Releasing messages blocked or errored.") - # After releasing two messages, adding a new one (msg4) should not block, even - # if msg3 has not been released yet. - all_added = adding_4_done.wait(timeout=0.1) - if not all_added: - pytest.fail("Adding a message with enough flow capacity blocked or errored.") + # Enough messages released, the large message should have come through in + # the meantime. + if not adding_large_done.wait(timeout=0.1): + pytest.fail("A thread adding a large message starved.") + + if adding_busy_done.wait(timeout=0.1): + pytest.fail("Adding multiple small messages did not block.") + + # Releasing the large message should unblock adding the remaining "busy" messages + # that have not been added yet. + _run_in_daemon(flow_controller, "release", [large_msg], releasing_large_done) + if not releasing_large_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + if not adding_busy_done.wait(timeout=1.0): + pytest.fail("Adding messages blocked or errored.")