Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiormq 6.2.3 can leak a socket if cancelled during channel.open #139

Open
TBBle opened this issue Apr 12, 2022 · 0 comments
Open

aiormq 6.2.3 can leak a socket if cancelled during channel.open #139

TBBle opened this issue Apr 12, 2022 · 0 comments

Comments

@TBBle
Copy link

TBBle commented Apr 12, 2022

Tracing a leaked socket in our unit tests, I have observed that in a task of the form

            self.connection = await aio_pika.connect_robust(rmq_url)
            async with self.connection.channel() as channel:
                await channel.set_qos(prefetch_count=1)
                queue = await channel.declare_queue(self.rmq_queue_name, durable=True)

                async for message in queue:
                    yield message

if we cancel the task during the second line, a socket will be leaked unclosed (port 34496 is the AMQP socket we're connecting to).

[2022-04-12T09:49:48.925Z] >               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
[2022-04-12T09:49:48.925Z] E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <socket.socket fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
[2022-04-12T09:49:48.925Z] E               
[2022-04-12T09:49:48.925Z] E               Traceback (most recent call last):
[2022-04-12T09:49:48.925Z] E                 File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiohttp/test_utils.py", line 560, in teardown_test_loop
[2022-04-12T09:49:48.925Z] E                   gc.collect()
[2022-04-12T09:49:48.925Z] E               ResourceWarning: unclosed <socket.socket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 60574), raddr=('127.0.0.1', 34496)>
[2022-04-12T09:49:48.925Z] 

Tracing through, the problem appears to be that in aiormq.Connection.channel, the call to await channel.open can block and later be cancelled. This only seems to reproduce on our test suite when the test is very quick, and we can see from the debug logs:

[2022-04-12T09:49:43.109Z] DEBUG    aio_pika.connection:connection.py:201 Creating AMQP channel for connection: <RobustConnection: "amqp://guest:******@127.0.0.1:34496" 0 channels>
[2022-04-12T09:49:43.109Z] DEBUG    aio_pika.connection:connection.py:210 Channel created: <RobustChannel #Not initialized channel "None">
[2022-04-12T09:49:43.109Z] -------------------------------- live log call ---------------------------------
2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x7f938da8dfc0>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Channel.Open object at 0x7f938da8dfc0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
<some test output>
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Channel.OpenOk object at 0x7f938da92400> in channel #1 weight=16 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x7f938da8d540>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Confirm.Select object at 0x7f938da8d540> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
<more test output>
[2022-04-12T09:49:43.109Z] PASSED                                                                   [ 16%]
[2022-04-12T09:49:43.109Z] ------------------------------ live log teardown -------------------------------
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Confirm.SelectOk object at 0x7f938dac6e80> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] <cancellation of the above task here>
[2022-04-12T09:49:43.109Z] <await the task>
[2022-04-12T09:49:43.109Z] WARNING  aiormq.channel:channel.py:179 Closing channel <Channel: "1" at 0x7f938d9672c0> because RPC call <Confirm.Select object at 0x7f938da8d540> cancelled
<we close the whole connection>
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:625 Closing connection <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0> cause: <class 'asyncio.exceptions.CancelledError'>
[2022-04-12T09:49:43.109Z] DEBUG    aio_pika.robust_connection:robust_connection.py:68 Closing AMQP connection <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938d947680>], drain_future=None)
[2022-04-12T09:49:43.109Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938d947680> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.110Z] DEBUG    aiormq.connection:connection.py:594 Writer exited for <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>
[2022-04-12T09:49:43.110Z] DEBUG    aiormq.connection:connection.py:463 Reader exited for <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938dc32cc0>

In contrast, the following flow for basically the same test doesn't leak a socket

[2022-04-12T09:49:44.053Z] DEBUG    aio_pika.connection:connection.py:201 Creating AMQP channel for connection: <RobustConnection: "amqp://guest:******@127.0.0.1:34496" 0 channels>
[2022-04-12T09:49:44.053Z] DEBUG    aio_pika.connection:connection.py:210 Channel created: <RobustChannel #Not initialized channel "None">
[2022-04-12T09:49:44.053Z] -------------------------------- live log call ---------------------------------
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x7f938dc0b2c0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Channel.Open object at 0x7f938dc0b2c0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
<some test output>
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Channel.OpenOk object at 0x7f938d855780> in channel #1 weight=16 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x7f938dc1a8c0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Confirm.Select object at 0x7f938dc1a8c0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Confirm.SelectOk object at 0x7f938d8e5280> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x7f938d8a4db0>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Basic.Qos object at 0x7f938d8a4db0> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Basic.QosOk object at 0x7f938d85c580> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.053Z] DEBUG    aio_pika.queue:queue.py:91 Declaring queue: <RobustQueue(PSResponses): auto_delete=False, durable=True, exclusive=False, arguments=None
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x7f938d8a8c10>], drain_future=None)
[2022-04-12T09:49:44.053Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Queue.Declare object at 0x7f938d8a8c10> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
<more test output>
[2022-04-12T09:49:44.054Z] PASSED                                                                   [ 33%]
[2022-04-12T09:49:44.054Z] ------------------------------ live log teardown -------------------------------
[2022-04-12T09:49:44.054Z] <cancellation of the above task here>
[2022-04-12T09:49:44.054Z] <await the task>
[2022-04-12T09:49:44.054Z] WARNING  aiormq.channel:channel.py:179 Closing channel <Channel: "1" at 0x7f938dc01590> because RPC call <Queue.Declare object at 0x7f938d8a8c10> cancelled
[2022-04-12T09:49:44.054Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938dc0a950>], drain_future=None)
[2022-04-12T09:49:44.054Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938dc0a950> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Queue.DeclareOk object at 0x7f938dc01860> in channel #1 weight=32 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG    aiormq.connection:connection.py:542 Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Close object at 0x7f938dbff400>], drain_future=None)
[2022-04-12T09:49:44.315Z] DEBUG    aiormq.connection:connection.py:547 Sending frame <Channel.Close object at 0x7f938dbff400> in channel #1 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>
[2022-04-12T09:49:44.315Z] DEBUG    aio_pika.robust_channel:robust_channel.py:107 Robust channel <RobustChannel #Not initialized channel "None"> has been closed.
[2022-04-12T09:49:44.315Z] DEBUG    aiormq.connection:connection.py:484 Received frame <Channel.CloseOk object at 0x7f938d85d5e0> in channel #1 weight=12 on <Connection: "amqp://guest:******@127.0.0.1:34496" at 0x7f938db3bc20>

In the failing case, async with self.connection.channel() as channel: never completes, but in the success case, you can see we got through channel.set_qos and channel.declare_queue as well.

The call-stack for the failure case:

[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 190, in rpc
[2022-04-12T09:03:08.983Z]     raise e
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 169, in rpc
[2022-04-12T09:03:08.983Z]     result = await countdown(self.rpc_frames.get())
[2022-04-12T09:03:08.983Z]   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
[2022-04-12T09:03:08.983Z]     await getter
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError
[2022-04-12T09:03:08.983Z] 
[2022-04-12T09:03:08.983Z] During handling of the above exception, another exception occurred:
[2022-04-12T09:03:08.983Z] 
[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/abc.py", line 40, in __inner
[2022-04-12T09:03:08.983Z]     return await self.task
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError
[2022-04-12T09:03:08.983Z] 
[2022-04-12T09:03:08.983Z] The above exception was the direct cause of the following exception:
[2022-04-12T09:03:08.983Z] 
[2022-04-12T09:03:08.983Z] Traceback (most recent call last):
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/src/webservice/status_update_api.py", line 33, in message_dispatcher
[2022-04-12T09:03:08.983Z]     async for message in app["rmq"].message_queue():
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/lib/shared/src/ps/shared/rabbitmq.py", line 68, in message_queue
[2022-04-12T09:03:08.983Z]     async with self.connection.channel() as channel:
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 142, in __aenter__
[2022-04-12T09:03:08.983Z]     await self.initialize()
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 213, in initialize
[2022-04-12T09:03:08.983Z]     channel: aiormq.abc.AbstractChannel = await self._create_channel(
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aio_pika/channel.py", line 200, in _create_channel
[2022-04-12T09:03:08.983Z]     return await self.connection.connection.channel(
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/connection.py", line 704, in channel
[2022-04-12T09:03:08.983Z]     await channel.open(timeout=timeout)
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/channel.py", line 198, in open
[2022-04-12T09:03:08.983Z]     await self.rpc(spec.Confirm.Select())
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/base.py", line 162, in wrap
[2022-04-12T09:03:08.983Z]     return await self.create_task(func(self, *args, **kwargs))
[2022-04-12T09:03:08.983Z]   File "/var/jenkins/workspace/ices_upgrade-aio-pika-and-aiormq/projects/webservice/.venv/lib/python3.9/site-packages/aiormq/abc.py", line 42, in __inner
[2022-04-12T09:03:08.983Z]     raise self.exception from e
[2022-04-12T09:03:08.983Z] asyncio.exceptions.CancelledError

My suspicion is that when the call to channel.open fails in Connection.channel, the channel is not stored in self.channels, but I'm not sure why that leads to a leaked socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant