Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair handling of async sub-tasks in ContextFut #520

Open
bmuddha opened this issue Feb 2, 2022 · 0 comments
Open

Fair handling of async sub-tasks in ContextFut #520

bmuddha opened this issue Feb 2, 2022 · 0 comments
Labels

Comments

@bmuddha
Copy link

bmuddha commented Feb 2, 2022

In implementation of Future trait for actix::contextimpl::ContextFut there's a section which concerns with mailbox processing:

// process mailbox
this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
if !this.wait.is_empty() && !this.stopping() {
continue;
}
// process items

the implementation itself is OK, except for the cases when mailbox is constantly being flooded with new messages:

pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_>) {
#[cfg(feature = "mailbox_assert")]
let mut n_polls = 0u16;
while !ctx.waiting() {
match Pin::new(&mut self.msgs).poll_next(task) {
Poll::Ready(Some(mut msg)) => {
msg.handle(act, ctx);
#[cfg(feature = "mailbox_assert")]
{
n_polls += 1;
// Maximum number of consecutive polls in a loop is 256.
assert!(n_polls < 256u16, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");
}
}
Poll::Ready(None) | Poll::Pending => return,
}
}

The poll method of Mailbox<A> gets forever stuck in the while loop as AddressReceiver::poll_next will keep producing values (message consumption rate is equal or lower than message production), and as such ContextFut won't be able to make progress on other sub-tasks (including Actor termination, if one is requested)

Expected Behavior

All sub-tasks should have a chance to make progress in one iteration of loop even if mailbox always has new messages to process

Current Behavior

<ContextFut as Future>::poll gets stuck at mailbox processing if message production rate is higher than consumption

Possible Solution

Maybe, introduction of an extra optional parameter, to limit the number of consecutively processed mailbox messages, could've helped, like

    pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_>,  max_consecutive:  Option<usize>) {
        let mut n_polls = 0u16;

        while !ctx.waiting() {
            match Pin::new(&mut self.msgs).poll_next(task) {
                Poll::Ready(Some(mut msg)) => {
                    msg.handle(act, ctx);
                    n_polls += 1;
                    // Maximum number of consecutive polls in a loop is passed as parameter.
                    match max_consecutive {
                          Some(max) if n_polls >= max => return,
                          _ => continue,
                    }
                    
                }
                Poll::Ready(None) | Poll::Pending => return,
            }
        }
    }

Steps to Reproduce

  1. Create a consumer Actor
  2. Create multiple producer Actors
  3. Send messages from producers to consumer (might as well use try_send)
  4. Make sure that production is higher than consumption
  5. Try to terminate consumer within message handling after some time, if mailbox is always at its full capacity, the consumer Actor will never be able to terminate

Context

I was using actix-web-actors, for websocket connection management, and the task was to forward some data from pubsub service to clients via websocket connection. Websocket connection manager is implemented as separate Actor, which gets messages from other Actors in system and forwards them to clients. In case if the message production rate is higher than the websocket connection manager is able to consume them, its mailbox gets full and stays full, as message production never stops (no back-pressure). At the meantime messages get handled by websocket connection actor and are added to queue in WebsocketContext::messages in order to be sent to client:
https://github.com/actix/actix-web/blob/fc5ecdc30bb1b964b512686bff3eaf83b7271cf5/actix-web-actors/src/ws.rs#L370-L377
but as mailbox processing never stops, the VecDeque grows without bounds, as there's no opportunity to empty it and send queued messages to client. Eventually as one might guess the application crashes after running out of memory.

  • Rust Version (I.e, output of rustc -V): 1.56.1
  • Actix Version: 0.12.0
  • Actix-web-actors: 4.0.0-beta.10
@robjtede robjtede added C-bug Category: bug help wanted labels Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants