From 63b317e7731510742a679f409449489656219da6 Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Sat, 23 Mar 2024 17:41:45 +0100 Subject: [PATCH 1/2] camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers --- .../apache/camel/component/jms/reply/QueueReplyManager.java | 3 +-- .../apache/camel/component/jms/reply/ReplyManagerSupport.java | 4 ++-- .../camel/component/jms/reply/TemporaryQueueReplyManager.java | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index c0697cf329d26..000350941d2ed 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -70,13 +70,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); + ReplyHandler handler = correlation.remove(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); } if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 8118c78ce2731..b8e12e7207812 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -250,12 +251,11 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation .build()) .build(); - return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null); + return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null); } private ReplyHandler getReplyHandler(String correlationID) { log.trace("Early reply not found handler. Waiting a bit longer."); - return correlation.get(correlationID); } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index fc715521c2616..181463055d9f6 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -88,13 +88,12 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); + ReplyHandler handler = correlation.remove(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); } if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore From 5edc42ce7ef3e4e1b5a291be43922ad38ffd26fa Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Sat, 23 Mar 2024 18:00:21 +0100 Subject: [PATCH 2/2] camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers --- .../camel/component/jms/reply/ReplyManagerSupport.java | 10 +++++----- .../camel/component/sjms/reply/QueueReplyManager.java | 5 +---- .../sjms/reply/TemporaryQueueReplyManager.java | 3 +-- .../org/apache/camel/support/DefaultTimeoutMap.java | 5 +++++ 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index b8e12e7207812..31234500eee5e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -231,13 +231,13 @@ public void processReply(ReplyHolder holder) { /** * IMPORTANT: This logic is only being used due to high performance in-memory only testing using InOut over - * JMS. Its unlikely to happen in a real life situation with communication to a remote broker, which always will be - * slower to send back reply, before Camel had a chance to update it's internal correlation map. + * JMS. It is unlikely to happen in a real life situation with communication to a remote broker, which always will + * be slower to send back reply, before Camel had a chance to update the internal correlation map. */ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) { // race condition, when using messageID as correlationID then we store a provisional correlation id // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely - // event that the reply comes back really really fast, and the correlation map hasn't yet been updated + // event that the reply comes back really fast, and the correlation map hasn't yet been updated // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again. if (log.isWarnEnabled()) { log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message); @@ -255,8 +255,8 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation } private ReplyHandler getReplyHandler(String correlationID) { - log.trace("Early reply not found handler. Waiting a bit longer."); - return correlation.get(correlationID); + log.trace("Early reply not found. Waiting a bit longer."); + return correlation.remove(correlationID); // get and remove } @Override diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java index fb67ba0273999..662f40fa6cccd 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java @@ -45,16 +45,13 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l // should not happen that we can't find the handler return; } - correlation.put(newCorrelationId, handler, requestTimeout); } @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); - + ReplyHandler handler = correlation.remove(correlationID); if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java index 6c3ce420c1e06..98553e9a23a0f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java @@ -63,9 +63,8 @@ public void updateCorrelationId(String correlationId, String newCorrelationId, l @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); + ReplyHandler handler = correlation.remove(correlationID); if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java index 5be87e40ef0a6..b6cda4f3eccd2 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java @@ -134,6 +134,11 @@ public V putIfAbsent(K key, V value, long timeoutMillis) { @Override public V remove(K key) { + // if no contains, the lock is not necessary + if (!map.containsKey(key)) { + return null; + } + V value = null; lock.lock(); try {