Skip to content

Commit

Permalink
fix: move await_msg_callbacks flag to subscribe() method (#320)
Browse files Browse the repository at this point in the history
* Revert "revert: add graceful streaming pull shutdown (#315)"

This reverts commit 16bf588.

* Move await_msg_callbacks to subscribe() method

This is to keep the StreamingPullFuture's surface intact for
compatibility with PubSub Lite client.

* Make streaming pull close() method non-blocking

* Add a blocking streaming pull shutdown sample

* Refine docs on awaiting callbacks on shutdown
  • Loading branch information
plamut committed Mar 24, 2021
1 parent 0a662a6 commit d40d027
Show file tree
Hide file tree
Showing 15 changed files with 533 additions and 106 deletions.
3 changes: 0 additions & 3 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Expand Up @@ -99,9 +99,6 @@ def dispatch_callback(self, items):
ValueError: If ``action`` isn't one of the expected actions
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
"""
if not self._manager.is_active:
return

batched_commands = collections.defaultdict(list)

for item in items:
Expand Down
9 changes: 5 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py
Expand Up @@ -35,10 +35,11 @@ def __init__(self, manager, period=_DEFAULT_PERIOD):
self._period = period

def heartbeat(self):
"""Periodically send heartbeats."""
while self._manager.is_active and not self._stop_event.is_set():
self._manager.heartbeat()
_LOGGER.debug("Sent heartbeat.")
"""Periodically send streaming pull heartbeats.
"""
while not self._stop_event.is_set():
if self._manager.heartbeat():
_LOGGER.debug("Sent heartbeat.")
self._stop_event.wait(timeout=self._period)

_LOGGER.info("%s exiting.", _HEARTBEAT_WORKER_NAME)
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -124,7 +124,7 @@ def maintain_leases(self):
ack IDs, then waits for most of that time (but with jitter), and
repeats.
"""
while self._manager.is_active and not self._stop_event.is_set():
while not self._stop_event.is_set():
# Determine the appropriate duration for the lease. This is
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
Expand Down
108 changes: 77 additions & 31 deletions google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Expand Up @@ -16,6 +16,7 @@

import collections
import functools
import itertools
import logging
import threading
import uuid
Expand All @@ -36,6 +37,7 @@
from google.pubsub_v1 import types as gapic_types

_LOGGER = logging.getLogger(__name__)
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
_RETRYABLE_STREAM_ERRORS = (
exceptions.DeadlineExceeded,
Expand Down Expand Up @@ -110,24 +112,34 @@ class StreamingPullManager(object):
scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler
to use to process messages. If not provided, a thread pool-based
scheduler will be used.
await_callbacks_on_shutdown (bool):
If ``True``, the shutdown thread will wait until all scheduler threads
terminate and only then proceed with shutting down the remaining running
helper threads.
If ``False`` (default), the shutdown thread will shut the scheduler down,
but it will not wait for the currently executing scheduler threads to
terminate.
This setting affects when the on close callbacks get invoked, and
consequently, when the StreamingPullFuture associated with the stream gets
resolved.
"""

_UNARY_REQUESTS = True
"""If set to True, this class will make requests over a separate unary
RPC instead of over the streaming RPC."""

def __init__(
self,
client,
subscription,
flow_control=types.FlowControl(),
scheduler=None,
use_legacy_flow_control=False,
await_callbacks_on_shutdown=False,
):
self._client = client
self._subscription = subscription
self._flow_control = flow_control
self._use_legacy_flow_control = use_legacy_flow_control
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
Expand Down Expand Up @@ -291,6 +303,9 @@ def activate_ordering_keys(self, ordering_keys):
activate. May be empty.
"""
with self._pause_resume_lock:
if self._scheduler is None:
return # We are shutting down, don't try to dispatch any more messages.

self._messages_on_hold.activate_ordering_keys(
ordering_keys, self._schedule_message_on_hold
)
Expand Down Expand Up @@ -420,37 +435,36 @@ def send(self, request):
If a RetryError occurs, the manager shutdown is triggered, and the
error is re-raised.
"""
if self._UNARY_REQUESTS:
try:
self._send_unary_request(request)
except exceptions.GoogleAPICallError:
_LOGGER.debug(
"Exception while sending unary RPC. This is typically "
"non-fatal as stream requests are best-effort.",
exc_info=True,
)
except exceptions.RetryError as exc:
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
exc_info=False,
)
# The underlying channel has been suffering from a retryable error
# for too long, time to give up and shut the streaming pull down.
self._on_rpc_done(exc)
raise

else:
self._rpc.send(request)
try:
self._send_unary_request(request)
except exceptions.GoogleAPICallError:
_LOGGER.debug(
"Exception while sending unary RPC. This is typically "
"non-fatal as stream requests are best-effort.",
exc_info=True,
)
except exceptions.RetryError as exc:
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
exc_info=False,
)
# The underlying channel has been suffering from a retryable error
# for too long, time to give up and shut the streaming pull down.
self._on_rpc_done(exc)
raise

def heartbeat(self):
"""Sends an empty request over the streaming pull RPC.
This always sends over the stream, regardless of if
``self._UNARY_REQUESTS`` is set or not.
Returns:
bool: If a heartbeat request has actually been sent.
"""
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(gapic_types.StreamingPullRequest())
return True

return False

def open(self, callback, on_callback_error):
"""Begin consuming messages.
Expand Down Expand Up @@ -517,11 +531,29 @@ def close(self, reason=None):
This method is idempotent. Additional calls will have no effect.
The method does not block, it delegates the shutdown operations to a background
thread.
Args:
reason (Any): The reason to close this. If None, this is considered
reason (Any): The reason to close this. If ``None``, this is considered
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
"""
thread = threading.Thread(
name=_REGULAR_SHUTDOWN_THREAD_NAME,
daemon=True,
target=self._shutdown,
kwargs={"reason": reason},
)
thread.start()

def _shutdown(self, reason=None):
"""Run the actual shutdown sequence (stop the stream and all helper threads).
Args:
reason (Any): The reason to close the stream. If ``None``, this is
considered an "intentional" shutdown.
"""
with self._closing:
if self._closed:
return
Expand All @@ -534,7 +566,9 @@ def close(self, reason=None):

# Shutdown all helper threads
_LOGGER.debug("Stopping scheduler.")
self._scheduler.shutdown()
dropped_messages = self._scheduler.shutdown(
await_msg_callbacks=self._await_callbacks_on_shutdown
)
self._scheduler = None

# Leaser and dispatcher reference each other through the shared
Expand All @@ -548,11 +582,23 @@ def close(self, reason=None):
# because the consumer gets shut down first.
_LOGGER.debug("Stopping leaser.")
self._leaser.stop()

total = len(dropped_messages) + len(
self._messages_on_hold._messages_on_hold
)
_LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
messages_to_nack = itertools.chain(
dropped_messages, self._messages_on_hold._messages_on_hold
)
for msg in messages_to_nack:
msg.nack()

_LOGGER.debug("Stopping dispatcher.")
self._dispatcher.stop()
self._dispatcher = None
# dispatcher terminated, OK to dispose the leaser reference now
self._leaser = None

_LOGGER.debug("Stopping heartbeater.")
self._heartbeater.stop()
self._heartbeater = None
Expand Down Expand Up @@ -722,7 +768,7 @@ def _on_rpc_done(self, future):
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
error = _wrap_as_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error}
name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
)
thread.daemon = True
thread.start()
13 changes: 13 additions & 0 deletions google/cloud/pubsub_v1/subscriber/client.py
Expand Up @@ -122,6 +122,7 @@ def subscribe(
flow_control=(),
scheduler=None,
use_legacy_flow_control=False,
await_callbacks_on_shutdown=False,
):
"""Asynchronously start receiving messages on a given subscription.
Expand Down Expand Up @@ -199,6 +200,17 @@ def callback(message):
*scheduler* to use when executing the callback. This controls
how callbacks are executed concurrently. This object must not be shared
across multiple SubscriberClients.
await_callbacks_on_shutdown (bool):
If ``True``, after canceling the returned future, the latter's
``result()`` method will block until the background stream and its
helper threads have been terminated, and all currently executing message
callbacks are done processing.
If ``False`` (default), the returned future's ``result()`` method will
not block after canceling the future. The method will instead return
immediately after the background stream and its helper threads have been
terminated, but some of the message callback threads might still be
running at that point.
Returns:
A :class:`~google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture`
Expand All @@ -212,6 +224,7 @@ def callback(message):
flow_control=flow_control,
scheduler=scheduler,
use_legacy_flow_control=use_legacy_flow_control,
await_callbacks_on_shutdown=await_callbacks_on_shutdown,
)

future = futures.StreamingPullFuture(manager)
Expand Down
49 changes: 41 additions & 8 deletions google/cloud/pubsub_v1/subscriber/scheduler.py
Expand Up @@ -54,8 +54,21 @@ def schedule(self, callback, *args, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def shutdown(self):
def shutdown(self, await_msg_callbacks=False):
"""Shuts down the scheduler and immediately end all pending callbacks.
Args:
await_msg_callbacks (bool):
If ``True``, the method will block until all currently executing
callbacks are done processing. If ``False`` (default), the
method will not wait for the currently running callbacks to complete.
Returns:
List[pubsub_v1.subscriber.message.Message]:
The messages submitted to the scheduler that were not yet dispatched
to their callbacks.
It is assumed that each message was submitted to the scheduler as the
first positional argument to the provided callback.
"""
raise NotImplementedError

Expand Down Expand Up @@ -103,15 +116,35 @@ def schedule(self, callback, *args, **kwargs):
"""
self._executor.submit(callback, *args, **kwargs)

def shutdown(self):
"""Shuts down the scheduler and immediately end all pending callbacks.
def shutdown(self, await_msg_callbacks=False):
"""Shut down the scheduler and immediately end all pending callbacks.
Args:
await_msg_callbacks (bool):
If ``True``, the method will block until all currently executing
executor threads are done processing. If ``False`` (default), the
method will not wait for the currently running threads to complete.
Returns:
List[pubsub_v1.subscriber.message.Message]:
The messages submitted to the scheduler that were not yet dispatched
to their callbacks.
It is assumed that each message was submitted to the scheduler as the
first positional argument to the provided callback.
"""
# Drop all pending item from the executor. Without this, the executor
# will block until all pending items are complete, which is
# undesirable.
dropped_messages = []

# Drop all pending item from the executor. Without this, the executor will also
# try to process any pending work items before termination, which is undesirable.
#
# TODO: Replace the logic below by passing `cancel_futures=True` to shutdown()
# once we only need to support Python 3.9+.
try:
while True:
self._executor._work_queue.get(block=False)
work_item = self._executor._work_queue.get(block=False)
dropped_messages.append(work_item.args[0])
except queue.Empty:
pass
self._executor.shutdown()

self._executor.shutdown(wait=await_msg_callbacks)
return dropped_messages

0 comments on commit d40d027

Please sign in to comment.