Skip to content

Commit

Permalink
Add flow control logic to publisher client
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 7, 2020
1 parent e6d27b1 commit 7e78317
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 4 deletions.
23 changes: 22 additions & 1 deletion google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -34,6 +34,7 @@
from google.cloud.pubsub_v1.publisher._batch import thread
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
from google.cloud.pubsub_v1.publisher.flow_controller import FlowController

__version__ = pkg_resources.get_distribution("google-cloud-pubsub").version

Expand Down Expand Up @@ -93,7 +94,11 @@ class Client(object):
# Optional
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=False
enable_message_ordering=False,
flow_control=pubsub_v1.types.PublishFlowControl(
message_limit=2000,
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
),
),
# Optional
Expand Down Expand Up @@ -198,6 +203,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs):
# Thread created to commit all sequencers after a timeout.
self._commit_thread = None

# The object controlling the message publishing flow
self._flow_controller = FlowController(self.publisher_options.flow_control)

@classmethod
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
"""Creates an instance of this client using the provided credentials
Expand Down Expand Up @@ -333,6 +341,11 @@ def publish(self, topic, data, ordering_key="", **attrs):
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
the ``message`` would exceed the max size limit on the backend.
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
If publishing a new message would exceed the publish flow control
limits and the desired action on overflow is
:attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`.
"""
# Sanity check: Is the data being sent as a bytestring?
# If it is literally anything else, complain loudly about it.
Expand Down Expand Up @@ -364,6 +377,13 @@ def publish(self, topic, data, ordering_key="", **attrs):
data=data, ordering_key=ordering_key, attributes=attrs
)

# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
self._flow_controller.add(message)

def on_publish_done(future):
self._flow_controller.release(message)

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
Expand All @@ -372,6 +392,7 @@ def publish(self, topic, data, ordering_key="", **attrs):

# Delegate the publishing to the sequencer.
future = sequencer.publish(message)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
# timeout.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/publisher/exceptions.py
Expand Up @@ -38,7 +38,12 @@ def __init__(self, ordering_key):
super(PublishToPausedOrderingKeyException, self).__init__()


class FlowControlLimitError(Exception):
"""An action resulted in exceeding the flow control limits."""


__all__ = (
"FlowControlLimitError",
"MessageTooLargeError",
"PublishError",
"TimeoutError",
Expand Down
145 changes: 145 additions & 0 deletions google/cloud/pubsub_v1/publisher/flow_controller.py
@@ -0,0 +1,145 @@
# Copyright 2020, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading
import warnings

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions


_LOGGER = logging.getLogger(__name__)


class FlowController(object):
"""A class used to control the flow of messages passing through it.
Args:
settings (~google.cloud.pubsub_v1.types.PublishFlowControl):
Desired flow control configuration.
"""

def __init__(self, settings):
self._settings = settings

self._message_count = 0
self._total_bytes = 0

# The lock is used to protect the internal state (message and byte count).
self._operational_lock = threading.Lock()

# The condition for blocking the flow if capacity is exceeded.
self._has_capacity = threading.Condition(lock=self._operational_lock)

def add(self, message):
"""Add a message to flow control.
Adding a message updates the internal load statistics, and an action is
taken if these limits are exceeded (depending on the flow control settings).
Args:
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
The message entering the flow control.
Raises:
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
If adding a message exceeds flow control limits and the desired
action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`.
"""
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
return

with self._operational_lock:
self._message_count += 1
self._total_bytes += message.ByteSize()

if not self._is_overflow():
return

# We have an overflow, react.
if (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.ERROR
):
msg = (
"Flow control limits exceeded "
"(messages: {} / {}, bytes: {} / {})."
).format(
self._message_count,
self._settings.message_limit,
self._total_bytes,
self._settings.byte_limit,
)
error = exceptions.FlowControlLimitError(msg)

# Raising an error means rejecting a message, thus we need to deduct
# the latter's contribution to the total load.
self._message_count -= 1
self._total_bytes -= message.ByteSize()
raise error

assert (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.BLOCK
)

while self._is_overflow():
_LOGGER.debug(
"Blocking until there is enough free capacity in the flow."
)
self._has_capacity.wait()
_LOGGER.debug("Woke up from waiting on free capacity in the flow.")

def release(self, message):
"""Release a mesage from flow control.
Args:
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
The message entering the flow control.
"""
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
return

with self._operational_lock:
was_overflow = self._is_overflow()

self._message_count -= 1
self._total_bytes -= message.ByteSize()

if self._message_count < 0 or self._total_bytes < 0:
warnings.warn(
"Releasing a message that was never added or already released.",
category=RuntimeWarning,
stacklevel=2,
)
self._message_count = max(0, self._message_count)
self._total_bytes = max(0, self._total_bytes)

if was_overflow and not self._is_overflow():
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
self._has_capacity.notify_all()

def _is_overflow(self):
"""Determine if the current message load exceeds flow control limits.
The method assumes that the caller has obtained ``_operational_lock``.
Returns:
bool
"""
return (
self._message_count > self._settings.message_limit
or self._total_bytes > self._settings.byte_limit
)

0 comments on commit 7e78317

Please sign in to comment.