From b1e3eb298ad13ad045cdd4ed8350c068103019d5 Mon Sep 17 00:00:00 2001 From: Semen Date: Wed, 13 Dec 2023 12:07:05 +0200 Subject: [PATCH] feat: add support for channel parameters (#352) * add pattern support for kafka * add pattern support for mqtt message handling * use mqtt specific wildcard * mqtt publish implementation * adopt interface for other protocols * update tests * general amqp routing key support * fix current amqp methods * provide arbitrary methods for amqp * provide arbitrary methods for mqtt * update tests * provide arbitrary methods for kafka in message handler * provide arbitrary methods for kafka in publisher * update tests * update readme, resolve missing features, add attention bar --- README.md | 14 +- filters/all.js | 41 ++- partials/AmqpConfig.java | 60 ++-- partials/AmqpPublisher.java | 38 +- partials/AmqpPublisherImpl.java | 81 +++++ partials/CommonPublisher.java | 32 +- partials/CommonPublisherImpl.java | 85 +++++ partials/KafkaConfig.java | 26 +- partials/KafkaPublisher.java | 20 +- partials/KafkaPublisherImpl.java | 69 ++++ partials/MqttConfig.java | 5 +- .../com/asyncapi/model/$$parameter$$.java | 194 ++++++++++ .../service/CommandLinePublisher.java | 9 +- .../service/MessageHandlerService.java | 207 ++++++++--- .../asyncapi/service/PublisherService.java | 2 +- .../service/PublisherServiceImpl.java | 11 + template/src/main/resources/application.yml | 16 +- .../java/com/asyncapi/SimpleKafkaTest.java | 4 +- .../com/asyncapi/TestcontainerKafkaTest.java | 4 +- .../com/asyncapi/TestcontainerMqttTest.java | 2 +- tests/__snapshots__/kafka.test.js.snap | 271 ++++++++++++-- tests/__snapshots__/mqtt.test.js.snap | 335 +++++++++++++++--- tests/__snapshots__/oneOf.test.js.snap | 17 +- tests/__snapshots__/parameters.test.js.snap | 17 +- tests/kafka.test.js | 29 +- tests/mocks/amqp.yml | 120 +++++++ tests/mocks/kafka-with-parameters.yml | 66 ++++ tests/mocks/mqtt.yml | 14 +- tests/mqtt.test.js | 6 +- 29 files changed, 1538 insertions(+), 257 deletions(-) create mode 100644 partials/AmqpPublisherImpl.java create mode 100644 partials/CommonPublisherImpl.java create mode 100644 partials/KafkaPublisherImpl.java create mode 100644 template/src/main/java/com/asyncapi/model/$$parameter$$.java create mode 100644 template/src/main/java/com/asyncapi/service/PublisherServiceImpl.java create mode 100644 tests/mocks/amqp.yml create mode 100644 tests/mocks/kafka-with-parameters.yml diff --git a/README.md b/README.md index a6c40050c..b10923335 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,12 @@ Java Spring template for the [AsyncAPI Generator](https://github.com/asyncapi/ge +---- +## Attention, AsyncAPI v3 is not currently supported by this template + + +---- + ## Usage Install AsyncAPI CLI, for details follow the [guide](https://www.asyncapi.com/tools/cli). @@ -140,15 +146,13 @@ Preferably generated tests should pass. See the list of features that are still missing in the component: - [ ] support of Kafka is done based on clear "spring-kafka" library without integration like for mqtt or amqp -- [ ] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java) +- [x] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java) - [ ] tests for protocol `amqp` are not provided -- [x] add annotation to the [model generation](template/src/main/java/com/asyncapi/model). Consider "@Valid", "@JsonProperty", "@Size", "@NotNull" e.t.c. -- [ ] [`parameters`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported +- [x] [`parameters`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported - [ ] [`server variables`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#serverVariableObject) are not entirely supported - [ ] [`security schemas`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#securitySchemeObject) are not supported -- [ ] [`traits`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#operationTraitObject) are not supported +- [x] [`traits`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#operationTraitObject) are not supported - [ ] Json serializer/deserializer is used always, without taking into account real [`content type`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#default-content-type) -- [x] client side generation mode (in general just flip subscribe and publish channels) - [ ] template generation of docker-compose depending on protocol of server, now the rabbitmq is hardcoded If you want to help us develop them, feel free to contribute. diff --git a/filters/all.js b/filters/all.js index 316824cff..ba06ce182 100644 --- a/filters/all.js +++ b/filters/all.js @@ -190,4 +190,43 @@ function addBackSlashToPattern(val) { } filter.addBackSlashToPattern = addBackSlashToPattern; -filter.currentTime = () => (new Date(Date.now())).toISOString(); \ No newline at end of file +filter.currentTime = () => (new Date(Date.now())).toISOString(); + +function replaceAll(originalStr, replacePattern, replaceString) { + return originalStr.replaceAll(replacePattern, replaceString) +} +filter.replaceAll = replaceAll; + +function toTopicString(channelName, hasParameters, parameters, convertDots, replaceParameterValue, replaceDots = "\\\\.") { + if (hasParameters) { + let topicName = channelName + if (convertDots) { + topicName = replaceAll(topicName, ".", replaceDots) + } + Object.keys(parameters).forEach(value => topicName = topicName.replace("{" + value + "}", replaceParameterValue)) + return topicName + } else { + return channelName + } +} + +function toKafkaTopicString(channelName, hasParameters, parameters) { + return toTopicString(channelName, hasParameters, parameters, true, ".*") +} +filter.toKafkaTopicString = toKafkaTopicString + +function toMqttTopicString(channelName, hasParameters, parameters) { + return toTopicString(channelName, hasParameters, parameters, false, "+") +} + +filter.toMqttTopicString = toMqttTopicString + +function toAmqpNeutral(channelName, hasParameters, parameters) { + return toTopicString(_.camelCase(channelName), hasParameters, parameters, true, "", "") +} +filter.toAmqpNeutral = toAmqpNeutral + +function toAmqpKey(channelName, hasParameters, parameters) { + return toTopicString(channelName, hasParameters, parameters, true, "*") +} +filter.toAmqpKey = toAmqpKey \ No newline at end of file diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 7b18ccfc6..d9e1956cb 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -32,20 +32,44 @@ public class Config { {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() %} - @Value("${amqp.{{- channelName -}}.exchange}") - private String {{channelName}}Exchange; + {% set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %} + {% if channel.binding('amqp') and channel.binding('amqp').exchange %} + @Value("${amqp.{{- varName -}}.exchange}") + private String {{varName}}Exchange; + {% endif %} + + @Value("${amqp.{{- varName -}}.routingKey}") + private String {{varName}}RoutingKey; - @Value("${amqp.{{- channelName -}}.routingKey}") - private String {{channelName}}RoutingKey; + {% if channel.binding('amqp') and channel.binding('amqp').queue %} + @Value("${amqp.{{- varName -}}.queue}") + private String {{varName}}Queue; {% endif %} - {% if channel.hasPublish() %} - @Value("${amqp.{{- channelName -}}.queue}") - private String {{channelName}}Queue; + {% set name = varName | camelCase %} + {% if channel.binding('amqp') and channel.binding('amqp').exchange %} + {% if channel.binding('amqp').exchange.type and channel.binding('amqp').exchange.type !== 'default' %}{% set type = channel.binding('amqp').exchange.type | camelCase %}{% else %}{% set type = 'Topic' %}{% endif %} + {% set type = type + 'Exchange' %} + @Bean + public {{type}} {{name}}Exchange() { + return new {{type}}({{varName}}Exchange, {% if channel.binding('amqp').exchange.durable %}{{channel.binding('amqp').exchange.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').exchange.exclusive %}{{channel.binding('amqp').exchange.exclusive}}{% else %}false{% endif%}); + } + + {% if channel.binding('amqp') and channel.binding('amqp').queue %} + @Bean + public Binding binding{{name | upperFirst}}({{type}} {{name}}Exchange, Queue {{name}}Queue) { + return BindingBuilder.bind({{name}}Queue).to({{name}}Exchange){% if channel.binding('amqp').exchange.type !== 'fanout' %}.with({{varName}}RoutingKey){% endif %}; + } + {% endif %}{% endif %} + + {% if channel.binding('amqp') and channel.binding('amqp').queue %} + @Bean + public Queue {{name}}Queue() { + return new Queue({{varName}}Queue, {% if channel.binding('amqp').queue.durable %}{{channel.binding('amqp').queue.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').queue.exclusive %}{{channel.binding('amqp').queue.exclusive}}{% else %}false{% endif%}, {% if channel.binding('amqp').queue.autoDelete %}{{channel.binding('amqp').queue.autoDelete}}{% else %}false{% endif%}); + } {% endif %} + {%- endfor %} - {% endfor %} @Bean public ConnectionFactory connectionFactory() { @@ -56,24 +80,6 @@ public ConnectionFactory connectionFactory() { return connectionFactory; } - @Bean - public Declarables exchanges() { - return new Declarables( - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} - ); - } - - @Bean - public Declarables queues() { - return new Declarables( - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} - ); - - @Bean public MessageConverter converter() { return new Jackson2JsonMessageConverter(); diff --git a/partials/AmqpPublisher.java b/partials/AmqpPublisher.java index 411ce9735..a4d08e353 100644 --- a/partials/AmqpPublisher.java +++ b/partials/AmqpPublisher.java @@ -1,9 +1,5 @@ {% macro amqpPublisher(asyncapi, params) %} -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; {% for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} {%- for message in channel.subscribe().messages() %} @@ -11,30 +7,26 @@ {%- endfor -%} {% endif -%} {% endfor %} +import javax.annotation.processing.Generated; - -@Service -public class PublisherService { - @Autowired - private RabbitTemplate template; +@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") +public interface PublisherService { {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} - @Value("${amqp.{{- channelName -}}.exchange}") - private String {{channelName}}Exchange; - @Value("${amqp.{{- channelName -}}.routingKey}") - private String {{channelName}}RoutingKey; + {%- if channel.subscribe().hasMultipleMessages() %} + {%- set varName = "object" %} + {%- else %} + {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} + {%- endif %} + {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} + * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} + * {{line | safe}}{% endfor %} + */{% endif %} + void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} payload); + {%- if channel.hasParameters() %} + void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload); {% endif %} - {% endfor %} - - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() %} - {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} - public void {{channel.subscribe().id() | camelCase}}(){ - {{schemaName}} {{channelName}}Payload = new {{schemaName}}(); - template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload); - } - {% endif %} {% endfor %} diff --git a/partials/AmqpPublisherImpl.java b/partials/AmqpPublisherImpl.java new file mode 100644 index 000000000..a1b679740 --- /dev/null +++ b/partials/AmqpPublisherImpl.java @@ -0,0 +1,81 @@ +{% macro amqpPublisherImpl(asyncapi, params) %} + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} +{% endfor %} + +import javax.annotation.processing.Generated; + +@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") +@Service +public class PublisherServiceImpl implements PublisherService { + @Autowired + private RabbitTemplate template; + + {% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %} + @Value("${amqp.{{- varName -}}.exchange}") + private String {{varName}}Exchange; + @Value("${amqp.{{- varName -}}.routingKey}") + private String {{varName}}RoutingKey; + {%- endif %} + {% endfor %} + + {%- set anyChannelHasParameter = false %} + {% for channelName, channel in asyncapi.channels() %} + {%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %} + {%- if channel.hasSubscribe() %} + {%- if channel.subscribe().hasMultipleMessages() %} + {%- set varName = "object" %} + {%- else %} + {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} + {%- endif %} + {%- set channelVariable = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %} + {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} + * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} + * {{line | safe}}{% endfor %} + */{% endif %} + public void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} payload){ + template.convertAndSend({{channelVariable}}Exchange, {{channelVariable}}RoutingKey, payload); + } + + {%- if channel.hasParameters() %} + public void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload) { + String compiledRoutingKey = compileRoutingKey({{channelVariable}}RoutingKey, {% for parameterName, parameter in channel.parameters() %}{{parameterName}}{% if not loop.last %}, {% endif %}{%- endfor %}); + template.convertAndSend({{channelVariable}}Exchange, compiledRoutingKey, payload); + } + {% endif %} + + {% endif %} + {% endfor %} + + {%- if anyChannelHasParameter %} + private String compileRoutingKey(String routingKeyTemplate, String... parameters) { + StringBuilder result = new StringBuilder(); + int routeKeyPossition = 0; + int parametersIndex = 0; + while (routeKeyPossition < routingKeyTemplate.length()) { + while (routingKeyTemplate.charAt(routeKeyPossition) != '*') { + routeKeyPossition++; + result.append(routingKeyTemplate.charAt(routeKeyPossition)); + } + routeKeyPossition++; + String parameter = parameters[parametersIndex++]; + result.append(parameter != null ? parameter : "*"); + } + return result.toString(); + } + {%- endif %} + +} + +{% endmacro %} \ No newline at end of file diff --git a/partials/CommonPublisher.java b/partials/CommonPublisher.java index 9fed54fde..1baa0b2df 100644 --- a/partials/CommonPublisher.java +++ b/partials/CommonPublisher.java @@ -1,22 +1,36 @@ -{% macro commonPublisher(asyncapi) %} - -import org.springframework.integration.annotation.Gateway; -import org.springframework.integration.annotation.MessagingGateway; +{% macro commonPublisher(asyncapi, params) %} +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} + {%- if channel.hasParameters() %} + {%- for parameterName, parameter in channel.parameters() %} + {%- if parameter.schema().type() === 'object' %} +import {{params['userJavaPackage']}}.model.{{parameterName | camelCase | upperFirst}}; + {%- endif %} + {%- endfor -%} + {% endif -%} +{% endfor %} import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") -@MessagingGateway public interface PublisherService { {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() %} + {%- if channel.hasSubscribe() %} + {%- if channel.subscribe().hasMultipleMessages() %} + {%- set varName = "object" %} + {%- else %} + {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} + {%- endif %} {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} - */{% endif %} - @Gateway(requestChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") - void {{channel.subscribe().id() | camelCase}}(String data); + */{% endif %}{% set hasParameters = channel.hasParameters() %} + void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}); {% endif %} {% endfor %} } diff --git a/partials/CommonPublisherImpl.java b/partials/CommonPublisherImpl.java new file mode 100644 index 000000000..92ec419a5 --- /dev/null +++ b/partials/CommonPublisherImpl.java @@ -0,0 +1,85 @@ +{% macro commonPublisherImpl(asyncapi, params) %} + +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} + {%- if channel.hasParameters() %} + {%- for parameterName, parameter in channel.parameters() %} + {%- if parameter.schema().type() === 'object' %} +import {{params['userJavaPackage']}}.model.{{parameterName | camelCase | upperFirst}}; + {%- endif %} + {%- endfor -%} + {% endif -%} +{% endfor %} + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.processing.Generated; +import java.util.HashMap; +import java.util.Map; + +@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") +@Service +public class PublisherServiceImpl implements PublisherService { + + {%- for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + @Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}") + private String {{channel.subscribe().id() | camelCase-}}Topic; + + @Autowired + private MessageHandler {{channel.subscribe().id() | camelCase}}Outbound; + + {%- endif %} + {%- endfor %} + {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} + {%- if channel.subscribe().hasMultipleMessages() %} + {%- set varName = "object" %} + {%- else %} + {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} + {%- endif %} + {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} + * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} + * {{line | safe}}{% endfor %} + */{% endif %}{% set hasParameters = channel.hasParameters() %} + public void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) { + Map headers = new HashMap<>(); + headers.put(MqttHeaders.TOPIC, get{{channel.subscribe().id() | camelCase-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %})); + Message<{{varName | upperFirst}}> message = new GenericMessage<>({{varName}}, headers); + {{channel.subscribe().id() | camelCase}}Outbound.handleMessage(message); + } + + private String get{{channel.subscribe().id() | camelCase-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) { + Map parameters = {% if hasParameters %}new HashMap<>(){% else %}null{% endif %}; + {%- if hasParameters %} + {%- for parameterName, parameter in channel.parameters() %} + parameters.put("{{parameterName}}", {{parameterName | camelCase}}{% if parameter.schema().type() === 'object'%}.toString(){% endif %}); + {%- endfor %} + {%- endif %} + return replaceParameters({{channel.subscribe().id() | camelCase-}}Topic, parameters); + } + {% endif %} + {% endfor %} + + private String replaceParameters(String topic, Map parameters) { + if (parameters != null) { + String compiledTopic = topic; + for (String key : parameters.keySet()) { + compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key)); + } + return compiledTopic; + } + return topic; + } +} +{% endmacro %} \ No newline at end of file diff --git a/partials/KafkaConfig.java b/partials/KafkaConfig.java index d5e46676d..1000c9d23 100644 --- a/partials/KafkaConfig.java +++ b/partials/KafkaConfig.java @@ -1,6 +1,7 @@ {% macro kafkaConfig(asyncapi, params) %} {%- set hasSubscribe = false -%} {%- set hasPublish = false -%} +{%- set hasParameters = false -%} {%- for channelName, channel in asyncapi.channels() -%} {%- if channel.hasPublish() -%} {%- set hasPublish = true -%} @@ -8,6 +9,9 @@ {%- if channel.hasSubscribe() -%} {%- set hasSubscribe = true -%} {%- endif -%} + {%- if channel.hasParameters() -%} + {%- set hasParameters = true -%} + {%- endif -%} {%- endfor %} {%- set securityProtocol = "PLAINTEXT" -%} @@ -76,8 +80,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import javax.annotation.processing.Generated; +{% if hasParameters %}import java.util.LinkedHashMap;{% endif %} import java.util.HashMap; import java.util.Map; +{% if hasParameters %}import java.util.regex.Pattern;{% endif %} @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") @Configuration @@ -93,6 +99,24 @@ public class Config { @Value("${spring.kafka.listener.concurrency}") private int concurrency;{% endif %} {%- if hasSubscribe %} +{% if hasParameters %} + @Bean + public RoutingKafkaTemplate kafkaTemplate() { + ProducerFactory producerFactory = producerFactory(); + + Map> map = new LinkedHashMap<>(); + {%- for channelName, channel in asyncapi.channels() %} + {%- set route = channelName | toKafkaTopicString(channel.hasParameters(), channel.parameters()) | safe %} + map.put(Pattern.compile("{{route}}"), producerFactory); + {%- endfor %} + return new RoutingKafkaTemplate(map); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } +{% else %} @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); @@ -102,7 +126,7 @@ public KafkaTemplate kafkaTemplate() { public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } - +{% endif %} @Bean public Map producerConfigs() { Map props = new HashMap<>(); diff --git a/partials/KafkaPublisher.java b/partials/KafkaPublisher.java index 96d66458d..77b008eb0 100644 --- a/partials/KafkaPublisher.java +++ b/partials/KafkaPublisher.java @@ -1,11 +1,5 @@ {% macro kafkaPublisher(asyncapi, params) %} -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Service; {% for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} {%- for message in channel.subscribe().messages() %} @@ -16,13 +10,11 @@ import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") -@Service -public class PublisherService { +public interface PublisherService { - @Autowired - private KafkaTemplate kafkaTemplate; {% for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} + {%- set hasParameters = channel.hasParameters() %} {%- if channel.subscribe().hasMultipleMessages() %} {%- set varName = "object" %} {%- else %} @@ -32,13 +24,7 @@ public class PublisherService { * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - public void {{channel.subscribe().id() | camelCase}}(Integer key, {{varName | upperFirst}} {{varName}}) { - Message<{{varName | upperFirst}}> message = MessageBuilder.withPayload({{varName}}) - .setHeader(KafkaHeaders.TOPIC, "{{channelName}}") - .setHeader(KafkaHeaders.{%- if params.springBoot2 %}MESSAGE_KEY{% else %}KEY{% endif -%}, key) - .build(); - kafkaTemplate.send(message); - } + public void {{channel.subscribe().id() | camelCase}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}); {%- endif %} {%- endfor %} } diff --git a/partials/KafkaPublisherImpl.java b/partials/KafkaPublisherImpl.java new file mode 100644 index 000000000..fa5539d33 --- /dev/null +++ b/partials/KafkaPublisherImpl.java @@ -0,0 +1,69 @@ +{% macro kafkaPublisherImpl(asyncapi, params) %} + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} +{% endfor %} +import javax.annotation.processing.Generated; +import java.util.HashMap; +import java.util.Map; + +@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") +@Service +public class PublisherServiceImpl implements PublisherService { + + @Autowired + private KafkaTemplate kafkaTemplate; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- set hasParameters = channel.hasParameters() %} + {%- set methodName = channel.subscribe().id() | camelCase %} + {%- if channel.subscribe().hasMultipleMessages() %} + {%- set varName = "object" %} + {%- else %} + {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} + {%- endif %} + {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} + * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} + * {{line | safe}}{% endfor %} + */{% endif %} + public void {{methodName}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) { + Message<{{varName | upperFirst}}> message = MessageBuilder.withPayload({{varName}}) + .setHeader(KafkaHeaders.TOPIC, get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %})) + .setHeader(KafkaHeaders.{%- if params.springBoot2 %}MESSAGE_KEY{% else %}KEY{% endif -%}, key) + .build(); + kafkaTemplate.send(message); + } + + private String get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) { + Map parameters = {% if hasParameters %}new HashMap<>(){% else %}null{% endif %}; + {%- if hasParameters %} + {%- for parameterName, parameter in channel.parameters() %} + parameters.put("{{parameterName}}", {{parameterName | camelCase}}{% if parameter.schema().type() !== 'string'%}.toString(){% endif %}); + {%- endfor %} + {%- endif %} + return replaceParameters("{{channelName}}", parameters); + } + {%- endif %} +{%- endfor %} + private String replaceParameters(String topic, Map parameters) { + if (parameters != null) { + String compiledTopic = topic; + for (String key : parameters.keySet()) { + compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key)); + } + return compiledTopic; + } + return topic; + } +} +{% endmacro %} \ No newline at end of file diff --git a/partials/MqttConfig.java b/partials/MqttConfig.java index 61dcc16d9..7c0f64629 100644 --- a/partials/MqttConfig.java +++ b/partials/MqttConfig.java @@ -9,7 +9,6 @@ import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @@ -102,7 +101,7 @@ public MqttPahoClientFactory mqttClientFactory() { {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} @Bean public IntegrationFlow {{channel.publish().id() | camelCase}}Flow() { - return IntegrationFlows.from({{channel.publish().id() | camelCase}}Inbound()) + return IntegrationFlow.from({{channel.publish().id() | camelCase}}Inbound()) .handle(messageHandlerService::handle{{channel.publish().id() | camelCase | upperFirst}}) .get(); } @@ -125,7 +124,7 @@ public MqttPahoClientFactory mqttClientFactory() { } @Bean - @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") + @ServiceActivator(outputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") public MessageHandler {{channel.subscribe().id() | camelCase}}Outbound() { MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); diff --git a/template/src/main/java/com/asyncapi/model/$$parameter$$.java b/template/src/main/java/com/asyncapi/model/$$parameter$$.java new file mode 100644 index 000000000..0e5b14618 --- /dev/null +++ b/template/src/main/java/com/asyncapi/model/$$parameter$$.java @@ -0,0 +1,194 @@ +package {{ params['userJavaPackage'] }}.model; + +{% if params.springBoot2 -%} +import javax.validation.constraints.*; +import javax.validation.Valid; +{% else %} +import jakarta.validation.constraints.*; +import jakarta.validation.Valid; +{%- endif %} + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; + +import javax.annotation.processing.Generated; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +{% if parameter.hasDescription() %}/**{% for line in parameter.description() | splitByLines %} + * {{ line | safe}}{% endfor %} + */{% endif %} +@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") +public class {{parameterName | camelCase | upperFirst}} { +{% set schema = parameter.schema() %} +{% for propName, prop in schema.properties() %} + {%- set isRequired = propName | isRequired(schema.required()) %} + {%- if prop.additionalProperties() %} + {%- if prop.additionalProperties() === true %} + private @Valid Map {{propName | camelCase}}; + {%- elif prop.additionalProperties().type() === 'object' %} + private @Valid Map {{propName | camelCase}}; + {%- elif prop.additionalProperties().format() %} + private @Valid Map {{propName | camelCase}}; + {%- elif prop.additionalProperties().type() %} + private @Valid Map {{propName | camelCase}}; + {%- endif %} + {%- elif prop.type() === 'object' %} + private @Valid {{prop.uid() | camelCase | upperFirst}} {{propName | camelCase}}; + {%- elif prop.type() === 'array' %} + {%- if prop.items().type() === 'object' %} + private @Valid List<{{prop.items().uid() | camelCase | upperFirst}}> {{propName | camelCase}}; + {%- elif prop.items().format() %} + private @Valid List<{{prop.items().format() | toJavaType | toClass}}> {{propName | camelCase}}; + {%- else %} + private @Valid List<{{prop.items().type() | toJavaType | toClass}}> {{propName | camelCase}}; + {%- endif %} + {%- elif prop.enum() and (prop.type() === 'string' or prop.type() === 'integer') %} + public enum {{propName | camelCase | upperFirst}}Enum { + {% for e in prop.enum() %} + {%- if prop.type() === 'string'%} + {{e | upper | createEnum}}(String.valueOf("{{e}}")){% if not loop.last %},{% else %};{% endif %} + {%- else %} + NUMBER_{{e}}({{e}}){% if not loop.last %},{% else %};{% endif %} + {%- endif %} + {% endfor %} + private {% if prop.type() === 'string'%}String{% else %}Integer{% endif %} value; + + {{propName | camelCase | upperFirst}}Enum ({% if prop.type() === 'string'%}String{% else %}Integer{% endif %} v) { + value = v; + } + + public {% if prop.type() === 'string'%}String{% else %}Integer{% endif %} value() { + return value; + } + + @Override + @JsonValue + public String toString() { + return String.valueOf(value); + } + + @JsonCreator + public static {{propName | camelCase | upperFirst}}Enum fromValue({% if prop.type() === 'string'%}String{% else %}Integer{% endif %} value) { + for ( {{propName | camelCase | upperFirst}}Enum b : {{propName | camelCase | upperFirst}}Enum.values()) { + if (Objects.equals(b.value, value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + } + + private @Valid {{propName | camelCase | upperFirst}}Enum {{propName | camelCase}}; + {%- elif prop.anyOf() or prop.oneOf() %} + {%- set propType = 'OneOf' %}{%- set hasPrimitive = false %} + {%- for obj in prop.anyOf() %} + {%- set hasPrimitive = hasPrimitive or obj.type() !== 'object' %} + {%- set propType = propType + obj.uid() | camelCase | upperFirst %} + {%- endfor %} + {%- for obj in prop.oneOf() %} + {%- set hasPrimitive = hasPrimitive or obj.type() !== 'object' %} + {%- set propType = propType + obj.uid() | camelCase | upperFirst %} + {%- endfor %} + {%- if hasPrimitive %} + {%- set propType = 'Object' %} + {%- else %} + public interface {{propType}} { + + } + {%- endif %} + private @Valid {{propType}} {{propName | camelCase}}; + {%- elif prop.allOf() %} + {%- set allName = 'AllOf' %} + {%- for obj in prop.allOf() %} + {%- set allName = allName + obj.uid() | camelCase | upperFirst %} + {%- endfor %} + public class {{allName}} { + {%- for obj in prop.allOf() %} + {%- set varName = obj.uid() | camelCase %} + {%- set className = obj.uid() | camelCase | upperFirst %} + {%- set propType = obj | defineType(obj.uid()) | safe %} + + private @Valid {{propType}} {{varName}}; + + public {{propType}} get{{className}}() { + return {{varName}}; + } + + public void set{{className}}({{propType}} {{varName}}) { + this.{{varName}} = {{varName}}; + } + {%- endfor %} + } + + private @Valid {{allName}} {{propName | camelCase}}; + {%- else %} + {%- if prop.format() %} + private @Valid {{prop.format() | toJavaType(isRequired)}} {{propName | camelCase}}; + {%- else %} + private @Valid {{prop.type() | toJavaType(isRequired)}} {{propName | camelCase}}; + {%- endif %} + {%- endif %} +{% endfor %} + +{% for propName, prop in schema.properties() %} + {%- set varName = propName | camelCase %} + {%- set className = propName | camelCase | upperFirst %} + {%- set propType = prop | defineType(propName) | safe %} + + {% if prop.description() or prop.examples()%}/**{% for line in prop.description() | splitByLines %} + * {{ line | safe}}{% endfor %}{% if prop.examples() %} + * Examples: {{prop.examples() | examplesToString | safe}}{% endif %} + */{% endif %} + @JsonProperty("{{propName}}") + {%- if propName | isRequired(schema.required()) %}@NotNull{% endif %} + {%- if prop.minLength() or prop.maxLength() or prop.maxItems() or prop.minItems() %}@Size({% if prop.minLength() or prop.minItems() %}min = {{prop.minLength()}}{{prop.minItems()}}{% endif %}{% if prop.maxLength() or prop.maxItems() %}{% if prop.minLength() or prop.minItems() %},{% endif %}max = {{prop.maxLength()}}{{prop.maxItems()}}{% endif %}){% endif %} + {%- if prop.pattern() %}@Pattern(regexp="{{prop.pattern() | addBackSlashToPattern}}"){% endif %} + {%- if prop.minimum() %}@Min({{prop.minimum()}}){% endif %}{% if prop.exclusiveMinimum() %}@Min({{prop.exclusiveMinimum() + 1}}){% endif %} + {%- if prop.maximum() %}@Max({{prop.maximum()}}){% endif %}{% if prop.exclusiveMaximum() %}@Max({{prop.exclusiveMaximum() + 1}}){% endif %} + public {{propType}} get{{className}}() { + return {{varName}}; + } + + public void set{{className}}({{propType}} {{varName}}) { + this.{{varName}} = {{varName}}; + } +{% endfor %} + {% if params.disableEqualsHashCode === 'false' %}@Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + {{parameterName | camelCase | upperFirst}} {{parameterName | camelCase}} = ({{parameterName | camelCase | upperFirst}}) o; + return {% for propName, prop in schema.properties() %}{% set varName = propName | camelCase %} + Objects.equals(this.{{varName}}, {{parameterName | camelCase}}.{{varName}}){% if not loop.last %} &&{% else %};{% endif %}{% endfor %} + } + + @Override + public int hashCode() { + return Objects.hash({% for propName, prop in schema.properties() %}{{propName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}); + }{% endif %} + + @Override + public String toString() { + return "class {{parameterName | camelCase | upperFirst}} {\n" + + {% for propName, prop in schema.properties() %}{% set varName = propName | camelCase %} + " {{varName}}: " + toIndentedString({{varName}}) + "\n" +{% endfor %} + "}"; + } + + /** + * Convert the given object to string with each line indented by 4 spaces (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} \ No newline at end of file diff --git a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java index 867b4a4a1..80379dfc1 100644 --- a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java +++ b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java @@ -19,10 +19,13 @@ public void run(String... args) { System.out.println("******* Sending message: *******"); {%- for channelName, channel in asyncapi.channels() %} - {%- if channel.hasSubscribe() %} + {%- if channel.hasSubscribe() %}{% set hasParameters = channel.hasParameters() %} {%- for message in channel.subscribe().messages() %} - publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}() - {% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %}); + {%- set payloadType = params['userJavaPackage'] + '.model.' + message.payload().uid() | camelCase | upperFirst %} + publisherService.{{channel.subscribe().id() | camelCase}}( + {%- if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{payloadType}}(){% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, new {% if parameter.schema().type() === 'object'%}{{payloadType}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %}(){% endfor %}{% endif %} + {%- elif asyncapi | isProtocol('amqp') %}new {{payloadType}}() + {%- else %}new {{payloadType}}(){% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, new {% if parameter.schema().type() === 'object'%}{{payloadType}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %}(){% endfor %}{% endif %}{% endif %}); {% endfor -%} {% endif -%} {%- endfor %} diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index 21dd43052..f22707f61 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -12,83 +12,188 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; -{% if asyncapi | isProtocol('kafka') and hasPublish %} +{%- if asyncapi | isProtocol('kafka') and hasPublish %} import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; - {% for channelName, channel in asyncapi.channels() %} - {%- if channel.hasPublish() %} - {%- for message in channel.publish().messages() %} -import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}; - {%- endfor %} - {%- endif %} - {%- endfor %} -{% endif %} -{% if asyncapi | isProtocol('amqp') and hasPublish %} +import java.util.regex.Matcher; +import java.util.regex.Pattern; +{%- endif %} +{%- if asyncapi | isProtocol('mqtt') and hasPublish %} +import org.springframework.integration.mqtt.support.MqttHeaders; +{%- endif %} +{%- if asyncapi | isProtocol('amqp') and hasPublish %} import org.springframework.amqp.rabbit.annotation.RabbitListener; - {% for channelName, channel in asyncapi.channels() %} - {%- if channel.hasPublish() %} - {%- for message in channel.publish().messages() %} +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.handler.annotation.Header; +{%- endif %} +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasPublish() %} + {%- for message in channel.publish().messages() %} import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}; {%- endfor %} - {%- endif %} - {%- endfor %} - {% endif %} + {%- endif %} +{% endfor %} import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") @Service public class MessageHandlerService { +{%- set anyChannelHasParameter = false %} private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); -{% if asyncapi | isProtocol('kafka') %} - {% for channelName, channel in asyncapi.channels() %} - {%- if channel.hasPublish() %} - {%- if channel.publish().hasMultipleMessages() %} - {%- set typeName = "Object" %} - {%- else %} - {%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %} - {%- endif %} - {% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %} - * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} - * {{line | safe}}{% endfor %} - */{% endif %} - @KafkaListener(topics = "{{channelName}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %}) - public void {{channel.publish().id() | camelCase}}(@Payload {{typeName}} payload, + +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasPublish() %} + {%- set hasParameters = channel.hasParameters() %} + {%- set anyChannelHasParameter = anyChannelHasParameter or hasParameters %} + {%- set methodName = channel.publish().id() | camelCase%} + {%- if channel.publish().hasMultipleMessages() %} + {%- set typeName = "Object" %} + {%- else %} + {%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %} + {%- endif %} + {% set javaDoc = '' %} + {% if channel.description() or channel.publish().description() %} + {%- set javaDoc = javaDoc + '/**\n' %} + {%- for line in channel.description() | splitByLines %} + {%- set javaDoc = javaDoc + ' * ' + (line | safe) %} + {%- set javaDoc = javaDoc + '\n' %} + {%- endfor %} + {%- for line in channel.publish().description() | splitByLines %} + {%- set javaDoc = javaDoc + ' * ' + (line | safe) %} + {%- set javaDoc = javaDoc + '\n' %} + {%- endfor %} + {%- set javaDoc = javaDoc + ' */' %} + {% endif %} + {%- if asyncapi | isProtocol('kafka') %} + {%- set route = channelName | toKafkaTopicString(channel.hasParameters(), channel.parameters()) | safe %} + {{javaDoc}} + @KafkaListener({% if hasParameters %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %}) + public void {{methodName}}(@Payload {{typeName}} payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_MESSAGE_KEY{% else %}RECEIVED_KEY{% endif -%}) Integer key, @Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_PARTITION_ID{% else %}RECEIVED_PARTITION{% endif -%}) int partition, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition); + {%- if hasParameters %} + List parameters = decompileTopic("{{route}}", topic); + {{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}payload, topic, key, partition, timestamp); + {%- endif %} + } + {%- if hasParameters %} + {{javaDoc}} + public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload, + String topic, Integer key, int partition, long timestamp) { + // parametrized listener + } + {%- endif %} + {% elif asyncapi | isProtocol('amqp') %} + {%- set propertyValueName = channelName | toAmqpNeutral(hasParameters, channel.parameters()) %} + {%- if hasParameters %} + @Value("${amqp.{{- propertyValueName -}}.routingKey}") + private String {{propertyValueName}}RoutingKey; + {% endif %} + {{javaDoc}} + @RabbitListener(queues = "${amqp.{{- propertyValueName -}}.queue}") + public void {{methodName}}({{typeName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) { + {%- if hasParameters %} + List parameters = decompileRoutingKey({{propertyValueName}}RoutingKey, routKey); + {{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}payload); + {% endif %} + LOGGER.info("Message received from {{- propertyValueName -}} : " + payload); + } + {% if hasParameters %} + {{javaDoc}} + public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) { + // parametrized listener } {%- endif %} - {% endfor %} + {%- else %} + {%- if hasParameters %} + @Value("${mqtt.topic.{{-methodName-}}}") + private String {{methodName}}Topic; - {% elif asyncapi | isProtocol('amqp') %} - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() %} - {%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %} - @RabbitListener(queues = "${amqp.{{- channelName -}}.queue}") - public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){ - LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload); + {%- endif %} + {{javaDoc}} + public void handle{{methodName | upperFirst}}(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + {%- if hasParameters %} + List parameters = decodeTopic({{methodName}}Topic, topic); + {%- endif %} + {{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}({{typeName}}) message.getPayload()); + } + {{javaDoc}} + public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) { + LOGGER.info("handler {{channelName}}"); + LOGGER.info(String.valueOf(payload.toString())); } {% endif %} - {% endfor %} +{% endif %} +{% endfor %} -{% else %} - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() %} - {% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %} - * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} - * {{line | safe}}{% endfor %} - */{% endif %} - public void handle{{channel.publish().id() | camelCase | upperFirst}}(Message message) { - LOGGER.info("handler {{channelName}}"); - LOGGER.info(String.valueOf(message.getPayload().toString())); +{%- if anyChannelHasParameter %} + {%- if asyncapi | isProtocol('kafka') %} + private List decompileTopic(String topicPattern, String topic) { + topicPattern = topicPattern.replaceAll("\\.\\*", "(.*)"); + List parameters = new ArrayList<>(); + Pattern pattern = Pattern.compile(topicPattern); + Matcher matcher = pattern.matcher(topic); + if (matcher.find()) { + for (int i = 0; i < matcher.groupCount(); i++) { + parameters.add(matcher.group(i + 1)); + } + } + return parameters; + } + {%- elif asyncapi | isProtocol('amqp') %} + private List decompileRoutingKey(String pattern, String routKey) { + List parameters = new ArrayList<>(); + int routeKeyPossition = 0; + int patternPosition = 0; + while (routeKeyPossition < routKey.length()) { + while (pattern.charAt(patternPosition) == routKey.charAt(routeKeyPossition)) { + routeKeyPossition++; + patternPosition++; + } + routeKeyPossition++; + patternPosition += 2; // skip .* + StringBuilder parameter = new StringBuilder(); + while (pattern.charAt(patternPosition) != routKey.charAt(routeKeyPossition)) { + parameter.append(routKey.charAt(routeKeyPossition)); + routeKeyPossition++; + } + parameters.add(parameter.toString()); + } + return parameters; + } + {%- else %} + private List decodeTopic(String topicPattern, String topic) { + List parameters = new ArrayList<>(); + int topicPossition = 0; + int patternPosition = 0; + while (topicPossition < topic.length()) { + while (topicPattern.charAt(patternPosition) == topic.charAt(topicPossition)) { + topicPossition++; + patternPosition++; + } + topicPossition++; + patternPosition += 2; // skip + + StringBuilder parameter = new StringBuilder(); + while (topicPattern.charAt(patternPosition) != topic.charAt(topicPossition)) { + parameter.append(topic.charAt(topicPossition)); + topicPossition++; + } + parameters.add(parameter.toString()); + } + return parameters; } - {% endif %} - {% endfor %} + {%- endif %} {% endif %} } diff --git a/template/src/main/java/com/asyncapi/service/PublisherService.java b/template/src/main/java/com/asyncapi/service/PublisherService.java index 044148da5..0555fa59e 100644 --- a/template/src/main/java/com/asyncapi/service/PublisherService.java +++ b/template/src/main/java/com/asyncapi/service/PublisherService.java @@ -7,5 +7,5 @@ {%- elif asyncapi | isProtocol('amqp') -%} {{- amqpPublisher(asyncapi, params) -}} {%- else -%} -{{- commonPublisher(asyncapi) -}} +{{- commonPublisher(asyncapi, params) -}} {%- endif -%} \ No newline at end of file diff --git a/template/src/main/java/com/asyncapi/service/PublisherServiceImpl.java b/template/src/main/java/com/asyncapi/service/PublisherServiceImpl.java new file mode 100644 index 000000000..065aef77b --- /dev/null +++ b/template/src/main/java/com/asyncapi/service/PublisherServiceImpl.java @@ -0,0 +1,11 @@ +package {{ params['userJavaPackage'] }}.service; +{%- from "partials/CommonPublisherImpl.java" import commonPublisherImpl -%} +{%- from "partials/KafkaPublisherImpl.java" import kafkaPublisherImpl -%} +{%- from "partials/AmqpPublisherImpl.java" import amqpPublisherImpl -%} +{%- if asyncapi | isProtocol('kafka') -%} +{{- kafkaPublisherImpl(asyncapi, params) -}} +{%- elif asyncapi | isProtocol('amqp') -%} +{{- amqpPublisherImpl(asyncapi, params) -}} +{%- else -%} +{{- commonPublisherImpl(asyncapi, params) -}} +{%- endif -%} \ No newline at end of file diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index b9575fcf1..8f9796a44 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -18,10 +18,10 @@ amqp: username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %} password: {% for channelName, channel in asyncapi.channels() %} - {{channelName}}: - {% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %} - {% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %} - {% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %} + {{channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters())}}: + {% if channel.binding('amqp') and channel.binding('amqp').exchange %}exchange: {{channel.binding('amqp').exchange.name}}{% endif %} + routingKey: {{channelName | toAmqpKey(channel.hasParameters(), channel.parameters())}} + {% if channel.binding('amqp') and channel.binding('amqp').queue %}queue: {{channel.binding('amqp').queue.name}}{% endif %} {% endfor %} {% endif %} @@ -52,11 +52,11 @@ mqtt: connection: {{params.connectionTimeout}} {% if server.binding('mqtt') and server.binding('mqtt').keepAlive | isDefined %}keepAlive: {{server.binding('mqtt').keepAlive}}{% endif %} topic: - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - {{channel.publish().id() | camelCase}}: {{channelName}} - {% elif channel.hasSubscribe() %} + {%- for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {{channel.publish().id() | camelCase}}: {{channelName | toMqttTopicString(channel.hasParameters(), channel.parameters()) }} + {%- elif channel.hasSubscribe() %} {{channel.subscribe().id() | camelCase}}: {{channelName}} - {% endif %}{% endfor %} + {%- endif %}{% endfor %} {% endif %}{% endfor %} {%- if asyncapi | isProtocol('kafka') %} diff --git a/template/src/test/java/com/asyncapi/SimpleKafkaTest.java b/template/src/test/java/com/asyncapi/SimpleKafkaTest.java index e5ba389d0..a7de033cd 100644 --- a/template/src/test/java/com/asyncapi/SimpleKafkaTest.java +++ b/template/src/test/java/com/asyncapi/SimpleKafkaTest.java @@ -104,7 +104,7 @@ public void init() { producer = new DefaultKafkaProducerFactory<>(producerConfigs, new IntegerSerializer(), new JsonSerializer()).createProducer(); {% endif %} } - {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} + {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}{% set hasParameters = channel.hasParameters() %} @Test public void {{channel.subscribe().id() | camelCase}}ProducerTest() { {%- if channel.subscribe().hasMultipleMessages() %} {% set typeName = "Object" %} {% else %} {% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} {% endif %} @@ -113,7 +113,7 @@ public void init() { KafkaTestUtils.getRecords(consumer{{ channelName | camelCase | upperFirst}}); - publisherService.{{channel.subscribe().id() | camelCase}}(key, payload); + publisherService.{{channel.subscribe().id() | camelCase}}(key, payload{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, new {% if parameter.schema().type() === 'object'%}{{payloadType}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %}(){% endfor %}{% endif %}); ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer{{ channelName | camelCase | upperFirst}}, {{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC); diff --git a/template/src/test/java/com/asyncapi/TestcontainerKafkaTest.java b/template/src/test/java/com/asyncapi/TestcontainerKafkaTest.java index d98d47b50..3bf15b316 100644 --- a/template/src/test/java/com/asyncapi/TestcontainerKafkaTest.java +++ b/template/src/test/java/com/asyncapi/TestcontainerKafkaTest.java @@ -72,7 +72,7 @@ public class TestcontainerKafkaTest { public static void kafkaProperties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); } - {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} + {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}{% set hasParameters = channel.hasParameters() %} {%- if channel.subscribe().hasMultipleMessages() %}{% set typeName = "Object" %}{% else %}{% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}{% endif %} @Test public void {{channel.subscribe().id() | camelCase}}ProducerTestcontainers() { @@ -82,7 +82,7 @@ public static void kafkaProperties(DynamicPropertyRegistry registry) { consumeMessages({{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC); - publisherService.{{channel.subscribe().id() | camelCase}}(key, payload); + publisherService.{{channel.subscribe().id() | camelCase}}(key, payload{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, new {% if parameter.schema().type() === 'object'%}{{payloadType}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %}(){% endfor %}{% endif %}); ConsumerRecord consumedMessage = consumeMessage({{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC); diff --git a/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java index a60f228c2..163a3edcc 100644 --- a/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java +++ b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java @@ -91,7 +91,7 @@ public void after() throws MqttException { receivedMessages.add(message); }); - publisherService.{{channel.subscribe().id() | camelCase}}(payload.toString()); + publisherService.{{channel.subscribe().id() | camelCase}}(payload{% if channel.hasParameters() %}{%for parameterName, parameter in channel.parameters() %}, new {% if parameter.schema().type() === 'object'%}{{ params['userJavaPackage'] }}.model.{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %}(){% endfor %}{% endif %}); MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); diff --git a/tests/__snapshots__/kafka.test.js.snap b/tests/__snapshots__/kafka.test.js.snap index 000fbdc4d..f9bb69983 100644 --- a/tests/__snapshots__/kafka.test.js.snap +++ b/tests/__snapshots__/kafka.test.js.snap @@ -1,6 +1,6 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 1`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 1`] = ` "package com.asyncapi.infrastructure; import org.apache.kafka.clients.CommonClientConfigs; @@ -21,9 +21,11 @@ import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import javax.annotation.processing.Generated; + import java.util.HashMap; import java.util.Map; + @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Configuration @EnableKafka @@ -37,6 +39,7 @@ public class Config { @Value("\${spring.kafka.listener.concurrency}") private int concurrency; + @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); @@ -95,66 +98,56 @@ public class Config { " `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 2`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 2`] = ` "package com.asyncapi.service; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Service; import com.asyncapi.model.LightMeasuredPayload; import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") -@Service -public class PublisherService { +public interface PublisherService { - @Autowired - private KafkaTemplate kafkaTemplate; - public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload) { - Message message = MessageBuilder.withPayload(lightMeasuredPayload) - .setHeader(KafkaHeaders.TOPIC, "event.lighting.measured") - .setHeader(KafkaHeaders.KEY, key) - .build(); - kafkaTemplate.send(message); - } + public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload); } " `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 3`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 3`] = ` "package com.asyncapi.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; - import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; - -import com.asyncapi.model.LightMeasuredPayload; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import com.asyncapi.model.LightMeasuredPayload; import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service public class MessageHandlerService { - private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + + + @KafkaListener(topics = "event.lighting.measured", groupId = "my-group") public void readLightMeasurement(@Payload LightMeasuredPayload payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { @@ -162,12 +155,12 @@ public class MessageHandlerService { } - + } " `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 4`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 4`] = ` "package com.asyncapi.model; @@ -259,7 +252,7 @@ public class LightMeasuredPayload { }" `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 5`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 5`] = ` "package com.asyncapi.model; import javax.annotation.processing.Generated; @@ -318,7 +311,7 @@ public class LightMeasured { }" `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 6`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 6`] = ` "package com.asyncapi; @@ -482,7 +475,7 @@ public class TestcontainerKafkaTest { " `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 7`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 7`] = ` "plugins { id 'org.springframework.boot' version "$springBootVersion" id 'io.spring.dependency-management' version "$springDependencyManager" @@ -512,7 +505,223 @@ dependencies { " `; -exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided kafka 8`] = ` +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 8`] = ` "springBootVersion=3.1.3 springDependencyManager=1.1.3" `; + +exports[`template integration tests for generated files using the generator and kafka example should generate proper config, services and DTOs files for provided kafka 9`] = ` +"package com.asyncapi.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import com.asyncapi.model.LightMeasuredPayload; +import javax.annotation.processing.Generated; +import java.util.HashMap; +import java.util.Map; + +@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") +@Service +public class PublisherServiceImpl implements PublisherService { + + @Autowired + private KafkaTemplate kafkaTemplate; + + + public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload) { + Message message = MessageBuilder.withPayload(lightMeasuredPayload) + .setHeader(KafkaHeaders.TOPIC, getUpdateLightMeasurementTopic()) + .setHeader(KafkaHeaders.KEY, key) + .build(); + kafkaTemplate.send(message); + } + + private String getUpdateLightMeasurementTopic() { + Map parameters = null; + return replaceParameters("event.lighting.measured", parameters); + } + private String replaceParameters(String topic, Map parameters) { + if (parameters != null) { + String compiledTopic = topic; + for (String key : parameters.keySet()) { + compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key)); + } + return compiledTopic; + } + return topic; + } +} +" +`; + +exports[`template integration tests for generated files using the generator and kafka with parameters example should generate proper config and message handler for provided kafka 1`] = ` +"package com.asyncapi.infrastructure; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.config.SaslConfigs; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import javax.annotation.processing.Generated; +import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") +@Configuration +@EnableKafka +public class Config { + + @Value("\${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("\${spring.kafka.listener.poll-timeout}") + private long pollTimeout; + + @Value("\${spring.kafka.listener.concurrency}") + private int concurrency; + + @Bean + public RoutingKafkaTemplate kafkaTemplate() { + ProducerFactory producerFactory = producerFactory(); + + Map> map = new LinkedHashMap<>(); + map.put(Pattern.compile("event\\\\.lighting\\\\..*\\\\.measured\\\\..*"), producerFactory); + return new RoutingKafkaTemplate(map); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + props.put(JsonSerializer.TYPE_MAPPINGS, + "lightMeasuredPayload:com.asyncapi.model.LightMeasuredPayload" + ); + // See https://kafka.apache.org/documentation/#producerconfigs for more properties + return props; + } + + @Bean + KafkaListenerContainerFactory> + kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(concurrency); + factory.getContainerProperties().setPollTimeout(pollTimeout); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + props.put(JsonDeserializer.TYPE_MAPPINGS, + "lightMeasuredPayload:com.asyncapi.model.LightMeasuredPayload" + ); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.asyncapi.model"); + return props; + } + +} +" +`; + +exports[`template integration tests for generated files using the generator and kafka with parameters example should generate proper config and message handler for provided kafka 2`] = ` +"package com.asyncapi.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.asyncapi.model.LightMeasuredPayload; + +import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; + +@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") +@Service +public class MessageHandlerService { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); + + + + + + @KafkaListener(topicPattern = "event\\\\.lighting\\\\..*\\\\.measured\\\\..*", groupId = "my-group") + public void readLightMeasurement(@Payload LightMeasuredPayload payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_KEY) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { + LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition); + List parameters = decompileTopic("event\\\\.lighting\\\\..*\\\\.measured\\\\..*", topic); + readLightMeasurement(parameters.get(0), parameters.get(1), payload, topic, key, partition, timestamp); + } + + public void readLightMeasurement(String streetlightId, String zoneId, LightMeasuredPayload payload, + String topic, Integer key, int partition, long timestamp) { + // parametrized listener + } + + + + private List decompileTopic(String topicPattern, String topic) { + topicPattern = topicPattern.replaceAll("\\\\.\\\\*", "(.*)"); + List parameters = new ArrayList<>(); + Pattern pattern = Pattern.compile(topicPattern); + Matcher matcher = pattern.matcher(topic); + if (matcher.find()) { + for (int i = 0; i < matcher.groupCount(); i++) { + parameters.add(matcher.group(i + 1)); + } + } + return parameters; + } + +} +" +`; diff --git a/tests/__snapshots__/mqtt.test.js.snap b/tests/__snapshots__/mqtt.test.js.snap index 20323fe2e..2ffda309e 100644 --- a/tests/__snapshots__/mqtt.test.js.snap +++ b/tests/__snapshots__/mqtt.test.js.snap @@ -12,7 +12,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @@ -92,7 +91,7 @@ public class Config { @Bean public IntegrationFlow receiveLightMeasurementFlow() { - return IntegrationFlows.from(receiveLightMeasurementInbound()) + return IntegrationFlow.from(receiveLightMeasurementInbound()) .handle(messageHandlerService::handleReceiveLightMeasurement) .get(); } @@ -115,7 +114,7 @@ public class Config { } @Bean - @ServiceActivator(inputChannel = "turnOnOutboundChannel") + @ServiceActivator(outputChannel = "turnOnOutboundChannel") public MessageHandler turnOnOutbound() { MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); @@ -133,7 +132,7 @@ public class Config { } @Bean - @ServiceActivator(inputChannel = "turnOffOutboundChannel") + @ServiceActivator(outputChannel = "turnOffOutboundChannel") public MessageHandler turnOffOutbound() { MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); @@ -151,7 +150,7 @@ public class Config { } @Bean - @ServiceActivator(inputChannel = "dimLightOutboundChannel") + @ServiceActivator(outputChannel = "dimLightOutboundChannel") public MessageHandler dimLightOutbound() { MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); @@ -171,34 +170,27 @@ public class Config { exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for basic example 2`] = ` "package com.asyncapi.service; -import org.springframework.integration.annotation.Gateway; -import org.springframework.integration.annotation.MessagingGateway; +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.DimLightPayload; import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") -@MessagingGateway public interface PublisherService { - - - @Gateway(requestChannel = "turnOnOutboundChannel") - void turnOn(String data); + void turnOn(TurnOnOffPayload turnOnOffPayload, String streetlightId); - - @Gateway(requestChannel = "turnOffOutboundChannel") - void turnOff(String data); + void turnOff(TurnOnOffPayload turnOnOffPayload, String streetlightId); - - @Gateway(requestChannel = "dimLightOutboundChannel") - void dimLight(String data); + void dimLight(DimLightPayload dimLightPayload, String streetlightId); } @@ -210,36 +202,73 @@ exports[`template integration tests for generated files using the generator and import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; +import org.springframework.integration.mqtt.support.MqttHeaders; + +import com.asyncapi.model.LightMeasuredPayload; + + import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service public class MessageHandlerService { - private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - - + + + + + @Value("\${mqtt.topic.receiveLightMeasurement}") + private String receiveLightMeasurementTopic; /** - * The topic on which measured values may be produced and consumed. - */ + * The topic on which measured values may be produced and consumed. + */ public void handleReceiveLightMeasurement(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + List parameters = decodeTopic(receiveLightMeasurementTopic, topic); + receiveLightMeasurement(parameters.get(0), (LightMeasuredPayload) message.getPayload()); + } + /** + * The topic on which measured values may be produced and consumed. + */ + public void receiveLightMeasurement(String streetlightId, LightMeasuredPayload payload) { LOGGER.info("handler smartylighting/streetlights/1/0/event/{streetlightId}/lighting/measured"); - LOGGER.info(String.valueOf(message.getPayload().toString())); + LOGGER.info(String.valueOf(payload.toString())); } - - - - - - - + + + + + private List decodeTopic(String topicPattern, String topic) { + List parameters = new ArrayList<>(); + int topicPossition = 0; + int patternPosition = 0; + while (topicPossition < topic.length()) { + while (topicPattern.charAt(patternPosition) == topic.charAt(topicPossition)) { + topicPossition++; + patternPosition++; + } + topicPossition++; + patternPosition += 2; // skip + + StringBuilder parameter = new StringBuilder(); + while (topicPattern.charAt(patternPosition) != topic.charAt(topicPossition)) { + parameter.append(topic.charAt(topicPossition)); + topicPossition++; + } + parameters.add(parameter.toString()); + } + return parameters; + } + } " `; @@ -833,7 +862,7 @@ public class TestcontainerMqttTest { receivedMessages.add(message); }); - publisherService.turnOn(payload.toString()); + publisherService.turnOn(payload, new String()); MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); @@ -850,7 +879,7 @@ public class TestcontainerMqttTest { receivedMessages.add(message); }); - publisherService.turnOff(payload.toString()); + publisherService.turnOff(payload, new String()); MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); @@ -867,7 +896,7 @@ public class TestcontainerMqttTest { receivedMessages.add(message); }); - publisherService.dimLight(payload.toString()); + publisherService.dimLight(payload, new String()); MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); @@ -916,6 +945,110 @@ exports[`template integration tests for generated files using the generator and springDependencyManager=1.1.3" `; +exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for basic example 13`] = ` +"package com.asyncapi.service; + + +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.DimLightPayload; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.processing.Generated; +import java.util.HashMap; +import java.util.Map; + +@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") +@Service +public class PublisherServiceImpl implements PublisherService { + @Value("\${mqtt.topic.turnOn}") + private String turnOnTopic; + + @Autowired + private MessageHandler turnOnOutbound; + @Value("\${mqtt.topic.turnOff}") + private String turnOffTopic; + + @Autowired + private MessageHandler turnOffOutbound; + @Value("\${mqtt.topic.dimLight}") + private String dimLightTopic; + + @Autowired + private MessageHandler dimLightOutbound; + + + + + + public void turnOn(TurnOnOffPayload turnOnOffPayload, String streetlightId) { + Map headers = new HashMap<>(); + headers.put(MqttHeaders.TOPIC, getturnOnTopic(streetlightId)); + Message message = new GenericMessage<>(turnOnOffPayload, headers); + turnOnOutbound.handleMessage(message); + } + + private String getturnOnTopic(String streetlightId) { + Map parameters = new HashMap<>(); + parameters.put("streetlightId", streetlightId); + return replaceParameters(turnOnTopic, parameters); + } + + + + + public void turnOff(TurnOnOffPayload turnOnOffPayload, String streetlightId) { + Map headers = new HashMap<>(); + headers.put(MqttHeaders.TOPIC, getturnOffTopic(streetlightId)); + Message message = new GenericMessage<>(turnOnOffPayload, headers); + turnOffOutbound.handleMessage(message); + } + + private String getturnOffTopic(String streetlightId) { + Map parameters = new HashMap<>(); + parameters.put("streetlightId", streetlightId); + return replaceParameters(turnOffTopic, parameters); + } + + + + + public void dimLight(DimLightPayload dimLightPayload, String streetlightId) { + Map headers = new HashMap<>(); + headers.put(MqttHeaders.TOPIC, getdimLightTopic(streetlightId)); + Message message = new GenericMessage<>(dimLightPayload, headers); + dimLightOutbound.handleMessage(message); + } + + private String getdimLightTopic(String streetlightId) { + Map parameters = new HashMap<>(); + parameters.put("streetlightId", streetlightId); + return replaceParameters(dimLightTopic, parameters); + } + + + + private String replaceParameters(String topic, Map parameters) { + if (parameters != null) { + String compiledTopic = topic; + for (String key : parameters.keySet()) { + compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key)); + } + return compiledTopic; + } + return topic; + } +} +" +`; + exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided mqtt 1`] = ` "package com.asyncapi.infrastructure; @@ -928,7 +1061,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @@ -1021,7 +1153,7 @@ public class Config { @Bean public IntegrationFlow receiveLightMeasurementFlow() { - return IntegrationFlows.from(receiveLightMeasurementInbound()) + return IntegrationFlow.from(receiveLightMeasurementInbound()) .handle(messageHandlerService::handleReceiveLightMeasurement) .get(); } @@ -1044,7 +1176,7 @@ public class Config { } @Bean - @ServiceActivator(inputChannel = "turnOnOutboundChannel") + @ServiceActivator(outputChannel = "turnOnOutboundChannel") public MessageHandler turnOnOutbound() { MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); @@ -1064,22 +1196,18 @@ public class Config { exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided mqtt 2`] = ` "package com.asyncapi.service; -import org.springframework.integration.annotation.Gateway; -import org.springframework.integration.annotation.MessagingGateway; +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.ZoneId; import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") -@MessagingGateway public interface PublisherService { - - - @Gateway(requestChannel = "turnOnOutboundChannel") - void turnOn(String data); + void turnOn(TurnOnOffPayload turnOnOffPayload, String streetlightId, ZoneId zoneId); } @@ -1091,32 +1219,69 @@ exports[`template integration tests for generated files using the generator and import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; +import org.springframework.integration.mqtt.support.MqttHeaders; + +import com.asyncapi.model.LightMeasuredPayload; import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service public class MessageHandlerService { - private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - - + + + + + @Value("\${mqtt.topic.receiveLightMeasurement}") + private String receiveLightMeasurementTopic; /** - * The topic on which measured values may be produced and consumed. - */ + * The topic on which measured values may be produced and consumed. + */ public void handleReceiveLightMeasurement(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + List parameters = decodeTopic(receiveLightMeasurementTopic, topic); + receiveLightMeasurement(parameters.get(0), (LightMeasuredPayload) message.getPayload()); + } + /** + * The topic on which measured values may be produced and consumed. + */ + public void receiveLightMeasurement(String streetlightId, LightMeasuredPayload payload) { LOGGER.info("handler smartylighting/streetlights/1/0/event/{streetlightId}/lighting/measured"); - LOGGER.info(String.valueOf(message.getPayload().toString())); + LOGGER.info(String.valueOf(payload.toString())); } - - - + + + private List decodeTopic(String topicPattern, String topic) { + List parameters = new ArrayList<>(); + int topicPossition = 0; + int patternPosition = 0; + while (topicPossition < topic.length()) { + while (topicPattern.charAt(patternPosition) == topic.charAt(topicPossition)) { + topicPossition++; + patternPosition++; + } + topicPossition++; + patternPosition += 2; // skip + + StringBuilder parameter = new StringBuilder(); + while (topicPattern.charAt(patternPosition) != topic.charAt(topicPossition)) { + parameter.append(topic.charAt(topicPossition)); + topicPossition++; + } + parameters.add(parameter.toString()); + } + return parameters; + } + } " `; @@ -1365,7 +1530,7 @@ public class TestcontainerMqttTest { receivedMessages.add(message); }); - publisherService.turnOn(payload.toString()); + publisherService.turnOn(payload, new String(), new com.asyncapi.model.ZoneId()); MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); @@ -1413,3 +1578,65 @@ exports[`template integration tests for generated files using the generator and "springBootVersion=3.1.3 springDependencyManager=1.1.3" `; + +exports[`template integration tests for generated files using the generator and mqtt example should generate proper config, services and DTOs files for provided mqtt 9`] = ` +"package com.asyncapi.service; + + +import com.asyncapi.model.TurnOnOffPayload; +import com.asyncapi.model.ZoneId; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.processing.Generated; +import java.util.HashMap; +import java.util.Map; + +@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") +@Service +public class PublisherServiceImpl implements PublisherService { + @Value("\${mqtt.topic.turnOn}") + private String turnOnTopic; + + @Autowired + private MessageHandler turnOnOutbound; + + + + + + public void turnOn(TurnOnOffPayload turnOnOffPayload, String streetlightId, ZoneId zoneId) { + Map headers = new HashMap<>(); + headers.put(MqttHeaders.TOPIC, getturnOnTopic(streetlightId, zoneId)); + Message message = new GenericMessage<>(turnOnOffPayload, headers); + turnOnOutbound.handleMessage(message); + } + + private String getturnOnTopic(String streetlightId, ZoneId zoneId) { + Map parameters = new HashMap<>(); + parameters.put("streetlightId", streetlightId); + parameters.put("zoneId", zoneId.toString()); + return replaceParameters(turnOnTopic, parameters); + } + + + + private String replaceParameters(String topic, Map parameters) { + if (parameters != null) { + String compiledTopic = topic; + for (String key : parameters.keySet()) { + compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key)); + } + return compiledTopic; + } + return topic; + } +} +" +`; diff --git a/tests/__snapshots__/oneOf.test.js.snap b/tests/__snapshots__/oneOf.test.js.snap index e5fc6efde..98843e09e 100644 --- a/tests/__snapshots__/oneOf.test.js.snap +++ b/tests/__snapshots__/oneOf.test.js.snap @@ -5,30 +5,35 @@ exports[`template integration tests for generated files using the generator and import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; - import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; - +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import com.asyncapi.model.AnonymousSchema1; import com.asyncapi.model.AnonymousSchema7; - import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service public class MessageHandlerService { - private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + + + @KafkaListener(topics = "song.released") public void release(@Payload Object payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { @@ -36,7 +41,7 @@ public class MessageHandlerService { } - + } " `; diff --git a/tests/__snapshots__/parameters.test.js.snap b/tests/__snapshots__/parameters.test.js.snap index eb1ac5526..75c562817 100644 --- a/tests/__snapshots__/parameters.test.js.snap +++ b/tests/__snapshots__/parameters.test.js.snap @@ -139,29 +139,34 @@ exports[`integration tests for generated files under different template paramete import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; - import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; - -import com.asyncapi.model.LightMeasuredPayload; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import com.asyncapi.model.LightMeasuredPayload; import javax.annotation.processing.Generated; +import java.util.ArrayList; +import java.util.List; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service public class MessageHandlerService { - private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + + + @KafkaListener(topics = "event.lighting.measured", groupId = "my-group") public void readLightMeasurement(@Payload LightMeasuredPayload payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { @@ -169,7 +174,7 @@ public class MessageHandlerService { } - + } " `; diff --git a/tests/kafka.test.js b/tests/kafka.test.js index 05b9d74af..9baec1d37 100644 --- a/tests/kafka.test.js +++ b/tests/kafka.test.js @@ -9,7 +9,7 @@ const generateFolderName = () => { return path.resolve(MAIN_TEST_RESULT_PATH, Date.now().toString()); }; -describe('template integration tests for generated files using the generator and mqtt example', () => { +describe('template integration tests for generated files using the generator and kafka example', () => { jest.setTimeout(30000); @@ -29,7 +29,8 @@ describe('template integration tests for generated files using the generator and '/src/main/java/com/asyncapi/model/LightMeasured.java', '/src/test/java/com/asyncapi/TestcontainerKafkaTest.java', '/build.gradle', - '/gradle.properties' + '/gradle.properties', + '/src/main/java/com/asyncapi/service/PublisherServiceImpl.java' ]; for (const index in expectedFiles) { const file = await readFile(path.join(outputDir, expectedFiles[index]), 'utf8'); @@ -38,3 +39,27 @@ describe('template integration tests for generated files using the generator and } }); }); + +describe('template integration tests for generated files using the generator and kafka with parameters example', () => { + + jest.setTimeout(30000); + + it('should generate proper config and message handler for provided kafka', async() => { + const outputDir = generateFolderName(); + const params = {}; + const kafkaExamplePath = './mocks/kafka-with-parameters.yml'; + + const generator = new Generator(path.normalize('./'), outputDir, { forceWrite: true, templateParams: params }); + await generator.generateFromFile(path.resolve('tests', kafkaExamplePath)); + + const expectedFiles = [ + '/src/main/java/com/asyncapi/infrastructure/Config.java', + '/src/main/java/com/asyncapi/service/MessageHandlerService.java' + ]; + for (const index in expectedFiles) { + const file = await readFile(path.join(outputDir, expectedFiles[index]), 'utf8'); + const fileWithAnyDate = file.replace(/date="\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d+([+-][0-2]\d:[0-5]\d|Z)"/, 'date="AnyDate"'); + expect(fileWithAnyDate).toMatchSnapshot(); + } + }); +}); \ No newline at end of file diff --git a/tests/mocks/amqp.yml b/tests/mocks/amqp.yml new file mode 100644 index 000000000..8da617e6c --- /dev/null +++ b/tests/mocks/amqp.yml @@ -0,0 +1,120 @@ +asyncapi: '2.0.0' +info: + title: Streetlights API + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + +servers: + production: + url: localhost + protocol: amqp + description: RabbitMQ + variables: + port: + default: '5672' + username: + default: guest + + +defaultContentType: application/json + +channels: + smartylighting.streetlights.event.{streetlightId}.lighting.measured: + description: The topic on which measured values may be produced and consumed. + bindings: + amqp: + is: routingKey + exchange: + name: lightMeasurementExchange + durable: false + autoDelete: true + queue: + name: lightMeasurementQueue + durable: false + autoDelete: true + exclusive: true + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + publish: + summary: Inform about environmental lighting conditions of a particular streetlight. + operationId: receiveLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + + smartylighting.streetlights.1.0.action.{streetlightId}.turn.on.{zoneId}: + bindings: + amqp: + is: routingKey + exchange: + name: lightMeasurementExchange + durable: false + autoDelete: true + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + zoneId: + $ref: '#/components/parameters/zoneId' + subscribe: + operationId: turnOn + message: + $ref: '#/components/messages/turnOnOff' + +components: + messages: + lightMeasured: + name: lightMeasured + title: Light measured + summary: Inform about environmental lighting conditions of a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + turnOnOff: + name: turnOnOff + title: Turn on/off + summary: Command a particular streetlight to turn the lights on or off. + payload: + $ref: "#/components/schemas/turnOnOffPayload" + + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + sentAt: + $ref: "#/components/schemas/sentAt" + turnOnOffPayload: + type: object + properties: + command: + type: string + enum: + - on + - off + description: Whether to turn on or off the light. + sentAt: + $ref: "#/components/schemas/sentAt" + sentAt: + type: string + format: date-time + description: Date and time when the message was sent. + + parameters: + streetlightId: + description: The ID of the streetlight. + schema: + type: string + + zoneId: + description: The ID of the streetlight. + schema: + type: object + properties: + id: + type: integer \ No newline at end of file diff --git a/tests/mocks/kafka-with-parameters.yml b/tests/mocks/kafka-with-parameters.yml new file mode 100644 index 000000000..0cd5c6a2e --- /dev/null +++ b/tests/mocks/kafka-with-parameters.yml @@ -0,0 +1,66 @@ +asyncapi: '2.0.0' +info: + title: Streetlights API + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you + to remotely manage the city lights. + license: + name: Apache 2.0 + url: 'https://www.apache.org/licenses/LICENSE-2.0' + +servers: + production: + url: kafka.bootstrap:{port} + protocol: kafka + variables: + port: + default: '9092' + enum: + - '9092' + - '9093' + +channels: + event.lighting.{streetlightId}.measured.{zoneId}: + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + zoneId: + schema: + type: integer + publish: + bindings: + kafka: + groupId: my-group + operationId: readLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' +components: + parameters: + streetlightId: + description: The ID of the streetlight. + schema: + type: string + messages: + lightMeasured: + summary: Inform about environmental lighting conditions for a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + sentAt: + $ref: "#/components/schemas/sentAt" + sentAt: + type: string + format: date-time + description: Date and time when the message was sent. \ No newline at end of file diff --git a/tests/mocks/mqtt.yml b/tests/mocks/mqtt.yml index b7dfdc512..17d3fe85e 100644 --- a/tests/mocks/mqtt.yml +++ b/tests/mocks/mqtt.yml @@ -45,10 +45,12 @@ channels: message: $ref: '#/components/messages/lightMeasured' - smartylighting/streetlights/1/0/action/{streetlightId}/turn/on: + smartylighting/streetlights/1/0/action/{streetlightId}/turn/on/{zoneId}: parameters: streetlightId: $ref: '#/components/parameters/streetlightId' + zoneId: + $ref: '#/components/parameters/zoneId' subscribe: bindings: mqtt: @@ -105,4 +107,12 @@ components: streetlightId: description: The ID of the streetlight. schema: - type: string \ No newline at end of file + type: string + + zoneId: + description: The ID of the streetlight. + schema: + type: object + properties: + id: + type: integer \ No newline at end of file diff --git a/tests/mqtt.test.js b/tests/mqtt.test.js index 4f7b58003..1d017ccbf 100644 --- a/tests/mqtt.test.js +++ b/tests/mqtt.test.js @@ -35,7 +35,8 @@ describe('template integration tests for generated files using the generator and '/src/main/java/com/asyncapi/model/TurnOnOff.java', '/src/test/java/com/asyncapi/TestcontainerMqttTest.java', '/build.gradle', - '/gradle.properties' + '/gradle.properties', + '/src/main/java/com/asyncapi/service/PublisherServiceImpl.java' ]; for (const index in expectedFiles) { const file = await readFile(path.join(outputDir, expectedFiles[index]), 'utf8'); @@ -60,7 +61,8 @@ describe('template integration tests for generated files using the generator and '/src/main/java/com/asyncapi/model/LightMeasured.java', '/src/test/java/com/asyncapi/TestcontainerMqttTest.java', '/build.gradle', - '/gradle.properties' + '/gradle.properties', + '/src/main/java/com/asyncapi/service/PublisherServiceImpl.java' ]; for (const index in expectedFiles) { const file = await readFile(path.join(outputDir, expectedFiles[index]), 'utf8');