Skip to content

Messaging redelivery

Daniel Kec edited this page Feb 15, 2022 · 3 revisions

MicroProfile Reactive Messaging 1.0 defines Message.ack() method for acknowledging producer/upstream that the message has been successfully consumed. Messaging methods supports 4 ack strategies

  • MANUAL - you have to call ack() manually
  • PRE-PROCESSING - message gets acked before your messaging method is invoked
  • POST-PROCESSING - message gets acked before your messaging method is successfully invoked
  • NONE - no ack is going to be performed

No ack because of failure

In case of the catastrophic scenario 💥 when the message consumption wasn't successful, redelivery can be achieved ... by restarting your POD. Kubernetes will do that for you if your POD crashed with health-check probe. From the Helidon side you need to add messaging health check:

<dependency>
   <groupId>io.helidon.microprofile.messaging</groupId>
   <artifactId>helidon-microprofile-messaging-health</artifactId>
</dependency>

It will add another check to the health check reported by your app:

{
    "name": "messaging",
    "state": "UP",
    "status": "UP",
    "data": {
        "my-channel-1": "UP",
        "my-channel-2": "UP"
    }
}

When your messaging channel dies/completes(uncaught exception, errored reactive stream), health check is going to report overall health as DOWN and K8s does the restart for you as it considers your POD as crashed.

@Incoming("fromKafka")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)// Message gets acked if method is invoked successfully
public void receiveMessage(String payload) {
	if("exception".equals(payload)) {
		throw new RuntimeException("Kill the stream!");// Uncaught exception kills the messaging channel
	}
}

Why so complicated?

Messaging works with various connectors each of them maps ack call to different underlying client differently. For Kafka .ack() method call actually commits offset of the consumed message, when you are calling .ack() on the message number 9., your are actually saying, Hey Kafka I got all the messages till the number 9. including, even if you didn't acked number 8. it's going to be acked also. Since acking in reactive messaging is asynchronous, there is no "right time" to seek backward on the partition offset and achieve redelivery. We also can't continue with next messages without committing offset further. Clean way out is to kill the stream and restart the POD.

Oracle AQ connector is acking by committing db transaction, in the end it is quite the same like the offset commit in the Kafka example.

How to do redelivery right

When the redelivery is needed, order of consumption is not required and no catastrophic scenario is happening, you can re-enqueue your message to the queue again with your own logic. Its seems weird but you can keep your own redelivery count within the message and have your own logic based on that. Using the error queues is also a battle proven practice:

    private final SubmissionPublisher<Message<CloudEvent>> errorQueueEmitter = new SubmissionPublisher<>();

    @Incoming("fromKafka")
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> consume(Message<CloudEvent> message) {
        CloudEvent event = message.getPayload();
        if (isPayloadValid(event)) {
            // do some business here
            return message.ack();
        } else {
            // send it to the error queue
            errorQueueEmitter.submit(Message.of(
                    CloudEventBuilder.from(event)
                            .withExtension("cause", "invalid payload") // add some extra metadata
                            .build(),
                    message::ack // Ack incoming message after error queue broker acks reception
            ));
            return CompletableFuture.completedStage(null);
        }
    }

    @Outgoing("to-error-queue")
    public PublisherBuilder<Message<CloudEvent>> errorQueuePub() {
        return ReactiveStreams
                .fromPublisher(FlowAdapters.toPublisher(errorQueueEmitter));
    }

NACK - Not acknowledging

Message.nack() is a way of telling the message producer that it's not possible to ack the message any more. That will fix the above issue, because Kafka/AQ/... connectors will be able to seek backward on offset/do a rollback as the exact moment of the nack will be known. Connectors will be able to solve the redelivery much more gracefully. MP Reactive Messaging 2.0 introduced .nack() but Helidon 2.x implements MP Reactive Messaging 1.0 - no nack yet sorry guys. MP Reactive Messaging 3.0 is planned for Helidon 3 in the near future, we are skipping 2.0 because of the Jakartification.