Skip to content

Messaging blocking or long running tasks

Daniel Kec edited this page Jul 28, 2022 · 1 revision

MicroProfile Reactive Messaging - blocking tasks

Generally blocking in Reactive messaging is a bad thing. Thread invoking the messaging method can be a worker from event loop and we can risk deadlocks or degraded overall performance. We don't need to however spin off our own threads.

MicroProfile Reactive Messaging integrates with MicroProfile Fault Tolerance so it is possible to use @Asynchronous for offloading blocking tasks from upstream threads.

    @Incoming("fromKafka")
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    @Asynchronous
    @Timeout(4000)
    @Fallback(fallbackMethod = "toDLQ", applyOn = {RuntimeException.class, IllegalStateException.class})
    public CompletionStage<Void> receiveMessage(Message<String> msg) {
        System.out.println("Consuming " + msg.getPayload());
        if ("3".equals(msg.getPayload())) throw new RuntimeException("BOOM!");
        System.out.println("Consumed " + msg.getPayload() + " successfully!");
        return msg.ack();
    }

    CompletionStage<Void> toDLQ(Message<String> msg) {
        System.out.println("Sending " + msg.getPayload() + " to DLQ");
        publisher.emit(msg); // Don't ack manually, outgoing connector does that when broker acks reception
        return CompletableFuture.completedFuture(null);
    }