Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addr::send does not respect Mailbox size #515

Open
thalesfragoso opened this issue Nov 4, 2021 · 2 comments
Open

Addr::send does not respect Mailbox size #515

thalesfragoso opened this issue Nov 4, 2021 · 2 comments
Labels
A-actix Crate: actix C-bug Category: bug

Comments

@thalesfragoso
Copy link
Member

thalesfragoso commented Nov 4, 2021

Addr::send will actually check if the Mailbox is full, but the future created by it uses a clone of AddressSender, and due to how AddressSender::clone is implemented, the newly cloned AddressSender will always bypass the Mailbox size check, which will cause MsgRequest to push to the queue on the first poll, independent if the queue is full or not.

AddressSender::clone:

return AddressSender {
inner: Arc::clone(&self.inner),
sender_task: Arc::new(Mutex::new(SenderTask::new())),
maybe_parked: Arc::new(AtomicBool::new(false)),
};

Note that it uses an Arc for maybe_parked, but instead of cloning the existing Arc it creates a new one. That Arc seems to never be used.

However, here comes the catch, the docs of channel seem to imply that this is the intended behavior, i.e. clone an AddressSender and you can bypass the queue by one message, but it's counter-intuitive that Addr::send would behave like that, the docs specifically mention a bounded channel.

I think we should change this behavior somehow or update the docs of Addr::send to reflect the current behavior.

Expected Behavior

The future returned by Addr::send (MsgRequest) should hold on to the item if the queue is full.

Current Behavior

MsgRequest always pushes the message to the Mailbox on the first poll, which might flood the actor and block ContextFut indefinitely, given that it tries to process all mailbox messages on a single poll.

Possible Solution

MsgRequest shouldn't have this power to always bypass queue line, we can change the behavior of AddressSender::clone or use another thing to implement MsgRequest. We also need to fix MsgRequest's Future implementation, since it returns Poll::Pending without registering a waker:

if let Some((sender, msg)) = this.info.take() {
match sender.send(msg) {
Ok(rx) => *this.rx = Some(rx),
Err(SendError::Full(msg)) => {
*this.info = Some((sender, msg));
return Poll::Pending;
}
Err(SendError::Closed(_)) => return Poll::Ready(Err(MailboxError::Closed)),
}
}

Today this isn't a problem, since it bypasses the queue which never returns SendError::Full.

Steps to Reproduce (for bugs)

The following example will panic due to:

assert!(n_polls < 256u16, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");

/// [dependencies]
/// actix = { version = "0.12.0", features = ["mailbox_assert"] }
/// actix-rt = "2.3.0"
/// tokio = { version = "1.13.0", default-features = false, features = ["signal"] }
/// futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] }

use actix::{Actor, Context, Handler, Message};
use futures_util::stream::{FuturesUnordered, StreamExt};

struct MyMsg;

impl Message for MyMsg {
    type Result = ();
}

struct MyActor;

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl Handler<MyMsg> for MyActor {
    type Result = ();

    fn handle(&mut self, _msg: MyMsg, _ctx: &mut Self::Context) -> Self::Result {}
}

#[actix_rt::main]
async fn main() {
    let addr = MyActor.start();
    let mut futs = FuturesUnordered::new();

    for _ in 0..300 {
        let request = addr.send(MyMsg);
        futs.push(request);
    }

    actix_rt::spawn(async move {
        while futs.next().await.is_some() {}
        println!("Done");
    });

    tokio::signal::ctrl_c().await.unwrap();
}

Context

The wanted solution is to be able to have a future that will await for the Mailbox to have enough space for the message before enqueuing, which in turn will allow me send several messages without being afraid of blocking ContextFut::poll for too long.

Your Environment

  • Rust Version (I.e, output of rustc -V): 1.55.0
  • Actix Version: 0.12.0
@fakeshadow
Copy link
Contributor

An easy solution would be use an unbounded channel with an async semaphore. For example:

pub struct Channel {
    chan: UnboundedChannel<(Option<Permit>, u8)>,
    permits: Semaphore,
}

impl Channel {
    pub fn do_send(&self, num: u8) {
        let permit = self.permits.try_acquire(1);
        self.chan.do_send((permit, num))
    }    
    
    pub async fn send(&self, num: u8) {
        let permit = self.permits.acquire(1).await;
        self.chan.do_send((Some(permit), num))
    }    
}

This would have some additional cost compared to seal the logic inside homebrew channel impl but in return we can use mature and popular async channel crates that are well maintained.
That said this external channel crate approach would need to figure out the WeakSender part too.

@thalesfragoso
Copy link
Member Author

I would also want to know if we want to continue with the current behavior of Addr, where if you clone it you get a free spot on the queue (as described in channel).

IMO it seems a bit error prone, and it's way worse in MsgRequest, because you can blow up the queue with just a single Addr, which kinda makes Addr::send behave like Addr::do_send.

I like the idea of using popular async channel crates, but I would also like to propose a less "radical" solution:

  • Modify AddressSender::clone to use the maybe_parked Arc of the original instance, i.e. use Arc::clone instead of Arc::new. AddressSender created by other methods would still have the old behavior, e.g. ctx.address() actually calls AddressSenderProducer::sender.
  • Modify AddressSender::send (breaking) or create a new method (non-breaking) to receive a &mut std::task::Context, that way MsgRequest can register its waker.

We will probably need to change a few other things, but I think that this route could also solve this problem, I don't claim it's the best route, but at least it isn't "incisive".

@Sytten Sytten added C-bug Category: bug A-actix Crate: actix labels May 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-actix Crate: actix C-bug Category: bug
Projects
None yet
Development

No branches or pull requests

3 participants