-
Notifications
You must be signed in to change notification settings - Fork 643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use of actor reference on Async Actor handle #308
Comments
refs: actix#308 Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
refs: actix#308 Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
refs: #308 Signed-off-by: Jonathas-Conceicao <jonathas.conceicao@ossystems.com.br>
I think this is happening because there are no explicit lifetime constraints between the actor and the future, so it automatically gets a I'm not sure if the former can be fixed by tweaking The latter however can be adjusted by using an explicit struct MyActor {
label: String,
}
impl MyActor {
fn reply(&self) -> impl Future<Output = Result<String, ()>> /* + 'static */ {
let label = self.label.clone();
async { Ok(label) }
}
} |
Yes, this is what I have been able to understand from compiler messages and experimenting.
I belve I have have been doing so far in the project I'm using Actix 0.9 is to clone the actor itself and wrap the method call. Since all my actors can be cloned and I currently don't need to update internal state itself based on the message handles this is valid approach for this use case. The code goes something to the likes of: impl Handler<Msg> for MyActor {
type Result = ResponseActFuture<Self, Result<()>>;
fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
let this = self.clone();
Box::new(actix::fut::wrap_future(this.async_handle_msg(msg)))
}
}
impl MyActor {
async fn async_handle_msg(self, msg: Msg) -> Result<()> {
todo!("Impl some handling here")
}
} |
The core problem that needs to be addressed is that while the future waits, other messages can be sent to the actor. So if they actor is currently borrowed by an
|
I don't think it possible. As @idanarye pointed out, the use of actor reference on async asctor handle leads to multiple mutable reference to actor, which is never allowed in design. I want to add that the lifetime of a future is static and the reference to self is never a static variable. What this lifetime really means is that a future have to take ownership of all variables inside a async block. This limitation cause the annoying pattern that all variable have to be clone outside a futures block in the futures 0.1 days. So, when a variable is used in multiple //futures 0.1
let non_copy_var = String::from("some text");
let non_copy_var1 = non_copy_var.clone();
let non_copy_var2 = non_copy_var.clone();
futures::future::ok(())
.map(move |_| {
println!("First usage of non_copy_var: {}", non_copy_var1);
()
})
.map(move |_| {
println!("Second usage of non_copy_var: {}", non_copy_var2);
()
}); In futures 0.3 today, this problem is alleviated that we only need to move variable once. But we still need to move variable into async code block, which can never happened when using self reference. For this reason, I don't think it possible to use mutable self reference in an async block. But you can move the required information into async code block and at last update data using The following code have passed the rust compiler. use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "Result<usize, ()>")]
struct Msg;
struct MyActor(usize);
impl Actor for MyActor {
type Context = Context<Self>;
}
impl Handler<Msg> for MyActor {
type Result = ResponseActFuture<Self, Result<usize, ()>>;
fn handle(&mut self, _msg: Msg, _: &mut Context<Self>) -> Self::Result {
let result = self.reply();
Box::new(
async move { result }
.into_actor(self)
.map(move |res, act, ctx| {
// update self this way
act.0 = res.unwrap();
Ok(act.0)
}),
)
}
}
impl MyActor {
fn reply(&mut self) -> Result<usize, ()> {
Ok(42)
}
} |
Would it be an option to try to implement that all actors, on creation, are an |
The system itself already ensures that only one message will be handled our polled at a time. What we need is a way to convince compiler of that. Meanwhile another workaround I've adopted to be able to update internal state without having to clone or mess old code too much was to use a raw (unsafe ) pointer deref to access it's async methods. So I have something like this: impl Handler<Msg> for MyActor{
type Result = ResponseFuture<Response>;
fn handle(&mut self, msg: Msg, _: &mut Context<Self>) -> Self::Result {
let this: *mut MyActor = &mut self;
Box::pin(async move {
unsafe {
let response = (*this).handle(msg).await;
}
})
}
} |
I think a raw pointer dereference can cause data race. Consider two long and complicate async method that edit fields of the same actor. use actix::clock::delay_for;
use std::time::Duration;
struct Foo(usize);
impl Foo{
async fn handle(&mut self) -> Result<usize, ()> {
self.0 = 30;
if self.0>0{
delay_for(Duration::from_secs(10));
self.0 -= 1;
}
Ok(self.0)
}
} When two handle is running, it is certainly a buggy disaster. |
@guoli-lyu yes, this could indeed lead to data race. But other than this, and cloning, I don't see any other way for one to use actors internal state on a async message handling. |
Also worth pointing out that the data race is not caused by the unsafe deref itself, considere the following handle: impl Handler<Msg> for Foo {
type Result = ResponseActFuture<Self, Result<usize, ()>>;
fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
println!("Received");
self.0 = 30;
Box::new(
async {}
.into_actor(self)
.then(|_, this: &mut Self, _| delay_for(Duration::from_secs(10)).into_actor(this))
.map(|_, this, _| {
this.0 -= 1;
Ok(this.0)
}),
)
}
} If |
As Actix's RunTime is single threaded, the write operations on the actor's internal state should not lead to any memory hazard, but it still lead to some unexpected behavior if one is not careful. |
Maybe it's worthwhile to add this possible data race in documentation when modification on the actor's internal state in async code. Well, this data race seems irrelavant to actor reference on async actor handle. My point is, the What about rewrite |
One option (based on a now-deleted Reddit comment) is to do something like this: fn handle(&mut self, msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
Box::new(move async {
let first_step_result = do_something_with_msg(msg).await?;
let mid_result_with_actor = ctx.with_actor(|this, _ctx) {
this.do_some_computation_with_(first_step_result)
}).await?;
final_computation(mi_result_with_actor).await
})
} The idea is that
|
About the with_actor methodThe pub fn with_actor<F>(&mut self, f: F)
where
F: Fn(&mut A, &mut Self) + 'static,
{
self.spawn(Box::new(
crate::fut::wrap_future(async {}).map(move |_, act, ctx| f(act, ctx)),
));
} I've put it on a fork: actix/topic/with_actor. About the race conditionOn a second note, now about the race condition. The problem happens because the system might pool on other futures while some computation on the fn handle<R: ResponseChannel<M>>(self, ctx: &mut A::Context, tx: Option<R>) {
ctx.wait(self.then(move |res, this, _| {
if let Some(tx) = tx {
tx.send(res);
}
async {}.into_actor(this)
}));
} I have have pushed such modification to another branch on my fork: actix/topic/avoid_handle_data_race. |
I just noticed that making Spawning it is not very helpful, not (just) because we can't get a result but because we cannot Can't we make a future out of it? I've managed to get this to work (see the full code in https://gist.github.com/idanarye/46cf23613d1a789c7ebf219a5225e86a) fn handle(&mut self, _msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
let addr = ctx.address();
let original_number = self.num;
Box::new(async move {
let current_number = addr.send(with_actor(|this: &mut Self, _| {
this.num
})).await.unwrap().unwrap();
println!("Current number is {}", current_number);
assert_eq!(current_number, original_number);
addr.send(Increment).await.unwrap();
addr.send(Increment).await.unwrap();
addr.send(Increment).await.unwrap();
let current_number = addr.send(Increment).await.unwrap();
println!("Number incremented to {}", current_number);
assert_eq!(current_number, original_number + 4);
{
let original_number = original_number;
addr.send(with_actor(move |this: &mut Self, _| {
this.num *= 2;
println!("Multiplying by 2 to {}", this.num);
assert_eq!(this.num, (original_number + 4) * 2);
})).await.unwrap().unwrap();
}
let current_number = addr.send(Increment).await.unwrap();
println!("Number incremented to {}", current_number);
assert_eq!(current_number, (original_number + 4) * 2 + 1);
Ok(())
}.into_actor(self))
} (I tried to make
Now I just have to Ugly, and probably not very efficient, but works and can maybe give you (or someone else who's familiar with the internals of Actix) some inspiration for a better way to do this? |
Hey everyone, just wanted to chime in here. Obviously, there has already been a lot of discussion along these lines, but I think something which actix needs right now is the following signature for message handlers: impl Handler<Msg> for Foo {
type Result = ...;
async fn handle(&mut self, _: Msg, ...) -> Self::Result {
//..
}
} The major change here is the
in such cases, you stand to run into problems. In other contexts, this would be referred to as a "critical section", and exclusion is guaranteed by the actor ... as long as you don't yield in that critical section. So, I don't think we should attempt to "re-solve" that problem. It is the case today, and it needs to be in the future as well, because that is what enables concurrent processing in general. So, all in all, if we can migrate the actix code base to embrace async/await at its core, and just help to teach users about concurrency and the fact that concurrent (not parallel) mutation is possible, and needs to be in order to support high-throughput, then we should be in good shape. We can teach users how to craft concurrent algorithms which embrace these facts and how to work and succeed within these constraints. As an example, I wrote the actix-raft crate using acitx, and there are plenty of cases where concurrent mutation takes place in the various actors, and these conditions need to be checked and accounted for. I know there are complexities with the context system and other bits like that. However, actix haveing async/await support at its core would be such a game changer! Worth some design discussion and such, IMHO. Thoughts? Cheers! |
Essentially, the |
Futures' ability to hold references was not changed - they could hold a reference before, and it still counts as a borrow: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=89a6b843b87d04ca269f713f47184b6d |
Also - Rust doesn't have async trait methods yet, and creates like async-trait are merely rewriting them to return a pinned |
@idanarye re async-trait: true, true. I'm wondering if that would be viable for now. As far as references, I should have been more clear with what I said. I was more referring to the limitations around With async/await of today (thanks Pin) we can use |
@thedodd
The second problem does get solved by using |
Hmm, I never said it was. My point has clearly been missed, but that's ok. Your response about I agree with your statements about |
@idanarye what if we use a signature like the following instead ( impl Handler<Msg> for Foo {
type Result = ...;
async fn handle(&self, _: Msg, ...) -> Self::Result {
//..
}
} This would mitigate issue Actix itself isn't 1.0 yet, so perhaps this would be a big win if we are able to implement a pattern like this. Thoughts? |
To be sure, in case it isn't clear from the above, doing so would allow us to remove the
@Jonathas-Conceicao @lucab @fMeow @Malanche @JohnTitor (pinging folks here based on comments on this thread |
Won't this kind of defeat the whole purpose of having an actor system? |
Would it? I'm not sure. I'm definitely just spitballing on design ideas and such.
Perhaps the fact that mutations would require synchronization is a major deterrent. I'll have to think it over a bit more. However, the Thoughts? |
I too beilive that having a async method on the Change the handle function to use a immutable reference seams like a step back to me. As @thedodd has said, |
Hi all! I implemented an unsafe PoC, what has the following signatures: #[async_trait(?Send)]
pub trait AsyncHandler<Msg>
where
Self: Actor,
Msg: Message,
{
type Result: MessageResponse<Self, Msg>;
async fn handle(&mut self, msg: Msg, ctx: &mut Self::Context) -> Self::Result;
}
#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
type Result = Result<()>;
async fn handle(&mut self, _msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
Ok(())
}
}
impl_handler!(MyMessage, MyActor); |
@balsa02 What happens if I have something like this: struct MyActor {
data: std::cell::HashMap<Foo, Bar>,
}
#[async_trait(?Send)]
impl AsyncHandler<MyMessage> for MyActor {
type Result = Result<()>;
async fn handle(&mut self, msg: MyMessage, _ctx: &mut <Self as Actor>::Context) -> Self::Result {
let mut bar = self.data.entry(&msg.foo).or_default();
*bar = bar.something_something().await?;
Ok(())
}
} and I send two The first message will create some entry, run (I don't really know if |
Based on a quick look through the code, it looks like per actor:
So, @idanarye it doesn't look like there will be a problem in your above code example. Also, please correct me if I am wrong. My statement above is just based on tracing through the code based on sending a message and then following the process of execution, which boils down to some code in the |
You can do this with one mutable reference too. let mut bar1 = self.data.entry(&msg1.foo).or_default();
let mut bar2 = self.data.entry(&msg2.foo).or_default();
*bar1 = bar1.something_something(); If a struct would be vulnerable with 2 single threaded mutable reference, it would be with 1 too. |
@balsa02 No you can't - the borrow checker will not allow that. |
@thedodd |
@idanarye agreed. It forces the actor context to execute only the |
@thedodd Meaning it doesn't really solve the race I described. |
They don't block the thread, they just don't allow any of the other futures which are part of the given context to be polled until all of the wait futures are polled to completion (there may be MANY other actors running concurrently on the same runtime). Once all of the wait futures are polled to completion, then the other futures are polled to completion. **However, your example above does not even have a race condition at all. Not without further information on what your Either way, such issues exist in the current implementation of actix, regardless of Also, I'm sorry if it seems like I am sparring. I don't intend to. This has been a good discussion, I am going to go experiment with |
It doesn't really matter what
How? The code you linked to is just picking futures and polling them - unless some of these futures are racing, that's not a race condition. |
Sorry. You are right, I take my words back. |
I have basically implemented what you are discussing here, with one twist. Take Carl Hewitt's word for it[1], an actor only processes one message at a time. That is for good reasons. Linear logic flow. While you have mutable access to something, nothing else is mutating it from underneath of you. Take my advice, stick to that. Even if you could make it work somehow (eg. the compiler accepts it), you would be creating a wild variety of footguns in terms of logical errors. I have yet to see any problem for which a design that requires an actor to process several messages concurrently and have mutable state is not an antipattern. If someone wants to challenge, describe the simplest problem for which you think this is needed and I will make you an implementation while only processing one message at a time. Once you let go of the anti pattern, it's trivial (async fn handle I mean). I'm working on an actor library: thespis (interface, reference implementation). It's not ready for publication yet, so don't expect nice guide level docs or API stability just yet, but if you want to see the implementation, have a look. there is examples, tests and the whole thing is like 600 SLOC instead of actix 5000 (that is not counting actix_rt). The mailbox is a simple [1] When he says factorial can process many messages at a time, that's because factorial doesn't have mutable state. Actix supports that through sync actors, albeit making these actors always run in a dedicated thread is probably mixing concerns. You can also do it by creating an actor in front of a pool of actors and use load balancing like logic to dispatch to them. These can still share mutable state if they are synchronizing properly (eg. locks, atomics etc). |
One idea I had been thinking about lately was to make the context implementation handle two separated kind of 'async' code, which we already have as I'm also not against the idea of making all the futures handled by an Actor atomic. Right now I have created the pr #357 (still pending some review atm) which offers a non-breaking way of handling messages atomically if the user so desires. |
As intuitive as this rule is, it is not automatically true. There are many examples where more complex code composed of more commands is actually faster:
But your solution is not even simpler. Sure, it makes the framework implementation simper, but that's only because it pushes that complexity to user code. If I want to send a message from actor And of course - all these messages become considerably more complex because I can't use the closures of the futures to pass data. So now Simplicity at its finest. |
Sure, it's not always true, but if people are proposing this because they are worried about hypothetical throughput performance because sometimes an actor has to wait for another before processing the next message, note that more complexity in the framework might well offset that performance in the first place, because that price you pay it everywhere for the rare case this might hypothetically not be an anti-pattern and you end up using it. If anything speaks for this I think the fact that this allows cyclic dependencies while avoiding deadlocks is a much better argument, but I think the price is still to high (solving deadlocks with this actually increases the potential for footguns) and I hope we find a better way to avoid deadlocks.
This is the essence of it. Yes, if you need mutable access to Foo's state when the result comes back, then keep it from processing other messages, lest there be dragons. If you don't need mutable state, eg. you just need to read some field, clone it, send it in an async task and spawn that, so you don't need to block the actor while it processes. If it's completely unrelated to the state before, but it does need mutable access, spawn a task, send However the original conception of the actor model would have you send your address along if the other side had to respond to you, and you process the response at some point later. By leveraging the rust async model to be able to do a request->response type operation and awaiting the response (for ergonomics, less boilerplate) we now get the deadlock problem. The original model never deadlocks. By introducing non-atomic processing on top of that, we just go further down the rabbit hole in my opinion... There is many ways to skin a cat, all with some trade offs, but non-atomic message handling with mutable state should really be at the very bottom of your list here, and I would say an actor framework shouldn't try to support it. Of course, I'm not developing nor using actix, so I don't care that much, but I just wanted to warn you that I think you're making a mistake. Think seriously about this. |
My try on this matter. Below is a minimal working actix actor pattern with async/await friendly I imagine. Move away from leaf futures for most of the implementation.
Add
context would process incoming messages in order. If multiple messages passed by |
I'm trying implement some asynchronous actor response that uses the Actor's self reference on actix v0.9 but I can't get it to work on the account of some lifetime bounds error. I've separated a small example that contains the error:
But this gives me the following error:
I've tried to implement this using the
ActorFuture
and thethem
combinator but I get similar errors, this is what I tried:The problem seams to be that I can only have static references for the future I return, but since the future will be handled by the Actor itself shouldn't it be able to use it's own reference?
Are there examples for asynchronously handling messages? I have only been able to find the doc example on
ActorFuture
, witch is even a little bit outdated; but it's a bit of a different context since that doesn't use a reference to the actor on the futures it chains.The text was updated successfully, but these errors were encountered: