Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: threads can skip the line in publisher flow controller #422

Merged
merged 5 commits into from Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
122 changes: 74 additions & 48 deletions google/cloud/pubsub_v1/publisher/flow_controller.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from collections import OrderedDict
import logging
import threading
import warnings
Expand All @@ -24,12 +24,21 @@
_LOGGER = logging.getLogger(__name__)


class _QuantityReservation(object):
"""A (partial) reservation of a quantifiable resource."""
class _QuantityReservation:
"""A (partial) reservation of quantifiable resources."""

def __init__(self, reserved, needed):
self.reserved = reserved
self.needed = needed
def __init__(self, bytes_reserved: int, bytes_needed: int, has_slot: bool):
self.bytes_reserved = bytes_reserved
self.bytes_needed = bytes_needed
self.has_slot = has_slot

def __repr__(self):
return (
f"{type(self).__name__}("
f"bytes_reserved={self.bytes_reserved}, "
f"bytes_needed={self.bytes_needed}, "
f"has_slot={self.has_slot})"
)


class FlowController(object):
Expand All @@ -48,14 +57,13 @@ def __init__(self, settings):
self._message_count = 0
self._total_bytes = 0

# A FIFO queue of threads blocked on adding a message, from first to last.
# A FIFO queue of threads blocked on adding a message that also tracks their
# reservations of available flow control bytes and message slots.
# Only relevant if the configured limit exceeded behavior is BLOCK.
self._waiting = deque()
self._waiting = OrderedDict()

# Reservations of available flow control bytes by the waiting threads.
# Each value is a _QuantityReservation instance.
self._byte_reservations = dict()
self._reserved_bytes = 0
self._reserved_slots = 0

# The lock is used to protect all internal state (message and byte count,
# waiting threads to add, etc.).
Expand Down Expand Up @@ -131,11 +139,13 @@ def add(self, message):
current_thread = threading.current_thread()

while self._would_overflow(message):
if current_thread not in self._byte_reservations:
self._waiting.append(current_thread)
self._byte_reservations[current_thread] = _QuantityReservation(
reserved=0, needed=message._pb.ByteSize()
if current_thread not in self._waiting:
reservation = _QuantityReservation(
bytes_reserved=0,
bytes_needed=message._pb.ByteSize(),
has_slot=False,
)
self._waiting[current_thread] = reservation # Will be placed last.

_LOGGER.debug(
"Blocking until there is enough free capacity in the flow - "
Expand All @@ -152,9 +162,9 @@ def add(self, message):
# Message accepted, increase the load and remove thread stats.
self._message_count += 1
self._total_bytes += message._pb.ByteSize()
self._reserved_bytes -= self._byte_reservations[current_thread].reserved
del self._byte_reservations[current_thread]
self._waiting.remove(current_thread)
self._reserved_bytes -= self._waiting[current_thread].bytes_reserved
self._reserved_slots -= 1
del self._waiting[current_thread]

def release(self, message):
"""Release a mesage from flow control.
Expand All @@ -180,39 +190,52 @@ def release(self, message):
self._message_count = max(0, self._message_count)
self._total_bytes = max(0, self._total_bytes)

self._distribute_available_bytes()
self._distribute_available_capacity()

# If at least one thread waiting to add() can be unblocked, wake them up.
if self._ready_to_unblock():
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
self._has_capacity.notify_all()

def _distribute_available_bytes(self):
"""Distribute availalbe free capacity among the waiting threads in FIFO order.
def _distribute_available_capacity(self):
"""Distribute available capacity among the waiting threads in FIFO order.

The method assumes that the caller has obtained ``_operational_lock``.
"""
available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes
available_slots = (
self._settings.message_limit - self._message_count - self._reserved_slots
)
available_bytes = (
self._settings.byte_limit - self._total_bytes - self._reserved_bytes
)

for reservation in self._waiting.values():
if available_slots <= 0 and available_bytes <= 0:
break # Santa is now empty-handed, better luck next time.

for thread in self._waiting:
if available <= 0:
break
# Distribute any free slots.
if available_slots > 0 and not reservation.has_slot:
reservation.has_slot = True
self._reserved_slots += 1
available_slots -= 1

reservation = self._byte_reservations[thread]
still_needed = reservation.needed - reservation.reserved
# Distribute any free bytes.
if available_bytes <= 0:
continue

# Sanity check for any internal inconsistencies.
if still_needed < 0:
bytes_still_needed = reservation.bytes_needed - reservation.bytes_reserved

if bytes_still_needed < 0: # Sanity check for any internal inconsistencies.
msg = "Too many bytes reserved: {} / {}".format(
reservation.reserved, reservation.needed
reservation.bytes_reserved, reservation.bytes_needed
)
warnings.warn(msg, category=RuntimeWarning)
still_needed = 0
bytes_still_needed = 0

can_give = min(still_needed, available)
reservation.reserved += can_give
can_give = min(bytes_still_needed, available_bytes)
reservation.bytes_reserved += can_give
self._reserved_bytes += can_give
available -= can_give
available_bytes -= can_give

def _ready_to_unblock(self):
"""Determine if any of the threads waiting to add a message can proceed.
Expand All @@ -225,10 +248,10 @@ def _ready_to_unblock(self):
if self._waiting:
# It's enough to only check the head of the queue, because FIFO
# distribution of any free capacity.
reservation = self._byte_reservations[self._waiting[0]]
first_reservation = next(iter(self._waiting.values()))
return (
reservation.reserved >= reservation.needed
and self._message_count < self._settings.message_limit
first_reservation.bytes_reserved >= first_reservation.bytes_needed
and first_reservation.has_slot
)

return False
Expand All @@ -245,16 +268,22 @@ def _would_overflow(self, message):
Returns:
bool
"""
reservation = self._byte_reservations.get(threading.current_thread())
reservation = self._waiting.get(threading.current_thread())

if reservation:
enough_reserved = reservation.reserved >= reservation.needed
enough_reserved = reservation.bytes_reserved >= reservation.bytes_needed
has_slot = reservation.has_slot
else:
enough_reserved = False
has_slot = False

bytes_taken = self._total_bytes + self._reserved_bytes + message._pb.ByteSize()
size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
msg_count_overflow = self._message_count + 1 > self._settings.message_limit

msg_count_overflow = not has_slot and (
(self._message_count + self._reserved_slots + 1)
> self._settings.message_limit
)

return size_overflow or msg_count_overflow

Expand All @@ -275,18 +304,15 @@ def _load_info(self, message_count=None, total_bytes=None):
Returns:
str
"""
msg = "messages: {} / {}, bytes: {} / {} (reserved: {})"

if message_count is None:
message_count = self._message_count

if total_bytes is None:
total_bytes = self._total_bytes

return msg.format(
message_count,
self._settings.message_limit,
total_bytes,
self._settings.byte_limit,
self._reserved_bytes,
return (
f"messages: {message_count} / {self._settings.message_limit} "
f"(reserved: {self._reserved_slots}), "
f"bytes: {total_bytes} / {self._settings.byte_limit} "
f"(reserved: {self._reserved_bytes})"
)