Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed memory object stream sometimes dropping sent items #735

Merged
merged 8 commits into from
May 26, 2024

Conversation

agronholm
Copy link
Owner

@agronholm agronholm commented May 14, 2024

Changes

Fixes #728.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #123, the entry should look like this:

* Fix big bad boo-boo in task groups (#123 <https://github.com/agronholm/anyio/issues/123>_; PR by Yourname)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

If possible, use your real name in the changelog entry. If not, use your GitHub
username.

@agronholm agronholm requested a review from gschaffner May 14, 2024 18:21
Copy link
Collaborator

@Zac-HD Zac-HD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, modulo the single concern about a possible uncancel edge case.

I'm not sure whether that even can happen, but it seems worth checking. If it can, I'd probably just add a code comment + note in the docs and merge anyway.

Comment on lines 66 to 70
:return: a list of task info objects
:return: a sequence of task info objects
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this to be a general Sequence, let's also update the type annotation above and cast below.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first thought too, but:

  1. it is factually returning a list
  2. a Sequence (or even a MutableSequence) would narrow the return type, thus breaking backwards compatibility, albeit only from the type checker standpoint.

I see no ideal solution here, so I chose the least disruptive way forward. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just say "list" instead of "sequence" here in the docstring.

As a broader comment, I tend to treat type annotations as documentation rather than a stable API, and thus don't mind updating them in minor versions if that otherwise makes sense. But continuing to use list here seems best anyway.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the wording.

Comment on lines +209 to +210
while self._state.waiting_receivers:
receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that this can deadlock if the receiving task is cancelled, then send_nowait invoked (removing it from the waiting_receivers), and then uncancelled before the event loop actually kicks the receiving task out of await receive.

Of course this is a pretty nasty use of the already brittle uncancel semantics, but we probably want a comment here to make the possibility salient to future readers? Or maybe I'm wrong about this.

Copy link
Owner Author

@agronholm agronholm May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to create a situation where an uncancel task would get its foot in the door to prevent the delivery of the cancellation. Here's my best attempt so far:

import asyncio
from asyncio import CancelledError, current_task, get_running_loop

from anyio import create_memory_object_stream, create_task_group


async def receiver(receive, task_status):
    with receive:
        task_status.started(current_task())
        try:
            item = await receive.receive()
        except CancelledError:
            print("cancelled")
            raise

        print("got", item)


async def main():
    send, receive = create_memory_object_stream()
    async with create_task_group() as tg:
        task = await tg.start(receiver, receive)
        get_running_loop().call_soon(task.uncancel)
        task.cancel()
        await send.send(6)


asyncio.run(main())

Copy link
Owner Author

@agronholm agronholm May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I wasn't trying hard enough. I was able to repro the situation using just asyncio's task APIs:

import asyncio
from asyncio import CancelledError, create_task

from anyio import create_memory_object_stream


async def receiver(receive):
    with receive:
        try:
            item = await receive.receive()
        except CancelledError:
            print("cancelled")
            raise

        print("got", item)


async def uncanceller(task):
    task.uncancel()
    print("uncancelled, cancelling =", task.cancelling())


async def main():
    send, receive = create_memory_object_stream()
    task = create_task(receiver(receive))
    uncancel_task = create_task(uncanceller(task))
    task.cancel()
    await send.send(6)


asyncio.run(main())

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that too, because the task hadn't even started and that's why the sender was blocking.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, here's my third attempt which seems to confirm that the cancellation is scheduled ahead of everything else:

import asyncio
from asyncio import CancelledError, create_task, sleep

from anyio import create_memory_object_stream


async def receiver(receive):
    with receive:
        print("waiting for item")
        try:
            item = await receive.receive()
        except CancelledError:
            print("cancelled")
        else:
            print("got", item)


async def uncanceller(task):
    task.uncancel()
    print("uncancelled, cancelling =", task.cancelling())


async def main():
    send, receive = create_memory_object_stream(1)
    receive_task = create_task(receiver(receive))
    await sleep(0)
    uncancel_task = create_task(uncanceller(receive_task))
    receive_task.cancel()
    await send.send(6)
    await uncancel_task
    await receive_task


asyncio.run(main())

Output:

waiting for item
cancelled
uncancelled, cancelling = 0
Traceback (most recent call last):
...

Interestingly, if I cancel the receiver task and uncancel it right after without yielding to another task, I get different results based on the Python version. On 3.13.0b1, the task is scheduled normally and receives the item:

import asyncio
from asyncio import CancelledError, create_task, sleep

from anyio import create_memory_object_stream


async def receiver(receive):
    with receive:
        print("waiting for item")
        try:
            item = await receive.receive()
        except CancelledError:
            print("cancelled")
        else:
            print("got", item)


async def main():
    send, receive = create_memory_object_stream()
    receive_task = create_task(receiver(receive))
    await sleep(0)
    receive_task.cancel()
    receive_task.uncancel()
    await send.send(6)
    await receive_task


asyncio.run(main())

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is the effect of Guido's recent cancellation PR.

Comment on lines +346 to +348
cancel_scope.cancel()
send.send_nowait("item")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from comment above: uncancelling that scope is my maybe-deadlock.

@agronholm
Copy link
Owner Author

I've now added an asyncio-only test that ensures that a native cancellation won't cause an item to be dropped. I've verified that the test fails on current master.

@agronholm
Copy link
Owner Author

Huh, now it's failing on py < 3.11. I need to investigate.

@agronholm agronholm requested a review from Zac-HD May 26, 2024 10:08
Copy link
Collaborator

@Zac-HD Zac-HD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native-cancellation handling looks good. I'm looking forward to using this!

Comment on lines 66 to 70
:return: a list of task info objects
:return: a sequence of task info objects
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just say "list" instead of "sequence" here in the docstring.

As a broader comment, I tend to treat type annotations as documentation rather than a stable API, and thus don't mind updating them in minor versions if that otherwise makes sense. But continuing to use list here seems best anyway.

@agronholm agronholm merged commit e7f750b into master May 26, 2024
16 checks passed
@agronholm agronholm deleted the fix-memobjectstream branch May 26, 2024 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MemoryObjectStream can drop items when the receiving end is cancelled
2 participants