diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index caa784407..8dbbea634 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -31,9 +31,12 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import publisher_client from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport +from google.cloud.pubsub_v1.publisher import exceptions +from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer +from google.cloud.pubsub_v1.publisher.flow_controller import FlowController __version__ = pkg_resources.get_distribution("google-cloud-pubsub").version @@ -93,7 +96,11 @@ class Client(object): # Optional publisher_options = pubsub_v1.types.PublisherOptions( - enable_message_ordering=False + enable_message_ordering=False, + flow_control=pubsub_v1.types.PublishFlowControl( + message_limit=2000, + limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, + ), ), # Optional @@ -198,6 +205,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): # Thread created to commit all sequencers after a timeout. self._commit_thread = None + # The object controlling the message publishing flow + self._flow_controller = FlowController(self.publisher_options.flow_control) + @classmethod def from_service_account_file(cls, filename, batch_settings=(), **kwargs): """Creates an instance of this client using the provided credentials @@ -364,6 +374,18 @@ def publish(self, topic, data, ordering_key="", **attrs): data=data, ordering_key=ordering_key, attributes=attrs ) + # Messages should go through flow control to prevent excessive + # queuing on the client side (depending on the settings). + try: + self._flow_controller.add(message) + except exceptions.FlowControlLimitError as exc: + future = futures.Future() + future.set_exception(exc) + return future + + def on_publish_done(future): + self._flow_controller.release(message) + with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") @@ -372,6 +394,7 @@ def publish(self, topic, data, ordering_key="", **attrs): # Delegate the publishing to the sequencer. future = sequencer.publish(message) + future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching # timeout. diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index 856be955a..89b3790a0 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -38,7 +38,12 @@ def __init__(self, ordering_key): super(PublishToPausedOrderingKeyException, self).__init__() +class FlowControlLimitError(Exception): + """An action resulted in exceeding the flow control limits.""" + + __all__ = ( + "FlowControlLimitError", "MessageTooLargeError", "PublishError", "TimeoutError", diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py new file mode 100644 index 000000000..c10fadcef --- /dev/null +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -0,0 +1,297 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +import logging +import threading +import warnings + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions + + +_LOGGER = logging.getLogger(__name__) + + +class _QuantityReservation(object): + """A (partial) reservation of a quantifiable resource.""" + + def __init__(self, reserved, needed): + self.reserved = reserved + self.needed = needed + + +class FlowController(object): + """A class used to control the flow of messages passing through it. + + Args: + settings (~google.cloud.pubsub_v1.types.PublishFlowControl): + Desired flow control configuration. + """ + + def __init__(self, settings): + self._settings = settings + + # Load statistics. They represent the number of messages added, but not + # yet released (and their total size). + self._message_count = 0 + self._total_bytes = 0 + + # A FIFO queue of threads blocked on adding a message, from first to last. + # Only relevant if the configured limit exceeded behavior is BLOCK. + self._waiting = deque() + + # Reservations of available flow control bytes by the waiting threads. + # Each value is a _QuantityReservation instance. + self._byte_reservations = dict() + self._reserved_bytes = 0 + + # The lock is used to protect all internal state (message and byte count, + # waiting threads to add, etc.). + self._operational_lock = threading.Lock() + + # The condition for blocking the flow if capacity is exceeded. + self._has_capacity = threading.Condition(lock=self._operational_lock) + + def add(self, message): + """Add a message to flow control. + + Adding a message updates the internal load statistics, and an action is + taken if these limits are exceeded (depending on the flow control settings). + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + + Raises: + :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: + Raised when the desired action is + :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and + the message would exceed flow control limits, or when the desired action + is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and + the message would block forever against the flow control limits. + """ + if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: + return + + with self._operational_lock: + if not self._would_overflow(message): + self._message_count += 1 + self._total_bytes += message.ByteSize() + return + + # Adding a message would overflow, react. + if ( + self._settings.limit_exceeded_behavior + == types.LimitExceededBehavior.ERROR + ): + # Raising an error means rejecting a message, thus we do not + # add anything to the existing load, but we do report the would-be + # load if we accepted the message. + load_info = self._load_info( + message_count=self._message_count + 1, + total_bytes=self._total_bytes + message.ByteSize(), + ) + error_msg = "Flow control limits would be exceeded - {}.".format( + load_info + ) + raise exceptions.FlowControlLimitError(error_msg) + + assert ( + self._settings.limit_exceeded_behavior + == types.LimitExceededBehavior.BLOCK + ) + + # Sanity check - if a message exceeds total flow control limits all + # by itself, it would block forever, thus raise error. + if ( + message.ByteSize() > self._settings.byte_limit + or self._settings.message_limit < 1 + ): + load_info = self._load_info( + message_count=1, total_bytes=message.ByteSize() + ) + error_msg = ( + "Total flow control limits too low for the message, " + "would block forever - {}.".format(load_info) + ) + raise exceptions.FlowControlLimitError(error_msg) + + current_thread = threading.current_thread() + + while self._would_overflow(message): + if current_thread not in self._byte_reservations: + self._waiting.append(current_thread) + self._byte_reservations[current_thread] = _QuantityReservation( + reserved=0, needed=message.ByteSize() + ) + + _LOGGER.debug( + "Blocking until there is enough free capacity in the flow - " + "{}.".format(self._load_info()) + ) + + self._has_capacity.wait() + + _LOGGER.debug( + "Woke up from waiting on free capacity in the flow - " + "{}.".format(self._load_info()) + ) + + # Message accepted, increase the load and remove thread stats. + self._message_count += 1 + self._total_bytes += message.ByteSize() + self._reserved_bytes -= self._byte_reservations[current_thread].reserved + del self._byte_reservations[current_thread] + self._waiting.remove(current_thread) + + def release(self, message): + """Release a mesage from flow control. + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + """ + if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: + return + + with self._operational_lock: + # Releasing a message decreases the load. + self._message_count -= 1 + self._total_bytes -= message.ByteSize() + + if self._message_count < 0 or self._total_bytes < 0: + warnings.warn( + "Releasing a message that was never added or already released.", + category=RuntimeWarning, + stacklevel=2, + ) + self._message_count = max(0, self._message_count) + self._total_bytes = max(0, self._total_bytes) + + self._distribute_available_bytes() + + # If at least one thread waiting to add() can be unblocked, wake them up. + if self._ready_to_unblock(): + _LOGGER.debug("Notifying threads waiting to add messages to flow.") + self._has_capacity.notify_all() + + def _distribute_available_bytes(self): + """Distribute availalbe free capacity among the waiting threads in FIFO order. + + The method assumes that the caller has obtained ``_operational_lock``. + """ + available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes + + for thread in self._waiting: + if available <= 0: + break + + reservation = self._byte_reservations[thread] + still_needed = reservation.needed - reservation.reserved + + # Sanity check for any internal inconsistencies. + if still_needed < 0: + msg = "Too many bytes reserved: {} / {}".format( + reservation.reserved, reservation.needed + ) + warnings.warn(msg, category=RuntimeWarning) + still_needed = 0 + + can_give = min(still_needed, available) + reservation.reserved += can_give + self._reserved_bytes += can_give + available -= can_give + + def _ready_to_unblock(self): + """Determine if any of the threads waiting to add a message can proceed. + + The method assumes that the caller has obtained ``_operational_lock``. + + Returns: + bool + """ + if self._waiting: + # It's enough to only check the head of the queue, because FIFO + # distribution of any free capacity. + reservation = self._byte_reservations[self._waiting[0]] + return ( + reservation.reserved >= reservation.needed + and self._message_count < self._settings.message_limit + ) + + return False + + def _would_overflow(self, message): + """Determine if accepting a message would exceed flow control limits. + + The method assumes that the caller has obtained ``_operational_lock``. + + Args: + message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + The message entering the flow control. + + Returns: + bool + """ + reservation = self._byte_reservations.get(threading.current_thread()) + + if reservation: + enough_reserved = reservation.reserved >= reservation.needed + else: + enough_reserved = False + + bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize() + size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved + msg_count_overflow = self._message_count + 1 > self._settings.message_limit + + return size_overflow or msg_count_overflow + + def _load_info(self, message_count=None, total_bytes=None, reserved_bytes=None): + """Return the current flow control load information. + + The caller can optionally adjust some of the values to fit its reporting + needs. + + The method assumes that the caller has obtained ``_operational_lock``. + + Args: + message_count (Optional[int]): + The value to override the current message count with. + total_bytes (Optional[int]): + The value to override the current total bytes with. + reserved_bytes (Optional[int]): + The value to override the current number of reserved bytes with. + + Returns: + str + """ + msg = "messages: {} / {}, bytes: {} / {} (reserved: {})" + + if message_count is None: + message_count = self._message_count + + if total_bytes is None: + total_bytes = self._total_bytes + + if reserved_bytes is None: + reserved_bytes = self._reserved_bytes + + return msg.format( + message_count, + self._settings.message_limit, + total_bytes, + self._settings.byte_limit, + reserved_bytes, + ) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index eb4f00681..b52b3ea60 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -13,7 +13,9 @@ # limitations under the License. from __future__ import absolute_import + import collections +import enum import sys from google.api import http_pb2 @@ -30,25 +32,6 @@ from google.cloud.pubsub_v1.proto import pubsub_pb2 -# Define the default publisher options. -# -# This class is used when creating a publisher client to pass in options -# to enable/disable features. -PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering"] -) -PublisherOptions.__new__.__defaults__ = (False,) # enable_message_ordering: False - -if sys.version_info >= (3, 5): - PublisherOptions.__doc__ = "The options for the publisher client." - PublisherOptions.enable_message_ordering.__doc__ = ( - "Whether to order messages in a batch by a supplied ordering key." - "EXPERIMENTAL: Message ordering is an alpha feature that requires " - "special permissions to use. Please contact the Cloud Pub/Sub team for " - "more information." - ) - - # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and @@ -81,6 +64,63 @@ ) +class LimitExceededBehavior(str, enum.Enum): + """The possible actions when exceeding the publish flow control limits.""" + + IGNORE = "ignore" + BLOCK = "block" + ERROR = "error" + + +PublishFlowControl = collections.namedtuple( + "PublishFlowControl", ["message_limit", "byte_limit", "limit_exceeded_behavior"] +) +PublishFlowControl.__new__.__defaults__ = ( + 10 * BatchSettings.__new__.__defaults__[2], # message limit + 10 * BatchSettings.__new__.__defaults__[0], # byte limit + LimitExceededBehavior.IGNORE, # desired behavior +) + +if sys.version_info >= (3, 5): + PublishFlowControl.__doc__ = ( + "The client flow control settings for message publishing." + ) + PublishFlowControl.message_limit.__doc__ = ( + "The maximum number of messages awaiting to be published." + ) + PublishFlowControl.byte_limit.__doc__ = ( + "The maximum total size of messages awaiting to be published." + ) + PublishFlowControl.limit_exceeded_behavior.__doc__ = ( + "The action to take when publish flow control limits are exceeded." + ) + +# Define the default publisher options. +# +# This class is used when creating a publisher client to pass in options +# to enable/disable features. +PublisherOptions = collections.namedtuple( + "PublisherConfig", ["enable_message_ordering", "flow_control"] +) +PublisherOptions.__new__.__defaults__ = ( + False, # enable_message_ordering: False + PublishFlowControl(), # default flow control settings +) + +if sys.version_info >= (3, 5): + PublisherOptions.__doc__ = "The options for the publisher client." + PublisherOptions.enable_message_ordering.__doc__ = ( + "Whether to order messages in a batch by a supplied ordering key." + "EXPERIMENTAL: Message ordering is an alpha feature that requires " + "special permissions to use. Please contact the Cloud Pub/Sub team for " + "more information." + ) + PublisherOptions.flow_control.__doc__ = ( + "Flow control settings for message publishing by the client. By default " + "the publisher client does not do any throttling." + ) + + # Define the type class and default values for flow control settings. # # This class is used when creating a publisher or subscriber client, and diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py new file mode 100644 index 000000000..26a61663b --- /dev/null +++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py @@ -0,0 +1,409 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import threading +import time +import warnings + +import pytest + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions +from google.cloud.pubsub_v1.publisher.flow_controller import FlowController + + +def _run_in_daemon( + flow_controller, + action, + messages, + all_done_event, + error_event=None, + action_pause=None, +): + """Run flow controller action (add or remove messages) in a daemon thread. + """ + assert action in ("add", "release") + + def run_me(): + method = getattr(flow_controller, action) + + try: + for msg in messages: + if action_pause is not None: + time.sleep(action_pause) + method(msg) + except Exception: + if error_event is not None: + error_event.set() + else: + all_done_event.set() + + thread = threading.Thread(target=run_me) + thread.daemon = True + thread.start() + + +def test_no_overflow_no_error(): + settings = types.PublishFlowControl( + message_limit=100, + byte_limit=10000, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + # there should be no errors + for data in (b"foo", b"bar", b"baz"): + msg = types.PubsubMessage(data=data) + flow_controller.add(msg) + + +def test_overflow_no_error_on_ignore(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=2, + limit_exceeded_behavior=types.LimitExceededBehavior.IGNORE, + ) + flow_controller = FlowController(settings) + + # there should be no overflow errors + flow_controller.add(types.PubsubMessage(data=b"foo")) + flow_controller.add(types.PubsubMessage(data=b"bar")) + + +def test_message_count_overflow_error(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=10000, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + flow_controller.add(types.PubsubMessage(data=b"foo")) + with pytest.raises(exceptions.FlowControlLimitError) as error: + flow_controller.add(types.PubsubMessage(data=b"bar")) + + assert "messages: 2 / 1" in str(error.value) + + +def test_byte_size_overflow_error(): + settings = types.PublishFlowControl( + message_limit=10000, + byte_limit=199, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + # Since the message data itself occupies 100 bytes, it means that both + # messages combined will exceed the imposed byte limit of 199, but a single + # message will not (the message size overhead is way lower than data size). + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + flow_controller.add(msg1) + with pytest.raises(exceptions.FlowControlLimitError) as error: + flow_controller.add(msg2) + + total_size = msg1.ByteSize() + msg2.ByteSize() + expected_info = "bytes: {} / 199".format(total_size) + assert expected_info in str(error.value) + + +def test_no_error_on_moderate_message_flow(): + settings = types.PublishFlowControl( + message_limit=2, + byte_limit=250, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + + # The flow control settings will accept two in-flight messages, but not three. + # If releasing messages works correctly, the sequence below will not raise errors. + flow_controller.add(msg1) + flow_controller.add(msg2) + flow_controller.release(msg1) + flow_controller.add(msg3) + flow_controller.release(msg2) + flow_controller.release(msg3) + + +def test_rejected_messages_do_not_increase_total_load(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + flow_controller.add(msg1) + + for _ in range(5): + with pytest.raises(exceptions.FlowControlLimitError): + flow_controller.add(types.PubsubMessage(data=b"z" * 100)) + + # After releasing a message we should again be able to add another one, despite + # previously trying to add a lot of other messages. + flow_controller.release(msg1) + flow_controller.add(msg2) + + +def test_incorrectly_releasing_too_many_messages(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + + # Releasing a message that would make the load negative should result in a warning. + with warnings.catch_warnings(record=True) as warned: + flow_controller.release(msg1) + + assert len(warned) == 1 + assert issubclass(warned[0].category, RuntimeWarning) + warning_msg = str(warned[0].message) + assert "never added or already released" in warning_msg + + # Incorrectly removing a message does not mess up internal stats, we can + # still only add a single message at a time to this flow. + flow_controller.add(msg2) + + with pytest.raises(exceptions.FlowControlLimitError) as error: + flow_controller.add(msg3) + + error_msg = str(error.value) + assert "messages: 2 / 1" in error_msg + total_size = msg2.ByteSize() + msg3.ByteSize() + expected_size_info = "bytes: {} / 150".format(total_size) + assert expected_size_info in error_msg + + +def test_blocking_on_overflow_until_free_capacity(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + msg3 = types.PubsubMessage(data=b"z" * 100) + msg4 = types.PubsubMessage(data=b"w" * 100) + + # If there is a concurrency bug in FlowController, we do not want to block + # the main thread running the tests, thus we delegate all add/release + # operations to daemon threads and check the outcome (blocked/not blocked) + # through Events. + adding_1_done = threading.Event() + adding_2_done = threading.Event() + adding_3_done = threading.Event() + adding_4_done = threading.Event() + releasing_1_done = threading.Event() + releasing_x_done = threading.Event() + + # Adding a message with free capacity should not block. + _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + if not adding_1_done.wait(timeout=0.1): + pytest.fail("Adding a message with enough flow capacity blocked or errored.") + + # Adding messages when there is not enough capacity should block, even if + # added through multiple threads. + _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + if adding_2_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") + + _run_in_daemon(flow_controller, "add", [msg3], adding_3_done) + if adding_3_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") + + _run_in_daemon(flow_controller, "add", [msg4], adding_4_done) + if adding_4_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") + + # After releasing one message, there should be room for a new message, which + # should result in unblocking one of the waiting threads. + _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + if not releasing_1_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + done_status = [ + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ] + + # In sum() we use the fact that True==1 and False==0, and that Event.wait() + # returns False only if it times out, i.e. its internal flag has not been set. + done_count = sum(done_status) + assert done_count == 1, "Exactly one thread should have been unblocked." + + # Release another message and verify that yet another thread gets unblocked. + added_msg = [msg2, msg3, msg4][done_status.index(True)] + _run_in_daemon(flow_controller, "release", [added_msg], releasing_x_done) + + if not releasing_x_done.wait(timeout=0.1): + pytest.fail("Releasing messages blocked or errored.") + + released_count = sum( + ( + adding_2_done.wait(timeout=0.1), + adding_3_done.wait(timeout=0.1), + adding_4_done.wait(timeout=0.1), + ) + ) + assert released_count == 2, "Exactly two threads should have been unblocked." + + +def test_error_if_mesage_would_block_indefinitely(): + settings = types.PublishFlowControl( + message_limit=0, # simulate non-sane settings + byte_limit=1, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg = types.PubsubMessage(data=b"xyz") + adding_done = threading.Event() + error_event = threading.Event() + + _run_in_daemon(flow_controller, "add", [msg], adding_done, error_event=error_event) + + assert error_event.wait(timeout=0.1), "No error on adding too large a message." + + # Now that we know that an error occurs, we can check its type directly + # without the fear of blocking indefinitely. + flow_controller = FlowController(settings) # we want a fresh controller + with pytest.raises(exceptions.FlowControlLimitError) as error_info: + flow_controller.add(msg) + + error_msg = str(error_info.value) + assert "would block forever" in error_msg + assert "messages: 1 / 0" in error_msg + assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg + + +def test_threads_posting_large_messages_do_not_starve(): + settings = types.PublishFlowControl( + message_limit=100, + byte_limit=110, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + large_msg = types.PubsubMessage(data=b"x" * 100) # close to entire byte limit + + adding_initial_done = threading.Event() + adding_large_done = threading.Event() + adding_busy_done = threading.Event() + releasing_busy_done = threading.Event() + releasing_large_done = threading.Event() + + # Occupy some of the flow capacity, then try to add a large message. Releasing + # enough messages should eventually allow the large message to come through, even + # if more messages are added after it (those should wait for the large message). + initial_messages = [types.PubsubMessage(data=b"x" * 10)] * 5 + _run_in_daemon(flow_controller, "add", initial_messages, adding_initial_done) + assert adding_initial_done.wait(timeout=0.1) + + _run_in_daemon(flow_controller, "add", [large_msg], adding_large_done) + + # Continuously keep adding more messages after the large one. + messages = [types.PubsubMessage(data=b"x" * 10)] * 10 + _run_in_daemon(flow_controller, "add", messages, adding_busy_done, action_pause=0.1) + + # At the same time, gradually keep releasing the messages - the freeed up + # capacity should be consumed by the large message, not the other small messages + # being added after it. + _run_in_daemon( + flow_controller, "release", messages, releasing_busy_done, action_pause=0.1 + ) + + # Sanity check - releasing should have completed by now. + if not releasing_busy_done.wait(timeout=1.1): + pytest.fail("Releasing messages blocked or errored.") + + # Enough messages released, the large message should have come through in + # the meantime. + if not adding_large_done.wait(timeout=0.1): + pytest.fail("A thread adding a large message starved.") + + if adding_busy_done.wait(timeout=0.1): + pytest.fail("Adding multiple small messages did not block.") + + # Releasing the large message should unblock adding the remaining "busy" messages + # that have not been added yet. + _run_in_daemon(flow_controller, "release", [large_msg], releasing_large_done) + if not releasing_large_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + if not adding_busy_done.wait(timeout=1.0): + pytest.fail("Adding messages blocked or errored.") + + +def test_warning_on_internal_reservation_stats_error_when_unblocking(): + settings = types.PublishFlowControl( + message_limit=1, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK, + ) + flow_controller = FlowController(settings) + + msg1 = types.PubsubMessage(data=b"x" * 100) + msg2 = types.PubsubMessage(data=b"y" * 100) + + # If there is a concurrency bug in FlowController, we do not want to block + # the main thread running the tests, thus we delegate all add/release + # operations to daemon threads and check the outcome (blocked/not blocked) + # through Events. + adding_1_done = threading.Event() + adding_2_done = threading.Event() + releasing_1_done = threading.Event() + + # Adding a message with free capacity should not block. + _run_in_daemon(flow_controller, "add", [msg1], adding_1_done) + if not adding_1_done.wait(timeout=0.1): + pytest.fail("Adding a message with enough flow capacity blocked or errored.") + + # Adding messages when there is not enough capacity should block, even if + # added through multiple threads. + _run_in_daemon(flow_controller, "add", [msg2], adding_2_done) + if adding_2_done.wait(timeout=0.1): + pytest.fail("Adding a message on overflow did not block.") + + # Intentionally corrupt internal stats + reservation = next(iter(flow_controller._byte_reservations.values()), None) + assert reservation is not None, "No messages blocked by flow controller." + reservation.reserved = reservation.needed + 1 + + with warnings.catch_warnings(record=True) as warned: + _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done) + if not releasing_1_done.wait(timeout=0.1): + pytest.fail("Releasing a message blocked or errored.") + + matches = [warning for warning in warned if warning.category is RuntimeWarning] + assert len(matches) == 1 + assert "too many bytes reserved" in str(matches[0].message).lower() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 4ca979892..4e3a3870f 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -25,6 +25,7 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer @@ -125,11 +126,17 @@ def test_publish(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) + future1 = mock.sentinel.future1 + future2 = mock.sentinel.future2 + future1.add_done_callback = mock.Mock(spec=["__call__"]) + future2.add_done_callback = mock.Mock(spec=["__call__"]) + # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) + # Set the mock up to claim indiscriminately that it accepts all messages. batch.will_accept.return_value = True - batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) + batch.publish.side_effect = (future1, future2) topic = "topic/path" client._set_batch(topic, batch) @@ -150,6 +157,30 @@ def test_publish(): ) +def test_publish_error_exceeding_flow_control_limits(): + creds = mock.Mock(spec=credentials.Credentials) + publisher_options = types.PublisherOptions( + flow_control=types.PublishFlowControl( + message_limit=10, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + ) + client = publisher.Client(credentials=creds, publisher_options=publisher_options) + + mock_batch = mock.Mock(spec=client._batch_class) + mock_batch.will_accept.return_value = True + topic = "topic/path" + client._set_batch(topic, mock_batch) + + future1 = client.publish(topic, b"a" * 100) + future2 = client.publish(topic, b"b" * 100) + + future1.result() # no error, still within flow control limits + with pytest.raises(exceptions.FlowControlLimitError): + future2.result() + + def test_publish_data_not_bytestring_error(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) @@ -208,10 +239,13 @@ def test_publish_new_batch_needed(): # Use mocks in lieu of the actual batch class. batch1 = mock.Mock(spec=client._batch_class) batch2 = mock.Mock(spec=client._batch_class) + # Set the first mock up to claim indiscriminately that it rejects all # messages and the second accepts all. + future = mock.sentinel.future + future.add_done_callback = mock.Mock(spec=["__call__"]) batch1.publish.return_value = None - batch2.publish.return_value = mock.sentinel.future + batch2.publish.return_value = future topic = "topic/path" client._set_batch(topic, batch1) @@ -390,9 +424,15 @@ def test_publish_with_ordering_key(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) + # Set the mock up to claim indiscriminately that it accepts all messages. + future1 = mock.sentinel.future1 + future2 = mock.sentinel.future2 + future1.add_done_callback = mock.Mock(spec=["__call__"]) + future2.add_done_callback = mock.Mock(spec=["__call__"]) + batch.will_accept.return_value = True - batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) + batch.publish.side_effect = (future1, future2) topic = "topic/path" ordering_key = "k1"