Skip to content

Commit

Permalink
Add flow control support for multiple add() threads
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 11, 2020
1 parent 7e78317 commit f729fdf
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 96 deletions.
4 changes: 4 additions & 0 deletions google/cloud/pubsub_v1/publisher/exceptions.py
Expand Up @@ -42,6 +42,10 @@ class FlowControlLimitError(Exception):
"""An action resulted in exceeding the flow control limits."""


class PermanentlyBlockedError(FlowControlLimitError):
"""A message exceeds *total* flow control limits and would block forever."""


__all__ = (
"FlowControlLimitError",
"MessageTooLargeError",
Expand Down
153 changes: 130 additions & 23 deletions google/cloud/pubsub_v1/publisher/flow_controller.py
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
import logging
import threading
import warnings
Expand All @@ -23,6 +24,14 @@
_LOGGER = logging.getLogger(__name__)


class _QuantityReservation(object):
"""A (partial) reservation of a quantifiable resource."""

def __init__(self, reserved, needed):
self.reserved = reserved
self.needed = needed


class FlowController(object):
"""A class used to control the flow of messages passing through it.
Expand All @@ -34,10 +43,22 @@ class FlowController(object):
def __init__(self, settings):
self._settings = settings

# Load statistics. They represent the number of messages added, but not
# yet released (and their total size).
self._message_count = 0
self._total_bytes = 0

# The lock is used to protect the internal state (message and byte count).
# A FIFO queue of threads blocked on adding a message, from first to last.
# Only relevant if the configured limit exceeded behavior is BLOCK.
self._waiting = deque()

# Reservations of available flow control bytes by the waiting threads.
# Each value is a _QuantityReservation instance.
self._byte_reservations = dict()
self._reserved_bytes = 0

# The lock is used to protect all internal state (message and byte count,
# waiting threads to add, etc.).
self._operational_lock = threading.Lock()

# The condition for blocking the flow if capacity is exceeded.
Expand All @@ -62,46 +83,80 @@ def add(self, message):
return

with self._operational_lock:
self._message_count += 1
self._total_bytes += message.ByteSize()

if not self._is_overflow():
if not self._would_overflow(message):
self._message_count += 1
self._total_bytes += message.ByteSize()
return

# We have an overflow, react.
# Adding a message would overflow, react.
if (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.ERROR
):
# Raising an error means rejecting a message, thus we do not
# add anything to the existing load, but we do report the would-be
# load if we accepted the message.
msg = (
"Flow control limits exceeded "
"Flow control limits would be exceeded "
"(messages: {} / {}, bytes: {} / {})."
).format(
self._message_count,
self._message_count + 1,
self._settings.message_limit,
self._total_bytes,
self._total_bytes + message.ByteSize(),
self._settings.byte_limit,
)
error = exceptions.FlowControlLimitError(msg)

# Raising an error means rejecting a message, thus we need to deduct
# the latter's contribution to the total load.
self._message_count -= 1
self._total_bytes -= message.ByteSize()
raise error

assert (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.BLOCK
)

while self._is_overflow():
# Sanity check - if a message exceeds total flow control limits all
# by itself, it would block forever, thus raise error.
if (
message.ByteSize() > self._settings.byte_limit
or self._settings.message_limit < 1
):
error_msg = (
"Flow control limits too low for the message. "
"(messages: {} / {}, bytes: {} / {})."
).format(
1,
self._settings.message_limit,
message.ByteSize(),
self._settings.byte_limit,
)
raise exceptions.PermanentlyBlockedError(error_msg)

current_thread = threading.current_thread()

while self._would_overflow(message):
if current_thread not in self._byte_reservations:
self._waiting.append(current_thread)
self._byte_reservations[current_thread] = _QuantityReservation(
reserved=0, needed=message.ByteSize()
)

_LOGGER.debug(
"Blocking until there is enough free capacity in the flow."
)
self._has_capacity.wait()
_LOGGER.debug("Woke up from waiting on free capacity in the flow.")

# Message accepted, increase the load and remove thread stats if
# they exist in the waiting queue.
self._message_count += 1
self._total_bytes += message.ByteSize()

reservation = self._byte_reservations.get(current_thread)
if reservation:
self._reserved_bytes -= reservation.reserved
del self._byte_reservations[current_thread]
self._waiting.remove(current_thread)

def release(self, message):
"""Release a mesage from flow control.
Expand All @@ -113,8 +168,7 @@ def release(self, message):
return

with self._operational_lock:
was_overflow = self._is_overflow()

# Releasing a message decreases the load.
self._message_count -= 1
self._total_bytes -= message.ByteSize()

Expand All @@ -127,19 +181,72 @@ def release(self, message):
self._message_count = max(0, self._message_count)
self._total_bytes = max(0, self._total_bytes)

if was_overflow and not self._is_overflow():
self._distribute_available_bytes()

# If at least one thread waiting to add() can be unblocked, wake them up.
if self._ready_to_unblock():
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
self._has_capacity.notify_all()

def _is_overflow(self):
"""Determine if the current message load exceeds flow control limits.
def _distribute_available_bytes(self):
"""Distribute availalbe free capacity among the waiting threads in FIFO order.
The method assumes that the caller has obtained ``_operational_lock``.
"""
available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes

for thread in self._waiting:
if available <= 0:
break

reservation = self._byte_reservations[thread]
still_needed = reservation.needed - reservation.reserved
can_give = min(still_needed, available)

reservation.reserved += can_give
self._reserved_bytes += can_give
available -= can_give

def _ready_to_unblock(self):
"""Determine if any of the threads waiting to add a message can proceed.
The method assumes that the caller has obtained ``_operational_lock``.
Returns:
bool
"""
return (
self._message_count > self._settings.message_limit
or self._total_bytes > self._settings.byte_limit
)
if self._waiting:
# It's enough to only check the head of the queue, because FIFO
# distribution of any free capacity.
reservation = self._byte_reservations[self._waiting[0]]
return (
reservation.reserved >= reservation.needed
and self._message_count < self._settings.message_limit
)

return False

def _would_overflow(self, message):
"""Determine if accepting a message would exceed flow control limits.
The method assumes that the caller has obtained ``_operational_lock``.
Args:
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
The message entering the flow control.
Returns:
bool
"""
reservation = self._byte_reservations.get(threading.current_thread())

if reservation:
enough_reserved = reservation.reserved >= reservation.needed
else:
enough_reserved = False

bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize()
size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
msg_count_overflow = self._message_count + 1 > self._settings.message_limit

return size_overflow or msg_count_overflow

0 comments on commit f729fdf

Please sign in to comment.