From 63bd587da526c7f681dcef6aba322ae4e275f3d4 Mon Sep 17 00:00:00 2001 From: Bartosz Popiela Date: Thu, 18 Apr 2024 13:39:58 +0200 Subject: [PATCH] CAMEL-20614: deep-copy output processors during instantiation of a route template (#13824) * CAMEL-20614: deep-copy output processors during instantiation of a route template When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition. CAMEL-20614: deep-copy output processors during instantiation of a route template When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition. * CAMEL-20545: Using replaceFromWith with camel-test and having route templates can lead to duplicate consumer on starutp error. (#13485) (cherry picked from commit 8aab61a7a286f6c0ed34433c068ca065084dc67d) * CAMEL-20614: add shallowCopy as per code review comment * CAMEL-20614: update access modifiers of copy constructors to be protected as per code review comment * CAMEL-20614: add unit tests --------- Co-authored-by: Claus Ibsen --- .../kamelet/KameletMultiThreadedTest.java | 70 +++++++++++++++++++ .../model/CopyableProcessorDefinition.java | 25 +++++++ .../apache/camel/model/FromDefinition.java | 16 +++++ .../camel/model/NoOutputDefinition.java | 6 ++ .../model/OptionalIdentifiedDefinition.java | 12 ++++ .../camel/model/ProcessorDefinition.java | 11 +++ .../camel/model/RouteTemplateDefinition.java | 35 ++++++++-- .../apache/camel/model/SendDefinition.java | 8 +++ .../org/apache/camel/model/ToDefinition.java | 13 +++- .../camel/model/ToDynamicDefinition.java | 19 ++++- .../model/RouteTemplateDefinitionTest.java | 63 +++++++++++++++++ 11 files changed, 272 insertions(+), 6 deletions(-) create mode 100644 components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java create mode 100644 core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java create mode 100644 core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java new file mode 100644 index 0000000000000..73d47e2cd6f6f --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import java.util.concurrent.CountDownLatch; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.component.kamelet.Kamelet.templateToRoute; + +public class KameletMultiThreadedTest extends CamelTestSupport { + + @Test + public void createSameKameletTwiceInParallel_KameletConsumerNotAvailableExceptionThrown() throws InterruptedException { + var latch = new CountDownLatch(2); + context.addRouteTemplateDefinitionConverter("*", (in, parameters) -> { + try { + return templateToRoute(in, parameters); + } finally { + latch.countDown(); + latch.await(); + } + }); + getMockEndpoint("mock:foo").expectedMessageCount(2); + + template.sendBody("seda:route", null); + template.requestBody("seda:route", ((Object) null)); + + MockEndpoint.assertIsSatisfied(context); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("seda:route?concurrentConsumers=2") + .toD("kamelet:-"); + + routeTemplate("-"). // This is a workaround for "*" to be iterated before templateId at org.apache.camel.impl.DefaultModel#addRouteFromTemplate (line 460) + from("kamelet:source") + .to("mock:foo"); + } + }; + } +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java new file mode 100644 index 0000000000000..cf770d7515c5a --- /dev/null +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +/** + * This interface is used to copy {@link ProcessorDefinition ProcessorDefinitions} during instantiation of a route + * template. + */ +interface CopyableProcessorDefinition { + ProcessorDefinition copy(); +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java index dc5d4b95665b4..4117dc5efc15a 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java @@ -66,6 +66,22 @@ public FromDefinition(EndpointConsumerBuilder endpointConsumerBuilder) { setEndpointConsumerBuilder(endpointConsumerBuilder); } + FromDefinition copy() { + FromDefinition copy = new FromDefinition(); + copy.parent = this.parent; + copy.endpoint = this.endpoint; + copy.endpointConsumerBuilder = this.endpointConsumerBuilder; + copy.uri = this.uri; + copy.variableReceive = this.variableReceive; + copy.setCamelContext(this.getCamelContext()); + copy.setId(this.getId()); + copy.setCustomId(this.getCustomId()); + copy.setDescription(this.getDescription()); + copy.setLineNumber(this.getLineNumber()); + copy.setLocation(this.getLocation()); + return copy; + } + @Override public String toString() { return "From[" + getLabel() + "]"; diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java index fdd21dd2e8af2..92dc058a0885a 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java @@ -29,4 +29,10 @@ public List> getOutputs() { return Collections.emptyList(); } + public NoOutputDefinition() { + } + + protected NoOutputDefinition(NoOutputDefinition source) { + super(source); + } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java index f8c42b3e2465e..767f40181b4ae 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java @@ -45,6 +45,18 @@ public abstract class OptionalIdentifiedDefinition ExpressionClause createAndSetExpression(T result) { ExpressionClause clause = new ExpressionClause<>(result); result.setExpression(clause); diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java index 44802872d58e0..1bf2eeae867b1 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java @@ -16,7 +16,12 @@ */ package org.apache.camel.model; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Objects.requireNonNullElse; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -410,15 +415,16 @@ public RouteDefinition asRouteDefinition() { copy.setDelayer(route.getDelayer()); copy.setGroup(route.getGroup()); copy.setInheritErrorHandler(route.isInheritErrorHandler()); - copy.setInput(route.getInput()); + // make a defensive copy of the input as input can be adviced during testing or other changes + copy.setInput(route.getInput().copy()); copy.setInputType(route.getInputType()); copy.setLogMask(route.getLogMask()); copy.setMessageHistory(route.getMessageHistory()); copy.setOutputType(route.getOutputType()); - copy.setOutputs(route.getOutputs()); - copy.setRoutePolicies(route.getRoutePolicies()); + copy.setOutputs(copy(route.getOutputs())); + copy.setRoutePolicies(shallowCopy(route.getRoutePolicies())); copy.setRoutePolicyRef(route.getRoutePolicyRef()); - copy.setRouteProperties(route.getRouteProperties()); + copy.setRouteProperties(shallowCopy(route.getRouteProperties())); copy.setShutdownRoute(route.getShutdownRoute()); copy.setShutdownRunningTask(route.getShutdownRunningTask()); copy.setStartupOrder(route.getStartupOrder()); @@ -432,6 +438,27 @@ public RouteDefinition asRouteDefinition() { } copy.setPrecondition(route.getPrecondition()); copy.setRouteConfigurationId(route.getRouteConfigurationId()); + copy.setTemplateParameters(shallowCopy(route.getTemplateParameters())); + return copy; + } + + private List shallowCopy(List list) { + return (list != null) ? new ArrayList<>(list) : null; + } + + private Map shallowCopy(Map map) { + return (map != null) ? new HashMap<>(map) : null; + } + + private List> copy(List> outputs) { + var copy = new ArrayList>(); + for (var definition : outputs) { + if (definition instanceof CopyableProcessorDefinition copyable) { + copy.add(copyable.copy()); + } else { + copy.add(definition); + } + } return copy; } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java index 21392205b790b..5e3a9725ae33f 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java @@ -51,6 +51,14 @@ public SendDefinition(String uri) { this.uri = uri; } + protected SendDefinition(SendDefinition source) { + super(source); + this.endpointUriToString = source.endpointUriToString; + this.endpoint = source.endpoint; + this.endpointProducerBuilder = source.endpointProducerBuilder; + this.uri = source.uri; + } + @Override public String getEndpointUri() { if (endpointProducerBuilder != null) { diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java index fed74435cd4fd..469ab1ea8b977 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java @@ -33,7 +33,7 @@ @Metadata(label = "eip,routing") @XmlRootElement(name = "to") @XmlAccessorType(XmlAccessType.FIELD) -public class ToDefinition extends SendDefinition { +public class ToDefinition extends SendDefinition implements CopyableProcessorDefinition { @XmlAttribute private String variableSend; @@ -76,6 +76,13 @@ public ToDefinition(EndpointProducerBuilder endpoint, ExchangePattern pattern) { this.pattern = pattern.name(); } + protected ToDefinition(ToDefinition source) { + super(source); + this.variableSend = source.variableSend; + this.variableReceive = source.variableReceive; + this.pattern = source.pattern; + } + @Override public String getShortName() { return "to"; @@ -128,4 +135,8 @@ public String getVariableReceive() { public void setVariableReceive(String variableReceive) { this.variableReceive = variableReceive; } + + public ToDefinition copy() { + return new ToDefinition(this); + } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java index 3daac2cbf52be..b1495e9fa8c18 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java @@ -34,7 +34,7 @@ @Metadata(label = "eip,routing") @XmlRootElement(name = "toD") @XmlAccessorType(XmlAccessType.FIELD) -public class ToDynamicDefinition extends NoOutputDefinition { +public class ToDynamicDefinition extends NoOutputDefinition implements CopyableProcessorDefinition { @XmlTransient protected EndpointProducerBuilder endpointProducerBuilder; @@ -69,6 +69,19 @@ public ToDynamicDefinition(String uri) { this.uri = uri; } + protected ToDynamicDefinition(ToDynamicDefinition source) { + super(source); + this.endpointProducerBuilder = source.endpointProducerBuilder; + this.uri = source.uri; + this.variableSend = source.variableSend; + this.variableReceive = source.variableReceive; + this.pattern = source.pattern; + this.cacheSize = source.cacheSize; + this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint; + this.allowOptimisedComponents = source.allowOptimisedComponents; + this.autoStartComponents = source.autoStartComponents; + } + @Override public String getShortName() { return "toD"; @@ -315,4 +328,8 @@ public String getAutoStartComponents() { public void setAutoStartComponents(String autoStartComponents) { this.autoStartComponents = autoStartComponents; } + + public ToDynamicDefinition copy() { + return new ToDynamicDefinition(this); + } } diff --git a/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java new file mode 100644 index 0000000000000..0c74b35a18140 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import java.util.List; +import java.util.Map; + +import org.apache.camel.support.RoutePolicySupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +class RouteTemplateDefinitionTest { + + @Test + void testDeepCopyMutableProperties() { + RouteDefinition route = new RouteDefinition(); + route.setTemplateParameters(Map.of("parameter", "parameterValue")); + route.setRouteProperties(List.of(new PropertyDefinition("property", "propertyValue"))); + route.setRoutePolicies(List.of(new RoutePolicySupport() { + })); + route.setInput(new FromDefinition("direct://fromEndpoint")); + route.setOutputs(List.of(new ToDefinition("direct://toEndpoint"), new SetHeaderDefinition("header", "headerValue"))); + RouteTemplateDefinition routeTemplate = new RouteTemplateDefinition(); + routeTemplate.setRoute(route); + + RouteDefinition routeCopy = routeTemplate.asRouteDefinition(); + + assertNotSame(route.getTemplateParameters(), routeCopy.getTemplateParameters()); + assertEquals(route.getTemplateParameters(), routeCopy.getTemplateParameters()); + assertNotSame(route.getRouteProperties(), routeCopy.getRouteProperties()); + assertEquals(route.getRouteProperties(), routeCopy.getRouteProperties()); + assertNotSame(route.getRoutePolicies(), routeCopy.getRoutePolicies()); + assertEquals(route.getRoutePolicies(), routeCopy.getRoutePolicies()); + assertNotSame(route.getInput(), routeCopy.getInput()); + assertEquals(route.getInput().getUri(), routeCopy.getInput().getUri()); + assertNotSame(route.getOutputs(), routeCopy.getOutputs()); + assertEquals(2, routeCopy.getOutputs().size()); + assertNotSame(route.getOutputs().get(0), routeCopy.getOutputs().get(0)); + assertInstanceOf(ToDefinition.class, route.getOutputs().get(0)); + assertInstanceOf(ToDefinition.class, routeCopy.getOutputs().get(0)); + assertEquals(((ToDefinition) route.getOutputs().get(0)).getUri(), + ((ToDefinition) routeCopy.getOutputs().get(0)).getUri()); + assertSame(route.getOutputs().get(1), routeCopy.getOutputs().get(1)); + } +}