Skip to content

Commit

Permalink
Var fail (#13597)
Browse files Browse the repository at this point in the history
CAMEL-20607: camel-core - Using variableReceive should only set result if exchange was process succesfully
  • Loading branch information
davsclaus committed Mar 23, 2024
1 parent eb36ce0 commit 41adc3a
Show file tree
Hide file tree
Showing 11 changed files with 1,075 additions and 21 deletions.
Expand Up @@ -222,12 +222,12 @@ public void done(boolean doneSync) {

Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
if (variableReceive != null) {
if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
// result should be stored in variable instead of message body
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
aggregatedExchange.getMessage());
aggregatedExchange.getMessage().setBody(originalBody);
aggregatedExchange.getMessage().setHeaders(originalHeaders);
}
// copy aggregation result onto original exchange (preserving pattern)
copyResultsWithoutCorrelationId(exchange, aggregatedExchange);
Expand Down
Expand Up @@ -350,11 +350,12 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
// must catch any exception from aggregation
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
if (variableReceive != null) {
if (ExchangeHelper.shouldSetVariableResult(aggregatedExchange, variableReceive)) {
// result should be stored in variable instead of message body
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
ExchangeHelper.setVariableFromMessageBodyAndHeaders(aggregatedExchange, variableReceive,
aggregatedExchange.getMessage());
aggregatedExchange.getMessage().setBody(originalBody);
aggregatedExchange.getMessage().setHeaders(originalHeaders);
}
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
Expand Down
Expand Up @@ -237,10 +237,10 @@ public boolean process(Exchange exchange, final AsyncCallback callback) {
ServiceHelper.stopAndShutdownService(endpoint);
}
// result should be stored in variable instead of message body
if (variableReceive != null) {
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) {
ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, target.getMessage());
target.getMessage().setBody(originalBody);
target.getMessage().setHeaders(originalHeaders);
}
// signal we are done
c.done(doneSync);
Expand Down
Expand Up @@ -179,11 +179,11 @@ public boolean process(Exchange exchange, final AsyncCallback callback) {
ac = doneSync -> {
try {
// result should be stored in variable instead of message body/headers
if (variableReceive != null) {
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) {
ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive,
target.getMessage());
target.getMessage().setBody(originalBody);
target.getMessage().setHeaders(originalHeaders);
}
// restore previous MEP
target.setPattern(existingPattern);
Expand All @@ -202,8 +202,6 @@ public boolean process(Exchange exchange, final AsyncCallback callback) {
if (variableSend != null) {
Object value = ExchangeHelper.getVariable(exchange, variableSend);
exchange.getMessage().setBody(value);
// TODO: empty headers or

}

LOG.debug(">>>> {} {}", destination, exchange);
Expand Down Expand Up @@ -240,7 +238,7 @@ public boolean process(Exchange exchange, final AsyncCallback callback) {
// restore previous MEP
exchange.setPattern(existingPattern);
// result should be stored in variable instead of message body/headers
if (variableReceive != null) {
if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) {
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
exchange.getMessage());
exchange.getMessage().setBody(originalBody);
Expand Down
@@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class EnrichVariableErrorTest extends ContextTestSupport {

@Override
public boolean isUseRouteBuilder() {
return false;
}

@Test
public void testThrowException() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.throwException(new IllegalArgumentException("Forced"));
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertTrue(out.isFailed());
Assertions.assertFalse(out.hasVariables());
// TODO: should this be World or Bye World?
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testTryCatch() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.doTry()
.throwException(new IllegalArgumentException("Forced"))
.doCatch(Exception.class)
.setBody(simple("Catch: ${body}"))
.end();
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(1);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertTrue(out.hasVariables());
Assertions.assertEquals("World", out.getMessage().getBody());
Assertions.assertEquals("Catch: Bye World", out.getVariable("bye"));
assertMockEndpointsSatisfied();
}

@Test
public void testOnExceptionHandled() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class)
.handled(true)
.setBody(simple("Error: ${body}"));

from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.throwException(new IllegalArgumentException("Forced"));
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testOnExceptionNotHandled() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class)
.handled(false)
.setBody(simple("Error: ${body}"));

from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.throwException(new IllegalArgumentException("Forced"));
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertTrue(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Error: Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testDeadLetterChannel() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("mock:dead"));

from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.throwException(new IllegalArgumentException("Forced"));
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
getMockEndpoint("mock:dead").expectedMessageCount(1);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testDefaultErrorHandler() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
errorHandler(defaultErrorHandler());

from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.throwException(new IllegalArgumentException("Forced"));
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertTrue(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testStop() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.stop();
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testRollback() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.rollback();
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertTrue(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testMarkRollbackLast() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.markRollbackOnly();
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

@Test
public void testMarkRollbackOnlyLast() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:receive")
.enrich().constant("direct:foo").variableReceive("bye")
.to("mock:result");

from("direct:foo")
.transform().simple("Bye ${body}")
.markRollbackOnlyLast();
}
});
context.start();

getMockEndpoint("mock:result").expectedMessageCount(0);
Exchange out = template.request("direct:receive", e -> e.getMessage().setBody("World"));
Assertions.assertFalse(out.isFailed());
Assertions.assertFalse(out.hasVariables());
Assertions.assertEquals("Bye World", out.getMessage().getBody());
assertMockEndpointsSatisfied();
}

}

0 comments on commit 41adc3a

Please sign in to comment.