Skip to content

Oracle Advanced Queuing

Daniel Kec edited this page Jun 20, 2022 · 4 revisions

Consuming multi consumer queue as named recipient

Since: 2.3.0

Helidon AQ messaging connector is using JMS api where following logic applies, JMS Queue maps to an AQ single-consumer queue, and Topic maps to a multiconsumer queue. (see docs for more details)

When enqueueing message to an AQ multi-consumer queue, keep in mind it behaves as topic when dequeued with JMS client as non-durable consumer, that means message is sent to all active subscribers. Messages wont be sent to any consumer which subscribe in the future.

A consumer that is added as a subscriber to a queue is only able to dequeue messages that are enqueued after the subscriber is added.

(see docs for more details)

⚠️ But this behavior changes when message is enqueued with recipient list:

Each message remains in the queue until it is consumed by all its intended consumers. (see docs for more details)

That means, messages are treated as topics when enqueued to multi-consumer queue without recipient list, but with recipient list it behaves like a multiple small queues for each recipient.

Example

Required depencencies:

        <dependency>
            <groupId>io.helidon.microprofile.messaging</groupId>
            <artifactId>helidon-microprofile-messaging</artifactId>
        </dependency>
        <dependency>
            <groupId>io.helidon.messaging.aq</groupId>
            <artifactId>helidon-messaging-aq</artifactId>
        </dependency>
        <!-- When using Oracle UCP for database connection -->
        <dependency>
            <groupId>io.helidon.integrations.cdi</groupId>
            <artifactId>helidon-integrations-cdi-datasource-ucp</artifactId>
            <scope>runtime</scope>
        </dependency>

Helidon config:

javax.sql.DataSource:
  aq-test-ds: 
    connectionFactoryClassName: oracle.jdbc.pool.OracleDataSource
    URL: jdbc:oracle:thin:@localhost:1521:XE
    user: frank
    password: frank

mp.messaging:
  connector:
    helidon-aq: 
      acknowledge-mode: CLIENT_ACKNOWLEDGE
      data-source: aq-test-ds
      
  incoming:
    
    from-multi-consumer-queue-red:
      connector: helidon-aq
      destination: MULTI_CONSUMER_QUEUE
      type: topic            # AQ multi consumer queue is mapped to JMS topic
      subscriber-name: RED   # AQ multi consumer queue recipient name
      durable: true          # needs to be durable for named consumer

    from-multi-consumer-queue-blue: 
      connector: helidon-aq
      destination: MULTI_CONSUMER_QUEUE
      type: topic
      subscriber-name: BLUE
      durable: true
      
    from-multi-consumer-queue-anonymous:
      connector: helidon-aq
      destination: MULTI_CONSUMER_QUEUE
      type: topic

Consuming Java bean:

@ApplicationScoped
public class RedBlueService {
   
    @Incoming("from-multi-consumer-queue-red")
    public void red(AqMessage<String> msg) {
        System.out.println("RED> " + msg.getPayload());
    }

    @Incoming("from-multi-consumer-queue-blue")
    public void blue(AqMessage<String> msg) {
        System.out.println("BLUE> " + msg.getPayload());
    }

    @Incoming("from-multi-consumer-queue-anonymous")
    public void anonymous(AqMessage<String> msg) {
        System.out.println("ANONYMOUS> " + msg.getPayload());
    }
}

Prepare multi consumer queue:

DECLARE
    queue_name         VARCHAR2(32);
    queue_tab          VARCHAR2(32);
BEGIN
    queue_name := 'FRANK.MULTI_CONSUMER_QUEUE';
    queue_tab :=  queue_name || '_TAB';
    DBMS_AQADM.CREATE_QUEUE_TABLE(
            queue_table => queue_tab,
            multiple_consumers => TRUE,
            queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
    DBMS_AQADM.CREATE_QUEUE(queue_name, queue_tab);
    DBMS_AQADM.START_QUEUE(queue_name);
    
    -- Register named subscribers in advance 
    -- so messages with recipient list can be queued for them
    DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('YELLOW', NULL, NULL));
    DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('RED', NULL, NULL));
    DBMS_AQADM.ADD_SUBSCRIBER(queue_name, sys.aq$_agent('BLUE', NULL, NULL));
END;

Enqueue message without recipient list (topic to all non-durable subscribers, queued for already subscribed YELLOW, RED and BLUE):

DECLARE
    enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    recipients         DBMS_AQ.AQ$_RECIPIENT_LIST_T;
    message_handle     RAW(16);
    msg                SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
    msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
    msg.set_text('for all ' || CURRENT_TIMESTAMP);
    DBMS_AQ.ENQUEUE(
            queue_name => 'FRANK.MULTI_CONSUMER_QUEUE',
            enqueue_options => enqueue_options,
            message_properties => message_properties,
            payload => msg,
            msgid => message_handle);
    COMMIT;
END;

Now start the Helidon app with our AQ consumers prepared above:

java -jar ./target/quickstart-mp.jar
...
... : Server started on http://localhost:8080 (and all other host addresses) in 1433 milliseconds (since JVM startup).
...
RED> for all 13-MAY-21 10.29.48.208405000 AM UTC
BLUE> for all 13-MAY-21 10.29.48.208405000 AM UTC

☝️ You can see there is no message for ANONYMOUS, because for it multi consumer queue behaves as topic, but RED and BLUE got the message which waited for them enqueued respectively.