Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CAMEL-20614: deep-copy output processors during instantiation of a ro…
…ute template (#13824) * CAMEL-20614: deep-copy output processors during instantiation of a route template When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition. CAMEL-20614: deep-copy output processors during instantiation of a route template When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition. * CAMEL-20545: Using replaceFromWith with camel-test and having route templates can lead to duplicate consumer on starutp error. (#13485) (cherry picked from commit 8aab61a) * CAMEL-20614: add shallowCopy as per code review comment * CAMEL-20614: update access modifiers of copy constructors to be protected as per code review comment * CAMEL-20614: add unit tests --------- Co-authored-by: Claus Ibsen <claus.ibsen@gmail.com>
- Loading branch information
1 parent
90aed8b
commit 4627a25
Showing
11 changed files
with
269 additions
and
6 deletions.
There are no files selected for viewing
70 changes: 70 additions & 0 deletions
70
...el-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.component.kamelet; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
|
||
import org.apache.camel.RoutesBuilder; | ||
import org.apache.camel.builder.RouteBuilder; | ||
import org.apache.camel.component.mock.MockEndpoint; | ||
import org.apache.camel.test.junit5.CamelTestSupport; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import static org.apache.camel.component.kamelet.Kamelet.templateToRoute; | ||
|
||
public class KameletMultiThreadedTest extends CamelTestSupport { | ||
|
||
@Test | ||
public void createSameKameletTwiceInParallel_KameletConsumerNotAvailableExceptionThrown() throws InterruptedException { | ||
var latch = new CountDownLatch(2); | ||
context.addRouteTemplateDefinitionConverter("*", (in, parameters) -> { | ||
try { | ||
return templateToRoute(in, parameters); | ||
} finally { | ||
latch.countDown(); | ||
latch.await(); | ||
} | ||
}); | ||
getMockEndpoint("mock:foo").expectedMessageCount(2); | ||
|
||
template.sendBody("seda:route", null); | ||
template.requestBody("seda:route", ((Object) null)); | ||
|
||
MockEndpoint.assertIsSatisfied(context); | ||
} | ||
|
||
// ********************************************** | ||
// | ||
// test set-up | ||
// | ||
// ********************************************** | ||
|
||
@Override | ||
protected RoutesBuilder createRouteBuilder() { | ||
return new RouteBuilder() { | ||
@Override | ||
public void configure() { | ||
from("seda:route?concurrentConsumers=2") | ||
.toD("kamelet:-"); | ||
|
||
routeTemplate("-"). // This is a workaround for "*" to be iterated before templateId at org.apache.camel.impl.DefaultModel#addRouteFromTemplate (line 460) | ||
from("kamelet:source") | ||
.to("mock:foo"); | ||
} | ||
}; | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.model; | ||
|
||
/** | ||
* This interface is used to copy {@link ProcessorDefinition ProcessorDefinitions} during instantiation of a route | ||
* template. | ||
*/ | ||
interface CopyableProcessorDefinition { | ||
ProcessorDefinition<?> copy(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.model; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.apache.camel.support.RoutePolicySupport; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertInstanceOf; | ||
import static org.junit.jupiter.api.Assertions.assertNotSame; | ||
import static org.junit.jupiter.api.Assertions.assertSame; | ||
|
||
class RouteTemplateDefinitionTest { | ||
|
||
@Test | ||
void testDeepCopyMutableProperties() { | ||
RouteDefinition route = new RouteDefinition(); | ||
route.setTemplateParameters(Map.of("parameter", "parameterValue")); | ||
route.setRouteProperties(List.of(new PropertyDefinition("property", "propertyValue"))); | ||
route.setRoutePolicies(List.of(new RoutePolicySupport() { | ||
})); | ||
route.setInput(new FromDefinition("direct://fromEndpoint")); | ||
route.setOutputs(List.of(new ToDefinition("direct://toEndpoint"), new SetHeaderDefinition("header", "headerValue"))); | ||
RouteTemplateDefinition routeTemplate = new RouteTemplateDefinition(); | ||
routeTemplate.setRoute(route); | ||
|
||
RouteDefinition routeCopy = routeTemplate.asRouteDefinition(); | ||
|
||
assertNotSame(route.getTemplateParameters(), routeCopy.getTemplateParameters()); | ||
assertEquals(route.getTemplateParameters(), routeCopy.getTemplateParameters()); | ||
assertNotSame(route.getRouteProperties(), routeCopy.getRouteProperties()); | ||
assertEquals(route.getRouteProperties(), routeCopy.getRouteProperties()); | ||
assertNotSame(route.getRoutePolicies(), routeCopy.getRoutePolicies()); | ||
assertEquals(route.getRoutePolicies(), routeCopy.getRoutePolicies()); | ||
assertNotSame(route.getInput(), routeCopy.getInput()); | ||
assertEquals(route.getInput().getUri(), routeCopy.getInput().getUri()); | ||
assertNotSame(route.getOutputs(), routeCopy.getOutputs()); | ||
assertEquals(2, routeCopy.getOutputs().size()); | ||
assertNotSame(route.getOutputs().get(0), routeCopy.getOutputs().get(0)); | ||
assertInstanceOf(ToDefinition.class, route.getOutputs().get(0)); | ||
assertInstanceOf(ToDefinition.class, routeCopy.getOutputs().get(0)); | ||
assertEquals(((ToDefinition) route.getOutputs().get(0)).getUri(), | ||
((ToDefinition) routeCopy.getOutputs().get(0)).getUri()); | ||
assertSame(route.getOutputs().get(1), routeCopy.getOutputs().get(1)); | ||
} | ||
} |