[Question] Message deduplication #42
-
Hi folks, I'm working on the rabbit-stream client in Rust, specifically on feature message batching. I have this behavior in my branch where when automatic dedup is enabled (by setting the producer name) it happens that some messages are not delivered to the consumer even though the confirm status is confirmed. I was able to reproduce the same behavior with the Java client. My question is in case of message deduplication does the producer needs to be single-thread or manually protected in a multi-thread environment? I'm attaching for reference the test that i did for reproducing it. Thanks @Test
void sendAndConsume() throws Exception {
int batchSize = 10;
int messageCount = batchSize;
CountDownLatch publishLatch = new CountDownLatch(messageCount);
Producer producer = environment.producerBuilder().stream(stream).name("sendAndConsume").batchSize(batchSize)
.build();
AtomicLong count = new AtomicLong(0);
ExecutorService service = Executors.newFixedThreadPool(batchSize);
IntStream.range(0, messageCount).forEach(i -> {
service.execute(() -> {
producer.send(producer.messageBuilder().addData("".getBytes()).build(), confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
count.incrementAndGet();
publishLatch.countDown();
}
});
});
});
boolean completed = publishLatch.await(10, TimeUnit.SECONDS);
assertThat(completed).isTrue();
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
AtomicLong chunkTimestamp = new AtomicLong();
Consumer consumer = environment.consumerBuilder().stream(stream).offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
chunkTimestamp.set(context.timestamp());
consumeLatch.countDown();
}).build();
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(chunkTimestamp.get()).isNotZero();
consumer.close();
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
I will convert this issue to a GitHub discussion. Currently GitHub will automatically close and lock the issue even though your question will be transferred and responded to elsewhere. This is to let you know that we do not intend to ignore this but this is how the current GitHub conversion mechanism makes it seem for the users :( |
Beta Was this translation helpful? Give feedback.
-
Publishing with several threads can lead to out-of-order publishing IDs. It's like publishing message 5 and then message 2 in a single thread, so the broker will ignore message 2. There's nothing much we can do here, but we have to document this clearly. Thanks for the precise report, the test worked out of the box. :-) |
Beta Was this translation helpful? Give feedback.
Publishing with several threads can lead to out-of-order publishing IDs. It's like publishing message 5 and then message 2 in a single thread, so the broker will ignore message 2.
There's nothing much we can do here, but we have to document this clearly.
Thanks for the precise report, the test worked out of the box. :-)