New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming pull shutdown hangs indefinitely on error in callback #394
Comments
@Sam-Persoon Interesting, that In any case, the library should handle this gracefully, thanks for the report! |
@Sam-Persoon BTW, are you sure you are indeed using graceful shutdown? By default this is disabled (to match what had been a default before), and can be enabled when starting the subscription by setting streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, await_callbacks_on_shutdown=True
) Edit: Ah, missed it, it happens regardless of that flag.
|
@Sam-Persoon I was not able to reproduce the deadlock locally, but I did reproduce the error:
As suspected, this happens when the executor is already in shutdown mode and puts BTW, any errors raised from within the callback are propagated up through Edit: diff --git google/cloud/pubsub_v1/subscriber/scheduler.py google/cloud/pubsub_v1/subscriber/scheduler.py
index a11ca49..b8f2b59 100644
--- google/cloud/pubsub_v1/subscriber/scheduler.py
+++ google/cloud/pubsub_v1/subscriber/scheduler.py
@@ -21,6 +21,7 @@ each message.
import abc
import concurrent.futures
import queue
+import warnings
class Scheduler(metaclass=abc.ABCMeta):
@@ -114,7 +115,14 @@ class ThreadScheduler(Scheduler):
Returns:
None
"""
- self._executor.submit(callback, *args, **kwargs)
+ try:
+ self._executor.submit(callback, *args, **kwargs)
+ except RuntimeError:
+ warnings.warn(
+ "Scheduling a callback after executor shutdown.",
+ category=RuntimeWarning,
+ stacklevel=2,
+ )
def shutdown(self, await_msg_callbacks=False):
"""Shut down the scheduler and immediately end all pending callbacks.
@@ -142,6 +150,8 @@ class ThreadScheduler(Scheduler):
try:
while True:
work_item = self._executor._work_queue.get(block=False)
+ if work_item is None: # Exceutor in shutdown mode.
+ continue
dropped_messages.append(work_item.args[0])
except queue.Empty:
pass Edit 2: Or just use the PR branch directly. |
@Sam-Persoon There's a chance #399 fixed the issue, but I cannot verify for sure (it does fix a few possible errors on shutdown). We'll release the fix soon, but if turns out that it's not sufficient and the hangs still occur, please re-open the issue (with any new info, if available). Thanks! |
Hi @plamut, is there a branch with this fix I can use? Or how would I go about ignoring this issue? I'm getting it on my ACK() and it's stopping my Kubernetes worker instances. In trying to solve my issue (#593 (comment)), I was asked to add "min_duration_per_lease_extension" and also enable exactly one message delivery. One of these has caused this NoneType issue to occur (about 1 in 5 times). Thanks |
@plamut thanks for the help and so far so good! |
Environment details
google-cloud-pubsub
version: 2.4.1Steps to reproduce
When an exception occurs in the callback, the subscriber will hang indefinitely in some cases (about 1 in 5). It seems the queue returns
None
instead ofqueue.Empty
here:python-pubsub/google/cloud/pubsub_v1/subscriber/scheduler.py
Lines 142 to 147 in 5812018
This happens both with
await_callbacks_on_shutdown=True
andawait_callbacks_on_shutdown=False
.Code example
Stack trace
The text was updated successfully, but these errors were encountered: