Skip to content

VictorGaiva/rabbitmq-stream

Repository files navigation

RabbitMQStream

Version Hex Docs Download License Unit Tests

Elixir Client for RabbitMQ Streams Protocol.

Overview

RabbiMQ 3.9 introduced the Streams as an alternative to Queues, but differs mainly by implementing a "non-destructive consumer semantics". A consumer can read the messages starting at any offset, while receiving new messages.

While this feature is avaiable when using the existing Queues, it shines when used with its dedicated protocol, that allows messages to be consumed extremelly fast, in comparisson to Queues.

This library aims to be a Client for the Streams Protocol, managing connections and providing an idiomatic way of interacting with all the features avaiable for this functionallity.

Features

Installation

The package can be installed by adding rabbitmq_stream to your list of dependencies in mix.exs:

def deps do
  [
    {:rabbitmq_stream, "~> 0.4.1"},
    # ...
  ]
end

RabbitMQ Streams protocol needs a static :reference_name per producer. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own publishing_id.

You can define a Producer module like this:

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "stream-01",
    connection: MyApp.MyConnection
end

Then you can publish messages to the stream:

MyApp.MyProducer.publish("Hello World")

Consuming

First you define a connection

defmodule MyApp.MyConnection do
  use RabbitMQStream.Connection
end

You then can declare a consumer module with the RabbitMQStream.Consumer:

defmodule MyApp.MyConsumer do
  use RabbitMQStream.Consumer,
    connection: MyApp.MyConnection,
    stream_name: "my_stream",
    initial_offset: :first

  @impl true
  def handle_message(_message) do
    :ok
  end
end

Or you could manually consume from the stream with

{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)

The caller process will start receiving messages with the format {:deliver, %RabbitMQStream.Message.Types.DeliverData{} = deliver_data}

def handle_info({:deliver, %RabbitMQStream.Message.Types.DeliverData{} = deliver_data}, state) do
  # do something with message
  {:noreply, state}
end

A super stream is a logical stream made of individual, regular streams.

You can declare SuperStreams with:

:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])

And you can consume from it with:

defmodule MyApp.MySuperConsumer do
  use RabbitMQStream.SuperConsumer,
    initial_offset: :next,
    super_stream: "my_super_stream"

  @impl true
  def handle_message(_message) do
    # ...
    :ok
  end
end

Configuration

The configuration for the connection can be set in your config.exs file:

config :rabbitmq_stream, MyApp.MyConnection,
  username: "guest",
  password: "guest"
  # ...
end

You can configure a default Serializer module by passing it to the defaults configuration option

config :rabbitmq_stream, :defaults,
  serializer: Jason
end

TLS

You can configure the RabbitmqStream to use TLS connections:

coonfig :rabbitmq_stream, :defaults,
  connection: [
    transport: :ssl,
    ssl_opts: [
      keyfile: "services/cert/client_box_key.pem",
      certfile: "services/cert/client_box_certificate.pem",
      cacertfile: "services/cert/ca_certificate.pem"
    ]
  ]

For more information, check the documentation.