Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling a notify_later() future sometimes fails, eventually delivering the cancelled message. #284

Open
vincentdephily opened this issue Sep 17, 2019 · 3 comments

Comments

@vincentdephily
Copy link

This is happening pretty reliably in my code, once enough messages are scheduled :

  • create a timeout handler using self.timeouts.insert(msgid, ctx.notify_later(MsgTimeout{msgid}, now + delay))
  • then ctx.cancel_future(self.timeouts.get(msgid).unwrap()) a few ms later when the reply arrives
  • but I still receive the MsgTimeout message a few secs later

There shouldn't be a race condition, as the timeout arrives many seconds after it was supposedly cancelled. I've triple-checked that I do indeed cancel the right spawnhandle. I've tried to create a reduced testcase that I can share, but didn't succeed. I don't think it's the same as issue #206 as there's no UDP socket involved (there is TCP though).

This is a problem both because of the unexpected timeout that leads to spurious handling, and because the Actor remains alive while waiting for the message.

@ousado
Copy link

ousado commented Sep 17, 2019

A testcase, even when it fails to demonstrate the problem, can still be useful to illustrate the overall setup.

@vincentdephily
Copy link
Author

Finally managed to reproduce with a reduced test-case, by bringing in a TCP connection. So it actually might be the same bug as #206, it's just not tied specificly to UDP. Maybe it's not even tied to a network connection, and having any kind of stream handling would be the same.

Run a local echo server on port 6142 with for example nc -l -p 6142 -e /bin/cat, and then cargo run the code below. You'll see an MsgTimeout being triggered after 500ms, despite the corresponding spawnhandle having been cancelled a couple of ms after having been scheduled.

Using Linux 64bit, actix v0.8.3, tokio 0.1.22, rustc 1.37.

//[package]
//name = "actix_undead_notify"
//version = "0.1.0"
//edition = "2018"
//
//[dependencies]
//actix = { version = "~0.8", default-features = false }
//time = "0.1.42"
//tokio = "0.1.22"
//bytes = "0.4.12"

use actix::{fut::wrap_future, prelude::*};
use bytes::BytesMut;
use std::{collections::BTreeMap,
          convert::TryInto,
          fmt::Display,
          io::Write,
          sync::atomic::{AtomicU64, Ordering},
          thread::sleep,
          time::{Duration, Instant}};
use tokio::{codec::{Decoder, FramedRead},
            io::{AsyncRead, WriteHalf},
            net::TcpStream};

static MSGID: AtomicU64 = AtomicU64::new(0);
/// Request sent to the server, including an id and a timeout. Server should just reply with the
/// same bytes. Use netcat as your server: `while nc -l -p 6142 -e /bin/cat; do sleep 1;done`.
#[derive(Debug, Message)]
struct MsgRequest(u64, u64);
impl MsgRequest {
    fn new() -> Self {
        MsgRequest(MSGID.fetch_add(1, Ordering::AcqRel), 500)
    }
}

/// Keep track of the pending requests by sending ourselves an MsgTimeout after a delay, and
/// cancelling the corresponding spawnahndle if we get the reply in time.
#[derive(Debug, Message)]
struct MsgTimeout(u64);

/// Inbound data type and associated codec.
#[derive(Debug, Message)]
struct MsgTcpIn(u64);
struct U64Codec;
impl Decoder for U64Codec {
    type Item = MsgTcpIn;
    type Error = std::io::Error;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 8 {
            return Ok(None);
        }
        Ok(Some(MsgTcpIn(u64::from_le_bytes(src.split_to(8).as_ref().try_into().unwrap()))))
    }
}

/// Forcibly shut down the system.
#[derive(Debug, Message)]
struct MsgShutdown;

/// Format time.
fn now() -> String {
    let t = time::now_utc();
    format!("{}.{:09.09}Z", t.strftime("%FT%T").unwrap(), t.tm_nsec)
}

/// Stop the system with a message.
fn system_stop(c: i32, s: impl Display) {
    if c == 0 {
        println!("{} Normal system stop", now());
        System::current().stop();
    } else {
        println!("{} !!!!!!!!!!!!!!!! system stop: {}", now(), s);
        System::current().stop_with_code(c);
    }
}

struct Client {
    /// Socket to server
    writer: Option<WriteHalf<TcpStream>>,
    /// Handles to pending MsgTimeouts
    pending: BTreeMap<u64, SpawnHandle>,
    /// Stop the system after a while
    watchdog: Addr<Watchdog>,
}
impl Client {
    fn new(watchdog: Addr<Watchdog>) -> Self {
        Self { writer: None, pending: BTreeMap::new(), watchdog }
    }
}
impl Actor for Client {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Context<Self>) {
        println!("{} client starting", now());
        let con = wrap_future(TcpStream::connect(&"127.0.0.1:6142".parse().unwrap())).map(
            |s: TcpStream, a: &mut Client, c| {
                println!("{} connected", now());
                let (r, w) = s.split();
                a.writer = Some(w);
                c.add_stream(FramedRead::new(r, U64Codec{}));
            },
        ).map_err(|e,_a,_c|{
            system_stop(1, format!("network error: {}", e))
        });
        ctx.spawn(con);
    }
    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        println!("{} client stopping {:?}", now(), self.pending);
        Running::Stop
    }
    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        println!("{} client stopped {:?}", now(), self.pending);
        if !self.pending.is_empty() {
            system_stop(1, "still have some reqs pending")
        }
    }
}
/// Send request {msg.0} to server and expect a reply within {msg.1} ms. If the connection isn't
/// ready yet, retry later.
impl Handler<MsgRequest> for Client {
    type Result = ();
    fn handle(&mut self, msg: MsgRequest, ctx: &mut Context<Self>) -> Self::Result {
        match &mut self.writer {
            Some(w) => {
                w.write_all(&msg.0.to_le_bytes()).expect("write");
                let h = ctx.notify_later(MsgTimeout(msg.0), Duration::from_millis(msg.1));
                self.pending.insert(msg.0, h);
                println!("{} {:?} sent {:?} {:?}", now(), msg, h, self.pending);
            },
            None => {
                let d = Duration::from_millis(10 + msg.0);
                println!("{} {:?} delayed {:?}", now(), msg, d);
                ctx.notify_later(msg, d);
            },
        }
    }
}
/// Got reply from server. Check that it's an expected reply and cancel the timeout.
impl StreamHandler<MsgTcpIn, std::io::Error> for Client {
    fn handle(&mut self, pkt: MsgTcpIn, ctx: &mut Context<Client>) {
        println!("{} client {:?} {:?}", now(), pkt, self.pending);
        match self.pending.remove(&pkt.0) {
            Some(h) => {
                println!("{} canceling {:?}", now(), h);
                ctx.cancel_future(h);
            },
            None => system_stop(1, "unexpected MsgTcpIn"),
        }
    }
}
/// Got a reply timeout, which shouldn't happen in these test conditions (network echo server
/// running on localhost). Guess `ctx.cancel_future()` didn't do its job properly.
impl Handler<MsgTimeout> for Client {
    type Result = ();
    fn handle(&mut self, msg: MsgTimeout, _ctx: &mut Context<Self>) -> Self::Result {
        system_stop(1, format!("{:?} unexpected {:?}", msg, self.pending));
    }
}

/// Keep test duration in check.
struct Watchdog {
    start: Instant,
}
impl Watchdog {
    fn new() -> Self {
        Watchdog { start: Instant::now() }
    }
}
impl Actor for Watchdog {
    type Context = Context<Self>;
    /// Timeout the whole process after 2s, long after the MsgTimeout should have triggered.
    fn started(&mut self, ctx: &mut Context<Self>) {
        ctx.notify_later(MsgShutdown, Duration::from_secs(2));
    }
    /// Connecting to a server and receiving 5 replies should take no more than ~50ms
    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        if self.start.elapsed() < Duration::from_millis(100) {
            system_stop(0, "normal")
        } else {
            system_stop(1, format!("took {:?}", self.start.elapsed()))
        }
    }
}
impl Handler<MsgShutdown> for Watchdog {
    type Result = ();
    fn handle(&mut self, _msg: MsgShutdown, ctx: &mut Context<Self>) -> Self::Result {
        ctx.stop();
    }
}

fn main() {
    System::run(|| {
        let wd = Watchdog::start_in_arbiter(&Arbiter::new(), |_ctx| Watchdog::new());
        let cl = Client::start_in_arbiter(&Arbiter::new(), |_ctx| Client::new(wd));
        for _ in 0..5 {
            cl.do_send(MsgRequest::new());
        }
    }).expect("should exit cleanly");
    // Give some time to print the last output.
    sleep(Duration::from_millis(20));
}

@alexlapa
Copy link

@vincentdephily ,

Might be fixed in #484

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants