Skip to content

Version 0.3.0! Already!

Compare
Choose a tag to compare
@asonix asonix released this 11 Jan 03:55

This release sees the introduction of the TimeoutStream<S: Stream<Error = Error>> struct. This type is particularly useful since you can wrap a MultipartStream, a ControlledStream, or an EndingStream in a TimeoutStream and receive an Either<A: S::Item, B: Timeout> when the stream is ready. This means that you either get a Timeout notification, or you get the value the stream was intending to produce.

tokio_zmq::TimeoutStream differs from tokio_timer::TimeoutStream in that tokio_timer's stream will error on timeout, while tokio_zmq's stream will produce an Either.

Example with TimeoutStream

#![feature(try_from)]

extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;
extern crate zmq;

use std::rc::Rc;
use std::convert::TryInto;
use std::time::Duration;

use futures::future::Either;
use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Sub};

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let ctx = Rc::new(zmq::Context::new());
    let sub: Sub = Socket::builder(ctx, &handle)
        .connect("tcp://localhost:5556")
        .filter(b"")
        .try_into()
        .unwrap();

    let consumer = sub
        .stream()
        .timeout(Duration::from_secs(30))
        .filter(|either| {
            Either::A(multipart) => Some(multipart),
            Either::B(_) => {
                println!("Operation timed out");
                None
            }
        })
        .for_each(|multipart| {
            for msg in multipart {
                if let Some(msg) = msg.as_str() {
                    println!("Received: {}", msg);
                }
            }

            Ok(())
        });

    core.run(consumer).unwrap();
}

API Changes

  • Introduce streams with timeouts
  • Rename Socket::create to Socket::builder to reflect that it returns a SocketBuilder.
  • Do builder notation for MultipartStreams for control, ending, and timers. Remove the separate Controlled variants from src/socket/types.rs.
  • Remove AsControlledSocket, ControlledStreamSocket, ControlledSinkSocket, and IntoControlledSocket.
  • Remove Controlled macros.
  • Remove Option<EndHandler> from MultipartStream.
  • Remove the DefaultHandler concept, since EndHandlers are only in the type if they're in use.
  • Remove stream_with_end(end_handler: EndHandler) in favor of stream().with_end(end_handler: EndHandler).
  • Remove ControlledSocket socket type in favor of socket.stream().controlled(control: impl StreamSocket, handler: impl ControlHandler).
  • Update all examples to work with changes.

Links

crates.io
docs.rs