Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jms inout concurrency issue #13598

Merged
merged 2 commits into from Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -70,13 +70,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
}

if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.component.jms.reply;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -230,13 +231,13 @@ public void processReply(ReplyHolder holder) {

/**
* <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only testing using InOut over
* JMS. Its unlikely to happen in a real life situation with communication to a remote broker, which always will be
* slower to send back reply, before Camel had a chance to update it's internal correlation map.
* JMS. It is unlikely to happen in a real life situation with communication to a remote broker, which always will
* be slower to send back reply, before Camel had a chance to update the internal correlation map.
*/
protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
// race condition, when using messageID as correlationID then we store a provisional correlation id
// at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
// event that the reply comes back really really fast, and the correlation map hasn't yet been updated
// event that the reply comes back really fast, and the correlation map hasn't yet been updated
// from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
if (log.isWarnEnabled()) {
log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
Expand All @@ -250,13 +251,12 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation
.build())
.build();

return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null);
return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
}

private ReplyHandler getReplyHandler(String correlationID) {
log.trace("Early reply not found handler. Waiting a bit longer.");

return correlation.get(correlationID);
log.trace("Early reply not found. Waiting a bit longer.");
return correlation.remove(correlationID); // get and remove
}

@Override
Expand Down
Expand Up @@ -88,13 +88,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
}

if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -45,16 +45,13 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l
// should not happen that we can't find the handler
return;
}

correlation.put(newCorrelationId, handler, requestTimeout);
}

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);

ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -63,9 +63,8 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l

@Override
protected void handleReplyMessage(String correlationID, Message message, Session session) {
ReplyHandler handler = correlation.get(correlationID);
ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching request and therefore
Expand Down
Expand Up @@ -134,6 +134,11 @@ public V putIfAbsent(K key, V value, long timeoutMillis) {

@Override
public V remove(K key) {
// if no contains, the lock is not necessary
if (!map.containsKey(key)) {
return null;
}

V value = null;
lock.lock();
try {
Expand Down