Skip to content

Commit

Permalink
CAMEL-20614: deep-copy output processors during instantiation of a ro…
Browse files Browse the repository at this point in the history
…ute template (#13824)

* CAMEL-20614: deep-copy output processors during instantiation of a route template

When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition.

CAMEL-20614: deep-copy output processors during instantiation of a route template

When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition.

* CAMEL-20545: Using replaceFromWith with camel-test and having route templates can lead to duplicate consumer on starutp error. (#13485)

(cherry picked from commit 8aab61a)

* CAMEL-20614: add shallowCopy as per code review comment

* CAMEL-20614: update access modifiers of copy constructors to be protected as per code review comment

* CAMEL-20614: add unit tests

---------

Co-authored-by: Claus Ibsen <claus.ibsen@gmail.com>
  • Loading branch information
bartoszpop and davsclaus committed Apr 18, 2024
1 parent 3ccf540 commit 63bd587
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 6 deletions.
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kamelet;

import java.util.concurrent.CountDownLatch;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;

import static org.apache.camel.component.kamelet.Kamelet.templateToRoute;

public class KameletMultiThreadedTest extends CamelTestSupport {

@Test
public void createSameKameletTwiceInParallel_KameletConsumerNotAvailableExceptionThrown() throws InterruptedException {
var latch = new CountDownLatch(2);
context.addRouteTemplateDefinitionConverter("*", (in, parameters) -> {
try {
return templateToRoute(in, parameters);
} finally {
latch.countDown();
latch.await();
}
});
getMockEndpoint("mock:foo").expectedMessageCount(2);

template.sendBody("seda:route", null);
template.requestBody("seda:route", ((Object) null));

MockEndpoint.assertIsSatisfied(context);
}

// **********************************************
//
// test set-up
//
// **********************************************

@Override
protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("seda:route?concurrentConsumers=2")
.toD("kamelet:-");

routeTemplate("-"). // This is a workaround for "*" to be iterated before templateId at org.apache.camel.impl.DefaultModel#addRouteFromTemplate (line 460)
from("kamelet:source")
.to("mock:foo");
}
};
}
}
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.model;

/**
* This interface is used to copy {@link ProcessorDefinition ProcessorDefinitions} during instantiation of a route
* template.
*/
interface CopyableProcessorDefinition {
ProcessorDefinition<?> copy();
}
Expand Up @@ -66,6 +66,22 @@ public FromDefinition(EndpointConsumerBuilder endpointConsumerBuilder) {
setEndpointConsumerBuilder(endpointConsumerBuilder);
}

FromDefinition copy() {
FromDefinition copy = new FromDefinition();
copy.parent = this.parent;
copy.endpoint = this.endpoint;
copy.endpointConsumerBuilder = this.endpointConsumerBuilder;
copy.uri = this.uri;
copy.variableReceive = this.variableReceive;
copy.setCamelContext(this.getCamelContext());
copy.setId(this.getId());
copy.setCustomId(this.getCustomId());
copy.setDescription(this.getDescription());
copy.setLineNumber(this.getLineNumber());
copy.setLocation(this.getLocation());
return copy;
}

@Override
public String toString() {
return "From[" + getLabel() + "]";
Expand Down
Expand Up @@ -29,4 +29,10 @@ public List<ProcessorDefinition<?>> getOutputs() {
return Collections.emptyList();
}

public NoOutputDefinition() {
}

protected NoOutputDefinition(NoOutputDefinition source) {
super(source);
}
}
Expand Up @@ -45,6 +45,18 @@ public abstract class OptionalIdentifiedDefinition<T extends OptionalIdentifiedD
private int lineNumber = -1;
private String location;

public OptionalIdentifiedDefinition() {
}

protected OptionalIdentifiedDefinition(OptionalIdentifiedDefinition source) {
this.camelContext = source.camelContext;
this.id = source.id;
this.customId = source.customId;
this.description = source.description;
this.lineNumber = source.lineNumber;
this.location = source.location;
}

@Override
public CamelContext getCamelContext() {
return camelContext;
Expand Down
Expand Up @@ -102,6 +102,17 @@ protected ProcessorDefinition() {
index = COUNTER.getAndIncrement();
}

protected ProcessorDefinition(ProcessorDefinition source) {
super(source);
this.disabled = source.disabled;
this.inheritErrorHandler = source.inheritErrorHandler;
this.blocks.addAll(source.blocks);
this.parent = source.parent;
this.routeConfiguration = source.routeConfiguration;
this.interceptStrategies.addAll(source.interceptStrategies);
this.index = source.index;
}

private static <T extends ExpressionNode> ExpressionClause<T> createAndSetExpression(T result) {
ExpressionClause<T> clause = new ExpressionClause<>(result);
result.setExpression(clause);
Expand Down
Expand Up @@ -16,7 +16,12 @@
*/
package org.apache.camel.model;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNullElse;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -410,15 +415,16 @@ public RouteDefinition asRouteDefinition() {
copy.setDelayer(route.getDelayer());
copy.setGroup(route.getGroup());
copy.setInheritErrorHandler(route.isInheritErrorHandler());
copy.setInput(route.getInput());
// make a defensive copy of the input as input can be adviced during testing or other changes
copy.setInput(route.getInput().copy());
copy.setInputType(route.getInputType());
copy.setLogMask(route.getLogMask());
copy.setMessageHistory(route.getMessageHistory());
copy.setOutputType(route.getOutputType());
copy.setOutputs(route.getOutputs());
copy.setRoutePolicies(route.getRoutePolicies());
copy.setOutputs(copy(route.getOutputs()));
copy.setRoutePolicies(shallowCopy(route.getRoutePolicies()));
copy.setRoutePolicyRef(route.getRoutePolicyRef());
copy.setRouteProperties(route.getRouteProperties());
copy.setRouteProperties(shallowCopy(route.getRouteProperties()));
copy.setShutdownRoute(route.getShutdownRoute());
copy.setShutdownRunningTask(route.getShutdownRunningTask());
copy.setStartupOrder(route.getStartupOrder());
Expand All @@ -432,6 +438,27 @@ public RouteDefinition asRouteDefinition() {
}
copy.setPrecondition(route.getPrecondition());
copy.setRouteConfigurationId(route.getRouteConfigurationId());
copy.setTemplateParameters(shallowCopy(route.getTemplateParameters()));
return copy;
}

private <T> List<T> shallowCopy(List<T> list) {
return (list != null) ? new ArrayList<>(list) : null;
}

private <K, V> Map<K, V> shallowCopy(Map<K, V> map) {
return (map != null) ? new HashMap<>(map) : null;
}

private List<ProcessorDefinition<?>> copy(List<ProcessorDefinition<?>> outputs) {
var copy = new ArrayList<ProcessorDefinition<?>>();
for (var definition : outputs) {
if (definition instanceof CopyableProcessorDefinition copyable) {
copy.add(copyable.copy());
} else {
copy.add(definition);
}
}
return copy;
}

Expand Down
Expand Up @@ -51,6 +51,14 @@ public SendDefinition(String uri) {
this.uri = uri;
}

protected SendDefinition(SendDefinition source) {
super(source);
this.endpointUriToString = source.endpointUriToString;
this.endpoint = source.endpoint;
this.endpointProducerBuilder = source.endpointProducerBuilder;
this.uri = source.uri;
}

@Override
public String getEndpointUri() {
if (endpointProducerBuilder != null) {
Expand Down
Expand Up @@ -33,7 +33,7 @@
@Metadata(label = "eip,routing")
@XmlRootElement(name = "to")
@XmlAccessorType(XmlAccessType.FIELD)
public class ToDefinition extends SendDefinition<ToDefinition> {
public class ToDefinition extends SendDefinition<ToDefinition> implements CopyableProcessorDefinition {

@XmlAttribute
private String variableSend;
Expand Down Expand Up @@ -76,6 +76,13 @@ public ToDefinition(EndpointProducerBuilder endpoint, ExchangePattern pattern) {
this.pattern = pattern.name();
}

protected ToDefinition(ToDefinition source) {
super(source);
this.variableSend = source.variableSend;
this.variableReceive = source.variableReceive;
this.pattern = source.pattern;
}

@Override
public String getShortName() {
return "to";
Expand Down Expand Up @@ -128,4 +135,8 @@ public String getVariableReceive() {
public void setVariableReceive(String variableReceive) {
this.variableReceive = variableReceive;
}

public ToDefinition copy() {
return new ToDefinition(this);
}
}
Expand Up @@ -34,7 +34,7 @@
@Metadata(label = "eip,routing")
@XmlRootElement(name = "toD")
@XmlAccessorType(XmlAccessType.FIELD)
public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> {
public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> implements CopyableProcessorDefinition {

@XmlTransient
protected EndpointProducerBuilder endpointProducerBuilder;
Expand Down Expand Up @@ -69,6 +69,19 @@ public ToDynamicDefinition(String uri) {
this.uri = uri;
}

protected ToDynamicDefinition(ToDynamicDefinition source) {
super(source);
this.endpointProducerBuilder = source.endpointProducerBuilder;
this.uri = source.uri;
this.variableSend = source.variableSend;
this.variableReceive = source.variableReceive;
this.pattern = source.pattern;
this.cacheSize = source.cacheSize;
this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint;
this.allowOptimisedComponents = source.allowOptimisedComponents;
this.autoStartComponents = source.autoStartComponents;
}

@Override
public String getShortName() {
return "toD";
Expand Down Expand Up @@ -315,4 +328,8 @@ public String getAutoStartComponents() {
public void setAutoStartComponents(String autoStartComponents) {
this.autoStartComponents = autoStartComponents;
}

public ToDynamicDefinition copy() {
return new ToDynamicDefinition(this);
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.model;

import java.util.List;
import java.util.Map;

import org.apache.camel.support.RoutePolicySupport;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;

class RouteTemplateDefinitionTest {

@Test
void testDeepCopyMutableProperties() {
RouteDefinition route = new RouteDefinition();
route.setTemplateParameters(Map.of("parameter", "parameterValue"));
route.setRouteProperties(List.of(new PropertyDefinition("property", "propertyValue")));
route.setRoutePolicies(List.of(new RoutePolicySupport() {
}));
route.setInput(new FromDefinition("direct://fromEndpoint"));
route.setOutputs(List.of(new ToDefinition("direct://toEndpoint"), new SetHeaderDefinition("header", "headerValue")));
RouteTemplateDefinition routeTemplate = new RouteTemplateDefinition();
routeTemplate.setRoute(route);

RouteDefinition routeCopy = routeTemplate.asRouteDefinition();

assertNotSame(route.getTemplateParameters(), routeCopy.getTemplateParameters());
assertEquals(route.getTemplateParameters(), routeCopy.getTemplateParameters());
assertNotSame(route.getRouteProperties(), routeCopy.getRouteProperties());
assertEquals(route.getRouteProperties(), routeCopy.getRouteProperties());
assertNotSame(route.getRoutePolicies(), routeCopy.getRoutePolicies());
assertEquals(route.getRoutePolicies(), routeCopy.getRoutePolicies());
assertNotSame(route.getInput(), routeCopy.getInput());
assertEquals(route.getInput().getUri(), routeCopy.getInput().getUri());
assertNotSame(route.getOutputs(), routeCopy.getOutputs());
assertEquals(2, routeCopy.getOutputs().size());
assertNotSame(route.getOutputs().get(0), routeCopy.getOutputs().get(0));
assertInstanceOf(ToDefinition.class, route.getOutputs().get(0));
assertInstanceOf(ToDefinition.class, routeCopy.getOutputs().get(0));
assertEquals(((ToDefinition) route.getOutputs().get(0)).getUri(),
((ToDefinition) routeCopy.getOutputs().get(0)).getUri());
assertSame(route.getOutputs().get(1), routeCopy.getOutputs().get(1));
}
}

0 comments on commit 63bd587

Please sign in to comment.