Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming pull shutdown hangs indefinitely on error in callback #394

Closed
Sam-Persoon opened this issue Apr 22, 2021 · 7 comments · Fixed by #399
Closed

Streaming pull shutdown hangs indefinitely on error in callback #394

Sam-Persoon opened this issue Apr 22, 2021 · 7 comments · Fixed by #399
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@Sam-Persoon
Copy link

Sam-Persoon commented Apr 22, 2021

Environment details

  • OS type and version: 4.19.121-linuxkit
  • Python version: 3.8.9
  • pip version: 21.0.1
  • google-cloud-pubsub version: 2.4.1

Steps to reproduce

  1. Use the StreamingPull subscriber
  2. Throw an exception in the callback function
  3. Will in some cases hang indefinitely

When an exception occurs in the callback, the subscriber will hang indefinitely in some cases (about 1 in 5). It seems the queue returns None instead of queue.Empty here:

try:
while True:
work_item = self._executor._work_queue.get(block=False)
dropped_messages.append(work_item.args[0])
except queue.Empty:
pass

This happens both with await_callbacks_on_shutdown=True and await_callbacks_on_shutdown=False.

Code example

from concurrent.futures import TimeoutError
from google.cloud.pubsub import SubscriberClient

project_id = "your-project-id"
subscription_id = "your-subscription-id"

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    raise Exception()

    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

with subscriber:
    try:
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()

Stack trace

Exception in thread Thread-OnRpcTerminated:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 569, in _shutdown
    dropped_messages = self._scheduler.shutdown(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/scheduler.py", line 145, in shutdown
    dropped_messages.append(work_item.args[0])
AttributeError: 'NoneType' object has no attribute 'args'
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Apr 22, 2021
@plamut
Copy link
Contributor

plamut commented Apr 22, 2021

@Sam-Persoon Interesting, that None might be a poison pill or something that the executor puts into the queue when it detects an exception in the callback (just a guess).

In any case, the library should handle this gracefully, thanks for the report!

@plamut plamut self-assigned this Apr 22, 2021
@plamut plamut added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Apr 22, 2021
@plamut
Copy link
Contributor

plamut commented Apr 22, 2021

@Sam-Persoon BTW, are you sure you are indeed using graceful shutdown? By default this is disabled (to match what had been a default before), and can be enabled when starting the subscription by setting await_callbacks_on_shutdown to True :

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, await_callbacks_on_shutdown=True
)

Edit: Ah, missed it, it happens regardless of that flag.

This happens both with await_callbacks_on_shutdown=True and await_callbacks_on_shutdown=False.

@plamut plamut changed the title Streaming graceful shutdown hangs indefinitely Streaming pull shutdown hangs indefinitely on error in callback Apr 22, 2021
@plamut
Copy link
Contributor

plamut commented Apr 27, 2021

@Sam-Persoon I was not able to reproduce the deadlock locally, but I did reproduce the error:

Exception in thread Thread-OnRpcTerminated:
Traceback (most recent call last):
    ...
    dropped_messages.append(work_item.args[0])
AttributeError: 'NoneType' object has no attribute 'args'

As suspected, this happens when the executor is already in shutdown mode and puts None value(s) into its internal queue. I'll make the internal scheduler handle this case more gracefully. I can't say if this will fix the hanging behavior, but it's a start (the shutdown will proceed without errors).

BTW, any errors raised from within the callback are propagated up through streaming_pull_future.result(). If you expect the errors to happen, you might want to catch them in your top level code in the try-catch block wrapping the .result() call.

Edit:
If you feel like it, you can apply the following patch and see if it fixes the hanging shutdown (I could not reproduce that, unfortunately):

diff --git google/cloud/pubsub_v1/subscriber/scheduler.py google/cloud/pubsub_v1/subscriber/scheduler.py
index a11ca49..b8f2b59 100644
--- google/cloud/pubsub_v1/subscriber/scheduler.py
+++ google/cloud/pubsub_v1/subscriber/scheduler.py
@@ -21,6 +21,7 @@ each message.
 import abc
 import concurrent.futures
 import queue
+import warnings


 class Scheduler(metaclass=abc.ABCMeta):
@@ -114,7 +115,14 @@ class ThreadScheduler(Scheduler):
         Returns:
             None
         """
-        self._executor.submit(callback, *args, **kwargs)
+        try:
+            self._executor.submit(callback, *args, **kwargs)
+        except RuntimeError:
+            warnings.warn(
+                "Scheduling a callback after executor shutdown.",
+                category=RuntimeWarning,
+                stacklevel=2,
+            )

     def shutdown(self, await_msg_callbacks=False):
         """Shut down the scheduler and immediately end all pending callbacks.
@@ -142,6 +150,8 @@ class ThreadScheduler(Scheduler):
         try:
             while True:
                 work_item = self._executor._work_queue.get(block=False)
+                if work_item is None:  # Exceutor in shutdown mode.
+                    continue
                 dropped_messages.append(work_item.args[0])
         except queue.Empty:
             pass

Edit 2: Or just use the PR branch directly.

@plamut
Copy link
Contributor

plamut commented May 18, 2021

@Sam-Persoon There's a chance #399 fixed the issue, but I cannot verify for sure (it does fix a few possible errors on shutdown).

We'll release the fix soon, but if turns out that it's not sufficient and the hangs still occur, please re-open the issue (with any new info, if available). Thanks!

@jtressle
Copy link

jtressle commented Apr 6, 2022

Hi @plamut, is there a branch with this fix I can use? Or how would I go about ignoring this issue? I'm getting it on my ACK() and it's stopping my Kubernetes worker instances.

In trying to solve my issue (#593 (comment)), I was asked to add "min_duration_per_lease_extension" and also enable exactly one message delivery. One of these has caused this NoneType issue to occur (about 1 in 5 times).

Thanks

@plamut
Copy link
Contributor

plamut commented Apr 7, 2022

@jtressle As of 2022 I am no longer maintaining this library, so I'm a bit out of the loop.

I read that a new version has been released, so you can try that first, but if it doesn't help, best to bring it up to the current maintainers' attention.

Fingers crossed that upgrading will help, though. 🤞

@jtressle
Copy link

jtressle commented Apr 7, 2022

@plamut thanks for the help and so far so good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants