Skip to content

Stop your Streams

Compare
Choose a tag to compare
@asonix asonix released this 04 Jan 23:00

The v0.1.2 release comes with a new trait called EndHandler, which can be used to handle End messages that are sent in the same socket that data is sent through.

crates.io
docs.rs

Here's an example pulled from the documentation. This stream stops producing values after 10 multiparts have been received.

#![feature(try_from)]

extern crate zmq;
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;

use std::rc::Rc;
use std::convert::TryInto;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::async::Multipart;
use tokio_zmq::{Socket, Sub};

struct Stop {
    count: usize,
}

impl Stop {
    fn new() -> Self {
        Stop {
            count: 0,
        }
    }
}

impl EndHandler for Stop {
    fn should_stop(&mut self, _: &Multipart) -> bool {
        if self.count < 10 {
            self.count += 1;

            false
        } else {
            true
        }
    }
}

fn main() {
    let core = Core::new().unwrap();
    let context = Rc::new(zmq::Context::new());
    let sub: Sub = Socket::new(context, core.handle())
        .connect("tcp://localhost:5569")
        .filter(b"")
        .try_into()
        .unwrap();

    let fut = sub.stream_with_end(Stop::new()).for_each(|multipart| {
        for msg in multipart {
            if let Some(msg) = msg.as_str() {
                println!("Message: {}", msg);
            }
        }
        Ok(())
    });

    core.run(fut).unwrap();
}