Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use of actor reference on Async Actor handle #308

Open
Jonathas-Conceicao opened this issue Jan 8, 2020 · 42 comments
Open

Use of actor reference on Async Actor handle #308

Jonathas-Conceicao opened this issue Jan 8, 2020 · 42 comments

Comments

@Jonathas-Conceicao
Copy link
Member

I'm trying implement some asynchronous actor response that uses the Actor's self reference on actix v0.9 but I can't get it to work on the account of some lifetime bounds error. I've separated a small example that contains the error:

use actix::prelude::*;

#[derive(Message)]
#[rtype(result = "Result<usize, ()>")]
struct Msg;

struct MyActor;

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl Handler<Msg> for MyActor {
    type Result = ResponseActFuture<Self, Result<usize, ()>>;

    fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
        Box::new(self.reply().into_actor(self))
    }
}

impl MyActor {
    async fn reply(&self) -> Result<usize, ()> {
        Ok(42)
    }
}

But this gives me the following error:

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
  --> src/main.rs:17:23
   |
17 |         Box::new(self.reply().into_actor(self))
   |                       ^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 16:5...
  --> src/main.rs:16:5
   |
16 | /     fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
17 | |         Box::new(self.reply().into_actor(self))
18 | |     }
   | |_____^
note: ...so that reference does not outlive borrowed content
  --> src/main.rs:17:18
   |
17 |         Box::new(self.reply().into_actor(self))
   |                  ^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
  --> src/main.rs:17:9
   |
17 |         Box::new(self.reply().into_actor(self))
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   = note: expected  `std::boxed::Box<(dyn actix::fut::ActorFuture<Actor = MyActor, Output = std::result::Result<usize, ()>> + 'static)>`
              found  `std::boxed::Box<dyn actix::fut::ActorFuture<Actor = MyActor, Output = std::result::Result<usize, ()>>>`

I've tried to implement this using the ActorFuture and the them combinator but I get similar errors, this is what I tried:

    fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
        Box::new(
            async {}
                .into_actor(self)
                .then(|_, act, _| actix::fut::wrap_future(act.reply())),
        )
    }

The problem seams to be that I can only have static references for the future I return, but since the future will be handled by the Actor itself shouldn't it be able to use it's own reference?

Are there examples for asynchronously handling messages? I have only been able to find the doc example on ActorFuture, witch is even a little bit outdated; but it's a bit of a different context since that doesn't use a reference to the actor on the futures it chains.

Jonathas-Conceicao pushed a commit to OSSystems/actix that referenced this issue Jan 8, 2020
refs: actix#308

Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
Jonathas-Conceicao pushed a commit to OSSystems/actix that referenced this issue Jan 8, 2020
refs: actix#308

Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
JohnTitor pushed a commit that referenced this issue Jan 11, 2020
refs: #308

Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
@lucab
Copy link

lucab commented Jan 15, 2020

I think this is happening because there are no explicit lifetime constraints between the actor and the future, so it automatically gets a 'static one. On the other hand, the async fn(&self) automatically gets a shorter lifetime bound to self based on its signature.

I'm not sure if the former can be fixed by tweaking actix types with explicit lifetimes.

The latter however can be adjusted by using an explicit impl Future instead of async fn. For example:

struct MyActor {
    label: String,
}

impl MyActor {
    fn reply(&self) -> impl Future<Output = Result<String, ()>> /* + 'static */ {
        let label = self.label.clone();
        async { Ok(label) }
    }
}

@Jonathas-Conceicao
Copy link
Member Author

Jonathas-Conceicao commented Jan 15, 2020

I think this is happening because there are no explicit lifetime constraints between the actor and the future

Yes, this is what I have been able to understand from compiler messages and experimenting.

I'm not sure if the former can be fixed by tweaking actix types with explicit lifetimes.

I belve ActorFuture, or maybe WrapFuture, should have some sort of lifetime parameter to indicate that the Future can have a lifetime that associated to the Actor and it's Context, this way I belive it would be possible to use methods on actors without the need of cloning. I have been trying to tweak definitions on actix::fut to indicate such lifetime parameters but without any real success so far.

I have have been doing so far in the project I'm using Actix 0.9 is to clone the actor itself and wrap the method call. Since all my actors can be cloned and I currently don't need to update internal state itself based on the message handles this is valid approach for this use case. The code goes something to the likes of:

impl Handler<Msg> for MyActor {
    type Result = ResponseActFuture<Self, Result<()>>;

    fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
        let this = self.clone();
        Box::new(actix::fut::wrap_future(this.async_handle_msg(msg)))
    }
}

impl MyActor {
    async fn async_handle_msg(self, msg: Msg) -> Result<()> {
        todo!("Impl some handling here")
    }
}

@idanarye
Copy link
Contributor

The core problem that needs to be addressed is that while the future waits, other messages can be sent to the actor. So if they actor is currently borrowed by an async block or by a Future returned from async function, these other messages cannot mutably borrow the actor.

ActorFuture solved this by not keeping the future borrowed while the future waits - it gets reborrowed every time the future is awoken. Doing something like this with async-await is far from trivial...

@fMeow
Copy link
Member

fMeow commented Feb 1, 2020

I don't think it possible. As @idanarye pointed out, the use of actor reference on async asctor handle leads to multiple mutable reference to actor, which is never allowed in design.

I want to add that the lifetime of a future is static and the reference to self is never a static variable. What this lifetime really means is that a future have to take ownership of all variables inside a async block. This limitation cause the annoying pattern that all variable have to be clone outside a futures block in the futures 0.1 days. So, when a variable is used in multiple map function, we have to clone this variable for every map function.

//futures 0.1
let non_copy_var = String::from("some text");
let non_copy_var1 = non_copy_var.clone();
let non_copy_var2 = non_copy_var.clone();

futures::future::ok(())
   .map(move |_| {
       println!("First usage of non_copy_var: {}", non_copy_var1);
       ()
    })
    .map(move |_| {
       println!("Second usage of non_copy_var: {}", non_copy_var2);
       ()
    });

In futures 0.3 today, this problem is alleviated that we only need to move variable once. But we still need to move variable into async code block, which can never happened when using self reference.

For this reason, I don't think it possible to use mutable self reference in an async block. But you can move the required information into async code block and at last update data using act in the map function of FutureWrap(fut.into_actor(self).map(move |res, act, ctx| {})).

The following code have passed the rust compiler.

    use actix::prelude::*;

    #[derive(Message)]
    #[rtype(result = "Result<usize, ()>")]
    struct Msg;

    struct MyActor(usize);

    impl Actor for MyActor {
        type Context = Context<Self>;
    }

    impl Handler<Msg> for MyActor {
        type Result = ResponseActFuture<Self, Result<usize, ()>>;

        fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
            let result = self.reply();
            Box::new(
                async move { result }
                    .into_actor(self)
                    .map(move |res, act, ctx| {
                        // update self this way
                        act.0 = res.unwrap();
                        Ok(act.0)
                    }),
            )
        }
    }

    impl MyActor {
        fn reply(&mut self) -> Result<usize, ()> {
            Ok(42)
        }
    }

@Malanche
Copy link

Malanche commented Feb 1, 2020

Would it be an option to try to implement that all actors, on creation, are an Arc<Mut<T>> to the actor? So that we could have a handle function that passes the locked actor, and an async handle that passes the Arc<Mut<T>>? We would still need to find a way to prevent double lock on the same thread (probably not detected by the compiler)... And I don't know how big a hit in performance this would be.

@Jonathas-Conceicao
Copy link
Member Author

Would it be an option to try to implement that all actors, on creation, are an Arc<Mut> to the actor? So that we could have a handle function that passes the locked actor, and an async handle that passes the Arc<Mut>?

The system itself already ensures that only one message will be handled our polled at a time. What we need is a way to convince compiler of that.

Meanwhile another workaround I've adopted to be able to update internal state without having to clone or mess old code too much was to use a raw (unsafe ) pointer deref to access it's async methods. So I have something like this:

impl Handler<Msg> for MyActor{
    type Result = ResponseFuture<Response>;

    fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
        let this: *mut MyActor = &mut self;
        Box::pin(async move {
            unsafe {
                let response = (*this).handle(msg).await;
            }
        })
    }
}

@fMeow
Copy link
Member

fMeow commented Feb 2, 2020

I think a raw pointer dereference can cause data race.

Consider two long and complicate async method that edit fields of the same actor.

use actix::clock::delay_for;
use std::time::Duration;

struct Foo(usize);

impl Foo{
    async fn handle(&mut self) -> Result<usize, ()> {
        self.0 = 30;
        if self.0>0{
            delay_for(Duration::from_secs(10));
            self.0 -= 1;
        }
        Ok(self.0)
    }
}

When two handle is running, it is certainly a buggy disaster.

@Jonathas-Conceicao
Copy link
Member Author

@guoli-lyu yes, this could indeed lead to data race. But other than this, and cloning, I don't see any other way for one to use actors internal state on a async message handling.

@Jonathas-Conceicao
Copy link
Member Author

Also worth pointing out that the data race is not caused by the unsafe deref itself, considere the following handle:

impl Handler<Msg> for Foo {
    type Result = ResponseActFuture<Self, Result<usize, ()>>;

    fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
        println!("Received");
        self.0 = 30;
        Box::new(
            async {}
                .into_actor(self)
                .then(|_, this: &mut Self, _| delay_for(Duration::from_secs(10)).into_actor(this))
                .map(|_, this, _| {
                    this.0 -= 1;
                    Ok(this.0)
                }),
        )
    }
}

If Msg is sent twice to Foo, the response could be 28 instead of the expected 29. This happens because the system might handle the second message while the first has not yet resolved.

@Jonathas-Conceicao
Copy link
Member Author

As Actix's RunTime is single threaded, the write operations on the actor's internal state should not lead to any memory hazard, but it still lead to some unexpected behavior if one is not careful.

@fMeow
Copy link
Member

fMeow commented Feb 2, 2020

Maybe it's worthwhile to add this possible data race in documentation when modification on the actor's internal state in async code.

Well, this data race seems irrelavant to actor reference on async actor handle. My point is, the handle function in Handler is synchronous code, while async actor handle is async. Moving from sync code to async requires move semantic for all used variable, including self.

What about rewrite handle() function in Handler<T> trait to an async function? In this way, there is no need to switch between sync and async code block. Though I don't know if this is possible.

@idanarye
Copy link
Contributor

idanarye commented Feb 2, 2020

One option (based on a now-deleted Reddit comment) is to do something like this:

fn handle(&mut self, msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
    Box::new(move async {
        let first_step_result = do_something_with_msg(msg).await?;
        let mid_result_with_actor = ctx.with_actor(|this, _ctx) {
            this.do_some_computation_with_(first_step_result)
        }).await?;
        final_computation(mi_result_with_actor).await
    })
}

The idea is that ctx.with_actor() gets a closure that gets the actor data and context and runs a synchronous block with them. Because it's synchronous, it doesn't have to keep the borrowed actor data and context while awaiting - because it does not await.

ctx.with_actor() itself returns a future which gets awaken when the Actix runtime decides the actor can be awoken - which I assume, since we are single threaded, happens immediately? So maybe it can even be optimized away?

@Jonathas-Conceicao
Copy link
Member Author

About the with_actor method

The ctx.with_actoridea looks ok to use, but I don't know how viable it is to implement to the current system. Context by itself wouldn't be able to execute the provided future because it doesn't hold a actor's reference, and sending it as a future to the be polled later wouldn't allow us to get a result out of it on with_actor returns. If with_actor would return just (), then I think it could be implemented as a method on Context to the likes of:

    pub fn with_actor<F>(&mut self, f: F)
    where
        F: Fn(&mut A, &mut Self) + 'static,
    {
        self.spawn(Box::new(
            crate::fut::wrap_future(async {}).map(move |_, act, ctx| f(act, ctx)),
        ));
    }

I've put it on a fork: actix/topic/with_actor.

About the race condition

On a second note, now about the race condition. The problem happens because the system might pool on other futures while some computation on the ResponseActFuture is pending. What if we enforce exclusivity when handling a ResponseActFuture? I think this could be achieved by instead of having it spawned into the context, have it sent as wait. So on MessageResponse implementation for ResponseActFuture we would have:

    fn handle<R: ResponseChannel<M>>(self, ctx: &mut A::Context, tx: Option<R>) {
        ctx.wait(self.then(move |res, this, _| {
            if let Some(tx) = tx {
                tx.send(res);
            }
            async {}.into_actor(this)
        }));
    }

I have have pushed such modification to another branch on my fork: actix/topic/avoid_handle_data_race.
I'm not sure this doesn't cause any block on the system, but I do intend to test it further when I have some more time.

@idanarye
Copy link
Contributor

idanarye commented Feb 3, 2020

I just noticed that making with_actor a method of Context would mean we need to move the context to the async block, but ctx is also mutably borrowed... Maybe it can be a method of the address, or some other object that's constructed from the context but does not depend on it? Maybe even a free function or a static method, and always borrow the actor of the future that awaits on it?

Spawning it is not very helpful, not (just) because we can't get a result but because we cannot await for it to complete - so we can't use it to control the flow.

Can't we make a future out of it? I've managed to get this to work (see the full code in https://gist.github.com/idanarye/46cf23613d1a789c7ebf219a5225e86a)

    fn handle(&mut self, _msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
        let addr = ctx.address();
        let original_number = self.num;
        Box::new(async move {

            let current_number = addr.send(with_actor(|this: &mut Self, _| {
                this.num
            })).await.unwrap().unwrap();
            println!("Current number is {}", current_number);
            assert_eq!(current_number, original_number);

            addr.send(Increment).await.unwrap();
            addr.send(Increment).await.unwrap();
            addr.send(Increment).await.unwrap();
            let current_number = addr.send(Increment).await.unwrap();
            println!("Number incremented to {}", current_number);
            assert_eq!(current_number, original_number + 4);

            {
                let original_number = original_number;
                addr.send(with_actor(move |this: &mut Self, _| {
                    this.num *= 2;
                    println!("Multiplying by 2 to {}", this.num);
                    assert_eq!(this.num, (original_number + 4) * 2);
                })).await.unwrap().unwrap();
            }

            let current_number = addr.send(Increment).await.unwrap();
            println!("Number incremented to {}", current_number);
            assert_eq!(current_number, (original_number + 4) * 2 + 1);

            Ok(())
        }.into_actor(self))
    }

(I tried to make with_actor accept the addr, but couldn't get it to build. This is just a PoC anyway)

with_actor is a helper function to create an instance of WithHelper - a message that contains an FnOnce that borrows the actor data and context and returns some value. I made my actor handle it by invoking the function and returning it's result as the message result.

Now I just have to move an Addr into the async block, and whenver I need to access the actor's state I just send it a WithHelper with a function that modifies it and/or reads data from it, and await the result of that function.

Ugly, and probably not very efficient, but works and can maybe give you (or someone else who's familiar with the internals of Actix) some inspiration for a better way to do this?

@thedodd
Copy link

thedodd commented Feb 20, 2020

Hey everyone, just wanted to chime in here. Obviously, there has already been a lot of discussion along these lines, but I think something which actix needs right now is the following signature for message handlers:

impl Handler<Msg> for Foo {
    type Result = ...;

    async fn handle(&mut self, _: Msg, ...) -> Self::Result {
      //..
    }
}

The major change here is the async fn handle. Now, obviously there has been a lot of discussion above about concurrent (not parallel) mutations of the actor state when a yield point is hit. TO BE SURE this is already the case in actix. If you have an actor which handles multiple concurrent messages, and the handling logic is something like the following:

1. mutate internal state
2. yield (due to network request or the like)
3. use state again

in such cases, you stand to run into problems. In other contexts, this would be referred to as a "critical section", and exclusion is guaranteed by the actor ... as long as you don't yield in that critical section.

So, I don't think we should attempt to "re-solve" that problem. It is the case today, and it needs to be in the future as well, because that is what enables concurrent processing in general.

So, all in all, if we can migrate the actix code base to embrace async/await at its core, and just help to teach users about concurrency and the fact that concurrent (not parallel) mutation is possible, and needs to be in order to support high-throughput, then we should be in good shape. We can teach users how to craft concurrent algorithms which embrace these facts and how to work and succeed within these constraints.

As an example, I wrote the actix-raft crate using acitx, and there are plenty of cases where concurrent mutation takes place in the various actors, and these conditions need to be checked and accounted for.

I know there are complexities with the context system and other bits like that. However, actix haveing async/await support at its core would be such a game changer! Worth some design discussion and such, IMHO. Thoughts?

Cheers!

@thedodd
Copy link

thedodd commented Feb 20, 2020

Essentially, the ActorFuture was a way to be able to gain access to the actor from within a futures chain. Before async/await was around, that was difficult. Now that we have async/await and futures can hold references, we get that easily. What are the other things which ActorFuture is even needed for?

@idanarye
Copy link
Contributor

Now that we have async/await and futures can hold references

Futures' ability to hold references was not changed - they could hold a reference before, and it still counts as a borrow: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=89a6b843b87d04ca269f713f47184b6d

@idanarye
Copy link
Contributor

Also - Rust doesn't have async trait methods yet, and creates like async-trait are merely rewriting them to return a pinned async block.

@thedodd
Copy link

thedodd commented Feb 20, 2020

@idanarye re async-trait: true, true. I'm wondering if that would be viable for now.

As far as references, I should have been more clear with what I said. I was more referring to the limitations around #[await] back in the day. Building the state machine with & or &mut was not allowed back in the day before pinning was around. You had to use smart pointers.

With async/await of today (thanks Pin) we can use & and &mut in async fn declarations and the like. And in this case that directly applies to the &mut self of the actor in the async fn handle(..) mentioned above.

@idanarye
Copy link
Contributor

@thedodd #[await] was never stable and stable actix was using callbacks - which do support passing references in the closure. ActorFuture was still necessary because:

  1. The closure - and therefore the future that contains it - would then borrow the actor, and it would not be usable again until the future is resolved.
  2. If you need to chain callbacks, only one of them can borrow the actor.

The second problem does get solved by using async - though one could argue it could be solved by nesting the callbacks instead of chaining them. The first problem is not solved by async, and would not be solved even if we had async traits.

@thedodd
Copy link

thedodd commented Feb 20, 2020

#[await] was never stable and stable

Hmm, I never said it was. My point has clearly been missed, but that's ok. Your response about ActorFuture is pertinent, so I'll focus on that.

I agree with your statements about 2.. As far as 1., I'll put together a gist of my thoughts for further discussion.

@thedodd
Copy link

thedodd commented Feb 20, 2020

@idanarye what if we use a signature like the following instead (&self instead of &mut self) :

impl Handler<Msg> for Foo {
    type Result = ...;

    async fn handle(&self, _: Msg, ...) -> Self::Result {
      //..
    }
}

This would mitigate issue 1. as discussed above. We gain ergonomics. Mutations would have to be done by way of interior mutability (tokio has a nice RwLock). I'm not too concerned about that last point because that is what one needs to do in other areas of the Actix stack. Eg, in actix-web, when setting up an HttpServer, if one needs to mutate some shared state, interior mutability needs to be used.

Actix itself isn't 1.0 yet, so perhaps this would be a big win if we are able to implement a pattern like this. Thoughts?

@thedodd
Copy link

thedodd commented Feb 20, 2020

To be sure, in case it isn't clear from the above, doing so would allow us to remove the ActorFuture construct.

  • every message handler would only take a shared ref to the actor (so any number of these can be leased out by the compiler).
  • mutability would be shifted over to the responsibility of the caller via interior mutability. There is plenty of support code for this already available in the ecosystem (tokio::sync::RwLock &c).
  • this may quite likely help to improve the overall throughput of the Actix system, as nothing requires exclusive refs to the actor, allowing for more tasks to be processed concurrently.
  • should work beautifully with a new handler signature like async fn handle(&self, ....

@Jonathas-Conceicao @lucab @fMeow @Malanche @JohnTitor (pinging folks here based on comments on this thread :))

@idanarye
Copy link
Contributor

Won't this kind of defeat the whole purpose of having an actor system?

@thedodd
Copy link

thedodd commented Feb 21, 2020

Would it? I'm not sure. I'm definitely just spitballing on design ideas and such.

  • state & functionality would still be encapsulated within each actor.
  • functionality would still be driven via message passing.
  • mutation would still be possible only from within the actor itself, as it is today. This would just require an added step of let state = self.state.write().await? (or the like) before being able to mutate.

Perhaps the fact that mutations would require synchronization is a major deterrent. I'll have to think it over a bit more. However, the ActorFuture is in essence a synchronization primitive. It guarantees exclusive access to the actor. As such, it is semantically a Mutex (reading & writing is excluded while the borrow [the lock in this case] is held). So perhaps this is tantamount to the same thing, but actually an improvement because full mutual exclusion wouldn't be required; users could use a RwLocks, which would allow for concurrent execution of messages per actor, only syncing when interior mutability is required by the user.

Thoughts?

@Jonathas-Conceicao
Copy link
Member Author

I too beilive that having a async method on the Handler trait would make things simpler, but I'm not sure how viable it would be now giving the current ecosystem.

Change the handle function to use a immutable reference seams like a step back to me. As @thedodd has said, ActorFuture does work like a synchronization primitive, removing it would require that users that do desire mutation in their Actor's internal state to implement synchronization themselves, and sharing state is one of the esses of the actor model.

@balsa02
Copy link

balsa02 commented Feb 24, 2020

Hi all!
I think the idea what Jonathas proposed before - controlled multiple mutable reference to the actor and ctx - is the ideal solution. This will bring the same concurency modell to the Actor as node.js have or the ActorFutures already has with it's combinators. I think if I must choose between the node.js model and interior mutability/locking then I will preferr the first one. To implement deadlocks is such easy (at least for me) and Rust will not protect you from it.
The futures 0.3 system has so weak combinators at Result/Future transition (compared to futures 0.1), and the async/await system with the ?/try! macro is so powerful, that the changeover to async/await can't be a question, only the directions.

I implemented an unsafe PoC, what has the following signatures:

#[async_trait(?Send)]
pub trait AsyncHandler<Msg>
where
    Self: Actor,
    Msg: Message,
{
    type Result: MessageResponse<Self, Msg>;
    async fn handle(&mut self, msg: Msg, ctx: &mut Self::Context) -> Self::Result;
}

#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
    type Result = Result<()>;

     async fn handle(&mut self, _msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
        Ok(())
    }
}
impl_handler!(MyMessage, MyActor);

@idanarye
Copy link
Contributor

@balsa02 What happens if I have something like this:

struct MyActor {
    data: std::cell::HashMap<Foo, Bar>,
}

#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
    type Result = Result<()>;

    async fn handle(&mut self, msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
        let mut bar = self.data.entry(&msg.foo).or_default();
        *bar = bar.something_something().await?;
        Ok(())
    }
}

and I send two MyMessages to the same MyActor? Wouldn't it invalidate the hashmap?

The first message will create some entry, run something_something(), and await on it. Then the second message will create another entry - which can potentially cause a rehash. When the first message return from the await - it'll store the data in the wrong place, because it doesn't know there was a rehash. HashMap does not handle this scenario because its impossible to do this in safe Rust.

(I don't really know if HashMap works like that - but it's allowed to, and even if it doesn't surely many other things do rely on these safety guarantees)

@thedodd
Copy link

thedodd commented Feb 24, 2020

Based on a quick look through the code, it looks like per actor:

  • all wait futures are polled to completion first.
  • then the mailbox is drained.
  • then any other non-wait futures are polled to completion, one at a time. If one of those futures spawns a new wait future then that will be driven to completion first.

So, @idanarye it doesn't look like there will be a problem in your above code example. Also, please correct me if I am wrong. My statement above is just based on tracing through the code based on sending a message and then following the process of execution, which boils down to some code in the contextimpl.rs module, and the impl<A, C> Future for ContextFut<A, C> bit.

@balsa02
Copy link

balsa02 commented Feb 24, 2020

[...] does not handle this scenario because its impossible to do this in safe Rust.

You can do this with one mutable reference too.

        let mut bar1 = self.data.entry(&msg1.foo).or_default();
        let mut bar2 = self.data.entry(&msg2.foo).or_default();
        *bar1 = bar1.something_something();

If a struct would be vulnerable with 2 single threaded mutable reference, it would be with 1 too.

@idanarye
Copy link
Contributor

@idanarye
Copy link
Contributor

@thedodd wait seems like a special case, not the norm, seeing how it blocks the thread and all.

@thedodd
Copy link

thedodd commented Feb 24, 2020

@idanarye agreed. It forces the actor context to execute only the wait futures. It is definitely not the norm. I've used actix quite a lot, and have never used the ctx.wait() method before :).

@idanarye
Copy link
Contributor

@thedodd Meaning it doesn't really solve the race I described.

@thedodd
Copy link

thedodd commented Feb 24, 2020

They don't block the thread, they just don't allow any of the other futures which are part of the given context to be polled until all of the wait futures are polled to completion (there may be MANY other actors running concurrently on the same runtime). Once all of the wait futures are polled to completion, then the other futures are polled to completion.

**However, your example above does not even have a race condition at all. Not without further information on what your bar.something_something() actually does.

Either way, such issues exist in the current implementation of actix, regardless of async fns being used or not. Here is the code: https://github.com/actix/actix/blob/master/src/contextimpl.rs#L379-L420

Also, I'm sorry if it seems like I am sparring. I don't intend to. This has been a good discussion, I am going to go experiment with async fn handle patterns to find the rough edges.

@idanarye
Copy link
Contributor

wait has to block the thread because it doesn't return a future. It doesn't return anything. This is not the same "blocking" as IO does though, so let's drop it. This is not really relevant.

However, your example above does not even have a race condition at all. Not without further information on what your bar.something_something() actually does.

It doesn't really matter what bar.something_something() does - we await on it meaning the executor is allowed to switch context. Maybe it doesn't have to, because the result is available immediately. Maybe bar.something_something() is sending an HTTP request to Pluto and the executor has to switch context to avoid blocking for 10 hours. But it can switch - meaning it is a race condition.

Either way, such issues exist in the current implementation of actix, regardless of async fns being used or not. Here is the code: https://github.com/actix/actix/blob/master/src/contextimpl.rs#L379-L420

How? The code you linked to is just picking futures and polling them - unless some of these futures are racing, that's not a race condition.

@balsa02
Copy link

balsa02 commented Feb 25, 2020

Sorry. You are right, I take my words back.
Then I opt for a with_actor like solution. Somehow we should realise the profit from the single threaded model to avoid locking.

@najamelan
Copy link
Contributor

I have basically implemented what you are discussing here, with one twist. Take Carl Hewitt's word for it[1], an actor only processes one message at a time. That is for good reasons. Linear logic flow. While you have mutable access to something, nothing else is mutating it from underneath of you. Take my advice, stick to that. Even if you could make it work somehow (eg. the compiler accepts it), you would be creating a wild variety of footguns in terms of logical errors.

I have yet to see any problem for which a design that requires an actor to process several messages concurrently and have mutable state is not an antipattern. If someone wants to challenge, describe the simplest problem for which you think this is needed and I will make you an implementation while only processing one message at a time.

Once you let go of the anti pattern, it's trivial (async fn handle I mean). I'm working on an actor library: thespis (interface, reference implementation). It's not ready for publication yet, so don't expect nice guide level docs or API stability just yet, but if you want to see the implementation, have a look. there is examples, tests and the whole thing is like 600 SLOC instead of actix 5000 (that is not counting actix_rt). The mailbox is a simple while msg.next().await loop. The mailbox owns the actor so it can give &mut self to the handle function. It awaits the returned future and processes the next message. Simple like a brick, it allows you to build a house, even a palace if you want. Less complexity, less code, less instructions -> faster.

[1] When he says factorial can process many messages at a time, that's because factorial doesn't have mutable state. Actix supports that through sync actors, albeit making these actors always run in a dedicated thread is probably mixing concerns. You can also do it by creating an actor in front of a pool of actors and use load balancing like logic to dispatch to them. These can still share mutable state if they are synchronizing properly (eg. locks, atomics etc).

@Jonathas-Conceicao
Copy link
Member Author

One idea I had been thinking about lately was to make the context implementation handle two separated kind of 'async' code, which we already have as Future and ActorFuture. And we could make so that only one ActorFuture would be polled at a time, and if it yields for some reason the Actor can try polling some of it's queued up Future. So futures that might access actor's mutable state would always have exclusive access over it, while other spawned futures would be free to be polled as they should interfere with the actors internal state. And the advantage I see over spawning a normal Future on the Actor instead of the Runtime itself is that the future would be bound to the Actor's lifetime if the Actor stops for some reason theses futures would be dropped, but I am not sure how common of an use case would be to have these normal Future running on the Actor instead of the Runtime.

I'm also not against the idea of making all the futures handled by an Actor atomic. Right now I have created the pr #357 (still pending some review atm) which offers a non-breaking way of handling messages atomically if the user so desires.

@idanarye
Copy link
Contributor

idanarye commented Mar 4, 2020

Less complexity, less code, less instructions -> faster.

As intuitive as this rule is, it is not automatically true. There are many examples where more complex code composed of more commands is actually faster:

  • SQL transactions.
  • SQL prepared statements instead of composing.
  • Doing IO in multiple threads instead of a single for loop.
  • Async IO.
  • Conditional jumps.
  • Quicksort is more complex than insertion sort.

But your solution is not even simpler. Sure, it makes the framework implementation simper, but that's only because it pushes that complexity to user code. If I want to send a message from actor Foo to actor Bar and do something with its result in Foo's context, I need two messages and two impl Handlers instead of one if I don't want to lock Foo while that Bar does its thing, because I need to make it send another message to Foo once It's done. If I want to send the same message to Bar from two different places in Foo I need to have four(!) messages instead of one.

And of course - all these messages become considerably more complex because I can't use the closures of the futures to pass data. So now Bar's handlers needs to know about Foo's handlers internal implementation details so they can pass them back to Foo in the response messages.

Simplicity at its finest.

@najamelan
Copy link
Contributor

najamelan commented Mar 4, 2020

As intuitive as this rule is, it is not automatically true.

Sure, it's not always true, but if people are proposing this because they are worried about hypothetical throughput performance because sometimes an actor has to wait for another before processing the next message, note that more complexity in the framework might well offset that performance in the first place, because that price you pay it everywhere for the rare case this might hypothetically not be an anti-pattern and you end up using it.

If anything speaks for this I think the fact that this allows cyclic dependencies while avoiding deadlocks is a much better argument, but I think the price is still to high (solving deadlocks with this actually increases the potential for footguns) and I hope we find a better way to avoid deadlocks.

do something with its result in Foo's context

This is the essence of it. Yes, if you need mutable access to Foo's state when the result comes back, then keep it from processing other messages, lest there be dragons. If you don't need mutable state, eg. you just need to read some field, clone it, send it in an async task and spawn that, so you don't need to block the actor while it processes.

If it's completely unrelated to the state before, but it does need mutable access, spawn a task, send foo_addr in there, await the response from Bar and then send a message to Foo from the spawned task. No need to change Bar.

However the original conception of the actor model would have you send your address along if the other side had to respond to you, and you process the response at some point later. By leveraging the rust async model to be able to do a request->response type operation and awaiting the response (for ergonomics, less boilerplate) we now get the deadlock problem. The original model never deadlocks. By introducing non-atomic processing on top of that, we just go further down the rabbit hole in my opinion...

There is many ways to skin a cat, all with some trade offs, but non-atomic message handling with mutable state should really be at the very bottom of your list here, and I would say an actor framework shouldn't try to support it. Of course, I'm not developing nor using actix, so I don't care that much, but I just wanted to warn you that I think you're making a mistake. Think seriously about this.

@fakeshadow
Copy link
Contributor

fakeshadow commented Nov 8, 2020

My try on this matter. Below is a minimal working actix actor pattern with async/await friendly I imagine.
https://gist.github.com/fakeshadow/2297324322c084752731f66a6fc5b704

Move away from leaf futures for most of the implementation.

Handler::handle would take &Actor and &Context<Actor> so we keep the ability for concurrent messages.

Add Handle::handle_wait which is basically the old handle method where &mut Actor and &mut Context<Actor> can be accessed in async block and it has exclusive access to actor and context. This method by default would fall back to Handle::handle method.

Addr<Actor>::send would call Handle::handle and Addr<Actor>::wait would call Handle::handle_wait.

context would process incoming messages in order. If multiple messages passed by Addr<Actor>::send are next to one aother it would process them concurrently. If a message is passed by Addr<Actor>::wait it would block the actor until it's resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants