From e6d27b1f503a3ffd98e1b9a023dcf72482c86278 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 5 May 2020 16:56:13 +0200 Subject: [PATCH 01/11] feat: add publish flow control settings --- google/cloud/pubsub_v1/types.py | 78 +++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index eb4f00681..b52b3ea60 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -13,7 +13,9 @@ # limitations under the License. from __future__ import absolute_import + import collections +import enum import sys from google.api import http_pb2 @@ -30,25 +32,6 @@ from google.cloud.pubsub_v1.proto import pubsub_pb2 -# Define the default publisher options. -# -# This class is used when creating a publisher client to pass in options -# to enable/disable features. -PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering"] -) -PublisherOptions.__new__.__defaults__ = (False,) # enable_message_ordering: False - -if sys.version_info >= (3, 5): - PublisherOptions.__doc__ = "The options for the publisher client." - PublisherOptions.enable_message_ordering.__doc__ = ( - "Whether to order messages in a batch by a supplied ordering key." - "EXPERIMENTAL: Message ordering is an alpha feature that requires " - "special permissions to use. Please contact the Cloud Pub/Sub team for " - "more information." - ) - - # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and @@ -81,6 +64,63 @@ ) +class LimitExceededBehavior(str, enum.Enum): + """The possible actions when exceeding the publish flow control limits.""" + + IGNORE = "ignore" + BLOCK = "block" + ERROR = "error" + + +PublishFlowControl = collections.namedtuple( + "PublishFlowControl", ["message_limit", "byte_limit", "limit_exceeded_behavior"] +) +PublishFlowControl.__new__.__defaults__ = ( + 10 * BatchSettings.__new__.__defaults__[2], # message limit + 10 * BatchSettings.__new__.__defaults__[0], # byte limit + LimitExceededBehavior.IGNORE, # desired behavior +) + +if sys.version_info >= (3, 5): + PublishFlowControl.__doc__ = ( + "The client flow control settings for message publishing." + ) + PublishFlowControl.message_limit.__doc__ = ( + "The maximum number of messages awaiting to be published." + ) + PublishFlowControl.byte_limit.__doc__ = ( + "The maximum total size of messages awaiting to be published." + ) + PublishFlowControl.limit_exceeded_behavior.__doc__ = ( + "The action to take when publish flow control limits are exceeded." + ) + +# Define the default publisher options. +# +# This class is used when creating a publisher client to pass in options +# to enable/disable features. +PublisherOptions = collections.namedtuple( + "PublisherConfig", ["enable_message_ordering", "flow_control"] +) +PublisherOptions.__new__.__defaults__ = ( + False, # enable_message_ordering: False + PublishFlowControl(), # default flow control settings +) + +if sys.version_info >= (3, 5): + PublisherOptions.__doc__ = "The options for the publisher client." + PublisherOptions.enable_message_ordering.__doc__ = ( + "Whether to order messages in a batch by a supplied ordering key." + "EXPERIMENTAL: Message ordering is an alpha feature that requires " + "special permissions to use. Please contact the Cloud Pub/Sub team for " + "more information." + ) + PublisherOptions.flow_control.__doc__ = ( + "Flow control settings for message publishing by the client. By default " + "the publisher client does not do any throttling." + ) + + # Define the type class and default values for flow control settings. # # This class is used when creating a publisher or subscriber client, and From 7e78317b336a1691d4fa71961f2c1573f9cbf8fd Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 5 May 2020 20:16:41 +0200 Subject: [PATCH 02/11] Add flow control logic to publisher client --- google/cloud/pubsub_v1/publisher/client.py | 23 +- .../cloud/pubsub_v1/publisher/exceptions.py | 5 + .../pubsub_v1/publisher/flow_controller.py | 145 +++++++++++ .../publisher/test_flow_controller.py | 241 ++++++++++++++++++ .../publisher/test_publisher_client.py | 21 +- 5 files changed, 431 insertions(+), 4 deletions(-) create mode 100644 google/cloud/pubsub_v1/publisher/flow_controller.py create mode 100644 tests/unit/pubsub_v1/publisher/test_flow_controller.py diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index caa784407..5ab7ce4d4 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -34,6 +34,7 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer +from google.cloud.pubsub_v1.publisher.flow_controller import FlowController __version__ = pkg_resources.get_distribution("google-cloud-pubsub").version @@ -93,7 +94,11 @@ class Client(object): # Optional publisher_options = pubsub_v1.types.PublisherOptions( - enable_message_ordering=False + enable_message_ordering=False, + flow_control=pubsub_v1.types.PublishFlowControl( + message_limit=2000, + limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, + ), ), # Optional @@ -198,6 +203,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): # Thread created to commit all sequencers after a timeout. self._commit_thread = None + # The object controlling the message publishing flow + self._flow_controller = FlowController(self.publisher_options.flow_control) + @classmethod def from_service_account_file(cls, filename, batch_settings=(), **kwargs): """Creates an instance of this client using the provided credentials @@ -333,6 +341,11 @@ def publish(self, topic, data, ordering_key="", **attrs): pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing the ``message`` would exceed the max size limit on the backend. + + :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: + If publishing a new message would exceed the publish flow control + limits and the desired action on overflow is + :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`. """ # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. @@ -364,6 +377,13 @@ def publish(self, topic, data, ordering_key="", **attrs): data=data, ordering_key=ordering_key, attributes=attrs ) + # Messages should go through flow control to prevent excessive + # queuing on the client side (depending on the settings). + self._flow_controller.add(message) + + def on_publish_done(future): + self._flow_controller.release(message) + with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") @@ -372,6 +392,7 @@ def publish(self, topic, data, ordering_key="", **attrs): # Delegate the publishing to the sequencer. future = sequencer.publish(message) + future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching # timeout. diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index 856be955a..89b3790a0 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -38,7 +38,12 @@ def __init__(self, ordering_key): super(PublishToPausedOrderingKeyException, self).__init__() +class FlowControlLimitError(Exception): + """An action resulted in exceeding the flow control limits.""" + + __all__ = ( + "FlowControlLimitError", "MessageTooLargeError", "PublishError", "TimeoutError", diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py new file mode 100644 index 000000000..ece6b160e --- /dev/null +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -0,0 +1,145 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +import warnings + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions + + +_LOGGER = logging.getLogger(__name__) + + +class FlowController(object): + """A class used to control the flow of messages passing through it. + + Args: + settings (~google.cloud.pubsub_v1.types.PublishFlowControl): + Desired flow control configuration. + """ + + def __init__(self, settings): + self._settings = settings + + self._message_count = 0 + self._total_bytes = 0 + + # The lock is used to protect the internal state (message and byte count). + self._operational_lock = threading.Lock() + + # The condition for blocking the flow if capacity is exceeded. + self._has_capacity = threading.Condition(lock=self._operational_lock) + + def add(self, message): + """Add a message to flow control. + + Adding a message updates the internal load statistics, and an action is + taken if these limits are exceeded (depending on the flow control settings). + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + + Raises: + :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: + If adding a message exceeds flow control limits and the desired + action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`. + """ + if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: + return + + with self._operational_lock: + self._message_count += 1 + self._total_bytes += message.ByteSize() + + if not self._is_overflow(): + return + + # We have an overflow, react. + if ( + self._settings.limit_exceeded_behavior + == types.LimitExceededBehavior.ERROR + ): + msg = ( + "Flow control limits exceeded " + "(messages: {} / {}, bytes: {} / {})." + ).format( + self._message_count, + self._settings.message_limit, + self._total_bytes, + self._settings.byte_limit, + ) + error = exceptions.FlowControlLimitError(msg) + + # Raising an error means rejecting a message, thus we need to deduct + # the latter's contribution to the total load. + self._message_count -= 1 + self._total_bytes -= message.ByteSize() + raise error + + assert ( + self._settings.limit_exceeded_behavior + == types.LimitExceededBehavior.BLOCK + ) + + while self._is_overflow(): + _LOGGER.debug( + "Blocking until there is enough free capacity in the flow." + ) + self._has_capacity.wait() + _LOGGER.debug("Woke up from waiting on free capacity in the flow.") + + def release(self, message): + """Release a mesage from flow control. + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + """ + if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: + return + + with self._operational_lock: + was_overflow = self._is_overflow() + + self._message_count -= 1 + self._total_bytes -= message.ByteSize() + + if self._message_count < 0 or self._total_bytes < 0: + warnings.warn( + "Releasing a message that was never added or already released.", + category=RuntimeWarning, + stacklevel=2, + ) + self._message_count = max(0, self._message_count) + self._total_bytes = max(0, self._total_bytes) + + if was_overflow and not self._is_overflow(): + _LOGGER.debug("Notifying threads waiting to add messages to flow.") + self._has_capacity.notify_all() + + def _is_overflow(self): + """Determine if the current message load exceeds flow control limits. + + The method assumes that the caller has obtained ``_operational_lock``. + + Returns: + bool + """ + return ( + self._message_count > self._settings.message_limit + or self._total_bytes > self._settings.byte_limit + ) diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py new file mode 100644 index 000000000..3e4349f62 --- /dev/null +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -0,0 +1,241 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import threading +import warnings + +import pytest + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions +from google.cloud.pubsub_v1.publisher.flow_controller import FlowController + + +def test_no_overflow_no_error(): + settings = types.PublishFlowControl( + message_limit=100, + byte_limit=10000, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + # there should be no errors + for data in (b"foo", b"bar", b"baz"): + msg = types.PubsubMessage(data=data) + instance.add(msg) + + +def test_overflow_no_error_on_ignore(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=2, + limit_exceeded_behavior=types.LimitExceededBehavior.IGNORE, + ) + instance = FlowController(settings) + + # there should be no overflow errors + instance.add(types.PubsubMessage(data=b"foo")) + instance.add(types.PubsubMessage(data=b"bar")) + + +def test_message_count_overflow_error(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=10000, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + instance.add(types.PubsubMessage(data=b"foo")) + with pytest.raises(exceptions.FlowControlLimitError) as error: + instance.add(types.PubsubMessage(data=b"bar")) + + assert "messages: 2 / 1" in str(error.value) + + +def test_byte_size_overflow_error(): + settings = types.PublishFlowControl( + message_limit=10000, + byte_limit=199, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + # Since the message data itself occupies 100 bytes, it means that both + # messages combined will exceed the imposed byte limit of 199, but a single + # message will not (the message size overhead is way lower than data size). + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + instance.add(msg1) + with pytest.raises(exceptions.FlowControlLimitError) as error: + instance.add(msg2) + + total_size = msg1.ByteSize() + msg2.ByteSize() + expected_info = "bytes: {} / 199".format(total_size) + assert expected_info in str(error.value) + + +def test_no_error_on_moderate_message_flow(): + settings = types.PublishFlowControl( + message_limit=2, + byte_limit=250, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + + # The flow control settings will accept two in-flight messages, but not three. + # If releasing messages works correctly, the sequence below will not raise errors. + instance.add(msg1) + instance.add(msg2) + instance.release(msg1) + instance.add(msg3) + instance.release(msg2) + instance.release(msg3) + + +def test_rejected_messages_do_not_increase_total_load(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + instance.add(msg1) + + for _ in range(5): + with pytest.raises(exceptions.FlowControlLimitError): + instance.add(types.PubsubMessage(data=b"z" * 100)) + + # After releasing a message we should again be able to add another one, despite + # previously trying to add a lot of other messages. + instance.release(msg1) + instance.add(msg2) + + +def test_incorrectly_releasing_too_many_messages(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + instance = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + + # Releasing a message that would make the load negative should result in a warning. + with warnings.catch_warnings(record=True) as warned: + instance.release(msg1) + + assert len(warned) == 1 + assert issubclass(warned[0].category, RuntimeWarning) + warning_msg = str(warned[0].message) + assert "never added or already released" in warning_msg + + # Incorrectly removing a message does not mess up internal stats, we can + # still only add a single message at a time to this flow. + instance.add(msg2) + + with pytest.raises(exceptions.FlowControlLimitError) as error: + instance.add(msg3) + + error_msg = str(error.value) + assert "messages: 2 / 1" in error_msg + total_size = msg2.ByteSize() + msg3.ByteSize() + expected_size_info = "bytes: {} / 150".format(total_size) + assert expected_size_info in error_msg + + +def test_blocking_on_overflow_until_free_capacity(): + settings = types.PublishFlowControl( + message_limit=2, + byte_limit=250, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + instance = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + msg4 = types.PubsubMessage(data=b"w" * 100) + + # If there is a concurrency bug in FlowController, we do not want to block + # the main thread running the tests, thus we delegate all add/release + # operations to daemon threads. + adding_123_done = threading.Event() + adding_4_done = threading.Event() + releasing_12_done = threading.Event() + + def add_messages(messages, all_done_event): + try: + for msg in messages: + instance.add(msg) + except Exception: + return + else: + all_done_event.set() + + def release_messages(messages, all_done_event): + try: + for msg in messages: + instance.release(msg) + except Exception: + return + else: + all_done_event.set() + + # The thread should block on adding the 3rd message. + adder_thread_123 = threading.Thread( + target=add_messages, args=([msg1, msg2, msg3], adding_123_done) + ) + adder_thread_123.daemon = True + adder_thread_123.start() + + all_added = adding_123_done.wait(timeout=0.1) + if all_added: + pytest.fail("Adding a message on overflow did not block.") + + # Start adding another message, but in the meantime also start freeing up + # enough flow capacity for it. + adder_thread_4 = threading.Thread(target=add_messages, args=([msg4], adding_4_done)) + adder_thread_4.daemon = True + adder_thread_4.start() + + releaser_thread = threading.Thread( + target=release_messages, args=([msg1, msg2], releasing_12_done) + ) + releaser_thread.daemon = True + releaser_thread.start() + + all_released = releasing_12_done.wait(timeout=0.1) + if not all_released: + pytest.fail("Releasing messages blocked or errored.") + + # After releasing two messages, adding a new one (msg4) should not block, even + # if msg3 has not been released yet. + all_added = adding_4_done.wait(timeout=0.1) + if not all_added: + pytest.fail("Adding a message with enough flow capacity blocked or errored.") diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 4ca979892..2d345821c 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -125,11 +125,17 @@ def test_publish(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) + future1 = mock.sentinel.future1 + future2 = mock.sentinel.future2 + future1.add_done_callback = mock.Mock(spec=["__call__"]) + future2.add_done_callback = mock.Mock(spec=["__call__"]) + # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) + # Set the mock up to claim indiscriminately that it accepts all messages. batch.will_accept.return_value = True - batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) + batch.publish.side_effect = (future1, future2) topic = "topic/path" client._set_batch(topic, batch) @@ -208,10 +214,13 @@ def test_publish_new_batch_needed(): # Use mocks in lieu of the actual batch class. batch1 = mock.Mock(spec=client._batch_class) batch2 = mock.Mock(spec=client._batch_class) + # Set the first mock up to claim indiscriminately that it rejects all # messages and the second accepts all. + future = mock.sentinel.future + future.add_done_callback = mock.Mock(spec=["__call__"]) batch1.publish.return_value = None - batch2.publish.return_value = mock.sentinel.future + batch2.publish.return_value = future topic = "topic/path" client._set_batch(topic, batch1) @@ -390,9 +399,15 @@ def test_publish_with_ordering_key(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) + # Set the mock up to claim indiscriminately that it accepts all messages. + future1 = mock.sentinel.future1 + future2 = mock.sentinel.future2 + future1.add_done_callback = mock.Mock(spec=["__call__"]) + future2.add_done_callback = mock.Mock(spec=["__call__"]) + batch.will_accept.return_value = True - batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) + batch.publish.side_effect = (future1, future2) topic = "topic/path" ordering_key = "k1" From 2fe776f7b3e2d7e3c0d7779bb995c45c35d0debb Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 8 May 2020 16:02:36 +0200 Subject: [PATCH 03/11] Add flow control support for multiple add() threads --- .../cloud/pubsub_v1/publisher/exceptions.py | 4 + .../pubsub_v1/publisher/flow_controller.py | 157 ++++++++-- .../publisher/test_flow_controller.py | 268 +++++++++++++----- 3 files changed, 333 insertions(+), 96 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index 89b3790a0..d34515c60 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -42,6 +42,10 @@ class FlowControlLimitError(Exception): """An action resulted in exceeding the flow control limits.""" +class PermanentlyBlockedError(FlowControlLimitError): + """A message exceeds *total* flow control limits and would block forever.""" + + __all__ = ( "FlowControlLimitError", "MessageTooLargeError", diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index ece6b160e..8340c778a 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import deque import logging import threading import warnings @@ -23,6 +24,14 @@ _LOGGER = logging.getLogger(__name__) +class _QuantityReservation(object): + """A (partial) reservation of a quantifiable resource.""" + + def __init__(self, reserved, needed): + self.reserved = reserved + self.needed = needed + + class FlowController(object): """A class used to control the flow of messages passing through it. @@ -34,10 +43,22 @@ class FlowController(object): def __init__(self, settings): self._settings = settings + # Load statistics. They represent the number of messages added, but not + # yet released (and their total size). self._message_count = 0 self._total_bytes = 0 - # The lock is used to protect the internal state (message and byte count). + # A FIFO queue of threads blocked on adding a message, from first to last. + # Only relevant if the configured limit exceeded behavior is BLOCK. + self._waiting = deque() + + # Reservations of available flow control bytes by the waiting threads. + # Each value is a _QuantityReservation instance. + self._byte_reservations = dict() + self._reserved_bytes = 0 + + # The lock is used to protect all internal state (message and byte count, + # waiting threads to add, etc.). self._operational_lock = threading.Lock() # The condition for blocking the flow if capacity is exceeded. @@ -57,37 +78,39 @@ def add(self, message): :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: If adding a message exceeds flow control limits and the desired action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`. + :exception:`~pubsub_v1.publisher.exceptions.PermanentlyBlockedError`: + If adding a message exceeds total flow control limits and would + always overflow on its own, and the desired action is + :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`. """ if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: return with self._operational_lock: - self._message_count += 1 - self._total_bytes += message.ByteSize() - - if not self._is_overflow(): + if not self._would_overflow(message): + self._message_count += 1 + self._total_bytes += message.ByteSize() return - # We have an overflow, react. + # Adding a message would overflow, react. if ( self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.ERROR ): + # Raising an error means rejecting a message, thus we do not + # add anything to the existing load, but we do report the would-be + # load if we accepted the message. msg = ( - "Flow control limits exceeded " + "Flow control limits would be exceeded " "(messages: {} / {}, bytes: {} / {})." ).format( - self._message_count, + self._message_count + 1, self._settings.message_limit, - self._total_bytes, + self._total_bytes + message.ByteSize(), self._settings.byte_limit, ) error = exceptions.FlowControlLimitError(msg) - # Raising an error means rejecting a message, thus we need to deduct - # the latter's contribution to the total load. - self._message_count -= 1 - self._total_bytes -= message.ByteSize() raise error assert ( @@ -95,13 +118,49 @@ def add(self, message): == types.LimitExceededBehavior.BLOCK ) - while self._is_overflow(): + # Sanity check - if a message exceeds total flow control limits all + # by itself, it would block forever, thus raise error. + if ( + message.ByteSize() > self._settings.byte_limit + or self._settings.message_limit < 1 + ): + error_msg = ( + "Flow control limits too low for the message. " + "(messages: {} / {}, bytes: {} / {})." + ).format( + 1, + self._settings.message_limit, + message.ByteSize(), + self._settings.byte_limit, + ) + raise exceptions.PermanentlyBlockedError(error_msg) + + current_thread = threading.current_thread() + + while self._would_overflow(message): + if current_thread not in self._byte_reservations: + self._waiting.append(current_thread) + self._byte_reservations[current_thread] = _QuantityReservation( + reserved=0, needed=message.ByteSize() + ) + _LOGGER.debug( "Blocking until there is enough free capacity in the flow." ) self._has_capacity.wait() _LOGGER.debug("Woke up from waiting on free capacity in the flow.") + # Message accepted, increase the load and remove thread stats if + # they exist in the waiting queue. + self._message_count += 1 + self._total_bytes += message.ByteSize() + + reservation = self._byte_reservations.get(current_thread) + if reservation: + self._reserved_bytes -= reservation.reserved + del self._byte_reservations[current_thread] + self._waiting.remove(current_thread) + def release(self, message): """Release a mesage from flow control. @@ -113,8 +172,7 @@ def release(self, message): return with self._operational_lock: - was_overflow = self._is_overflow() - + # Releasing a message decreases the load. self._message_count -= 1 self._total_bytes -= message.ByteSize() @@ -127,19 +185,72 @@ def release(self, message): self._message_count = max(0, self._message_count) self._total_bytes = max(0, self._total_bytes) - if was_overflow and not self._is_overflow(): + self._distribute_available_bytes() + + # If at least one thread waiting to add() can be unblocked, wake them up. + if self._ready_to_unblock(): _LOGGER.debug("Notifying threads waiting to add messages to flow.") self._has_capacity.notify_all() - def _is_overflow(self): - """Determine if the current message load exceeds flow control limits. + def _distribute_available_bytes(self): + """Distribute availalbe free capacity among the waiting threads in FIFO order. + + The method assumes that the caller has obtained ``_operational_lock``. + """ + available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes + + for thread in self._waiting: + if available <= 0: + break + + reservation = self._byte_reservations[thread] + still_needed = reservation.needed - reservation.reserved + can_give = min(still_needed, available) + + reservation.reserved += can_give + self._reserved_bytes += can_give + available -= can_give + + def _ready_to_unblock(self): + """Determine if any of the threads waiting to add a message can proceed. The method assumes that the caller has obtained ``_operational_lock``. Returns: bool """ - return ( - self._message_count > self._settings.message_limit - or self._total_bytes > self._settings.byte_limit - ) + if self._waiting: + # It's enough to only check the head of the queue, because FIFO + # distribution of any free capacity. + reservation = self._byte_reservations[self._waiting[0]] + return ( + reservation.reserved >= reservation.needed + and self._message_count < self._settings.message_limit + ) + + return False + + def _would_overflow(self, message): + """Determine if accepting a message would exceed flow control limits. + + The method assumes that the caller has obtained ``_operational_lock``. + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + + Returns: + bool + """ + reservation = self._byte_reservations.get(threading.current_thread()) + + if reservation: + enough_reserved = reservation.reserved >= reservation.needed + else: + enough_reserved = False + + bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize() + size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved + msg_count_overflow = self._message_count + 1 > self._settings.message_limit + + return size_overflow or msg_count_overflow diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index 3e4349f62..a4e848aeb 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import threading +import time import warnings import pytest @@ -24,18 +25,49 @@ from google.cloud.pubsub_v1.publisher.flow_controller import FlowController +def _run_in_daemon( + flow_controller, + action, + messages, + all_done_event, + error_event=None, + action_pause=None, +): + """Run flow controller action (add or remove messages) in a daemon thread. + """ + assert action in ("add", "release") + + def run_me(): + method = getattr(flow_controller, action) + + try: + for msg in messages: + if action_pause is not None: + time.sleep(action_pause) + method(msg) + except Exception: + if error_event is not None: + error_event.set() + else: + all_done_event.set() + + thread = threading.Thread(target=run_me) + thread.daemon = True + thread.start() + + def test_no_overflow_no_error(): settings = types.PublishFlowControl( message_limit=100, byte_limit=10000, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # there should be no errors for data in (b"foo", b"bar", b"baz"): msg = types.PubsubMessage(data=data) - instance.add(msg) + flow_controller.add(msg) def test_overflow_no_error_on_ignore(): @@ -44,11 +76,11 @@ def test_overflow_no_error_on_ignore(): byte_limit=2, limit_exceeded_behavior=types.LimitExceededBehavior.IGNORE, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # there should be no overflow errors - instance.add(types.PubsubMessage(data=b"foo")) - instance.add(types.PubsubMessage(data=b"bar")) + flow_controller.add(types.PubsubMessage(data=b"foo")) + flow_controller.add(types.PubsubMessage(data=b"bar")) def test_message_count_overflow_error(): @@ -57,11 +89,11 @@ def test_message_count_overflow_error(): byte_limit=10000, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) - instance.add(types.PubsubMessage(data=b"foo")) + flow_controller.add(types.PubsubMessage(data=b"foo")) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(types.PubsubMessage(data=b"bar")) + flow_controller.add(types.PubsubMessage(data=b"bar")) assert "messages: 2 / 1" in str(error.value) @@ -72,7 +104,7 @@ def test_byte_size_overflow_error(): byte_limit=199, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) # Since the message data itself occupies 100 bytes, it means that both # messages combined will exceed the imposed byte limit of 199, but a single @@ -80,9 +112,9 @@ def test_byte_size_overflow_error(): msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) - instance.add(msg1) + flow_controller.add(msg1) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(msg2) + flow_controller.add(msg2) total_size = msg1.ByteSize() + msg2.ByteSize() expected_info = "bytes: {} / 199".format(total_size) @@ -95,7 +127,7 @@ def test_no_error_on_moderate_message_flow(): byte_limit=250, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -103,12 +135,12 @@ def test_no_error_on_moderate_message_flow(): # The flow control settings will accept two in-flight messages, but not three. # If releasing messages works correctly, the sequence below will not raise errors. - instance.add(msg1) - instance.add(msg2) - instance.release(msg1) - instance.add(msg3) - instance.release(msg2) - instance.release(msg3) + flow_controller.add(msg1) + flow_controller.add(msg2) + flow_controller.release(msg1) + flow_controller.add(msg3) + flow_controller.release(msg2) + flow_controller.release(msg3) def test_rejected_messages_do_not_increase_total_load(): @@ -117,21 +149,21 @@ def test_rejected_messages_do_not_increase_total_load(): byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) - instance.add(msg1) + flow_controller.add(msg1) for _ in range(5): with pytest.raises(exceptions.FlowControlLimitError): - instance.add(types.PubsubMessage(data=b"z" * 100)) + flow_controller.add(types.PubsubMessage(data=b"z" * 100)) # After releasing a message we should again be able to add another one, despite # previously trying to add a lot of other messages. - instance.release(msg1) - instance.add(msg2) + flow_controller.release(msg1) + flow_controller.add(msg2) def test_incorrectly_releasing_too_many_messages(): @@ -140,7 +172,7 @@ def test_incorrectly_releasing_too_many_messages(): byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -148,7 +180,7 @@ def test_incorrectly_releasing_too_many_messages(): # Releasing a message that would make the load negative should result in a warning. with warnings.catch_warnings(record=True) as warned: - instance.release(msg1) + flow_controller.release(msg1) assert len(warned) == 1 assert issubclass(warned[0].category, RuntimeWarning) @@ -157,10 +189,10 @@ def test_incorrectly_releasing_too_many_messages(): # Incorrectly removing a message does not mess up internal stats, we can # still only add a single message at a time to this flow. - instance.add(msg2) + flow_controller.add(msg2) with pytest.raises(exceptions.FlowControlLimitError) as error: - instance.add(msg3) + flow_controller.add(msg3) error_msg = str(error.value) assert "messages: 2 / 1" in error_msg @@ -171,11 +203,11 @@ def test_incorrectly_releasing_too_many_messages(): def test_blocking_on_overflow_until_free_capacity(): settings = types.PublishFlowControl( - message_limit=2, - byte_limit=250, + message_limit=1, + byte_limit=150, limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, ) - instance = FlowController(settings) + flow_controller = FlowController(settings) msg1 = types.PubsubMessage(data=b"x" * 100) msg2 = types.PubsubMessage(data=b"y" * 100) @@ -184,58 +216,148 @@ def test_blocking_on_overflow_until_free_capacity(): # If there is a concurrency bug in FlowController, we do not want to block # the main thread running the tests, thus we delegate all add/release - # operations to daemon threads. - adding_123_done = threading.Event() + # operations to daemon threads and check the outcome (blocked/not blocked) + # through Events. + adding_1_done = threading.Event() + adding_2_done = threading.Event() + adding_3_done = threading.Event() adding_4_done = threading.Event() - releasing_12_done = threading.Event() + releasing_1_done = threading.Event() + releasing_x_done = threading.Event() - def add_messages(messages, all_done_event): - try: - for msg in messages: - instance.add(msg) - except Exception: - return - else: - all_done_event.set() + # Adding a message with free capacity should not block. + _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + if not adding_1_done.wait(timeout=0.1): + pytest.fail("Adding a message with enough flow capacity blocked or errored.") - def release_messages(messages, all_done_event): - try: - for msg in messages: - instance.release(msg) - except Exception: - return - else: - all_done_event.set() + # Adding messages when there is not enough capacity should block, even if + # added through multiple threads. + _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + if adding_2_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") - # The thread should block on adding the 3rd message. - adder_thread_123 = threading.Thread( - target=add_messages, args=([msg1, msg2, msg3], adding_123_done) - ) - adder_thread_123.daemon = True - adder_thread_123.start() + _run_in_daemon(flow_controller, "add", [msg3], adding_3_done) + if adding_3_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") - all_added = adding_123_done.wait(timeout=0.1) - if all_added: + _run_in_daemon(flow_controller, "add", [msg4], adding_4_done) + if adding_4_done.wait(timeout=0.1): pytest.fail("Adding a message on overflow did not block.") - # Start adding another message, but in the meantime also start freeing up - # enough flow capacity for it. - adder_thread_4 = threading.Thread(target=add_messages, args=([msg4], adding_4_done)) - adder_thread_4.daemon = True - adder_thread_4.start() + # After releasing one message, there should be room for a new message, which + # should result in unblocking one of the waiting threads. + _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + if not releasing_1_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + done_status = [ + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ] + + # In sum() we use the fact that True==1 and False==0, and that Event.wait() + # returns False only if it times out, i.e. its internal flag has not been set. + done_count = sum(done_status) + assert done_count == 1, "Exactly one thread should have been unblocked." + + # Release another message and verify that yet another thread gets unblocked. + added_msg = [msg2, msg3, msg4][done_status.index(True)] + _run_in_daemon(flow_controller, "release", [added_msg], releasing_x_done) + + if not releasing_x_done.wait(timeout=0.1): + pytest.fail("Releasing messages blocked or errored.") + + released_count = sum( + ( + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ) + ) + assert released_count == 2, "Exactly two threads should have been unblocked." + + +def test_error_if_mesage_would_block_indefinitely(): + settings = types.PublishFlowControl( + message_limit=0, # simulate non-sane settings + byte_limit=1, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg = types.PubsubMessage(data=b"xyz") + adding_done = threading.Event() + error_event = threading.Event() + + _run_in_daemon(flow_controller, "add", [msg], adding_done, error_event=error_event) + + assert error_event.wait(timeout=0.1), "No error on adding too large a message." + + # Now that we know that an error occurs, we can check its type directly + # without the fear of blocking indefinitely. + flow_controller = FlowController(settings) # we want a fresh controller + with pytest.raises(exceptions.PermanentlyBlockedError) as error_info: + flow_controller.add(msg) - releaser_thread = threading.Thread( - target=release_messages, args=([msg1, msg2], releasing_12_done) + error_msg = str(error_info.value) + assert "messages: 1 / 0" in error_msg + assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg + + +def test_threads_posting_large_messages_do_not_starve(): + settings = types.PublishFlowControl( + message_limit=100, # simulate non-sane settings + byte_limit=110, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + large_msg = types.PubsubMessage(data=b"x" * 100) # close to entire byte limit + + adding_initial_done = threading.Event() + adding_large_done = threading.Event() + adding_busy_done = threading.Event() + releasing_busy_done = threading.Event() + releasing_large_done = threading.Event() + + # Occupy some of the flow capacity, then try to add a large message. Releasing + # enough messages should eventually allow the large message to come through, even + # if more messages are added after it (those should wait for the large message). + initial_messages = [types.PubsubMessage(data=b"x" * 10)] * 5 + _run_in_daemon(flow_controller, "add", initial_messages, adding_initial_done) + assert adding_initial_done.wait(timeout=0.1) + + _run_in_daemon(flow_controller, "add", [large_msg], adding_large_done) + + # Continuously keep adding more messages after the large one. + messages = [types.PubsubMessage(data=b"x" * 10)] * 10 + _run_in_daemon(flow_controller, "add", messages, adding_busy_done, action_pause=0.1) + + # At the same time, gradually keep releasing the messages - the freeed up + # capacity should be consumed by the large message, not the other small messages + # being added after it. + _run_in_daemon( + flow_controller, "release", messages, releasing_busy_done, action_pause=0.1 ) - releaser_thread.daemon = True - releaser_thread.start() - all_released = releasing_12_done.wait(timeout=0.1) - if not all_released: + # Sanity check - releasing should have completed by now. + if not releasing_busy_done.wait(timeout=1.1): pytest.fail("Releasing messages blocked or errored.") - # After releasing two messages, adding a new one (msg4) should not block, even - # if msg3 has not been released yet. - all_added = adding_4_done.wait(timeout=0.1) - if not all_added: - pytest.fail("Adding a message with enough flow capacity blocked or errored.") + # Enough messages released, the large message should have come through in + # the meantime. + if not adding_large_done.wait(timeout=0.1): + pytest.fail("A thread adding a large message starved.") + + if adding_busy_done.wait(timeout=0.1): + pytest.fail("Adding multiple small messages did not block.") + + # Releasing the large message should unblock adding the remaining "busy" messages + # that have not been added yet. + _run_in_daemon(flow_controller, "release", [large_msg], releasing_large_done) + if not releasing_large_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + if not adding_busy_done.wait(timeout=1.0): + pytest.fail("Adding messages blocked or errored.") From ac9af40ad96a3a54fc57bde56c0aeb2b7902195a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 12 May 2020 09:52:51 +0200 Subject: [PATCH 04/11] Raise publish flow control errors through futures --- google/cloud/pubsub_v1/publisher/client.py | 9 ++++++- .../publisher/test_publisher_client.py | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 5ab7ce4d4..d97c0d11d 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -31,6 +31,8 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import publisher_client from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport +from google.cloud.pubsub_v1.publisher import exceptions +from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer @@ -379,7 +381,12 @@ def publish(self, topic, data, ordering_key="", **attrs): # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). - self._flow_controller.add(message) + try: + self._flow_controller.add(message) + except exceptions.FlowControlLimitError as exc: + future = futures.Future() + future.set_exception(exc) + return future def on_publish_done(future): self._flow_controller.release(message) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 2d345821c..4e3a3870f 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -25,6 +25,7 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer @@ -156,6 +157,30 @@ def test_publish(): ) +def test_publish_error_exceeding_flow_control_limits(): + creds = mock.Mock(spec=credentials.Credentials) + publisher_options = types.PublisherOptions( + flow_control=types.PublishFlowControl( + message_limit=10, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + ) + client = publisher.Client(credentials=creds, publisher_options=publisher_options) + + mock_batch = mock.Mock(spec=client._batch_class) + mock_batch.will_accept.return_value = True + topic = "topic/path" + client._set_batch(topic, mock_batch) + + future1 = client.publish(topic, b"a" * 100) + future2 = client.publish(topic, b"b" * 100) + + future1.result() # no error, still within flow control limits + with pytest.raises(exceptions.FlowControlLimitError): + future2.result() + + def test_publish_data_not_bytestring_error(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) From 6d79cbac9cf3d158d1d3a02a7904908ec7f6736b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 12 May 2020 11:06:07 +0200 Subject: [PATCH 05/11] Include load info in debug log messages --- .../pubsub_v1/publisher/flow_controller.py | 79 ++++++++++++++----- 1 file changed, 58 insertions(+), 21 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index 8340c778a..ecabf0102 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -100,18 +100,14 @@ def add(self, message): # Raising an error means rejecting a message, thus we do not # add anything to the existing load, but we do report the would-be # load if we accepted the message. - msg = ( - "Flow control limits would be exceeded " - "(messages: {} / {}, bytes: {} / {})." - ).format( - self._message_count + 1, - self._settings.message_limit, - self._total_bytes + message.ByteSize(), - self._settings.byte_limit, + load_info = self._load_info( + message_count=self._message_count + 1, + total_bytes=self._total_bytes + message.ByteSize(), ) - error = exceptions.FlowControlLimitError(msg) - - raise error + error_msg = "Flow control limits would be exceeded - {}.".format( + load_info + ) + raise exceptions.FlowControlLimitError(error_msg) assert ( self._settings.limit_exceeded_behavior @@ -124,14 +120,11 @@ def add(self, message): message.ByteSize() > self._settings.byte_limit or self._settings.message_limit < 1 ): - error_msg = ( - "Flow control limits too low for the message. " - "(messages: {} / {}, bytes: {} / {})." - ).format( - 1, - self._settings.message_limit, - message.ByteSize(), - self._settings.byte_limit, + load_info = self._load_info( + message_count=1, total_bytes=message.ByteSize() + ) + error_msg = "Flow control limits too low for the message - {}.".format( + load_info ) raise exceptions.PermanentlyBlockedError(error_msg) @@ -145,10 +138,16 @@ def add(self, message): ) _LOGGER.debug( - "Blocking until there is enough free capacity in the flow." + "Blocking until there is enough free capacity in the flow - " + "{}.".format(self._load_info()) ) + self._has_capacity.wait() - _LOGGER.debug("Woke up from waiting on free capacity in the flow.") + + _LOGGER.debug( + "Woke up from waiting on free capacity in the flow - " + "{}.".format(self._load_info()) + ) # Message accepted, increase the load and remove thread stats if # they exist in the waiting queue. @@ -254,3 +253,41 @@ def _would_overflow(self, message): msg_count_overflow = self._message_count + 1 > self._settings.message_limit return size_overflow or msg_count_overflow + + def _load_info(self, message_count=None, total_bytes=None, reserved_bytes=None): + """Return the current flow control load information. + + The caller can optionally adjust some of the values to fit its reporting + needs. + + The method assumes that the caller has obtained ``_operational_lock``. + + Args: + message_count (Optional[int]): + The value to override the current message count with. + total_bytes (Optional[int]): + The value to override the current total bytes with. + reserved_bytes (Optional[int]): + The value to override the current number of reserved bytes with. + + Returns: + str + """ + msg = "messages: {} / {}, bytes: {} / {} (reserved: {})" + + if message_count is None: + message_count = self._message_count + + if total_bytes is None: + total_bytes = self._total_bytes + + if reserved_bytes is None: + reserved_bytes = self._reserved_bytes + + return msg.format( + message_count, + self._settings.message_limit, + total_bytes, + self._settings.byte_limit, + reserved_bytes, + ) From 64fd9a16209c07dda42fdbe106c484d2b598044c Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 12 May 2020 13:13:07 +0200 Subject: [PATCH 06/11] Remove incorrect comment in a test --- tests/unit/pubsub_v1/publisher/test_flow_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index a4e848aeb..94a2a5086 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -307,7 +307,7 @@ def test_error_if_mesage_would_block_indefinitely(): def test_threads_posting_large_messages_do_not_starve(): settings = types.PublishFlowControl( - message_limit=100, # simulate non-sane settings + message_limit=100, byte_limit=110, limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, ) From b7a40f80c37e567f29416a3090745ba5fd624c3b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 22 May 2020 11:07:54 +0200 Subject: [PATCH 07/11] Remove comment about an error not directly raised --- google/cloud/pubsub_v1/publisher/client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index d97c0d11d..8dbbea634 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -343,11 +343,6 @@ def publish(self, topic, data, ordering_key="", **attrs): pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing the ``message`` would exceed the max size limit on the backend. - - :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: - If publishing a new message would exceed the publish flow control - limits and the desired action on overflow is - :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`. """ # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. From 6bcd9cedec39047353cb7f9b4814746f8085f128 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 29 May 2020 10:40:03 +0200 Subject: [PATCH 08/11] Remove redundant check for reservation exsistence --- google/cloud/pubsub_v1/publisher/flow_controller.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index ecabf0102..d2511177a 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -149,16 +149,12 @@ def add(self, message): "{}.".format(self._load_info()) ) - # Message accepted, increase the load and remove thread stats if - # they exist in the waiting queue. + # Message accepted, increase the load and remove thread stats. self._message_count += 1 self._total_bytes += message.ByteSize() - - reservation = self._byte_reservations.get(current_thread) - if reservation: - self._reserved_bytes -= reservation.reserved - del self._byte_reservations[current_thread] - self._waiting.remove(current_thread) + self._reserved_bytes -= self._byte_reservations[current_thread].reserved + del self._byte_reservations[current_thread] + self._waiting.remove(current_thread) def release(self, message): """Release a mesage from flow control. From 36892283df0afacbfee1b58cccf1cf6325979797 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 29 May 2020 11:06:14 +0200 Subject: [PATCH 09/11] Change exception for publishing too large a message --- .../cloud/pubsub_v1/publisher/exceptions.py | 4 ---- .../pubsub_v1/publisher/flow_controller.py | 19 ++++++++++--------- .../publisher/test_flow_controller.py | 3 ++- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index d34515c60..89b3790a0 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -42,10 +42,6 @@ class FlowControlLimitError(Exception): """An action resulted in exceeding the flow control limits.""" -class PermanentlyBlockedError(FlowControlLimitError): - """A message exceeds *total* flow control limits and would block forever.""" - - __all__ = ( "FlowControlLimitError", "MessageTooLargeError", diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index d2511177a..4f7e58412 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -76,12 +76,12 @@ def add(self, message): Raises: :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: - If adding a message exceeds flow control limits and the desired - action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`. - :exception:`~pubsub_v1.publisher.exceptions.PermanentlyBlockedError`: - If adding a message exceeds total flow control limits and would - always overflow on its own, and the desired action is - :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`. + If adding a message would exceed flow control limits and the desired + action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`, + or if a message would always exceed total flow control limits on + its own and the desired action is + :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`, + meaning that the message would block forever. """ if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: return @@ -123,10 +123,11 @@ def add(self, message): load_info = self._load_info( message_count=1, total_bytes=message.ByteSize() ) - error_msg = "Flow control limits too low for the message - {}.".format( - load_info + error_msg = ( + "Total flow control limits too low for the message, " + "would block forever - {}.".format(load_info) ) - raise exceptions.PermanentlyBlockedError(error_msg) + raise exceptions.FlowControlLimitError(error_msg) current_thread = threading.current_thread() diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index 94a2a5086..c9c70865e 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -297,10 +297,11 @@ def test_error_if_mesage_would_block_indefinitely(): # Now that we know that an error occurs, we can check its type directly # without the fear of blocking indefinitely. flow_controller = FlowController(settings) # we want a fresh controller - with pytest.raises(exceptions.PermanentlyBlockedError) as error_info: + with pytest.raises(exceptions.FlowControlLimitError) as error_info: flow_controller.add(msg) error_msg = str(error_info.value) + assert "would block forever" in error_msg assert "messages: 1 / 0" in error_msg assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg From a0c771a61108dbca2be4a4613488e32f37e823c6 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 29 May 2020 13:45:26 +0200 Subject: [PATCH 10/11] Add internal sanity check for byte reservations --- .../pubsub_v1/publisher/flow_controller.py | 10 ++++- .../publisher/test_flow_controller.py | 45 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index 4f7e58412..eb74d1038 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -201,8 +201,16 @@ def _distribute_available_bytes(self): reservation = self._byte_reservations[thread] still_needed = reservation.needed - reservation.reserved - can_give = min(still_needed, available) + # Sanity check for any internal inconsistencies. + if still_needed < 0: + msg = "Too many bytes reserved: {} / {}".format( + reservation.reserved, reservation.needed + ) + warnings.warn(msg, category=RuntimeWarning) + still_needed = 0 + + can_give = min(still_needed, available) reservation.reserved += can_give self._reserved_bytes += can_give available -= can_give diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py index c9c70865e..26a61663b 100644 --- a/tests/unit/pubsub_v1/publisher/test_flow_controller.py +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -362,3 +362,48 @@ def test_threads_posting_large_messages_do_not_starve(): if not adding_busy_done.wait(timeout=1.0): pytest.fail("Adding messages blocked or errored.") + + +def test_warning_on_internal_reservation_stats_error_when_unblocking(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + # If there is a concurrency bug in FlowController, we do not want to block + # the main thread running the tests, thus we delegate all add/release + # operations to daemon threads and check the outcome (blocked/not blocked) + # through Events. + adding_1_done = threading.Event() + adding_2_done = threading.Event() + releasing_1_done = threading.Event() + + # Adding a message with free capacity should not block. + _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + if not adding_1_done.wait(timeout=0.1): + pytest.fail("Adding a message with enough flow capacity blocked or errored.") + + # Adding messages when there is not enough capacity should block, even if + # added through multiple threads. + _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + if adding_2_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") + + # Intentionally corrupt internal stats + reservation = next(iter(flow_controller._byte_reservations.values()), None) + assert reservation is not None, "No messages blocked by flow controller." + reservation.reserved = reservation.needed + 1 + + with warnings.catch_warnings(record=True) as warned: + _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + if not releasing_1_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + matches = [warning for warning in warned if warning.category is RuntimeWarning] + assert len(matches) == 1 + assert "too many bytes reserved" in str(matches[0].message).lower() From f8d6c6039d237f99f8b461730f1bd2f0763ff6df Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 1 Jun 2020 08:48:03 +0200 Subject: [PATCH 11/11] Reword the docstring on flow control limits error --- google/cloud/pubsub_v1/publisher/flow_controller.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index eb74d1038..c10fadcef 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -76,12 +76,11 @@ def add(self, message): Raises: :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: - If adding a message would exceed flow control limits and the desired - action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`, - or if a message would always exceed total flow control limits on - its own and the desired action is - :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`, - meaning that the message would block forever. + Raised when the desired action is + :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and + the message would exceed flow control limits, or when the desired action + is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and + the message would block forever against the flow control limits. """ if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: return