Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle exceptions in created tasks #163

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

torarvid
Copy link

@torarvid torarvid commented Jan 5, 2023

When create_task is called, the done_callback should check whether or not an exception was raised, and if so, re-raise it.

@torarvid
Copy link
Author

torarvid commented Jan 5, 2023

Hi @mosquito 👋🏻 I'm not 100% certain this change is the right fix, but I wanted to try and of course you can judge whether or not you think it makes sense.

In an application at my job that uses aio-pika, we've seen quite a few instances of Task exceptions not being handled. Typically, they look like Task exception was never retrieved future: <Task finished name='Task-2437' coro=<consumer() done, defined at /Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aio_pika/queue.py:24> exception=[...blablabla...].

Adding this change makes my logs spit out this instead:

Exception in callback FutureStore.__on_task_done.<locals>.remover(<Task finishe...ven uuid."}')>) at /Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aiormq/base.py:33
handle: <Handle FutureStore.__on_task_done.<locals>.remover(<Task finishe...ven uuid."}')>) at /Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aiormq/base.py:33>

One reason I'm not sure if this is a 100% good fix, is that now I get two exception log outputs instead of one, with nearly identical stack traces. They start like this:

Traceback (most recent call last):
  File "/Users/tor/.pyenv/versions/3.10.1/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aiormq/base.py", line 43, in remover
    raise exc
  File "/Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aio_pika/queue.py", line 30, in consumer
    return await create_task(callback, message)
  # and more that is identical in both cases...
Traceback (most recent call last):
  File "/Users/tor/.pyenv/versions/3.10.1/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aiormq/base.py", line 43, in remover
    raise exc
  File "/Users/tor/.pyenv/versions/3.10.1/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aiormq/base.py", line 43, in remover
    raise exc
  File "/Users/tor/oda/trex/.venv/lib/python3.10/site-packages/aio_pika/queue.py", line 30, in consumer
    return await create_task(callback, message)
  # and more that is identical in both cases...

I don't know if you can make better sense of this than me, but at least it seems appropriate to raise task exceptions when they happen 🙂

@torarvid
Copy link
Author

torarvid commented Feb 6, 2023

Hello, @mosquito. Have you had any time to look at this and see whether you think it makes sense or not? 🙂

@mosquito
Copy link
Owner

mosquito commented Feb 6, 2023

@torarvid Hi, haven't looked closely yet. All free time is busy with this PR #164. If you have any ideas how to fix this case feel free to open a PR

@mosquito
Copy link
Owner

mosquito commented Feb 6, 2023

Oh, at first I answered, then I realized that this is PR.

@mosquito
Copy link
Owner

mosquito commented Feb 6, 2023

This patch should fix your issue I guess
diff --git a/aiormq/abc.py b/aiormq/abc.py
index 751385f..7c192ae 100644
--- a/aiormq/abc.py
+++ b/aiormq/abc.py
@@ -27,24 +27,24 @@ ExceptionType = Union[BaseException, Type[BaseException]]

 # noinspection PyShadowingNames
 class TaskWrapper:
-    __slots__ = "exception", "task"
+    __slots__ = "_exception", "task"

-    exception: Union[BaseException, Type[BaseException]]
+    _exception: Union[BaseException, Type[BaseException]]
     task: asyncio.Task

     def __init__(self, task: asyncio.Task):
         self.task = task
-        self.exception = asyncio.CancelledError
+        self._exception = asyncio.CancelledError

     def throw(self, exception: ExceptionType) -> None:
-        self.exception = exception
+        self._exception = exception
         self.task.cancel()

     async def __inner(self) -> Any:
         try:
             return await self.task
         except asyncio.CancelledError as e:
-            raise self.exception from e
+            raise self._exception from e

     def __await__(self, *args: Any, **kwargs: Any) -> Any:
         return self.__inner().__await__()

@torarvid
Copy link
Author

torarvid commented Feb 7, 2023

This patch should fix your issue I guess

[patch omitted]

Good morning, @mosquito. Thanks for the reply, but I don't think I understand your suggestion... To my eyes, it seems it just renames exception to _exception, but perhaps there is some underlying magic that I don't understand (if for example _exception is somehow a "special name").

My premise for this PR is that it's possible for a task (child) to raise an exception, and for that exception to not be propagated to the awaiting task (parent), because the child task is not raised in the done_callback.

@mosquito
Copy link
Owner

mosquito commented Feb 7, 2023

The main idea is the TaskWrapper.__getattr__ will start to return task exception method instead of the it's own attribute.

@torarvid
Copy link
Author

torarvid commented Feb 7, 2023

@mosquito Ah, so you mean with that patch, I could simply add

exc = future.exception()
if exc is not None:
    raise exc

and skip the if isinstance(future, TaskWrapper) stuff?

@mosquito
Copy link
Owner

mosquito commented Feb 9, 2023

please rebase this against master branch

@torarvid
Copy link
Author

torarvid commented Feb 9, 2023

@mosquito rebased now ✅

aiormq/base.py Outdated
@@ -35,6 +35,10 @@ def remover(*_: Any) -> None:
if future in self.futures:
self.futures.remove(future)

exc = future.exception()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is asyncio.CancelledError exception not handled here by intention?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that was not by intention (I was just not aware this needed any special handling). I tried adding a workaround there now.

@mosquito
Copy link
Owner

@torarvid could you please double check this on latest release?

When create_task is called, the done_callback should check whether or
not an exception was raised, and if so, re-raise it.
@torarvid
Copy link
Author

torarvid commented Feb 20, 2023

@torarvid could you please double check this on latest release?

@mosquito Well, I have the problem that I don't know how 😬 🙈 We have seen some issues in production that occur if there is ever "a blip in the network", but we have not been able to reproduce these issues locally on developer machines.

So this PR is not the result of me "finding a specific bug", but rather that we've seen cases — both in our own code and in the code of libraries that we use — that code sometimes call asyncio.create_task but without adding a done_callback that re-raises any errors that occurs in the underlying task.

I consider it a good rule-of-thumb that "if one creates a task t where you cannot guarantee that it's always awaited

t = asyncio.create_task(some_coroutine)
# ...
await t # <-- crucial step

... then it's good hygiene to always add a done_callback where any exception from some_coroutine is re-raised (maybe except CancelledError as @decaz pointed out in a comment above)

I don't consider myself an expert in the asyncio domain though, so I'm interested to hear if there are other arguments here that are relevant 🙂

@mosquito
Copy link
Owner

@torarvid I'm glad to know that in your production it does no harm. However, to be honest, we have cases in our production when connections are broken, the network is generally not a very reliable thing, and I have been struggling with problems of this kind for several releases. It gets better every release, of course, and I encourage you to try updating to the latest version of the library.

@torarvid
Copy link
Author

@mosquito yeah, we have noticed improved resilience towards network blips with the later releases, so kudos for all the recent improvements 🎉

@mosquito
Copy link
Owner

@torarvid I think these changes break something. Somewhere it's important to get a CanceledError and this is a reason when tests has been broken. But what exactly I don't understand yet.

@torarvid
Copy link
Author

@mosquito that could be true, but remember: before I made the most recent change today, only a single commit was in the PR, and it didn't have any special handling of CancelledError, so it could possibly be something else?

@torarvid
Copy link
Author

@mosquito I found out some details by hacking channel_max to be 2 (instead of 2047) here. Looks like there's some wrapping happening that probably gets worse and worse when channel_max is 2047? (I'm on very thin ice here, not quite sure what I'm doing 🤷🏻‍♂️). My pytest output for the first of these tests below:

~/dev/aiormq on  check-for-task-exceptions [!] via 🐍 pyenv 3.11.1 
15:02:49 ❯ poetry run pytest -vv tests/test_connection.py -k test_no_free_channels -s -rP
======================================================================================= test session starts ========================================================================================
platform darwin -- Python 3.11.1, pytest-7.2.0, pluggy-1.0.0 -- /Users/tor/dev/aiormq/.venv/bin/python
cachedir: .pytest_cache
rootdir: /Users/tor/dev/aiormq
plugins: rst-0.0.7, cov-4.0.0, aiomisc-16.2.10, pylama-8.4.1
collected 237 items / 233 deselected / 4 selected                                                                                                                                                  

tests/test_connection.py::test_no_free_channels[amqp] Unexpected connection close from remote "amqp://guest:******@localhost:5672/", Connection.Close(reply_code=530, reply_text='NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)')
NoneType: None
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb4c0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb600>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092ebd80>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092e9bc0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb740>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092ebba0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb7e0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb880>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
    raise exc
  File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
    return await self.task
           ^^^^^^^^^^^^^^^
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
    await handler(frame)
  File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
    raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
PASSED

@mosquito mosquito self-assigned this Feb 20, 2023
@mosquito
Copy link
Owner

@torarvid If I correctly understood your idea, then the result should be obtained only from tasks.

@torarvid
Copy link
Author

@mosquito I think my idea is mostly that we should (both for Futures and Tasks) ensure that if the caller does not await the future, we don't swallow exceptions. And as far as I know, the only way to do that is to have a done_callback that checks whether or not there are any exceptions.

@torarvid
Copy link
Author

@mosquito I see that there is a FutureStore instance, and it also has a parent FutureStore. It looks like a single future could be added both to the child and the parent FutureStore (possibly more than 2 as well in theory, as long as parent != None?). In those cases I believe we will now have a single future with one done_callback for each FutureStore, which means one exception will be raised per callback.

I don't understand the code too well, but it feels like the concept of "remove the future from the self.futures set" (you probably want to do this through the whole chain of futurestore.parent.parent.parent...) is mixed up with "ensuring coroutine exceptions are raised to the caller" (you probably want to do this once regardless of the number of future stores in the .parent.parent... chain)

@torarvid
Copy link
Author

I did a bit more debugging.

  1. I set channel_max = 20
  2. I added a counter that prints every time we run the TaskWrapper ctor (in total, 264 tasks are created)
  3. I added a counter that prints every time we run the FutureStore ctor (in total, 88 stores are created)
  4. I added a (global) counter that prints every time we run the FutureStore.add method (in total, 684 calls)
  5. I added a (global) counter that prints every time we run the FutureStore.__on_task_done method (in total, 684 calls)

(My command line: poetry run pytest -vv tests/test_connection.py -k test_no_free_channels -s)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants