Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-20299: camel-spring-rabbitmq - Add option to auto declare produ… #13427

Merged
merged 2 commits into from Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

Expand Up @@ -28,6 +28,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "args": target.setArgs(property(camelContext, java.util.Map.class, value)); return true;
case "autodeclare":
case "autoDeclare": target.setAutoDeclare(property(camelContext, boolean.class, value)); return true;
case "autodeclareproducer":
case "autoDeclareProducer": target.setAutoDeclareProducer(property(camelContext, boolean.class, value)); return true;
case "autostartup":
case "autoStartup": target.setAutoStartup(property(camelContext, boolean.class, value)); return true;
case "autowiredenabled":
Expand Down Expand Up @@ -98,6 +100,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "args": return java.util.Map.class;
case "autodeclare":
case "autoDeclare": return boolean.class;
case "autodeclareproducer":
case "autoDeclareProducer": return boolean.class;
case "autostartup":
case "autoStartup": return boolean.class;
case "autowiredenabled":
Expand Down Expand Up @@ -164,6 +168,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "args": return target.getArgs();
case "autodeclare":
case "autoDeclare": return target.isAutoDeclare();
case "autodeclareproducer":
case "autoDeclareProducer": return target.isAutoDeclareProducer();
case "autostartup":
case "autoStartup": return target.isAutoStartup();
case "autowiredenabled":
Expand Down
Expand Up @@ -30,6 +30,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "asyncConsumer": target.setAsyncConsumer(property(camelContext, boolean.class, value)); return true;
case "autodeclare":
case "autoDeclare": target.setAutoDeclare(property(camelContext, boolean.class, value)); return true;
case "autodeclareproducer":
case "autoDeclareProducer": target.setAutoDeclareProducer(property(camelContext, boolean.class, value)); return true;
case "autostartup":
case "autoStartup": target.setAutoStartup(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
Expand Down Expand Up @@ -105,6 +107,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "asyncConsumer": return boolean.class;
case "autodeclare":
case "autoDeclare": return boolean.class;
case "autodeclareproducer":
case "autoDeclareProducer": return boolean.class;
case "autostartup":
case "autoStartup": return boolean.class;
case "bridgeerrorhandler":
Expand Down Expand Up @@ -181,6 +185,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "asyncConsumer": return target.isAsyncConsumer();
case "autodeclare":
case "autoDeclare": return target.isAutoDeclare();
case "autodeclareproducer":
case "autoDeclareProducer": return target.isAutoDeclareProducer();
case "autostartup":
case "autoStartup": return target.isAutoStartup();
case "bridgeerrorhandler":
Expand Down
Expand Up @@ -21,12 +21,13 @@ public class SpringRabbitMQEndpointUriFactory extends org.apache.camel.support.c
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(38);
Set<String> props = new HashSet<>(39);
props.add("acknowledgeMode");
props.add("allowNullBody");
props.add("args");
props.add("asyncConsumer");
props.add("autoDeclare");
props.add("autoDeclareProducer");
props.add("autoStartup");
props.add("bridgeErrorHandler");
props.add("concurrentConsumers");
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -53,10 +53,14 @@ public class SpringRabbitMQComponent extends HeaderFilterStrategyComponent {
@Metadata(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer container should auto-startup.")
private boolean autoStartup = true;
@Metadata(label = "consumer", defaultValue = "false",
@Metadata(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer should auto declare binding between exchange, queue and routing key when starting."
+ " Enabling this can be good for development to make it easy to standup exchanges, queues and bindings on the broker.")
private boolean autoDeclare;
private boolean autoDeclare = true;
@Metadata(label = "producer", defaultValue = "false",
description = "Specifies whether the producer should auto declare binding between exchange, queue and routing key when starting."
+ " Enabling this can be good for development to make it easy to standup exchanges, queues and bindings on the broker.")
private boolean autoDeclareProducer;
@Metadata(label = "advanced",
description = "To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message.")
private MessageConverter messageConverter;
Expand Down Expand Up @@ -144,6 +148,7 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
endpoint.setMessagePropertiesConverter(messagePropertiesConverter);
endpoint.setAutoStartup(autoStartup);
endpoint.setAutoDeclare(autoDeclare);
endpoint.setAutoDeclareProducer(autoDeclareProducer);
endpoint.setDeadLetterExchange(deadLetterExchange);
endpoint.setDeadLetterExchangeType(deadLetterExchangeType);
endpoint.setDeadLetterQueue(deadLetterQueue);
Expand Down Expand Up @@ -228,6 +233,14 @@ public void setAutoDeclare(boolean autoDeclare) {
this.autoDeclare = autoDeclare;
}

public boolean isAutoDeclareProducer() {
return autoDeclareProducer;
}

public void setAutoDeclareProducer(boolean autoDeclareProducer) {
this.autoDeclareProducer = autoDeclareProducer;
}

public String getDeadLetterExchange() {
return deadLetterExchange;
}
Expand Down
Expand Up @@ -84,9 +84,9 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
@UriParam(label = "common",
description = "The connection factory to be use. A connection factory must be configured either on the component or endpoint.")
private ConnectionFactory connectionFactory;
@UriParam(label = "consumer",
description = "The queue(s) to use for consuming messages. Multiple queue names can be separated by comma."
+ " If none has been configured then Camel will generate an unique id as the queue name for the consumer.")
@UriParam(label = "common",
description = "The queue(s) to use for consuming or producing messages. Multiple queue names can be separated by comma."
+ " If none has been configured then Camel will generate an unique id as the queue name.")
private String queues;
@UriParam(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer container should auto-startup.")
Expand Down Expand Up @@ -130,13 +130,13 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
private boolean exclusive;
@UriParam(label = "consumer", description = "Set to true for an no-local consumer")
private boolean noLocal;
@UriParam(label = "consumer", description = "The name of the dead letter exchange")
@UriParam(label = "common", description = "The name of the dead letter exchange")
private String deadLetterExchange;
@UriParam(label = "consumer", description = "The name of the dead letter queue")
@UriParam(label = "common", description = "The name of the dead letter queue")
private String deadLetterQueue;
@UriParam(label = "consumer", description = "The routing key for the dead letter exchange")
@UriParam(label = "common", description = "The routing key for the dead letter exchange")
private String deadLetterRoutingKey;
@UriParam(label = "consumer", defaultValue = "direct", enums = "direct,fanout,headers,topic",
@UriParam(label = "common", defaultValue = "direct", enums = "direct,fanout,headers,topic",
description = "The type of the dead letter exchange")
private String deadLetterExchangeType = "direct";
@UriParam(label = "common",
Expand All @@ -146,6 +146,9 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp
+ " handles the reply message. You can also use this option if you want to use Camel as a proxy between different"
+ " message brokers and you want to route message from one system to another.")
private boolean disableReplyTo;
@UriParam(label = "producer",
description = "Specifies whether the producer should auto declare binding between exchange, queue and routing key when starting.")
private boolean autoDeclareProducer;
@UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "30000",
description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply (InOut) messaging."
+ " The default value is 30 seconds. A negative value indicates an indefinite timeout (Beware that this will cause a memory leak if a reply is not received).")
Expand Down Expand Up @@ -244,6 +247,14 @@ public void setAutoDeclare(boolean autoDeclare) {
this.autoDeclare = autoDeclare;
}

public boolean isAutoDeclareProducer() {
return autoDeclareProducer;
}

public void setAutoDeclareProducer(boolean autoDeclareProducer) {
this.autoDeclareProducer = autoDeclareProducer;
}

public boolean isAsyncConsumer() {
return asyncConsumer;
}
Expand Down Expand Up @@ -610,10 +621,13 @@ protected String parseArgsString(Map<String, Object> args, String key, String de
}

public void declareElements(AbstractMessageListenerContainer container) {
AmqpAdmin admin = null;
if (container instanceof MessageListenerContainer) {
admin = ((MessageListenerContainer) container).getAmqpAdmin();
AmqpAdmin admin = ((MessageListenerContainer) container).getAmqpAdmin();
declareElements(container, admin);
}
}

public void declareElements(AbstractMessageListenerContainer container, AmqpAdmin admin) {
if (admin != null && autoDeclare) {
// bind dead letter exchange
if (deadLetterExchange != null) {
Expand Down Expand Up @@ -711,7 +725,7 @@ public void declareElements(AbstractMessageListenerContainer container) {
String qn = admin.declareQueue(rabbitQueue);

// if we auto created a new unique queue then the container needs to know the queue name
if (generateUniqueQueue) {
if (generateUniqueQueue && container != null) {
container.setQueueNames(qn);
}

Expand Down
Expand Up @@ -27,12 +27,14 @@
import org.apache.camel.support.DefaultAsyncProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class SpringRabbitMQProducer extends DefaultAsyncProducer {
Expand Down Expand Up @@ -80,6 +82,16 @@ protected void doStart() throws Exception {
if (getEndpoint().isTestConnectionOnStartup()) {
testConnectionOnStartup();
}
if (getEndpoint().isAutoDeclareProducer()) {
// auto declare but without spring
AmqpAdmin admin = getEndpoint().getComponent().getAmqpAdmin();
if (admin == null) {
RabbitAdmin ra = new RabbitAdmin(getEndpoint().getConnectionFactory());
ra.setIgnoreDeclarationExceptions(getEndpoint().getComponent().isIgnoreDeclarationExceptions());
admin = ra;
}
getEndpoint().declareElements(null, admin);
}
}

@Override
Expand Down
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.springrabbit.integration;

import java.nio.charset.Charset;
import java.util.Map;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class RabbitMQProducerAutoDeclareIT extends RabbitMQITSupport {

@Test
public void testProducer() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBody("direct:start", "Hello World");

AmqpTemplate template = new RabbitTemplate(cf);
String out = (String) template.receiveAndConvert("myqueue");
Assertions.assertEquals("Hello World", out);
}

@Test
public void testProducerWithHeader() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBodyAndHeader("direct:start", "Hello World", "cheese", "gouda");

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");

byte[] body = out.getBody();
Assertions.assertNotNull(body, "The body should not be null");
Assertions.assertEquals("Hello World", new String(body));
Assertions.assertEquals("gouda", out.getMessageProperties().getHeader("cheese"));
}

@Test
public void testProducerWithMessage() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message body = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();

template.sendBody("direct:start", body);

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");
Assertions.assertEquals("foo", new String(out.getBody()));
Assertions.assertEquals("baz", out.getMessageProperties().getHeader("bar"));
}

@Test
public void testProducerWithMessageProperties() throws Exception {
ConnectionFactory cf = context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);

template.sendBodyAndHeaders("direct:start", "<price>123</price>",
Map.of(SpringRabbitMQConstants.DELIVERY_MODE, MessageDeliveryMode.PERSISTENT,
SpringRabbitMQConstants.TYPE, "price",
SpringRabbitMQConstants.CONTENT_TYPE, "application/xml",
SpringRabbitMQConstants.MESSAGE_ID, "0fe9c142-f9c1-426f-9237-f5a4c988a8ae",
SpringRabbitMQConstants.PRIORITY, 1));

AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");

final MessageProperties messageProperties = out.getMessageProperties();
Assertions.assertNotNull(messageProperties, "The message properties should not be null");
String encoding = messageProperties.getContentEncoding();
Assertions.assertEquals(Charset.defaultCharset().name(), encoding);
Assertions.assertEquals("<price>123</price>", new String(out.getBody(), encoding));
Assertions.assertEquals(MessageDeliveryMode.PERSISTENT, messageProperties.getReceivedDeliveryMode());
Assertions.assertEquals("price", messageProperties.getType());
Assertions.assertEquals("application/xml", messageProperties.getContentType());
Assertions.assertEquals("0fe9c142-f9c1-426f-9237-f5a4c988a8ae", messageProperties.getMessageId());
Assertions.assertEquals(1, messageProperties.getPriority());
Assertions.assertEquals(0, messageProperties.getHeaders().size());
}

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("spring-rabbitmq:foo?autoDeclareProducer=true&routingKey=foo.bar.#&queues=myqueue&exchangeType=topic");
}
};
}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.springrabbit.integration;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Test;

public class RabbitMQProducerSimpleIT extends RabbitMQITSupport {

@Test
public void testProducer() throws Exception {
template.sendBody("direct:start", "Hello World");
}

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("spring-rabbitmq:simple");
}
};
}
}