Skip to content

Commit

Permalink
feat: update aqmp config (#310)
Browse files Browse the repository at this point in the history
* Updated config file for amqp

* Added publisher service

* Added listeners and publisher code

* Updated AMQP config and added sample config files to test

* Updated amqp config

* Segregated declarables for exchanges and queues

* Updated test snapshots

* fix double dependency

* fix snapshots

---------

Co-authored-by: Semen <tenischev.semen@gmail.com>
  • Loading branch information
VaishnaviNandakumar and Tenischev committed Oct 7, 2023
1 parent f3219d3 commit 6c611bc
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 74 deletions.
78 changes: 26 additions & 52 deletions partials/AmqpConfig.java
Expand Up @@ -6,17 +6,11 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

import javax.annotation.processing.Generated;

Expand All @@ -36,16 +30,22 @@ public class Config {
@Value("${amqp.broker.password}")
private String password;

{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Value("${amqp.exchange.{{- channelName -}}}")

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${amqp.queue.{{- channelName -}}}")
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}

{% if channel.hasPublish() %}
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;
{% endif %}

{% endif %}{% endfor %}
{% endfor %}

@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -56,65 +56,39 @@ public ConnectionFactory connectionFactory() {
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

// consumer

@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}

@Bean
public IntegrationFlow {{channelName | camelCase}}Flow() {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue))
.handle(messageHandlerService::handle{{channelName | upperFirst}})
.get();
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
{% endif %}{% endfor %}

// publisher

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}

@Bean
public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() {
return new DirectChannel();
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
@ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExchangeName({{channelName}}Exchange);
outbound.setRoutingKey("#");
return outbound;
public AmqpTemplate template() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
{% endif %}{% endfor %}
}
{% endmacro %}
43 changes: 43 additions & 0 deletions partials/AmqpPublisher.java
@@ -0,0 +1,43 @@
{% macro amqpPublisher(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}


@Service
public class PublisherService {
@Autowired
private RabbitTemplate template;

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}
{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
public void {{channel.subscribe().id() | camelCase}}(){
{{schemaName}} {{channelName}}Payload = new {{schemaName}}();
template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload);
}

{% endif %}
{% endfor %}

}

{% endmacro %}
Expand Up @@ -21,7 +21,8 @@ public void run(String... args) {
{%- for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}()
{% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %});
{% endfor -%}
{% endif -%}
{%- endfor %}
Expand Down
Expand Up @@ -27,6 +27,16 @@
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
import javax.annotation.processing.Generated;

@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
Expand Down Expand Up @@ -55,9 +65,21 @@ public class MessageHandlerService {
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
@RabbitListener(queues = "${amqp.{{- channelName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){
LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload);
}
{% endif %}
{% endfor %}

{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{% if channel.hasPublish() %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
Expand Down
@@ -1,8 +1,11 @@
package {{ params['userJavaPackage'] }}.service;
{%- from "partials/CommonPublisher.java" import commonPublisher -%}
{%- from "partials/KafkaPublisher.java" import kafkaPublisher -%}
{%- from "partials/AmqpPublisher.java" import amqpPublisher -%}
{%- if asyncapi | isProtocol('kafka') -%}
{{- kafkaPublisher(asyncapi, params) -}}
{%- elif asyncapi | isProtocol('amqp') -%}
{{- amqpPublisher(asyncapi, params) -}}
{%- else -%}
{{- commonPublisher(asyncapi) -}}
{%- endif -%}
22 changes: 8 additions & 14 deletions template/src/main/resources/application.yml
Expand Up @@ -9,27 +9,21 @@
{%- endif -%}
{%- endfor -%}

{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
amqp:
broker: {% for line in server.description() | splitByLines %}
# {{line | safe}}{% endfor %}
host: {{server.url() | replace(':{port}', '') }}
host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}
port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %}
username:
username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %}
password:
exchange:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %}
{{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}}
{% endif %}
{% endfor %}
queue:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() and channel.publish().binding('amqp') %}
{{channelName}}: {{channel.publish().binding('amqp').queue.name}}
{% endif %}
{{channelName}}:
{% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %}
{% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %}
{% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %}
{% endfor %}
{% endif %}
{% endif %}

{% if server.protocol() == 'mqtt' %}
mqtt:
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/kafka.test.js.snap
Expand Up @@ -142,6 +142,7 @@ import org.springframework.messaging.handler.annotation.Payload;
import com.asyncapi.model.LightMeasuredPayload;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
Expand All @@ -161,6 +162,7 @@ public class MessageHandlerService {
}
}
"
`;
Expand Down
14 changes: 8 additions & 6 deletions tests/__snapshots__/mqtt.test.js.snap
Expand Up @@ -213,6 +213,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
Expand All @@ -222,7 +223,7 @@ public class MessageHandlerService {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);
/**
* The topic on which measured values may be produced and consumed.
*/
Expand All @@ -232,11 +233,11 @@ public class MessageHandlerService {
}
}
Expand Down Expand Up @@ -1087,6 +1088,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
Expand All @@ -1096,7 +1098,7 @@ public class MessageHandlerService {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);
/**
* The topic on which measured values may be produced and consumed.
*/
Expand All @@ -1106,7 +1108,7 @@ public class MessageHandlerService {
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/oneOf.test.js.snap
Expand Up @@ -16,6 +16,7 @@ import org.springframework.messaging.handler.annotation.Payload;
import com.asyncapi.model.AnonymousSchema1;
import com.asyncapi.model.AnonymousSchema7;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
Expand All @@ -35,6 +36,7 @@ public class MessageHandlerService {
}
}
"
`;
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/parameters.test.js.snap
Expand Up @@ -149,6 +149,7 @@ import org.springframework.messaging.handler.annotation.Payload;
import com.asyncapi.model.LightMeasuredPayload;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
Expand All @@ -168,6 +169,7 @@ public class MessageHandlerService {
}
}
"
`;
Expand Down

0 comments on commit 6c611bc

Please sign in to comment.