Skip to content

reinhapa/rabbitmq-cdi

Repository files navigation

Provides a JavaEE Event ←→ RabbitMQ bridge.

MIT License Maven Central javadoc Code Coverage CI

Supported Versions:

Java 11 Java 17 Java 18

This project contains all needed classes to bind JavaEE enterprise events to a RabbitMQ exchange for outgoing events. Inbound events can also be bound to the respective queues and will be handed over to all JavaEE event observers.

The RabbitMQ message content is done via JSON serialization of a Java Bean compatible PoJo object and vice versa.

Usage example

First you need to define event objects using standard Java Bean syntax:

public class EventOne {
  private boolean enabled;

  public boolean isEnabled() {
    return enabled;
  }

  public void setEnabled(boolean enabled) {
    this.enabled = enabled;
  }
}

public class EventTwo {
  private String value;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

As second step you have to define the bindings:

@Dependent
public class RabbitBinder extends EventBinder {
  @Override
  protected void bindEvents() {

    ExchangeDeclaration testExchangeOne = declarerFactory()
    .createExchangeDeclaration("test.exchange.one")
      .withExchangeType(BuiltinExchangeType.DIRECT)
      .withAutoDelete(false)
      .withDurable(false); // (1)

    QueueDeclaration testQueue = declarerFactory()
    .createQueueDeclaration("test.queue")
      .withAutoDelete(false)
      .withDurable(false)
      .withExclusiveAccess(false); // (2)

    bind(EventOne.class)
      .toExchange("test.exchange.one");
      .withDeclaration(testExchangeOne) // (3)
    bind(EventOne.class)
      .toExchange("test.exchange.two")
      .withRoutingKey("test.key")
      .withEncoder(new MyCustomEncoder()); // (4)

    bind(EventTwo.class)
      .toQueue("test.queue");
    bind(EventTwo.class)
      .toQueue("test.queue")
      .withDecoder(new MyCustomDecoder())
      .withDeclaration(testQueue)
      .autoAck(); // (5)
  }
}
  1. Create exchange declaration for exchange with name 'test.exchange.one' and type 'direct'

  2. Creates queue declaration for queue with name 'test.queue'

  3. Uses an empty routing key for exchange 'test.exchange.one' and adds exchange declaration for this publisher

  4. Specifies a custom event encoder

  5. Specifies a custom event decoder, enables auto acknowledge and adds queue declaration for this consumer

Note to declarations:

Created declarations are not automatically declared on broker side. For each consumer/producer own channels exist because of that reason it is necessary to add the declaration needed to a binding via .withDeclaration(..) Only these declarations are applied for this consumer/producer

As last step you need to initialize the binder either in a singleton startup bean or servlet after having also configured the connection settings:

@Singleton
@Startup
public class BindingInitializer {
  @Inject
  private RabbitBinder binder;

  @PostConstruct
  public void initialize() {
    try {
      binder.configuration()
        .addHost("somehost.somedomain") // (1)
        .setUsername("myuser") // (2)
        .setPassword("mypassword"); // (3)
        .setSecure(true) // (4)
        .setConnectTimeout(10000) // (5)
        .setConnectRetryWaitTime(10000) // (6)
        .setRequestedConnectionHeartbeatTimeout(3) // (7)
        .withPrefetchCount(5); //8
      binder.initialize();
    } catch (IOException e) {
      LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
    }
  }
}
  1. Specifies a AMQP host name (there can be added more than one here)

  2. Specifies a connection user name

  3. Specifies a connection password

  4. Enables the transport layer security (TLS)

  5. Sets the connect timeout to 10 sec

  6. Set the time to wait between connection retries to 10 sec

  7. Set the heartbeat timeout to 3 sec (To detect connection problems)

  8. Set the prefetch count which configures how many messages are downloaded at once from broker

Alternative connection definition

Alternatively the connection can also be configured using a respective URI string:

@Singleton
@Startup
public class BindingInitializer {
  @Inject
  private RabbitBinder binder;

  @PostConstruct
  public void initialize() {
    try {
      binder.configuration()
        .setUri("amqp://user:mysecret@somehost.somedomain/virtualhost"); // (1)
      binder.initialize();
    } catch (IOException e) {
      LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
    }
  }
}
  1. Specifies a AMQP connection URI

More information about the detailed URI can be found in the RabbitMQ URI specification.

Multiple server connections

In case you have to support two different servers, create a binder implementation for each host and initialize them in one single binding initializer:

@Singleton
@Startup
public class BindingInitializer {
  @Inject
  private RabbitBinder binderOne;
  @Inject
  private RabbitBinder binderTwo;

  @PostConstruct
  public void initialize() {
    try {
      binderOne.configuration()
        .addHost("hostOne.somedomain")
        .setUsername("myuser")
        .setPassword("mypassword");
      binderTwo.configuration()
        .addHost("hostTwo.somedomain")
        .setUsername("myuser")
        .setPassword("mypassword");

      binderOne.initialize();
      binderTwo.initialize();
    } catch (IOException e) {
      LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
    }
  }
}

Usage in a container

Now the events can be used within your JavaEE container:

public class EventDemoBean {
  @Inject
  private Event<EventOne> eventOnes;

  public void submitEvent(boolean enabled) {
    EventOne eventOne = new EventOne();
    eventOne.setEnabled(enabled);
    eventOnes.fire(eventOne);
  }

  public void receiveEvent(@Observes EventTwo eventTwo) {
    String data = eventTwo.getData();
    // do some work
  }
}

Contribute

Contributions are always welcome. Use Google code style format for your changes.

License

This project is licensed under the MIT license