Skip to content

Commit

Permalink
jms inout concurrency issue (#13598)
Browse files Browse the repository at this point in the history
CAMEL-20493: camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
  • Loading branch information
davsclaus committed Mar 28, 2024
1 parent 3facd2d commit 630b74d
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 17 deletions.
Expand Up @@ -70,13 +70,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
}

if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.component.jms.reply;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -230,13 +231,13 @@ public void processReply(ReplyHolder holder) {

/**
* <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only testing using InOut over
* JMS. Its unlikely to happen in a real life situation with communication to a remote broker, which always will be
* slower to send back reply, before Camel had a chance to update it's internal correlation map.
* JMS. It is unlikely to happen in a real life situation with communication to a remote broker, which always will
* be slower to send back reply, before Camel had a chance to update the internal correlation map.
*/
protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
// race condition, when using messageID as correlationID then we store a provisional correlation id
// at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
// event that the reply comes back really really fast, and the correlation map hasn't yet been updated
// event that the reply comes back really fast, and the correlation map hasn't yet been updated
// from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
if (log.isWarnEnabled()) {
log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
Expand All @@ -250,13 +251,12 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation
.build())
.build();

return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null);
return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
}

private ReplyHandler getReplyHandler(String correlationID) {
log.trace("Early reply not found handler. Waiting a bit longer.");

return correlation.get(correlationID);
log.trace("Early reply not found. Waiting a bit longer.");
return correlation.remove(correlationID); // get and remove
}

@Override
Expand Down
Expand Up @@ -88,13 +88,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
}

if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -45,16 +45,13 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l
// should not happen that we can't find the handler
return;
}

correlation.put(newCorrelationId, handler, requestTimeout);
}

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);

ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -63,9 +63,8 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -134,6 +134,11 @@ public V putIfAbsent(K key, V value, long timeoutMillis) {

@Override
public V remove(K key) {
// if no contains, the lock is not necessary
if (!map.containsKey(key)) {
return null;
}

V value = null;
lock.lock();
try {
Expand Down

0 comments on commit 630b74d

Please sign in to comment.