Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future is potentially leaked in basic_ack, basic_nack, basic_reject, basic_publish #176

Open
axnsan12 opened this issue Mar 29, 2023 · 0 comments

Comments

@axnsan12
Copy link

In Channel methods that use the write_queue with the drain_future pattern, the drain_future will be leaked if self.write_queue.put throws an exception (for example, asyncio.CancelledError when shutting down the loop).

ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=Exception() created at C:\Python\Python310\lib\asyncio\base_events.py:424>
source_traceback: Object created at (most recent call last):
  File "main.py", line 277, in <module>
    asyncio.run(main())
  File "C:\Python\Python310\lib\asyncio\runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "C:\Python\Python310\lib\asyncio\runners.py", line 63, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 628, in run_until_complete
    self.run_forever()
  File "C:\Python\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 595, in run_forever
    self._run_once()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 1873, in _run_once
    handle._run()
  File "C:\Python\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  [...]
  File "C:\Python\Python310\lib\site-packages\aio_pika\message.py", line 604, in __aexit__
    await self.message.reject(requeue=self.requeue)
  File "C:\Python\Python310\lib\site-packages\aio_pika\message.py", line 501, in reject
    await self.__channel.basic_reject(
  File "C:\Python\Python310\lib\site-packages\aiormq\channel.py", line 590, in basic_reject
    drain_future = self.create_future()
  File "C:\Python\Python310\lib\site-packages\aiormq\base.py", line 115, in create_future
    return self.__future_store.create_future()
  File "C:\Python\Python310\lib\site-packages\aiormq\base.py", line 72, in create_future
    future = self.loop.create_future()
  File "C:\Python\Python310\lib\asyncio\base_events.py", line 424, in create_future
    return futures.Future(loop=self)

Example offending code snippet:

aiormq/aiormq/channel.py

Lines 540 to 556 in 5b9c88d

drain_future = self.create_future() if wait else None
await self.write_queue.put(
ChannelFrame.marshall(
frames=[
spec.Basic.Ack(
delivery_tag=delivery_tag,
multiple=multiple,
),
],
channel_number=self.number,
drain_future=drain_future,
),
)
if drain_future is not None:
await drain_future

Not sure what a robust solution would be here, maybe try-except the queue op and drain_future.cancel() on execption.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant