Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using channel after CancelledError exception in basic_consume callback can cause connection to close #70

Open
moznuy opened this issue Jan 24, 2020 · 4 comments

Comments

@moznuy
Copy link

moznuy commented Jan 24, 2020

Python 3.8.1

aio-pika 6.4.1
aiormq 3.2.0

Consider callback example (using aio_pika):

async def callback(message: IncomingMessage):
    async with message.process():
        await asyncio.sleep(0.1)

Or plain aiormq:

async def callback(message: DeliveredMessage):
    try:
        await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        await message.channel.basic_nack(message.delivery.delivery_tag)
    else:
        await message.channel.basic_ack(message.delivery.delivery_tag)

and this section:

aiormq/aiormq/base.py

Lines 139 to 143 in 63a8b0d

with suppress(Exception):
await self._on_close(exc)
with suppress(Exception):
await self._cancel_tasks(exc)

Those two examples give me:

"CHANNEL_ERROR - expected 'channel.open'"

in Connection.add_close_callback (Connection.closing.add_done_callback for aiormq) after await channel.close()

I understand that second example is far-fetched. It's not exactly necessary to send nack/reject in case of CancelledError. But I ran into something similar to first example and had been debugging for a long time what causes sending nacks after channel has been closed. I thought that I simply can't send anything after close, so I needed to wrap code in callback in some logic to prevent sending messages after close. And than I found that channel.close waits for all subtasks(including on_message callbacks?) but for some reason after it have already sent Channel.Close method via AMQP
So my question is following: Is closing channel first and than cancelling subtasks intentional?

One full example:

import asyncio
import logging
from typing import Optional

import aio_pika
from aio_pika import IncomingMessage

import sys


logging.basicConfig(format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s', level=logging.DEBUG, stream=sys.stdout)
queue_name = 'test_queue'


async def callback(message: IncomingMessage):
    try:
        async with message.process():
            await asyncio.sleep(0.1)
    finally:
        print('--- Callback finished')


def close_callback(reason):
    logging.warning('CONNECTION CLOSED %s', str(reason))


async def consumer():
    connection: Optional[aio_pika.Connection] = None
    channel: Optional[aio_pika.channel.Channel] = None
    tag: Optional[str] = None
    q: Optional[aio_pika.Queue] = None

    try:
        connection = await aio_pika.connect('amqp://guest:guest@localhost')
        connection.add_close_callback(close_callback)

        channel = await connection.channel()
        q = await channel.declare_queue(queue_name, durable=True)
        await channel.set_qos(prefetch_count=3)
        tag = await q.consume(callback)

        await asyncio.sleep(1.2)
        logging.info('Starting to close')
    finally:
        if q and tag:
            await q.cancel(tag)
            logging.info('Queue consume canceled')

        if channel:
            await channel.close()
            logging.info('Channel closed')

        await asyncio.sleep(2)
        logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)

        if connection:
            logging.info('Before connection.close()')
            await connection.close()
            logging.info('After connection.close()')


if __name__ == '__main__':
    asyncio.run(consumer())

Output:

   51.30 -              asyncio -    DEBUG - Using selector: EpollSelector
   56.96 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   57.17 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   58.63 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
  194.32 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
 1396.74 -                 root -     INFO - Starting to close
 1398.72 -                 root -     INFO - Queue consume canceled
--- __CLOSER <Channel: "1">
--- _on_close ISSUED <Channel: "1">
--- _on_close FINISHED <Channel: "1">
--- _cancel_tasks ISSUED <Channel: "1">
--- Callback finished
--- Callback finished
--- Callback finished
--- _cancel_tasks FINISHED <Channel: "1">
 1402.86 -                 root -     INFO - Channel closed
--- __CLOSER <Connection: "amqp://guest:******@localhost">
--- _on_close ISSUED <Connection: "amqp://guest:******@localhost">
--- _cancel_tasks ISSUED <Connection: "amqp://guest:******@localhost">
 1406.87 -                 root -  WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
 1407.04 -  aio_pika.connection -    DEBUG - Closing AMQP connection None
 1407.40 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 385, in __reader
    return await self.close(
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/base.py", line 154, in close
    await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
 3404.40 -                 root -     INFO - After 2 seconds: Connection.is_closed == False 
 3404.56 -                 root -     INFO - Before connection.close()
 3404.63 -                 root -     INFO - After connection.close()

P.S. I added custom prints to __closer in aiormq:

    async def __closer(self, exc):
        print("--- __CLOSER", repr(self))
        if self.is_closed:  # pragma: no cover
            return

        with suppress(Exception):
            print('--- _on_close ISSUED', repr(self))
            await self._on_close(exc)
            print('--- _on_close FINISHED', repr(self))

        with suppress(Exception):
            print('--- _cancel_tasks ISSUED', repr(self))
            await self._cancel_tasks(exc)
            print('--- _cancel_tasks FINISHED', repr(self))

P.S Connection.is_closed equals False in log 2 seconds after closing callback fired

@ghost
Copy link

ghost commented Jan 31, 2020

I'm getting this exact issue also. It would be good to have some clarity over this functionality.

@ghost
Copy link

ghost commented Jan 31, 2020

I had a look at the commit history to try and understand why that is the case.

It looks as though one day he fixed it by cancelling tasks before closing the channel: 4ee42e2#diff-03ab1aa1a76f0fbf4c092ce774a54d08

However the next commit on the same day looks to revert that change as part of another change: ed55461#diff-03ab1aa1a76f0fbf4c092ce774a54d08

Based on the name of the commit message, I'd suspect he accidentally committed his old files and overrode his initial change.

Is this correct?

@mosquito
Copy link
Owner

Let's see to the aiormq.base._cancel_tasks method. It's cancelling all tasks for FutureStore. So, you are totally right, it's useless to send frames for closed channel, but actually you doesn't send anything, because channels instance has no writer for write frames.

AMQP is totally asynchronous, and the receiving Channel.Close frame is terminal situation for the channel. It means you must stop to send anything for this channel, but you doesn't know the future state of user code. Thus last chance to handle this situation is sends Channel.CloseOk frame as soon as possible and translate channel instance to the closed state. Thus channels methods will be raising exceptions.

But I might be misunderstood your problem 🤔.

If you have a solution (or any proposal) for this problem, don't keep silence.

@moznuy
Copy link
Author

moznuy commented Feb 15, 2020

The issue for me(and the reason for writing) is that this happens:

1406.87 -                 root -  WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'

because we tried to send something after channel has been closed.
Suppose we have multiple channels that listen to some queues like this:

async def callback(message: IncomingMessage):
    async with message.process():
        await asyncio.sleep(0.1)

and then we issue await queue.cancel(tag) and await channel.close() on one channel.
And this causes whole connection to close (because of error on AMQP level).

Consider this example:

import asyncio  
import logging  
import sys  
from typing import Optional  
  
import aio_pika  
from aio_pika import IncomingMessage  
  
logging.basicConfig(  
    format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s',  
  level=logging.DEBUG,  
  stream=sys.stdout  
)  
N = 10  
RABBIT_URL = 'amqp://guest:guest@localhost'  
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]  
  
  
def callback_wrapper(queue_name: str):  
    async def callback(message: IncomingMessage):  
        try:  
            async with message.process():  
                await asyncio.sleep(0.1)  
        finally:  
            print('--- Callback finished queue:', queue_name)  
  
    return callback  
  
  
def close_callback(reason):  
    logging.warning('CONNECTION CLOSED %s', str(reason))  
  
  
async def consumer(connection: aio_pika.Connection, i: int, queue_name: str):  
    channel: Optional[aio_pika.channel.Channel] = None  
  tag: Optional[str] = None  
  queue: Optional[aio_pika.Queue] = None  
  
 try:  
        channel = await connection.channel()  
        queue = await channel.declare_queue(queue_name)  
        await channel.set_qos(prefetch_count=3)  
        tag = await queue.consume(callback_wrapper(queue_name))  
  
        await asyncio.sleep(3600 if i > 0 else 2)  
    except:  
        logging.exception('Consumer exception on queue: %s', queue_name)  
    finally:  
        if queue and tag:  
            logging.info('Before canceling %s', queue_name)  
            await queue.cancel(tag)  
            logging.info('Queue %s consume canceled', queue_name)  
  
        if channel:  
            await channel.close()  
            logging.info('Channel closed (Queue: %s)', queue_name)  
  
  
async def main():  
    connection: Optional[aio_pika.Connection] = None  
  
 try:  
        connection = await aio_pika.connect('amqp://guest:guest@localhost')  
        connection.add_close_callback(close_callback)  
  
        tasks = [  
            asyncio.create_task(consumer(connection, i, queue_name))  
            for i, queue_name in enumerate(QUEUE_NAMES)  
        ]  
        await asyncio.gather(*tasks, return_exceptions=True)  
    finally:  
        await asyncio.sleep(2)  
        logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)  
  
        if connection:  
            await connection.close()  
            logging.info('Connection closed')  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

Queue filler code for testing:

import asyncio  
from typing import Awaitable, Callable  
  
import aio_pika  
  
N = 10  
RABBIT_URL = 'amqp://guest:guest@localhost'  
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]  
  
  
async def purge(connection: aio_pika.Connection, queue_name: str):  
    channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)  
    queue: aio_pika.Queue = await channel.declare_queue(queue_name)  
    await queue.purge()  
  
  
async def push(connection: aio_pika.Connection, queue_name: str):  
    channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)  
    await channel.declare_queue(queue_name)  
  
    for i in range(100000):  
        await channel.default_exchange.publish(aio_pika.Message(  
            body=f'Q:{queue_name} Message: {i}'.encode()),  
  routing_key=queue_name)  
  
  
async def main(fill=True):  
    connection: aio_pika.Connection = await aio_pika.connect(RABBIT_URL)  
  
    action: Callable[[aio_pika.Connection, str], Awaitable] = push if fill else purge  
  
    tasks = [asyncio.create_task(action(connection, queue_name)) for queue_name in QUEUE_NAMES]  
    await asyncio.gather(*tasks, return_exceptions=True)  
    await connection.close()  
  
if __name__ == '__main__':  
    asyncio.run(main(fill=True))

Output that I get:

/home/lamar/.envs/issue_aiormq/bin/python /home/lamar/PycharmProjects/issue_aiormq/test.py
   68.57 -              asyncio -    DEBUG - Using selector: EpollSelector
   74.49 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   74.67 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   74.83 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   74.89 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   75.01 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   75.07 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   75.18 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   75.25 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   75.35 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   75.41 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   76.31 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   76.39 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   76.52 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   76.59 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   76.69 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   76.75 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   76.88 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   76.94 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   77.05 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   77.10 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   81.54 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue0): auto_delete=False, durable=None, exclusive=False, arguments=None>
   81.77 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue3): auto_delete=False, durable=None, exclusive=False, arguments=None>
   81.97 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue4): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.07 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue1): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.17 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue5): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.27 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue6): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.35 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue2): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.83 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue7): auto_delete=False, durable=None, exclusive=False, arguments=None>
   82.95 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue8): auto_delete=False, durable=None, exclusive=False, arguments=None>
   84.06 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue9): auto_delete=False, durable=None, exclusive=False, arguments=None>
  141.61 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue0): auto_delete=False, durable=None, exclusive=False, arguments=None>
  147.72 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue1): auto_delete=False, durable=None, exclusive=False, arguments=None>
  197.64 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue8): auto_delete=False, durable=None, exclusive=False, arguments=None>
  207.51 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue2): auto_delete=False, durable=None, exclusive=False, arguments=None>
  227.28 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue9): auto_delete=False, durable=None, exclusive=False, arguments=None>
  232.13 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue5): auto_delete=False, durable=None, exclusive=False, arguments=None>
  232.26 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue3): auto_delete=False, durable=None, exclusive=False, arguments=None>
  232.34 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue6): auto_delete=False, durable=None, exclusive=False, arguments=None>
  232.68 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue4): auto_delete=False, durable=None, exclusive=False, arguments=None>
  236.56 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue7): auto_delete=False, durable=None, exclusive=False, arguments=None>
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue3
...
<It goes for 2 seconds>
...
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
 2146.89 -                 root -     INFO - Before canceling test_queue0
 2147.87 -                 root -     INFO - Queue test_queue0 consume canceled
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
 2150.29 -                 root -     INFO - Channel closed (Queue: test_queue0)
 2153.73 -              asyncio -  WARNING - socket.send() raised exception.
 2153.89 -              asyncio -  WARNING - socket.send() raised exception.
 2154.02 -              asyncio -  WARNING - socket.send() raised exception.
 2154.14 -              asyncio -  WARNING - socket.send() raised exception.
 2154.25 -              asyncio -  WARNING - socket.send() raised exception.
 2154.36 -              asyncio -  WARNING - socket.send() raised exception.
 2154.74 -              asyncio -  WARNING - socket.send() raised exception.
 2154.85 -              asyncio -  WARNING - socket.send() raised exception.
 2154.95 -              asyncio -  WARNING - socket.send() raised exception.
 2155.06 -              asyncio -  WARNING - socket.send() raised exception.
 2155.17 -              asyncio -  WARNING - socket.send() raised exception.
 2155.31 -              asyncio -  WARNING - socket.send() raised exception.
 2155.47 -              asyncio -  WARNING - socket.send() raised exception.
 2155.58 -              asyncio -  WARNING - socket.send() raised exception.
 2155.68 -              asyncio -  WARNING - socket.send() raised exception.
 2155.79 -              asyncio -  WARNING - socket.send() raised exception.
 2155.89 -              asyncio -  WARNING - socket.send() raised exception.
 2156.00 -              asyncio -  WARNING - socket.send() raised exception.
 2156.10 -              asyncio -  WARNING - socket.send() raised exception.
 2156.21 -              asyncio -  WARNING - socket.send() raised exception.
 2156.31 -                 root -  WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
 2156.36 -  aio_pika.connection -    DEBUG - Closing AMQP connection None
 2156.49 -              asyncio -  WARNING - socket.send() raised exception.
 2156.61 -              asyncio -  WARNING - socket.send() raised exception.
 2156.73 -              asyncio -  WARNING - socket.send() raised exception.
 2157.30 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/home/lamar/.envs/issue_aiormq/lib/python3.8/site-packages/aiormq/connection.py", line 385, in __reader
    return await self.close(
  File "/home/lamar/.envs/issue_aiormq/lib/python3.8/site-packages/aiormq/base.py", line 156, in close
    await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9

<It stopes outputing here completly>

Program is still running because of await asyncio.sleep(3600 if i > 0 else 2) but other channels do not receive anything because connection is already closed:

No.     Time           Source                Destination           Protocol Length Info
   1443 18.289208205   127.0.0.1             127.0.0.1             AMQP     118    Basic.Cancel 
   1444 18.289475910   127.0.0.1             127.0.0.1             AMQP     117    Basic.Cancel-Ok 
   1445 18.289862357   127.0.0.1             127.0.0.1             AMQP     85     Channel.Close reply= 
   1446 18.290321621   127.0.0.1             127.0.0.1             AMQP     78     Channel.Close-Ok 
   1447 18.290765956   127.0.0.1             127.0.0.1             AMQP     87     Basic.Reject 
   1448 18.290827337   127.0.0.1             127.0.0.1             AMQP     87     Basic.Reject 
   1449 18.290845098   127.0.0.1             127.0.0.1             TCP      66     5672 → 42554 [ACK] Seq=102188 Ack=13864 Win=65536 Len=0 TSval=2578187892 TSecr=2578187892
   1450 18.290929211   127.0.0.1             127.0.0.1             AMQP     87     Basic.Reject 
   1451 18.292394659   127.0.0.1             127.0.0.1             AMQP     124    Connection.Close reply=CHANNEL_ERROR - expected 'channel.open' 
   1452 18.292680238   127.0.0.1             127.0.0.1             AMQP     78     Connection.Close-Ok 
   1453 18.292863989   127.0.0.1             127.0.0.1             TCP      66     42554 → 5672 [FIN, ACK] Seq=13897 Ack=102246 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894
   1454 18.292885056   127.0.0.1             127.0.0.1             TCP      66     5672 → 42554 [FIN, ACK] Seq=102246 Ack=13898 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894
   1455 18.292895202   127.0.0.1             127.0.0.1             TCP      66     42554 → 5672 [ACK] Seq=13898 Ack=102247 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894

Than I tried to reverse

with suppress(Exception):  
    await self._on_close(exc)  
  
with suppress(Exception):  
    await self._cancel_tasks(exc)

in aiormq.base.__closer
Output is quite similar, except the part:

  1. No WARNING - socket.send() raised exception.
  2. No WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
  3. No DEBUG - Reader task cancelled: <exception>
  4. Other channels continued to receive data and spit it out.

Than I send SIGINT, some KeyboardInterrupt, Cancellation Exceptions were thrown and program hanged, but that is not related to this issues and is related to mosquito/aio-pika#253

So my reasoning is that trying to write to closed channel should not effect connection or other channels in this drastic manner. Maybe reversing solves this issue completely or one can prevent (in the library code) sending anything after channel.close has been issued.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants