diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 5dcabc96facd2..9424fe89a616e 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -222,12 +222,12 @@ public void done(boolean doneSync) { Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { - if (variableReceive != null) { + if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) { // result should be stored in variable instead of message body - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, - exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive, + aggregatedExchange.getMessage()); + aggregatedExchange.getMessage().setBody(originalBody); + aggregatedExchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsWithoutCorrelationId(exchange, aggregatedExchange); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index d939963dd268e..e13ec57ed88ff 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -350,11 +350,12 @@ public boolean process(Exchange exchange, AsyncCallback callback) { // must catch any exception from aggregation Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); if (aggregatedExchange != null) { - if (variableReceive != null) { + if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) { // result should be stored in variable instead of message body - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive, + aggregatedExchange.getMessage()); + aggregatedExchange.getMessage().setBody(originalBody); + aggregatedExchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, aggregatedExchange); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index d877f180684dc..decf336be5764 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -237,10 +237,10 @@ public boolean process(Exchange exchange, final AsyncCallback callback) { ServiceHelper.stopAndShutdownService(endpoint); } // result should be stored in variable instead of message body - if (variableReceive != null) { - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) { + ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, target.getMessage()); + target.getMessage().setBody(originalBody); + target.getMessage().setHeaders(originalHeaders); } // signal we are done c.done(doneSync); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java index 048402859800f..37dd575fccdb3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -179,11 +179,11 @@ public boolean process(Exchange exchange, final AsyncCallback callback) { ac = doneSync -> { try { // result should be stored in variable instead of message body/headers - if (variableReceive != null) { - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, - exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); + if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) { + ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, + target.getMessage()); + target.getMessage().setBody(originalBody); + target.getMessage().setHeaders(originalHeaders); } // restore previous MEP target.setPattern(existingPattern); @@ -202,8 +202,6 @@ public boolean process(Exchange exchange, final AsyncCallback callback) { if (variableSend != null) { Object value = ExchangeHelper.getVariable(exchange, variableSend); exchange.getMessage().setBody(value); - // TODO: empty headers or - } LOG.debug(">>>> {} {}", destination, exchange); @@ -240,7 +238,7 @@ public boolean process(Exchange exchange, final AsyncCallback callback) { // restore previous MEP exchange.setPattern(existingPattern); // result should be stored in variable instead of message body/headers - if (variableReceive != null) { + if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) { ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage()); exchange.getMessage().setBody(originalBody); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java new file mode 100644 index 0000000000000..3c199057eae16 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableErrorTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class EnrichVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + // TODO: should this be World or Bye World? + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTryCatch() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .doTry() + .throwException(new IllegalArgumentException("Forced")) + .doCatch(Exception.class) + .setBody(simple("Catch: ${body}")) + .end(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertTrue(out.hasVariables()); + Assertions.assertEquals("World", out.getMessage().getBody()); + Assertions.assertEquals("Catch: Bye World", out.getVariable("bye")); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(true) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionNotHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(false) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDeadLetterChannel() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDefaultErrorHandler() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler()); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .stop(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollback() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .rollback(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnly(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackOnlyLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnlyLast(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java new file mode 100644 index 0000000000000..c1dd22ff71529 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableErrorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.seda.SedaEndpoint; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PollEnrichVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + template.send("seda:foo", e -> { + e.getMessage().setBody("Bye World"); + e.setException(new IllegalArgumentException()); + }); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class); + Exchange ex = se.createExchange(); + ex.getMessage().setBody("Bye World"); + ex.setRouteStop(true); + se.getQueue().add(ex); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertTrue(out.isRouteStop()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollbackOnly() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:result"); + } + }); + context.start(); + + SedaEndpoint se = context.getEndpoint("seda:foo", SedaEndpoint.class); + Exchange ex = se.createExchange(); + ex.getMessage().setBody("Bye World"); + ex.setRollbackOnly(true); + se.getQueue().add(ex); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertTrue(out.isRollbackOnly()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableErrorTest.java new file mode 100644 index 0000000000000..00bc2a10fe404 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableErrorTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ToDynamicVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + // TODO: should this be World or Bye World? + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTryCatch() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .doTry() + .throwException(new IllegalArgumentException("Forced")) + .doCatch(Exception.class) + .setBody(simple("Catch: ${body}")) + .end(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertTrue(out.hasVariables()); + Assertions.assertEquals("World", out.getMessage().getBody()); + Assertions.assertEquals("Catch: Bye World", out.getVariable("bye")); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(true) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionNotHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(false) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDeadLetterChannel() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDefaultErrorHandler() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler()); + + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .stop(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollback() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .rollback(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnly(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackOnlyLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toD("direct:${header.where}", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnlyLast(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> { + e.getMessage().setHeader("where", "foo"); + e.getMessage().setBody("World"); + }); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java new file mode 100644 index 0000000000000..f3351a2941dea --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableErrorTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ToVariableErrorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testThrowException() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + // TODO: should this be World or Bye World? + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTryCatch() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .doTry() + .throwException(new IllegalArgumentException("Forced")) + .doCatch(Exception.class) + .setBody(simple("Catch: ${body}")) + .end(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertTrue(out.hasVariables()); + Assertions.assertEquals("World", out.getMessage().getBody()); + Assertions.assertEquals("Catch: Bye World", out.getVariable("bye")); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(true) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testOnExceptionNotHandled() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .handled(false) + .setBody(simple("Error: ${body}")); + + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Error: Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDeadLetterChannel() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testDefaultErrorHandler() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler()); + + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .stop(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testRollback() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .rollback(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertTrue(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnly(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + + @Test + public void testMarkRollbackOnlyLast() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .toV("direct:foo", null, "bye") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .markRollbackOnlyLast(); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(0); + Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World")); + Assertions.assertFalse(out.isFailed()); + Assertions.assertFalse(out.hasVariables()); + Assertions.assertEquals("Bye World", out.getMessage().getBody()); + assertMockEndpointsSatisfied(); + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 2cdf7071450bf..a73db916db942 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -1156,6 +1156,29 @@ public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, Strin } } + /** + * Whether the processing of the {@link Exchange} was success and that the result should be stored in variable. + * + * @param exchange the exchange + * @param name the variable name + * @return true to call setVariableFromMessageBodyAndHeaders to set the result after-wards + */ + public static boolean shouldSetVariableResult(Exchange exchange, String name) { + if (name == null) { + return false; + } + // same logic as in Pipeline/PipelineHelper + boolean stop + = exchange.isRouteStop() || exchange.isFailed() || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast() + || exchange.getExchangeExtension().isErrorHandlerHandledSet() + && exchange.getExchangeExtension().isErrorHandlerHandled(); + if (stop) { + return false; + } + // success + return true; + } + /** * Gets the variable * diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc new file mode 100644 index 0000000000000..d297df9c99e1e --- /dev/null +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc @@ -0,0 +1,16 @@ += Apache Camel 4.x Upgrade Guide + +This document is for helping you upgrade your Apache Camel application +from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2, then you should follow the guides +from both 4.0 to 4.1 and 4.1 to 4.2. + +== Upgrading Camel 4.5 to 4.6 + +=== variables + +When using `variableReceive` then the variable is only set if processing the `Exchange` was completely successfully. + +For example calling a route that fails due to an exception being thrown (even if `onException` or `errorHandler` are in use) +then the variable is no longer set. Also, if the route is marked for rollback, or to stop continue routing with `.stop()`. + +This is the same logic that the routing engine uses, whether to continue routing the `Exchange` or not. diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc index d82ec3704e4c1..903ec6b126e38 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc @@ -14,4 +14,5 @@ You can find upgrade guide for each release in the following pages: - xref:camel-4x-upgrade-guide-4_3.adoc[Upgrade guide 4.2 -> 4.3] - xref:camel-4x-upgrade-guide-4_4.adoc[Upgrade guide 4.3 -> 4.4] - xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.4 -> 4.5] +- xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.5 -> 4.6]