Skip to content

gregbugaj/hoplin.io

Repository files navigation

Build Status

hoplin.io

A lightweight RabbitMQ client for Java (built on top of rabittmq java client)

To make working with RabbitMQ as simple as possible with minimum dependencies.

Quick start

Add dependency to your project

Apache Maven

	<dependency>
		<groupId>io.hoplin</groupId>
		<artifactId>hoplin-client</artifactId>
		<version>1.1.4-SNAPSHOT</version>
	</dependency>

Gradle Groovy DSL

implementation 'io.hoplin:hoplin-client:1.1.4-SNAPSHOT'

Gradle Kotlin DSL

implementation("io.hoplin:hoplin-client:1.1.4-SNAPSHOT")

Download directly from Maven Central

https://repo1.maven.org/maven2/io/hoplin/

Connecting to a broker

Minimal example that will bootstrap a direct exchange client and connect to local instance of RabbitMQ using sensible defaults.

  ExchangeClient client = ExchangeClient.direct(RabbitMQOptions.defaults(), "awesome-exchange");

At this point your are able to publish and create subscriptions to the awesome-exchange

The RabbitMQOptions can be created in couple different ways.

Default

Using defaults will connect to localhost instance of RabbitMQ, great way to get up and running quickly during development not something we want to use for production.

  RabbitMQOptions options = RabbitMQOptions.defaults();

Connection String

Parse the connection string in format key1=value;key2=value

 RabbitMQOptions options = RabbitMQOptions.from("host=localhost;virtualHost=vhost1;username=user;password=secret");

Supported properties

    host
    virtualhost
    username
    password
    requestedheartbeat
    timeout
    product
    platform
    connectionretries
    connectionretrydelay

RabbitMQOptions Object

Providing options directly gives the most flexibility.

    /**
     * Create 'default' connection options
     * @return
     */
    protected static RabbitMQOptions options(final String host)
    {
        final RabbitMQOptions options =  new RabbitMQOptions();
        options.setConnectionRetries(3);
        options.setConnectionRetryDelay(250L);
        options.setHost(host);

        return options;
    }

Exchange Clients

Support for common topologies have been abstracted into an ExchangeClient

Topologies

    direct
    fanout
    header
    topic
    rpc   - custom
    batch - custom

Clients are crated providing a Binding or parameters directly.

Binding represent exchange/queue topology

Basic fanout exchange

    final Binding binding = BindingBuilder
        .bind()
        .to(new FanoutExchange(EXCHANGE));

Topic binding with custom options

    final Binding binding = BindingBuilder
        .bind()
        .to(new TopicExchange(EXCHANGE))
        .withAutoAck(true)
        .withPrefetchCount(1)
        .withPublisherConfirms(true)
        .build();

Exchange Client (Direct / Fanout / Topic)

Creating Exchange Client is almost identical for most of the topologies, big different is the way the queue is bound to exchange. Check out the 'examples' for all examples and definitions.

Publisher

   private static final String EXCHANGE = "direct_logs";
    public static void main(final String... args) throws InterruptedException {
      log.info("Starting producer on exchange : {}", EXCHANGE);
      final ExchangeClient client = ExchangeClient.direct(options(), EXCHANGE);
      client.publish(createMessage("info"), "info");

    }
  
    private static LogDetail createMessage(final String level) {
      return new LogDetail("Msg : " + System.nanoTime(), level);
    }

Subscriber

private static final String EXCHANGE = "direct_logs";

  public static void main(final String... args) throws InterruptedException {
    final ExchangeClient client = informative();
    final SubscriptionResult subscription = client
        .subscribe("test", LogDetail.class, msg -> {log.info("Message received [{}]", msg);});

    info(subscription);
    Thread.currentThread().join();
  }

  private static ExchangeClient critical() {
    return ExchangeClient
        .direct(options(), EXCHANGE, "log.critical", "error");
  }

  private static ExchangeClient informative() {
    return ExchangeClient
        .direct(options(), EXCHANGE, "log.informative", "info");
  }

Exchange Client (Header)

Header exchange shows how messages can be customized via the Configuration argument.

Publisher

  private static final String EXCHANGE = "header_logs";

  public static void main(final String... args) throws InterruptedException {
    log.info("Starting header producer for exchange : {}", EXCHANGE);
    final ExchangeClient client = ExchangeClient.header(options(), EXCHANGE);

    client.publish(createMessage("info"), cfg ->
    {
      cfg.addHeader("type", "info");
      cfg.addHeader("category", "service-xyz");
    });
  }

  private static LogDetail createMessage(final String level) {
    return new LogDetail("Msg : " + System.nanoTime(), level);
  }

Subscriber

  private static final String EXCHANGE = "header_logs";

  public static void main(final String... args) throws InterruptedException {
    log.info("Starting header consumer for exchange : {}", EXCHANGE);
    final ExchangeClient client = clientFromBinding(EXCHANGE, "info", "service-xyz");
    client.subscribe("test", LogDetail.class, ReceiveLogHeader::handler);

    Thread.currentThread().join();
  }

  private static void handler(final LogDetail detail) {
    log.info("Message received :  {} ", detail);
  }

  private static ExchangeClient clientFromBinding(String exchange, String type, String category) {
    final Binding binding = BindingBuilder
        .bind("header_log_info_queue")
        .to(new HeaderExchange(exchange))
        .withArgument("x-match", "all")
        .withArgument("type", type)
        .withArgument("category", category)
        .build();

    return ExchangeClient.header(options(), binding);
  }

Message customization

Messages can be customized before they are published.

    client.publish(createMessage("info"), cfg ->
    {
      cfg.addHeader("type", "info");
      cfg.addHeader("category", "service-xyz");
    });

Multiple message handlers / Polymorphic messages

Multiple message types can be published to the same exchange.

Publisher

  private static final String EXCHANGE = "mh_logs";

  public static void main(final String... args) throws InterruptedException {
    final Binding binding = bind();
    log.info("Binding : {}", binding);
    final ExchangeClient client = ExchangeClient.fanout(options(), binding);

    client.publish(new LogDetail("DetailType A", "info"));
    client.publish(new LogDetailType2("DetailType B", "info"));
  }

  private static Binding bind() {
    return BindingBuilder
        .bind()
        .to(new FanoutExchange(EXCHANGE));
  }

Subscriber

  private static final String EXCHANGE = "mh_logs";

  public static void main(final String... args) throws InterruptedException {
    final ExchangeClient client = FanoutExchangeClient.create(options(), EXCHANGE);

    client.subscribe("test", LogDetail.class, MultipleTypesReceiver::handle1);
    client.subscribe("test", LogDetailType2.class, MultipleTypesReceiver::handle2);

    Thread.currentThread().join();
  }

  private static void handle1(final LogDetail msg, final MessageContext context) {
    log.info("Handler-1  >  {}", msg);
  }

  private static void handle2(final LogDetailType2 msg, final MessageContext context) {
    log.info("Handler-2  >  {}", msg);
  }
}

Same Producer/Consumer

Client can serve both as a Producer and Consumer

  private static final String EXCHANGE = "topic_logs";

  public static void main(final String... args) throws InterruptedException {
    log.info("Starting producer/consumer for exchange : {}", EXCHANGE);
    final ExchangeClient client = ExchangeClient.topic(options(), EXCHANGE);
    client.subscribe("Test", LogDetail.class, SamePublisherConsumerExample::handle);

    for (int i = 0; i < 5; ++i) {
      client.publish(createMessage("info"), "log.info.info");
      client.publish(createMessage("debug"), "log.info.debug");

      Thread.sleep(1000L);
    }
  }

  private static void handle(final LogDetail msg) {
    log.info("Incoming msg : {}", msg);
  }

  private static LogDetail createMessage(final String level) {
    return new LogDetail("Msg : " + System.nanoTime(), level);
  }

Message Context

RPC Client / Server

Client supports both Direct-Reply and Queue per Request/Response patterns.

For Direct-Reply leave blank or use amq.rabbitmq.reply-to If direct-reply is not used new queue will be create in format {queumame}.response.{UUID} Queue = Name of the queue we will use for 'Reply-To' messages Exchange = name of the exchange we want to bind our queue to

  private static Binding bind() {    
    return BindingBuilder
        .bind("rpc.request.log")
        .to(new DirectExchange("exchange.rpc.logs"))
        .build()
        ;
  }

Typical structure of RPC client and server

    RpcClient<LogDetailRequest, LogDetailResponse> client = DefaultRpcClient.create(options(), bind());
    
    // rpc response
    client.respondAsync((request)->
    {   
        // do some heavy lifting
        return new LogDetailResponse("Response message", "info");
    });
    
    // rpc request
    final LogDetailResponse response = client.request(new LogDetailRequest("Request message", "info"));
    log.info("RPC response : {} ", response);

RPC client supports both synchronous and asynchronous processing, all methods have a corresponding async method that will return a CompletableFuture

Synchronous

    final LogDetailResponse response = client.request(new LogDetailRequest("Request message", "info"));
    log.info("RPC response : {} ", response);

Asynchronous

  client
      .requestAsync(new LogDetail("Msg : " + System.nanoTime(), "info"))
      .whenComplete((reply, t) -> {        
        log.info("RPC response : {} ", reply);
        latch.countDown();
      });

RPC client example

 
  public static void main(final String... args) throws IOException {
    final Binding binding = bind();
    log.info("Binding : {}", binding);

    // Blocking
    final RpcClient<LogDetailRequest, LogDetailResponse> client = DefaultRpcClient
        .create(options(), binding);

    final LogDetailResponse response = client
        .request(new LogDetailRequest("Request message 1", "info"));
  }

  private static Binding bind() {
    return BindingBuilder
        .bind("rpc.request.log")
        .to(new DirectExchange("exchange.rpc.logs"))
        .build()
        ;
  }

RPC server example

  public static void main(final String... args) throws InterruptedException {
    final Binding binding = bind();
    log.info("Binding : {}", binding);

    final RpcServer<LogDetailRequest, LogDetailResponse> server = DefaultRpcServer
        .create(options(), binding);
    server.respondAsync(RpcServerExample::handler);
    Thread.currentThread().join();
  }

  private static LogDetailResponse handler(final LogDetailRequest log) {
    return new LogDetailResponse("response", "info");
  }

  private static Binding bind() {
    return BindingBuilder
        .bind("rpc.request.log")
        .to(new DirectExchange("exchange.rpc.logs"))
        .build()
        ;
  }

RabittMQ batch message processing

There are times when we want to fire set of jobs and be notified when all of them complete. This can be easily accomplished with the latest version of hoplin.io RabbitMQ client.

Under the hood the batches are tracked via two custom properties x-batch-id and x-batch-correlationId,

Batch job publisher

We start by creating a new client and then enqueuing number of jobs to process, upon completion we display the time it took to complete all jobs. Client will attempt to use Direct-Reply queue if available.

    public static void main(final String... args) throws InterruptedException {
      final CountDownLatch latch = new CountDownLatch(1);
      final BatchClient client = new DefaultBatchClient(options(), bind());
  
      client.startNew(context ->
      {
        for (int i = 0; i < 2; ++i) {
          context.enqueue(() -> new LogDetail("Msg >> " + System.nanoTime(), "info"));
          context.enqueue(() -> new LogDetail("Msg >> " + System.nanoTime(), "warn"));
        }
      })
      .whenComplete((context, throwable) ->
      {
        log.info("Batch completed in : {}", context.duration());
        latch.countDown();
      });
  
      latch.await();
    }
  
    private static Binding bind() {
      return BindingBuilder
          .bind("batch.documents")
          .to(new DirectExchange("exchange.batch"))
          .build()
          ;
    }

Batch job receiver

The subscriber is just a binding to a queue. The key is the return type of the handler for Reply<LogDetail>. Currently, the value has to be wrapped with the Reply objects.

public class ReceiveBatchJob extends BaseExample {

  private static final Logger log = LoggerFactory.getLogger(ReceiveBatchJob.class);

  private static final String EXCHANGE = "exchange.batch";

  public static void main(final String... args) throws InterruptedException {
    final ExchangeClient client = DirectExchangeClient.create(options(), EXCHANGE);

    client.subscribe("test", LogDetail.class, ReceiveBatchJob::handleWithReply);
    Thread.currentThread().join();
  }

  private static Reply<LogDetail> handleWithReply(final LogDetail msg,
      final MessageContext context) {
    final LogDetail reply = new LogDetail("Reply Message > " + System.nanoTime(), "WARN");
    log.info("Processing message : {} , {}", msg, context);

    return Reply.with(reply);
  }

}

Error handling

Error handling is build into the client directly. When new subscription is created an error exchange and queue are defined for tracking processing errors and can be accessed from the returned Subscriptoin

    final SubscriptionResult subscription = client
        .subscribe("test", LogDetail.class, ReceiveBatchJob::handleWithReply);

    log.info("Subscription : {}", subscription);
SubscriptionResult
{
    exchange='exchange.batch', 
    queue='test:exchange.batch:examples.LogDetail', 
    errorExchangeName='hoplin.error.exchange',
    errorQueueName='test:exchange.batch:examples.LogDetail.error'
}

Messages are routed to default error exchange : hoplin.error.exchange

Sample MessageError with original payload.

{
  "exchange": "exchange.batch",
  "queue": "test:exchange.batch:examples.LogDetail",
  "routingKey": "",
  "creationTime": 1582627056671,
  "exception": "examples.batch.ReceiveBatchJob.handleWithReturn(ReceiveBatchJob.java:34)\n",
  "body": "{\n  \"status\": 0,\n  \"payload\": {\n    \"msg\": \"Msg \\u003e\\u003e 738745336710241\",\n    \"level\": \"info\"\n  },\n  \"type\": \"examples.LogDetail\",\n  \"ctime\": 1582627056560,\n  \"_payload_type_\": \"examples.LogDetail\"\n}",
  "properties": {
    "headers": {
      "x-batch-id": {
        "bytes": "ODljOGIyOGUtYWYyZS00NGM2LWIzNWQtNDU2NzA2YTI2YTVj"
      },
      "x-batch-correlationId": "77646972-e269-42f5-9b5b-388c27c13a86"
    },
    "correlationId": "77646972-e269-42f5-9b5b-388c27c13a86",
    "replyTo": "batch.documents.reply-to.dd846234-bcce-466a-a841-360e9f9aa356",
    "bodySize": 204
  }
}

Metrics

Hoplin does not have any dependencies on any existing metrics libraries but rather it provides a way to hook into the underlying metrics via MetricsPublisher interface. Metrics expose a number of key/value pairs that are updated and send to metrics consumers.

Depending on which client we use the metrics key will be different and it is up to the consumer to normalize the name. Data is packed into a Map<String, Map<String,String>> structure, this allows us to add metrics easily without breaking any API.

Sample Payload

 {exchange.rpc.logs-rpc.request.log={received.size=211, sent.size=204, received.count=1, sent.count=1}}
Metrics Key      = exchange.rpc.logs-rpc.request.log 
received.size    = Amount of data received by this client
received.count   = Number of messages received

sent.size        = Amount of data sent by this client
sent.count       = Number of messages sent

Instantiating metrics consumer

    FunctionMetricsPublisher
        .consumer(EmitLogTopic::metrics)
        .withInterval(1, TimeUnit.SECONDS)
        .withResetOnReporting(false)
        .build()
        .start();

Signature for the reporting method

  void metrics(final Map<String, Map<String, String>> o) {
    System.out.println(String.format("Metrics Info : %s", o));
  }

Client Interoperability

The client is able to communicate between different RabbitMQ client (C#, Python, Ruby, etc...) by using reflection based message parsing. This means that there is no need for MessagePayload envelope, except when using RPC client. Trade of here is speed and lack of message polymorphism. Messages wrapped in envelope do not need to perform type determination.

With envelope

{
  "status": 0,
  "payload": {
               "msg": "Msg : 27819763881153",
               "level": "info"
             },
  "type": "io.hoplin.model.LogDetail",
  "ctime": 0,
  "_payload_type_": "io.hoplin.model.LogDetail"
}

Without envelope

{
  "msg": "Msg : 27819763881153",
  "level": "info"
}
public class LogDetail {
  private final String msg;
  private final String level;

  public LogDetail(final String msg, final String level) {
    this.msg = msg;
    this.level = level;
  }
}

Publishing a message without envelope. For Consumer<MessageConfiguration> setNativeMessageFormat(true).

// configuring publish without envelope
ExchangeClient client = clientFromExchange();
LogDetail detail = getLogDetail();
client.publish(detail, cfg -> cfg.setNativeMessageFormat(true));

Closing clients

Client can be closed couple ways. First by calling close method on the client itself.

  ExchangeClient client = clientFromExchange();
  // do something here 
  client.close();

Second method would to be get the client wrapped in AutoCloseable as CloseableExchangeClient and using try-with-resources

    try (CloseableExchangeClient client = clientFromExchange().asClosable()) {
       // do something here  
    }

Both of this methods will invoke the same close method. Deciding which method suit best depends on the use case of how the client is used.

Notes

Checking number of active rabbitmq connections

Method 1

    sudo netstat -anp | grep :5672 | grep ESTABLISHED | wc -l

Method 2

rabbitmqctl list_connections

hoplin_default_error_queue

Release Notes

Version 1.1.5

  • RPC support to handle multiple messages
  • Prefetch Size (QOS) can be set for the RPC Server
  • Custom executor can be supplied to RPC server

Code Style

Google Style Guides

Versioning

Following Semantic Versioning 2.0.0 guideline.

Resources

RabbitMQ

RawRabbit

EasyNetQ

OSSRH-43588

Maven version upgrade

mvn versions:set -DnewVersion=1.1.7
mvn versions:commit