Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueIterator raises StopAsyncIteration when iterator/channel is closed. #615

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Darsstar
Copy link
Contributor

@Darsstar Darsstar commented Jan 17, 2024

See #358

Currently QueueIterator never throws a StopAsyncIterator exception. Not when the channel is closed, and not after QueueIterator's close method is called.
Which implies that starting an async for message in queue.iterator(): loop will keep running "forever" even if no new message will ever arrive. ("forever" because the asyncio task can be canceled, etc.)

This PR fixes that in a backwards compatible way. Some tests are refactored to rely on this implicitly, and new ones that explicitly test QueueIterator.anext() throws certain exceptions have been added as well.

@coveralls
Copy link

coveralls commented Jan 17, 2024

Coverage Status

coverage: 92.045% (+3.9%) from 88.125%
when pulling 9abe45f on Darsstar:QueueIterator-raises-StopAsyncIterator
into 848c025 on mosquito:master.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 3 times, most recently from a5180c0 to 15ca5ee Compare January 17, 2024 01:00
@@ -201,8 +204,7 @@ async def ready(self) -> None:
def __del__(self) -> None:
if (
self.is_closed or
self.loop.is_closed() or
not hasattr(self, "connection")
Copy link
Contributor Author

@Darsstar Darsstar Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I overlooked some method how self could end up with a connection attribute I will undo this change.

@Darsstar Darsstar changed the title QueueIterator raised StopAsyncIteration when channel is closed. QueueIterator raises StopAsyncIteration when channel is closed. Jan 17, 2024
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 6 times, most recently from 9ea9416 to f12ecfb Compare January 17, 2024 15:25
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 2 times, most recently from a2e3647 to 5652d2d Compare January 20, 2024 09:52
@Darsstar Darsstar changed the title QueueIterator raises StopAsyncIteration when channel is closed. QueueIterator raises StopAsyncIteration when iterator/channel is closed. Jan 24, 2024
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from 5652d2d to 71fc125 Compare January 29, 2024 15:09
@Darsstar
Copy link
Contributor Author

@mosquito ping

@mosquito
Copy link
Owner

mosquito commented Mar 4, 2024

@Darsstar this request contains a lot of changes that could potentially breaks backward compatibility. I'm still thinking about how to test it so that I understand I need to make a separate major release, or make do with a minor one.

@Darsstar
Copy link
Contributor Author

Darsstar commented Mar 4, 2024

this request contains a lot of changes that could potentially breaks backward compatibility.

I assume you are refering to:

  • new abstract methods
  • some 'protected' properties changing type (bool ->> asyncio.Event)

9.4.0 already dropped Python 3.7. The changes taking advantage of 3.8, all contained in a single commit, were made in a way it should be backward compatible.

It probably should be a new major version due to the protected properties.

Although, I think those could be rewritten as @propertys with setters which notify a asyncio.Condition ...
Than I think it boils down to wether you are supporting people inheriting from the abstract base classes without inheriting from the concrete classes aio-pika provides.
If you do the major version should be bumped.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 4 times, most recently from 709ecc8 to 8155901 Compare March 4, 2024 20:50
@Darsstar
Copy link
Contributor Author

Darsstar commented Mar 5, 2024

I rebased, which added the 3.12 tests, and now tests/test_amqp_robust_proxy.py::test_channel_reconnect_stairway[[0] 0.1-128] (consistantly) fails only on 3.12, time for me to debug the issue...

Copy link
Owner

@mosquito mosquito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for not giving up on this improvement, I'm sorry that I don't have much time to devote to it right now. But I'm open to discussion.

if self._closed.is_set():
raise StopAsyncIteration

message = asyncio.create_task(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating task on each message is significantly slower. Task creation works fine when the coroutine do some long job or waiting network.

Here, as many as three tasks are created that must not only be queued for execution, but also canceled upon the occurrence of one of the events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of Python 3.11 passing coroutines to asyncio.wait() is forbidden. So I recuded it to one task per message, instead of three.

I don't think it is possible to remove that last task without effectively reimplementing asyncio.wait().

@Darsstar
Copy link
Contributor Author

Small update: I am currently under the impression this PR isn't at fault.

I branched from upstream/master and altered the reconnect_stairway test to start at 60 and go all the way up to 4096, instead of every power of 2 in the range [64, 4096]. The tests fail on all Python versions, so it seems to be some sort of race condition based on how tasks end up being scheduled...?
The larger the stair value, the less likely it is to fail.

I don't have a strong lead, but I started looking at aiomisc and aiormq as well. Hopefully I'll stumble on the cause in the next two weeks or so...

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from 6ec53f1 to 0d2ae5b Compare March 19, 2024 16:02
@Darsstar
Copy link
Contributor Author

The tests are passing again :)

Public backward incompatible API changes:

  • QueueIterator.close()` no longer accepts arguments

Internal backward incompatible API changes:

  • Connection._closed changed from being of type bool to type asyncio.Future
  • Channel._closed changed from being of type bool to type asyncio.Future

Those are all I found going over all the changes again.

My preference would be leaving it as is and reflecting that in the version number. Say the words and I will will turn Connection._closed and Channel._closed into properties and undo the QueueIterator.close() change.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from c5626ad to 9abe45f Compare April 16, 2024 13:12
@Darsstar Darsstar requested a review from mosquito April 18, 2024 07:36
@LockedThread
Copy link

Hey @Darsstar, I am coming from #623 and I have a question about how we should be handling the TimeoutError being raised from the application perspective? Recently, this exception has been throwing periodically in my production code and I am unsure of how I should be handling it... I thought the RobustConnection would handle this.

@LockedThread
Copy link

LockedThread commented Apr 25, 2024

I just re-read the issue and this PR may also be fixing a memory leak I have been facing for a while. I am going to pull this version down and see if it fixes it. Will report back.

@LockedThread
Copy link

LockedThread commented Apr 25, 2024

This PR also seems to fix my memory leak. @mosquito Do we have an ETA on when this could be merged? I am going to start using @Darsstar's version now.

@Darsstar
Copy link
Contributor Author

Darsstar commented Apr 26, 2024

Hey @Darsstar, I am coming from #623 and I have a question about how we should be handling the TimeoutError being raised from the application perspective? Recently, this exception has been throwing periodically in my production code and I am unsure of how I should be handling it... I thought the RobustConnection would handle this.

This branch should only throw TimeoutError if you pass a non-None value as the timeout keyword argument to Queue.iterator().
If you do, presumably you did so for a reason.

iterator = queue.iterator(timeout=60)
while True:
    try:
        message = await iterator
    except StopAsyncIteration:
        break
    except asyncio.TimeoutError:
        # do whatever you want to do
        continue
    # handle message here

Alternatively you could create a wrapper

class Wrapper:
    def __init__(self, iterator: AsyncIterator):
        self.iterator = iterator

    async def __anext__(self):
        while True
            try:
                return await self.iterator
            except StopAsyncIteration:
                # do whatever you want to do

async for message in Wrapper(queue.iterator(timeout=60)):
    # handle message here

@mosquito
Copy link
Owner

@LockedThread @Darsstar sorry, lots of changes, should planning to retest all these myself.

@LockedThread
Copy link

@LockedThread @Darsstar sorry, lots of changes, should planning to retest all these myself.

All good, we appreciate your diligent work in supporting this project.

@Darsstar
Copy link
Contributor Author

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:

before your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741
Average: 4.766

after your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128
Average: 5.313

@LockedThread
Copy link

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:


before your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741

Average: 4.766



after your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128

Average: 5.313

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

@Darsstar
Copy link
Contributor Author

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

@LockedThread
Copy link

LockedThread commented May 4, 2024

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

Hopefully PyO3 gets better support for async/await, while maintaining interoperability with asyncio. There is a lot of active work on this right now. Once that happens, using Rust will be feasible.

In the meantime, losing 10% in performance is worthwhile to make sure I dont get OOM kills.

@mosquito
Copy link
Owner

mosquito commented May 5, 2024

You should do a performance test with cProfile for example. The last time I did this, it marshall in pamqp the slowest one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants