Skip to content

Commit

Permalink
CAMEL-20614: deep-copy output processors during instantiation of a ro…
Browse files Browse the repository at this point in the history
…ute template

When multiple threads try to instantiate and send an exchange to the same kamelet in parallel, org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be thrown because the underlying RouteTemplateDefinition is shallow-copied and changes to the RouteDefinition are reflected in the RouteTemplateDefinition.
  • Loading branch information
bartoszpop committed Apr 17, 2024
1 parent a43e85e commit 8d02b75
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.model;

/**
* This interface is used to copy {@link ProcessorDefinition ProcessorDefinitions} during instantiation of a route
* template.
*/
interface CopyableProcessorDefinition {
ProcessorDefinition<?> copy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ public List<ProcessorDefinition<?>> getOutputs() {
return Collections.emptyList();
}

public NoOutputDefinition() {
}

NoOutputDefinition(NoOutputDefinition source) {
super(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public abstract class OptionalIdentifiedDefinition<T extends OptionalIdentifiedD
private int lineNumber = -1;
private String location;

public OptionalIdentifiedDefinition() {
}

OptionalIdentifiedDefinition(OptionalIdentifiedDefinition source) {
this.camelContext = source.camelContext;
this.id = source.id;
this.customId = source.customId;
this.description = source.description;
this.lineNumber = source.lineNumber;
this.location = source.location;
}

@Override
public CamelContext getCamelContext() {
return camelContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ protected ProcessorDefinition() {
index = COUNTER.getAndIncrement();
}

ProcessorDefinition(ProcessorDefinition source) {
super(source);
this.disabled = source.disabled;
this.inheritErrorHandler = source.inheritErrorHandler;
this.blocks.addAll(source.blocks);
this.parent = source.parent;
this.routeConfiguration = source.routeConfiguration;
this.interceptStrategies.addAll(source.interceptStrategies);
this.index = source.index;
}

private static <T extends ExpressionNode> ExpressionClause<T> createAndSetExpression(T result) {
ExpressionClause<T> clause = new ExpressionClause<>(result);
result.setExpression(clause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
*/
package org.apache.camel.model;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNullElse;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -35,6 +40,10 @@
import org.apache.camel.spi.AsEndpointUri;
import org.apache.camel.spi.Metadata;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNullElse;

/**
* Defines a route template (parameterized routes)
*/
Expand Down Expand Up @@ -405,10 +414,10 @@ public RouteDefinition asRouteDefinition() {
copy.setLogMask(route.getLogMask());
copy.setMessageHistory(route.getMessageHistory());
copy.setOutputType(route.getOutputType());
copy.setOutputs(route.getOutputs());
copy.setRoutePolicies(route.getRoutePolicies());
copy.setOutputs(copy(route.getOutputs()));
copy.setRoutePolicies(new ArrayList<>(requireNonNullElse(route.getRoutePolicies(), emptyList())));
copy.setRoutePolicyRef(route.getRoutePolicyRef());
copy.setRouteProperties(route.getRouteProperties());
copy.setRouteProperties(new ArrayList<>(requireNonNullElse(route.getRouteProperties(), emptyList())));
copy.setShutdownRoute(route.getShutdownRoute());
copy.setShutdownRunningTask(route.getShutdownRunningTask());
copy.setStartupOrder(route.getStartupOrder());
Expand All @@ -422,6 +431,19 @@ public RouteDefinition asRouteDefinition() {
}
copy.setPrecondition(route.getPrecondition());
copy.setRouteConfigurationId(route.getRouteConfigurationId());
copy.setTemplateParameters(new HashMap<>(requireNonNullElse(route.getTemplateParameters(), emptyMap())));
return copy;
}

private List<ProcessorDefinition<?>> copy(List<ProcessorDefinition<?>> outputs) {
var copy = new ArrayList<ProcessorDefinition<?>>();
for (var definition : outputs) {
if (definition instanceof CopyableProcessorDefinition) {
copy.add(((CopyableProcessorDefinition)definition).copy());
} else {
copy.add(definition);
}
}
return copy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public SendDefinition(String uri) {
this.uri = uri;
}

SendDefinition(SendDefinition source) {
super(source);
this.endpointUriToString = source.endpointUriToString;
this.endpoint = source.endpoint;
this.endpointProducerBuilder = source.endpointProducerBuilder;
this.uri = source.uri;
}

@Override
public String getEndpointUri() {
if (endpointProducerBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Metadata(label = "eip,routing")
@XmlRootElement(name = "to")
@XmlAccessorType(XmlAccessType.FIELD)
public class ToDefinition extends SendDefinition<ToDefinition> {
public class ToDefinition extends SendDefinition<ToDefinition> implements CopyableProcessorDefinition {

@XmlAttribute
@Metadata(label = "advanced", javaType = "org.apache.camel.ExchangePattern", enums = "InOnly,InOut,InOptionalOut")
Expand Down Expand Up @@ -71,6 +71,11 @@ public ToDefinition(EndpointProducerBuilder endpoint, ExchangePattern pattern) {
this.pattern = pattern.name();
}

ToDefinition(ToDefinition source) {
super(source);
this.pattern = source.pattern;
}

@Override
public String getShortName() {
return "to";
Expand All @@ -93,4 +98,7 @@ public void setPattern(String pattern) {
this.pattern = pattern;
}

public ToDefinition copy() {
return new ToDefinition(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@Metadata(label = "eip,routing")
@XmlRootElement(name = "toD")
@XmlAccessorType(XmlAccessType.FIELD)
public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> {
public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> implements CopyableProcessorDefinition {

@XmlTransient
protected EndpointProducerBuilder endpointProducerBuilder;
Expand Down Expand Up @@ -64,6 +64,17 @@ public ToDynamicDefinition(String uri) {
this.uri = uri;
}

ToDynamicDefinition(ToDynamicDefinition source) {
super(source);
this.endpointProducerBuilder = source.endpointProducerBuilder;
this.uri = source.uri;
this.pattern = source.pattern;
this.cacheSize = source.cacheSize;
this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint;
this.allowOptimisedComponents = source.allowOptimisedComponents;
this.autoStartComponents = source.autoStartComponents;
}

@Override
public String getShortName() {
return "toD";
Expand Down Expand Up @@ -269,4 +280,8 @@ public String getAutoStartComponents() {
public void setAutoStartComponents(String autoStartComponents) {
this.autoStartComponents = autoStartComponents;
}

public ToDynamicDefinition copy() {
return new ToDynamicDefinition(this);
}
}

0 comments on commit 8d02b75

Please sign in to comment.