Skip to content

Commit

Permalink
CAMEL-20299: camel-spring-rabbitmq - Add option to auto declare produ… (
Browse files Browse the repository at this point in the history
#13427)

CAMEL-20299: camel-spring-rabbitmq - Add option to auto declare producer.
  • Loading branch information
davsclaus committed Mar 10, 2024
1 parent 4b27439 commit 436e185
Show file tree
Hide file tree
Showing 13 changed files with 614 additions and 179 deletions.

Large diffs are not rendered by default.

Expand Up @@ -28,6 +28,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "args": target.setArgs(property(camelContext, java.util.Map.class, value)); return true;
case "autodeclare":
case "autoDeclare": target.setAutoDeclare(property(camelContext, boolean.class, value)); return true;
case "autodeclareproducer":
case "autoDeclareProducer": target.setAutoDeclareProducer(property(camelContext, boolean.class, value)); return true;
case "autostartup":
case "autoStartup": target.setAutoStartup(property(camelContext, boolean.class, value)); return true;
case "autowiredenabled":
Expand Down Expand Up @@ -98,6 +100,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "args": return java.util.Map.class;
case "autodeclare":
case "autoDeclare": return boolean.class;
case "autodeclareproducer":
case "autoDeclareProducer": return boolean.class;
case "autostartup":
case "autoStartup": return boolean.class;
case "autowiredenabled":
Expand Down Expand Up @@ -164,6 +168,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "args": return target.getArgs();
case "autodeclare":
case "autoDeclare": return target.isAutoDeclare();
case "autodeclareproducer":
case "autoDeclareProducer": return target.isAutoDeclareProducer();
case "autostartup":
case "autoStartup": return target.isAutoStartup();
case "autowiredenabled":
Expand Down
Expand Up @@ -30,6 +30,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "asyncConsumer": target.setAsyncConsumer(property(camelContext, boolean.class, value)); return true;
case "autodeclare":
case "autoDeclare": target.setAutoDeclare(property(camelContext, boolean.class, value)); return true;
case "autodeclareproducer":
case "autoDeclareProducer": target.setAutoDeclareProducer(property(camelContext, boolean.class, value)); return true;
case "autostartup":
case "autoStartup": target.setAutoStartup(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
Expand Down Expand Up @@ -105,6 +107,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "asyncConsumer": return boolean.class;
case "autodeclare":
case "autoDeclare": return boolean.class;
case "autodeclareproducer":
case "autoDeclareProducer": return boolean.class;
case "autostartup":
case "autoStartup": return boolean.class;
case "bridgeerrorhandler":
Expand Down Expand Up @@ -181,6 +185,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "asyncConsumer": return target.isAsyncConsumer();
case "autodeclare":
case "autoDeclare": return target.isAutoDeclare();
case "autodeclareproducer":
case "autoDeclareProducer": return target.isAutoDeclareProducer();
case "autostartup":
case "autoStartup": return target.isAutoStartup();
case "bridgeerrorhandler":
Expand Down
Expand Up @@ -21,12 +21,13 @@ public class SpringRabbitMQEndpointUriFactory extends org.apache.camel.support.c
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(38);
Set<String> props = new HashSet<>(39);
props.add("acknowledgeMode");
props.add("allowNullBody");
props.add("args");
props.add("asyncConsumer");
props.add("autoDeclare");
props.add("autoDeclareProducer");
props.add("autoStartup");
props.add("bridgeErrorHandler");
props.add("concurrentConsumers");
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -53,10 +53,14 @@ public class SpringRabbitMQComponent extends HeaderFilterStrategyComponent {
@Metadata(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer container should auto-startup.")
private boolean autoStartup = true;
@Metadata(label = "consumer", defaultValue = "false",
@Metadata(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer should auto declare binding between exchange, queue and routing key when starting."
+ " Enabling this can be good for development to make it easy to standup exchanges, queues and bindings on the broker.")
private boolean autoDeclare;
private boolean autoDeclare = true;
@Metadata(label = "producer", defaultValue = "false",
description = "Specifies whether the producer should auto declare binding between exchange, queue and routing key when starting."
+ " Enabling this can be good for development to make it easy to standup exchanges, queues and bindings on the broker.")
private boolean autoDeclareProducer;
@Metadata(label = "advanced",
description = "To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message.")
private MessageConverter messageConverter;
Expand Down Expand Up @@ -144,6 +148,7 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
endpoint.setMessagePropertiesConverter(messagePropertiesConverter);
endpoint.setAutoStartup(autoStartup);
endpoint.setAutoDeclare(autoDeclare);
endpoint.setAutoDeclareProducer(autoDeclareProducer);
endpoint.setDeadLetterExchange(deadLetterExchange);
endpoint.setDeadLetterExchangeType(deadLetterExchangeType);
endpoint.setDeadLetterQueue(deadLetterQueue);
Expand Down Expand Up @@ -228,6 +233,14 @@ public void setAutoDeclare(boolean autoDeclare) {
this.autoDeclare = autoDeclare;
}

public boolean isAutoDeclareProducer() {
return autoDeclareProducer;
}

public void setAutoDeclareProducer(boolean autoDeclareProducer) {
this.autoDeclareProducer = autoDeclareProducer;
}

public String getDeadLetterExchange() {
return deadLetterExchange;
}
Expand Down
Expand Up @@ -84,9 +84,9 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
@UriParam(label = "common",
description = "The connection factory to be use. A connection factory must be configured either on the component or endpoint.")
private ConnectionFactory connectionFactory;
@UriParam(label = "consumer",
description = "The queue(s) to use for consuming messages. Multiple queue names can be separated by comma."
+ " If none has been configured then Camel will generate an unique id as the queue name for the consumer.")
@UriParam(label = "common",
description = "The queue(s) to use for consuming or producing messages. Multiple queue names can be separated by comma."
+ " If none has been configured then Camel will generate an unique id as the queue name.")
private String queues;
@UriParam(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer container should auto-startup.")
Expand Down Expand Up @@ -130,13 +130,13 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
private boolean exclusive;
@UriParam(label = "consumer", description = "Set to true for an no-local consumer")
private boolean noLocal;
@UriParam(label = "consumer", description = "The name of the dead letter exchange")
@UriParam(label = "common", description = "The name of the dead letter exchange")
private String deadLetterExchange;
@UriParam(label = "consumer", description = "The name of the dead letter queue")
@UriParam(label = "common", description = "The name of the dead letter queue")
private String deadLetterQueue;
@UriParam(label = "consumer", description = "The routing key for the dead letter exchange")
@UriParam(label = "common", description = "The routing key for the dead letter exchange")
private String deadLetterRoutingKey;
@UriParam(label = "consumer", defaultValue = "direct", enums = "direct,fanout,headers,topic",
@UriParam(label = "common", defaultValue = "direct", enums = "direct,fanout,headers,topic",
description = "The type of the dead letter exchange")
private String deadLetterExchangeType = "direct";
@UriParam(label = "common",
Expand All @@ -146,6 +146,9 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
+ " handles the reply message. You can also use this option if you want to use Camel as a proxy between different"
+ " message brokers and you want to route message from one system to another.")
private boolean disableReplyTo;
@UriParam(label = "producer",
description = "Specifies whether the producer should auto declare binding between exchange, queue and routing key when starting.")
private boolean autoDeclareProducer;
@UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "30000",
description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply (InOut) messaging."
+ " The default value is 30 seconds. A negative value indicates an indefinite timeout (Beware that this will cause a memory leak if a reply is not received).")
Expand Down Expand Up @@ -244,6 +247,14 @@ public void setAutoDeclare(boolean autoDeclare) {
this.autoDeclare = autoDeclare;
}

public boolean isAutoDeclareProducer() {
return autoDeclareProducer;
}

public void setAutoDeclareProducer(boolean autoDeclareProducer) {
this.autoDeclareProducer = autoDeclareProducer;
}

public boolean isAsyncConsumer() {
return asyncConsumer;
}
Expand Down Expand Up @@ -610,10 +621,13 @@ protected String parseArgsString(Map<String, Object> args, String key, String de
}

public void declareElements(AbstractMessageListenerContainer container) {
AmqpAdmin admin = null;
if (container instanceof MessageListenerContainer) {
admin = ((MessageListenerContainer) container).getAmqpAdmin();
AmqpAdmin admin = ((MessageListenerContainer) container).getAmqpAdmin();
declareElements(container, admin);
}
}

public void declareElements(AbstractMessageListenerContainer container, AmqpAdmin admin) {
if (admin != null && autoDeclare) {
// bind dead letter exchange
if (deadLetterExchange != null) {
Expand Down Expand Up @@ -711,7 +725,7 @@ public void declareElements(AbstractMessageListenerContainer container) {
String qn = admin.declareQueue(rabbitQueue);

// if we auto created a new unique queue then the container needs to know the queue name
if (generateUniqueQueue) {
if (generateUniqueQueue && container != null) {
container.setQueueNames(qn);
}

Expand Down
Expand Up @@ -27,12 +27,14 @@
import org.apache.camel.support.DefaultAsyncProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class SpringRabbitMQProducer extends DefaultAsyncProducer {
Expand Down Expand Up @@ -80,6 +82,16 @@ protected void doStart() throws Exception {
if (getEndpoint().isTestConnectionOnStartup()) {
testConnectionOnStartup();
}
if (getEndpoint().isAutoDeclareProducer()) {
// auto declare but without spring
AmqpAdmin admin = getEndpoint().getComponent().getAmqpAdmin();
if (admin == null) {
RabbitAdmin ra = new RabbitAdmin(getEndpoint().getConnectionFactory());
ra.setIgnoreDeclarationExceptions(getEndpoint().getComponent().isIgnoreDeclarationExceptions());
admin = ra;
}
getEndpoint().declareElements(null, admin);
}
}

@Override
Expand Down
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.springrabbit.integration;

import java.nio.charset.Charset;
import java.util.Map;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class RabbitMQProducerAutoDeclareIT extends RabbitMQITSupport {

@Test
public void testProducer() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBody("direct:start", "Hello World");

AmqpTemplate template = new RabbitTemplate(cf);
String out = (String) template.receiveAndConvert("myqueue");
Assertions.assertEquals("Hello World", out);
}

@Test
public void testProducerWithHeader() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBodyAndHeader("direct:start", "Hello World", "cheese", "gouda");

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");

byte[] body = out.getBody();
Assertions.assertNotNull(body, "The body should not be null");
Assertions.assertEquals("Hello World", new String(body));
Assertions.assertEquals("gouda", out.getMessageProperties().getHeader("cheese"));
}

@Test
public void testProducerWithMessage() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message body = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();

template.sendBody("direct:start", body);

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");
Assertions.assertEquals("foo", new String(out.getBody()));
Assertions.assertEquals("baz", out.getMessageProperties().getHeader("bar"));
}

@Test
public void testProducerWithMessageProperties() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBodyAndHeaders("direct:start", "<price>123</price>",
Map.of(SpringRabbitMQConstants.DELIVERY_MODE, MessageDeliveryMode.PERSISTENT,
SpringRabbitMQConstants.TYPE, "price",
SpringRabbitMQConstants.CONTENT_TYPE, "application/xml",
SpringRabbitMQConstants.MESSAGE_ID, "0fe9c142-f9c1-426f-9237-f5a4c988a8ae",
SpringRabbitMQConstants.PRIORITY, 1));

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");

final MessageProperties messageProperties = out.getMessageProperties();
Assertions.assertNotNull(messageProperties, "The message properties should not be null");
String encoding = messageProperties.getContentEncoding();
Assertions.assertEquals(Charset.defaultCharset().name(), encoding);
Assertions.assertEquals("<price>123</price>", new String(out.getBody(), encoding));
Assertions.assertEquals(MessageDeliveryMode.PERSISTENT, messageProperties.getReceivedDeliveryMode());
Assertions.assertEquals("price", messageProperties.getType());
Assertions.assertEquals("application/xml", messageProperties.getContentType());
Assertions.assertEquals("0fe9c142-f9c1-426f-9237-f5a4c988a8ae", messageProperties.getMessageId());
Assertions.assertEquals(1, messageProperties.getPriority());
Assertions.assertEquals(0, messageProperties.getHeaders().size());
}

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("spring-rabbitmq:foo?autoDeclareProducer=true&routingKey=foo.bar.#&queues=myqueue&exchangeType=topic");
}
};
}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.springrabbit.integration;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Test;

public class RabbitMQProducerSimpleIT extends RabbitMQITSupport {

@Test
public void testProducer() throws Exception {
template.sendBody("direct:start", "Hello World");
}

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("spring-rabbitmq:simple");
}
};
}
}

0 comments on commit 436e185

Please sign in to comment.