Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aio_pika_client@1.6.0: Exception from close() after connect() #91

Open
bernhardkaindl opened this issue Oct 5, 2022 · 3 comments
Open
Assignees

Comments

@bernhardkaindl
Copy link
Contributor

bernhardkaindl commented Oct 5, 2022

import asyncio

from pjrpc.client.backend import aio_pika as pjrpc_client


async def main():
    client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
    await client.connect()
    await client.close()


if __name__ == "__main__":
    asyncio.run(main())

with 1.6.0, this results in:

  File "aio_pika_client-close-test.py", line 9, in main
    await client.close()
  File "/home/autonoma/flowhub/pjrpc/pjrpc/client/backend/aio_pika.py", line 86, in close
    assert self._result_queue is not None, "client is not initialized"
AssertionError: client is not initialized

self._result_queue is always none, unless the Client is costructed to use a fixed result_queue like this:

    kwargs["result_queue_name"] = "jsonrpc-responses"
    kwargs["result_queue_args"] = {
        "exclusive": False,
        "auto_delete": False,
        "durable": True,
        "arguments": None,
    }
    client = xjsonrpc_client.Client(
        URL("amqp://guest:guest@localhost:5672/"),
        "jsonrpc-requests",
        **kwargs,
    )

But for the majority of uses, a fixed result queue is not needed and a temporary result queue is used for each call (which is easier if you want to run more than one RPC client)

@bernhardkaindl
Copy link
Contributor Author

This assertion was added by #74 to fix a mypy warning:

assert self._result_queue is not None, "client is not initialized"

Context: git log -p -U11 |grep -C7 "+.*assert self._result_queue"

     async def close(self) -> None:
         """
         Closes current broker connection.
         """
 
+        assert self._channel is not None, "client is not initialized"
+        assert self._connection is not None, "client is not initialized"
+        assert self._result_queue is not None, "client is not initialized"
+
         if self._consumer_tag:
             await self._result_queue.cancel(self._consumer_tag)
             self._consumer_tag = None
 
         await self._channel.close()
         await self._connection.close()

@bernhardkaindl
Copy link
Contributor Author

bernhardkaindl commented Oct 5, 2022

Proposed fix:

-        assert self._result_queue is not None, "client is not initialized"

-         if self._consumer_tag:
+         if self._consumer_tag and self._result_queue:
             await self._result_queue.cancel(self._consumer_tag)
             self._consumer_tag = None
             
         await self._channel.close()
         await self._connection.close()             

Reason: self._result_queue is not initialized by default, only when a fixed result queue is used (see the description).

Also it should be possible to call close the a connection (self._clannel and self._connection), even when one of them is not set.

This could happen in some circumstances, e.g. when handling a KeyboardInterrupt or other terminating while a connection is attempted to be connected or closed and it would be better to handle this by closing whatever is open and finish any futures than to throw an AssertionError.
Thus I'll also threat these two cases without throwing an AssertionError. I've opened #92 with these proposed fixes.

bernhardkaindl pushed a commit that referenced this issue Oct 5, 2022
Issue #91 shows an AssertionError when a connected aio_pika client
which didn't need to open a result queue calls client.close().

It should be possible to call client.close() also in this case
to close the connection to the RabbitMQ server, because when
either the client was interrupted, had nothing to do before
closing, it, this is the normal state.

Also replace the other assertions in the aio_pika client's close()
with simple checks which don't abort the function to close any
other remaining connection elements or futures which are not
properly handled when simply aborting with an assertion.

For details and a test case, see issue #91.
bernhardkaindl pushed a commit that referenced this issue Oct 5, 2022
Issue #91 shows an AssertionError when a connected aio_pika client
which didn't need to open a result queue calls client.close().

It should be possible to call client.close() also in this case
to close the connection to the RabbitMQ server, because when
either the client was interrupted, had nothing to do before
closing, it, this is the normal state.

Also replace the other assertions in the aio_pika client's close()
with simple checks which don't abort the function to close any
other remaining connection elements or futures which are not
properly handled when simply aborting with an assertion.

For details and a test case, see issue #91.
@bernhardkaindl
Copy link
Contributor Author

@dapper91 ping

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants