Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect streaming client disconnects #202

Open
powturns opened this issue Jul 6, 2023 · 0 comments
Open

Detect streaming client disconnects #202

powturns opened this issue Jul 6, 2023 · 0 comments
Assignees
Milestone

Comments

@powturns
Copy link
Contributor

powturns commented Jul 6, 2023

Description of problem

The async server does not detect if a client disconnects ungracefully from a streaming API. For example, let's say a client wishes to subscribe to updates streamed from the server. The connection is established, and the server starts streaming updates to the client. If the client dies, the server does not immediately realize this and continues to send events.

For example, adapting the async-stream-server file to only read the first message from the client, then echo back an increasing sequence number forever:

    async fn echo_stream(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>,
    ) -> ::ttrpc::Result<()> {
        let mut e = s.recv().await?.unwrap();
        loop {
            e.seq += 1;

            println!("Sleep 1 second to simulate waiting for an event");
            sleep(std::time::Duration::from_secs(1)).await;

            println!("sending: {}", e.seq);
            let result = s.send(&e).await;

            if result.is_err() {
                println!("error sending item!");
            }
        }
    }

With this client implementation:

async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
    let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

    let echo = streaming::EchoPayload {
        seq: 0,
        msg: "Echo in a stream".to_string(),
        ..Default::default()
    };
    stream.send(&echo).await.unwrap();

    while let Ok(resp) = stream.recv().await {
        println!("{}", resp);
    }
}

Expected result

I'm not sure what would be best; some options are:

  1. Cancel the async future the service method is executing in (like non-streaming async does if the client dies)
  2. Provide some kind of token that can be selected on (like Need a mechanism which can help method.handler to realize the client has been closed #180)
  3. Propagate the write error back to the service method (so errors can be detected when calling s.send(&e).await;). While I think this makes sense to do regardless, it's a lot more convenient if the client disconnect can be detected before trying to send something to it.

Actual result

Client output:

seq: 5 msg: "Echo in a stream"
seq: 6 msg: "Echo in a stream"
^C  <-- client killed

Server output:

... snip ....
Sleep 1 second to simulate waiting for an event
sending: 6
Sleep 1 second to simulate waiting for an event
sending: 7
Sleep 1 second to simulate waiting for an event
[00:00:19.969] (7ffb7073b640) ERROR  write_message got error: Socket("Broken pipe (os error 32)")
sending: 8
Sleep 1 second to simulate waiting for an event
[00:00:20.970] (7ffb7073b640) ERROR  write_message got error: Socket("Broken pipe (os error 32)")

@Tim-Zhang Tim-Zhang added this to the 0.8.0 milestone Jan 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants