Skip to content

Releases: asonix/tokio-zmq

Pinned dependencies?

06 Apr 20:46
Compare
Choose a tag to compare

Public-facing APIs have not changed at all.

tokio-zmq-derive was rewritten, and futures 0.2.0-beta has been pinned, as tokio does not yet support futures 0.2.0 stable

crates.io
docs.rs

Bump dependencies?

06 Apr 16:04
Compare
Choose a tag to compare

This release sees a cleaning of Tokio ZMQ's dependencies. It now relies on sub-crates of Futures and of Tokio, so it doesn't pull more than it needs to. I also bumped tokio-file-unix, which recently released full tokio-support, so it doesn't pull tokio-core anymore.

There will probably be another beta release before 0.4 hits. I'm waiting on tokio to fully adopt futures2 support, so I can drop my own fork of tokio-timer. And since I messed up the versioning for tokio-zmq-derive already, the first version of 0.4 might be 0.4.1 or 0.4.2. We'll just have to see.

crate: crates.io
docs: docs.rs

The 0.4.0 beta!

25 Mar 23:07
Compare
Choose a tag to compare

Wow! A lot has happened!

Changes:

  • Tokio ZMQ doesn't use Rc<> in the sockets anymore. Instead, they get passed around where they're needed.
  • .send(), .recv(), .stream(), and .sink() now consume the calling socket, meaning if you want to use a Stream and a Sink, you need to use the newly introduced .sink_stream() and then call .split() on it. WARNING: if you try to spawn a future with one part of the sink-stream, but not the other, you will experience a Panic. This is because the underlying datastructure used to represent the sink and stream is Send but not Sync. Sending part of the sink-stream without the other opens the possibility of simultaneous access from multiple threads. See this example for a way to avoid doing this.
  • Tokio ZMQ now uses Tokio 0.1.4 and Futures 0.2.0-beta

crate: crates.io
docs: docs.rs

Proper Errors!

17 Mar 03:01
17b142e
Compare
Choose a tag to compare

This crate's Error type now implements std::error::Error in order to be better compatible with other error-handling libraries. This change is much needed, since the adoption of crates like failure depend on errors at least implementing StdError.

In the future, this library may be updated to implement Fail instead of implementing StdError directly.

crate: crates.io
docs: docs.rs

Version 0.3.0! Already!

11 Jan 03:55
Compare
Choose a tag to compare

This release sees the introduction of the TimeoutStream<S: Stream<Error = Error>> struct. This type is particularly useful since you can wrap a MultipartStream, a ControlledStream, or an EndingStream in a TimeoutStream and receive an Either<A: S::Item, B: Timeout> when the stream is ready. This means that you either get a Timeout notification, or you get the value the stream was intending to produce.

tokio_zmq::TimeoutStream differs from tokio_timer::TimeoutStream in that tokio_timer's stream will error on timeout, while tokio_zmq's stream will produce an Either.

Example with TimeoutStream

#![feature(try_from)]

extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;
extern crate zmq;

use std::rc::Rc;
use std::convert::TryInto;
use std::time::Duration;

use futures::future::Either;
use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Sub};

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let ctx = Rc::new(zmq::Context::new());
    let sub: Sub = Socket::builder(ctx, &handle)
        .connect("tcp://localhost:5556")
        .filter(b"")
        .try_into()
        .unwrap();

    let consumer = sub
        .stream()
        .timeout(Duration::from_secs(30))
        .filter(|either| {
            Either::A(multipart) => Some(multipart),
            Either::B(_) => {
                println!("Operation timed out");
                None
            }
        })
        .for_each(|multipart| {
            for msg in multipart {
                if let Some(msg) = msg.as_str() {
                    println!("Received: {}", msg);
                }
            }

            Ok(())
        });

    core.run(consumer).unwrap();
}

API Changes

  • Introduce streams with timeouts
  • Rename Socket::create to Socket::builder to reflect that it returns a SocketBuilder.
  • Do builder notation for MultipartStreams for control, ending, and timers. Remove the separate Controlled variants from src/socket/types.rs.
  • Remove AsControlledSocket, ControlledStreamSocket, ControlledSinkSocket, and IntoControlledSocket.
  • Remove Controlled macros.
  • Remove Option<EndHandler> from MultipartStream.
  • Remove the DefaultHandler concept, since EndHandlers are only in the type if they're in use.
  • Remove stream_with_end(end_handler: EndHandler) in favor of stream().with_end(end_handler: EndHandler).
  • Remove ControlledSocket socket type in favor of socket.stream().controlled(control: impl StreamSocket, handler: impl ControlHandler).
  • Update all examples to work with changes.

Links

crates.io
docs.rs

Version 0.2.0

07 Jan 22:41
Compare
Choose a tag to compare

This version doesn't have a whole lot of difference from 0.1.3, other than it should be completely working now. Some socket types were fine with read and write methods intermingling need_read and need_write, but some socket types (namely router) really didn't like that.

API Changes:

  • Req now implements StreamSocket and SinkSocket instead of FutureSocket. All the same functionality is provided in addition to Req now having a controlled variant.
  • The FutureSocket trait no longer exists
  • SockConfigStart has been renamed SocketBuilder
  • To build sockets, the call is now Socket::create instead of Socket::new
  • SocketBuilder::new now accepts a reference to a tokio_core::reactor::Handle instead of the handle itself.
  • SocketBuilder now has an identity method to set the socket's identity.
  • PairConfigStart has been removed in favour of SocketBuilder::pair(self, addr: &str, bind: bool)
  • Multipart is it's own type now, wrapping VecDeque. Multipart implements From<zmq::Message> and From<Vec<zmq::Message>>

Other changes:

  • Added example for load-balancing req workers and req clients with routers

crates.io
docs.rs

Drive! Drive! Derive!

05 Jan 23:50
Compare
Choose a tag to compare

This release sees no noticeable changes to the API (unless you really cared about the ControlledStreamSocket having an H: ControlHandler type, because now that's specified in the call to .stream()), although some of the re-exports were moved around. If you were manually importing *Controlled sockets, you'll now need to get them from tokio_zmq::socket::types.

We derive the traits in tokio_zmq::prelude for all the types in tokio_zmq::socket::types now, and the derivation code is nested within this crate as the tokio-zmq-derive crate. I should probably add a readme and documentation for that, but it shouldn't be used outside the context of tokio-zmq.

tokio-zmq on crates.io
tokio-zmq on docs.rs

Stop your Streams

04 Jan 23:00
Compare
Choose a tag to compare

The v0.1.2 release comes with a new trait called EndHandler, which can be used to handle End messages that are sent in the same socket that data is sent through.

crates.io
docs.rs

Here's an example pulled from the documentation. This stream stops producing values after 10 multiparts have been received.

#![feature(try_from)]

extern crate zmq;
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;

use std::rc::Rc;
use std::convert::TryInto;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::async::Multipart;
use tokio_zmq::{Socket, Sub};

struct Stop {
    count: usize,
}

impl Stop {
    fn new() -> Self {
        Stop {
            count: 0,
        }
    }
}

impl EndHandler for Stop {
    fn should_stop(&mut self, _: &Multipart) -> bool {
        if self.count < 10 {
            self.count += 1;

            false
        } else {
            true
        }
    }
}

fn main() {
    let core = Core::new().unwrap();
    let context = Rc::new(zmq::Context::new());
    let sub: Sub = Socket::new(context, core.handle())
        .connect("tcp://localhost:5569")
        .filter(b"")
        .try_into()
        .unwrap();

    let fut = sub.stream_with_end(Stop::new()).for_each(|multipart| {
        for msg in multipart {
            if let Some(msg) = msg.as_str() {
                println!("Message: {}", msg);
            }
        }
        Ok(())
    });

    core.run(fut).unwrap();
}

Dealers, Routers, Restructures, oh my!

03 Jan 23:57
Compare
Choose a tag to compare

To clean the documentation, this release moves all the socket wrapper types into a single file. It also moves to make MultipartSink and MultipartStream use the zmq socket logic from MultipartRequest and MultipartResponse from the async::future module. This simplifies the library and ensures any change to sending or receiving zmq messages affects streams and futures.

This release also brings the Dealer and Router socket types, to allow creation of brokers on the Tokio event loop.

crates.io
docs.rs

Initial Release!

02 Jan 02:51
Compare
Choose a tag to compare