Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: threads can skip the line in publisher flow controller (#422)
* Add publisher flow controller test for FIFO order

* Simplify _run_in_daemon() test helper

* Fix message slots not acquired in FIFO order

* Unify the logic for distributing any free capacity

* Use OrderedDict for the FIFO queue

This allows to hold the queue of threads and their reservation data
in a single structure, no need for the separate deque and reservations
dict.
  • Loading branch information
plamut committed Jun 17, 2021
1 parent 0df7c96 commit ef89f55
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 77 deletions.
122 changes: 74 additions & 48 deletions google/cloud/pubsub_v1/publisher/flow_controller.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from collections import OrderedDict
import logging
import threading
import warnings
Expand All @@ -24,12 +24,21 @@
_LOGGER = logging.getLogger(__name__)


class _QuantityReservation(object):
"""A (partial) reservation of a quantifiable resource."""
class _QuantityReservation:
"""A (partial) reservation of quantifiable resources."""

def __init__(self, reserved, needed):
self.reserved = reserved
self.needed = needed
def __init__(self, bytes_reserved: int, bytes_needed: int, has_slot: bool):
self.bytes_reserved = bytes_reserved
self.bytes_needed = bytes_needed
self.has_slot = has_slot

def __repr__(self):
return (
f"{type(self).__name__}("
f"bytes_reserved={self.bytes_reserved}, "
f"bytes_needed={self.bytes_needed}, "
f"has_slot={self.has_slot})"
)


class FlowController(object):
Expand All @@ -48,14 +57,13 @@ def __init__(self, settings):
self._message_count = 0
self._total_bytes = 0

# A FIFO queue of threads blocked on adding a message, from first to last.
# A FIFO queue of threads blocked on adding a message that also tracks their
# reservations of available flow control bytes and message slots.
# Only relevant if the configured limit exceeded behavior is BLOCK.
self._waiting = deque()
self._waiting = OrderedDict()

# Reservations of available flow control bytes by the waiting threads.
# Each value is a _QuantityReservation instance.
self._byte_reservations = dict()
self._reserved_bytes = 0
self._reserved_slots = 0

# The lock is used to protect all internal state (message and byte count,
# waiting threads to add, etc.).
Expand Down Expand Up @@ -131,11 +139,13 @@ def add(self, message):
current_thread = threading.current_thread()

while self._would_overflow(message):
if current_thread not in self._byte_reservations:
self._waiting.append(current_thread)
self._byte_reservations[current_thread] = _QuantityReservation(
reserved=0, needed=message._pb.ByteSize()
if current_thread not in self._waiting:
reservation = _QuantityReservation(
bytes_reserved=0,
bytes_needed=message._pb.ByteSize(),
has_slot=False,
)
self._waiting[current_thread] = reservation # Will be placed last.

_LOGGER.debug(
"Blocking until there is enough free capacity in the flow - "
Expand All @@ -152,9 +162,9 @@ def add(self, message):
# Message accepted, increase the load and remove thread stats.
self._message_count += 1
self._total_bytes += message._pb.ByteSize()
self._reserved_bytes -= self._byte_reservations[current_thread].reserved
del self._byte_reservations[current_thread]
self._waiting.remove(current_thread)
self._reserved_bytes -= self._waiting[current_thread].bytes_reserved
self._reserved_slots -= 1
del self._waiting[current_thread]

def release(self, message):
"""Release a mesage from flow control.
Expand All @@ -180,39 +190,52 @@ def release(self, message):
self._message_count = max(0, self._message_count)
self._total_bytes = max(0, self._total_bytes)

self._distribute_available_bytes()
self._distribute_available_capacity()

# If at least one thread waiting to add() can be unblocked, wake them up.
if self._ready_to_unblock():
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
self._has_capacity.notify_all()

def _distribute_available_bytes(self):
"""Distribute availalbe free capacity among the waiting threads in FIFO order.
def _distribute_available_capacity(self):
"""Distribute available capacity among the waiting threads in FIFO order.
The method assumes that the caller has obtained ``_operational_lock``.
"""
available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes
available_slots = (
self._settings.message_limit - self._message_count - self._reserved_slots
)
available_bytes = (
self._settings.byte_limit - self._total_bytes - self._reserved_bytes
)

for reservation in self._waiting.values():
if available_slots <= 0 and available_bytes <= 0:
break # Santa is now empty-handed, better luck next time.

for thread in self._waiting:
if available <= 0:
break
# Distribute any free slots.
if available_slots > 0 and not reservation.has_slot:
reservation.has_slot = True
self._reserved_slots += 1
available_slots -= 1

reservation = self._byte_reservations[thread]
still_needed = reservation.needed - reservation.reserved
# Distribute any free bytes.
if available_bytes <= 0:
continue

# Sanity check for any internal inconsistencies.
if still_needed < 0:
bytes_still_needed = reservation.bytes_needed - reservation.bytes_reserved

if bytes_still_needed < 0: # Sanity check for any internal inconsistencies.
msg = "Too many bytes reserved: {} / {}".format(
reservation.reserved, reservation.needed
reservation.bytes_reserved, reservation.bytes_needed
)
warnings.warn(msg, category=RuntimeWarning)
still_needed = 0
bytes_still_needed = 0

can_give = min(still_needed, available)
reservation.reserved += can_give
can_give = min(bytes_still_needed, available_bytes)
reservation.bytes_reserved += can_give
self._reserved_bytes += can_give
available -= can_give
available_bytes -= can_give

def _ready_to_unblock(self):
"""Determine if any of the threads waiting to add a message can proceed.
Expand All @@ -225,10 +248,10 @@ def _ready_to_unblock(self):
if self._waiting:
# It's enough to only check the head of the queue, because FIFO
# distribution of any free capacity.
reservation = self._byte_reservations[self._waiting[0]]
first_reservation = next(iter(self._waiting.values()))
return (
reservation.reserved >= reservation.needed
and self._message_count < self._settings.message_limit
first_reservation.bytes_reserved >= first_reservation.bytes_needed
and first_reservation.has_slot
)

return False
Expand All @@ -245,16 +268,22 @@ def _would_overflow(self, message):
Returns:
bool
"""
reservation = self._byte_reservations.get(threading.current_thread())
reservation = self._waiting.get(threading.current_thread())

if reservation:
enough_reserved = reservation.reserved >= reservation.needed
enough_reserved = reservation.bytes_reserved >= reservation.bytes_needed
has_slot = reservation.has_slot
else:
enough_reserved = False
has_slot = False

bytes_taken = self._total_bytes + self._reserved_bytes + message._pb.ByteSize()
size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
msg_count_overflow = self._message_count + 1 > self._settings.message_limit

msg_count_overflow = not has_slot and (
(self._message_count + self._reserved_slots + 1)
> self._settings.message_limit
)

return size_overflow or msg_count_overflow

Expand All @@ -275,18 +304,15 @@ def _load_info(self, message_count=None, total_bytes=None):
Returns:
str
"""
msg = "messages: {} / {}, bytes: {} / {} (reserved: {})"

if message_count is None:
message_count = self._message_count

if total_bytes is None:
total_bytes = self._total_bytes

return msg.format(
message_count,
self._settings.message_limit,
total_bytes,
self._settings.byte_limit,
self._reserved_bytes,
return (
f"messages: {message_count} / {self._settings.message_limit} "
f"(reserved: {self._reserved_slots}), "
f"bytes: {total_bytes} / {self._settings.byte_limit} "
f"(reserved: {self._reserved_bytes})"
)

0 comments on commit ef89f55

Please sign in to comment.