Skip to content

Commit

Permalink
revert: add graceful streaming pull shutdown (#315)
Browse files Browse the repository at this point in the history
This reverts commit 00874fe.
  • Loading branch information
anguillanneuf committed Mar 10, 2021
1 parent 4080b0a commit 16bf588
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 390 deletions.
3 changes: 3 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Expand Up @@ -99,6 +99,9 @@ def dispatch_callback(self, items):
ValueError: If ``action`` isn't one of the expected actions
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
"""
if not self._manager.is_active:
return

batched_commands = collections.defaultdict(list)

for item in items:
Expand Down
9 changes: 4 additions & 5 deletions google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py
Expand Up @@ -35,11 +35,10 @@ def __init__(self, manager, period=_DEFAULT_PERIOD):
self._period = period

def heartbeat(self):
"""Periodically send streaming pull heartbeats.
"""
while not self._stop_event.is_set():
if self._manager.heartbeat():
_LOGGER.debug("Sent heartbeat.")
"""Periodically send heartbeats."""
while self._manager.is_active and not self._stop_event.is_set():
self._manager.heartbeat()
_LOGGER.debug("Sent heartbeat.")
self._stop_event.wait(timeout=self._period)

_LOGGER.info("%s exiting.", _HEARTBEAT_WORKER_NAME)
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -126,7 +126,7 @@ def maintain_leases(self):
ack IDs, then waits for most of that time (but with jitter), and
repeats.
"""
while not self._stop_event.is_set():
while self._manager.is_active and not self._stop_event.is_set():
# Determine the appropriate duration for the lease. This is
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
Expand Down
Expand Up @@ -16,7 +16,6 @@

import collections
import functools
import itertools
import logging
import threading
import uuid
Expand Down Expand Up @@ -114,6 +113,10 @@ class StreamingPullManager(object):
scheduler will be used.
"""

_UNARY_REQUESTS = True
"""If set to True, this class will make requests over a separate unary
RPC instead of over the streaming RPC."""

def __init__(
self,
client,
Expand Down Expand Up @@ -289,9 +292,6 @@ def activate_ordering_keys(self, ordering_keys):
activate. May be empty.
"""
with self._pause_resume_lock:
if self._scheduler is None:
return # We are shutting down, don't try to dispatch any more messages.

self._messages_on_hold.activate_ordering_keys(
ordering_keys, self._schedule_message_on_hold
)
Expand Down Expand Up @@ -421,36 +421,37 @@ def send(self, request):
If a RetryError occurs, the manager shutdown is triggered, and the
error is re-raised.
"""
try:
self._send_unary_request(request)
except exceptions.GoogleAPICallError:
_LOGGER.debug(
"Exception while sending unary RPC. This is typically "
"non-fatal as stream requests are best-effort.",
exc_info=True,
)
except exceptions.RetryError as exc:
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
exc_info=False,
)
# The underlying channel has been suffering from a retryable error
# for too long, time to give up and shut the streaming pull down.
self._on_rpc_done(exc)
raise
if self._UNARY_REQUESTS:
try:
self._send_unary_request(request)
except exceptions.GoogleAPICallError:
_LOGGER.debug(
"Exception while sending unary RPC. This is typically "
"non-fatal as stream requests are best-effort.",
exc_info=True,
)
except exceptions.RetryError as exc:
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
exc_info=False,
)
# The underlying channel has been suffering from a retryable error
# for too long, time to give up and shut the streaming pull down.
self._on_rpc_done(exc)
raise

else:
self._rpc.send(request)

def heartbeat(self):
"""Sends an empty request over the streaming pull RPC.
Returns:
bool: If a heartbeat request has actually been sent.
This always sends over the stream, regardless of if
``self._UNARY_REQUESTS`` is set or not.
"""
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(gapic_types.StreamingPullRequest())
return True

return False

def open(self, callback, on_callback_error):
"""Begin consuming messages.
Expand Down Expand Up @@ -512,7 +513,7 @@ def open(self, callback, on_callback_error):
# Start the stream heartbeater thread.
self._heartbeater.start()

def close(self, reason=None, await_msg_callbacks=False):
def close(self, reason=None):
"""Stop consuming messages and shutdown all helper threads.
This method is idempotent. Additional calls will have no effect.
Expand All @@ -521,15 +522,6 @@ def close(self, reason=None, await_msg_callbacks=False):
reason (Any): The reason to close this. If None, this is considered
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
await_msg_callbacks (bool):
If ``True``, the method will wait until all scheduler threads terminate
and only then proceed with the shutdown with the remaining shutdown
tasks,
If ``False`` (default), the method will shut down the scheduler in a
non-blocking fashion, i.e. it will not wait for the currently executing
scheduler threads to terminate.
"""
with self._closing:
if self._closed:
Expand All @@ -543,9 +535,7 @@ def close(self, reason=None, await_msg_callbacks=False):

# Shutdown all helper threads
_LOGGER.debug("Stopping scheduler.")
dropped_messages = self._scheduler.shutdown(
await_msg_callbacks=await_msg_callbacks
)
self._scheduler.shutdown()
self._scheduler = None

# Leaser and dispatcher reference each other through the shared
Expand All @@ -559,23 +549,11 @@ def close(self, reason=None, await_msg_callbacks=False):
# because the consumer gets shut down first.
_LOGGER.debug("Stopping leaser.")
self._leaser.stop()

total = len(dropped_messages) + len(
self._messages_on_hold._messages_on_hold
)
_LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
messages_to_nack = itertools.chain(
dropped_messages, self._messages_on_hold._messages_on_hold
)
for msg in messages_to_nack:
msg.nack()

_LOGGER.debug("Stopping dispatcher.")
self._dispatcher.stop()
self._dispatcher = None
# dispatcher terminated, OK to dispose the leaser reference now
self._leaser = None

_LOGGER.debug("Stopping heartbeater.")
self._heartbeater.stop()
self._heartbeater = None
Expand Down
15 changes: 2 additions & 13 deletions google/cloud/pubsub_v1/subscriber/futures.py
Expand Up @@ -43,23 +43,12 @@ def _on_close_callback(self, manager, result):
else:
self.set_exception(result)

def cancel(self, await_msg_callbacks=False):
def cancel(self):
"""Stops pulling messages and shutdowns the background thread consuming
messages.
Args:
await_msg_callbacks (bool):
If ``True``, the method will block until the background stream and its
helper threads have has been terminated, as well as all currently
executing message callbacks are done processing.
If ``False`` (default), the method returns immediately after the
background stream and its helper threads have has been terminated, but
some of the message callback threads might still be running at that
point.
"""
self._cancelled = True
return self._manager.close(await_msg_callbacks=await_msg_callbacks)
return self._manager.close()

def cancelled(self):
"""
Expand Down
59 changes: 15 additions & 44 deletions google/cloud/pubsub_v1/subscriber/scheduler.py
Expand Up @@ -20,6 +20,7 @@

import abc
import concurrent.futures
import sys

import six
from six.moves import queue
Expand Down Expand Up @@ -57,29 +58,19 @@ def schedule(self, callback, *args, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def shutdown(self, await_msg_callbacks=False):
def shutdown(self):
"""Shuts down the scheduler and immediately end all pending callbacks.
Args:
await_msg_callbacks (bool):
If ``True``, the method will block until all currently executing
callbacks are done processing. If ``False`` (default), the
method will not wait for the currently running callbacks to complete.
Returns:
List[pubsub_v1.subscriber.message.Message]:
The messages submitted to the scheduler that were not yet dispatched
to their callbacks.
It is assumed that each message was submitted to the scheduler as the
first positional argument to the provided callback.
"""
raise NotImplementedError


def _make_default_thread_pool_executor():
return concurrent.futures.ThreadPoolExecutor(
max_workers=10, thread_name_prefix="ThreadPoolExecutor-ThreadScheduler"
)
# Python 2.7 and 3.6+ have the thread_name_prefix argument, which is useful
# for debugging.
executor_kwargs = {}
if sys.version_info[:2] == (2, 7) or sys.version_info >= (3, 6):
executor_kwargs["thread_name_prefix"] = "ThreadPoolExecutor-ThreadScheduler"
return concurrent.futures.ThreadPoolExecutor(max_workers=10, **executor_kwargs)


class ThreadScheduler(Scheduler):
Expand Down Expand Up @@ -119,35 +110,15 @@ def schedule(self, callback, *args, **kwargs):
"""
self._executor.submit(callback, *args, **kwargs)

def shutdown(self, await_msg_callbacks=False):
"""Shut down the scheduler and immediately end all pending callbacks.
Args:
await_msg_callbacks (bool):
If ``True``, the method will block until all currently executing
executor threads are done processing. If ``False`` (default), the
method will not wait for the currently running threads to complete.
Returns:
List[pubsub_v1.subscriber.message.Message]:
The messages submitted to the scheduler that were not yet dispatched
to their callbacks.
It is assumed that each message was submitted to the scheduler as the
first positional argument to the provided callback.
def shutdown(self):
"""Shuts down the scheduler and immediately end all pending callbacks.
"""
dropped_messages = []

# Drop all pending item from the executor. Without this, the executor will also
# try to process any pending work items before termination, which is undesirable.
#
# TODO: Replace the logic below by passing `cancel_futures=True` to shutdown()
# once we only need to support Python 3.9+.
# Drop all pending item from the executor. Without this, the executor
# will block until all pending items are complete, which is
# undesirable.
try:
while True:
work_item = self._executor._work_queue.get(block=False)
dropped_messages.append(work_item.args[0])
self._executor._work_queue.get(block=False)
except queue.Empty:
pass

self._executor.shutdown(wait=await_msg_callbacks)
return dropped_messages
self._executor.shutdown()

0 comments on commit 16bf588

Please sign in to comment.