Skip to content

Commit

Permalink
feat: add support for channel parameters (#352)
Browse files Browse the repository at this point in the history
* add pattern support for kafka

* add pattern support for mqtt message handling

* use mqtt specific wildcard

* mqtt publish implementation

* adopt interface for other protocols

* update tests

* general amqp routing key support

* fix current amqp methods

* provide arbitrary methods for amqp

* provide arbitrary methods for mqtt

* update tests

* provide arbitrary methods for kafka in message handler

* provide arbitrary methods for kafka in publisher

* update tests

* update readme, resolve missing features, add attention bar
  • Loading branch information
Tenischev committed Dec 13, 2023
1 parent 7ce1c90 commit b1e3eb2
Show file tree
Hide file tree
Showing 29 changed files with 1,538 additions and 257 deletions.
14 changes: 9 additions & 5 deletions README.md
Expand Up @@ -24,6 +24,12 @@ Java Spring template for the [AsyncAPI Generator](https://github.com/asyncapi/ge

<!-- tocstop -->

----
## Attention, AsyncAPI v3 is not currently supported by this template


----

## Usage

Install AsyncAPI CLI, for details follow the [guide](https://www.asyncapi.com/tools/cli).
Expand Down Expand Up @@ -140,15 +146,13 @@ Preferably generated tests should pass.
See the list of features that are still missing in the component:

- [ ] support of Kafka is done based on clear "spring-kafka" library without integration like for mqtt or amqp
- [ ] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java)
- [x] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java)
- [ ] tests for protocol `amqp` are not provided
- [x] add annotation to the [model generation](template/src/main/java/com/asyncapi/model). Consider "@Valid", "@JsonProperty", "@Size", "@NotNull" e.t.c.
- [ ] [`parameters`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported
- [x] [`parameters`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported
- [ ] [`server variables`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#serverVariableObject) are not entirely supported
- [ ] [`security schemas`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#securitySchemeObject) are not supported
- [ ] [`traits`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#operationTraitObject) are not supported
- [x] [`traits`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#operationTraitObject) are not supported
- [ ] Json serializer/deserializer is used always, without taking into account real [`content type`](https://github.com/asyncapi/spec/blob/2.0.0/versions/2.0.0/asyncapi.md#default-content-type)
- [x] client side generation mode (in general just flip subscribe and publish channels)
- [ ] template generation of docker-compose depending on protocol of server, now the rabbitmq is hardcoded

If you want to help us develop them, feel free to contribute.
Expand Down
41 changes: 40 additions & 1 deletion filters/all.js
Expand Up @@ -190,4 +190,43 @@ function addBackSlashToPattern(val) {
}
filter.addBackSlashToPattern = addBackSlashToPattern;

filter.currentTime = () => (new Date(Date.now())).toISOString();
filter.currentTime = () => (new Date(Date.now())).toISOString();

function replaceAll(originalStr, replacePattern, replaceString) {
return originalStr.replaceAll(replacePattern, replaceString)
}
filter.replaceAll = replaceAll;

function toTopicString(channelName, hasParameters, parameters, convertDots, replaceParameterValue, replaceDots = "\\\\.") {
if (hasParameters) {
let topicName = channelName
if (convertDots) {
topicName = replaceAll(topicName, ".", replaceDots)
}
Object.keys(parameters).forEach(value => topicName = topicName.replace("{" + value + "}", replaceParameterValue))
return topicName
} else {
return channelName
}
}

function toKafkaTopicString(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, true, ".*")
}
filter.toKafkaTopicString = toKafkaTopicString

function toMqttTopicString(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, false, "+")
}

filter.toMqttTopicString = toMqttTopicString

function toAmqpNeutral(channelName, hasParameters, parameters) {
return toTopicString(_.camelCase(channelName), hasParameters, parameters, true, "", "")
}
filter.toAmqpNeutral = toAmqpNeutral

function toAmqpKey(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, true, "*")
}
filter.toAmqpKey = toAmqpKey
60 changes: 33 additions & 27 deletions partials/AmqpConfig.java
Expand Up @@ -32,20 +32,44 @@ public class Config {


{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
{% set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{% if channel.binding('amqp') and channel.binding('amqp').exchange %}
@Value("${amqp.{{- varName -}}.exchange}")
private String {{varName}}Exchange;
{% endif %}

@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;

@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Value("${amqp.{{- varName -}}.queue}")
private String {{varName}}Queue;
{% endif %}

{% if channel.hasPublish() %}
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;
{% set name = varName | camelCase %}
{% if channel.binding('amqp') and channel.binding('amqp').exchange %}
{% if channel.binding('amqp').exchange.type and channel.binding('amqp').exchange.type !== 'default' %}{% set type = channel.binding('amqp').exchange.type | camelCase %}{% else %}{% set type = 'Topic' %}{% endif %}
{% set type = type + 'Exchange' %}
@Bean
public {{type}} {{name}}Exchange() {
return new {{type}}({{varName}}Exchange, {% if channel.binding('amqp').exchange.durable %}{{channel.binding('amqp').exchange.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').exchange.exclusive %}{{channel.binding('amqp').exchange.exclusive}}{% else %}false{% endif%});
}

{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Bean
public Binding binding{{name | upperFirst}}({{type}} {{name}}Exchange, Queue {{name}}Queue) {
return BindingBuilder.bind({{name}}Queue).to({{name}}Exchange){% if channel.binding('amqp').exchange.type !== 'fanout' %}.with({{varName}}RoutingKey){% endif %};
}
{% endif %}{% endif %}

{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Bean
public Queue {{name}}Queue() {
return new Queue({{varName}}Queue, {% if channel.binding('amqp').queue.durable %}{{channel.binding('amqp').queue.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').queue.exclusive %}{{channel.binding('amqp').queue.exclusive}}{% else %}false{% endif%}, {% if channel.binding('amqp').queue.autoDelete %}{{channel.binding('amqp').queue.autoDelete}}{% else %}false{% endif%});
}
{% endif %}
{%- endfor %}

{% endfor %}

@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -56,24 +80,6 @@ public ConnectionFactory connectionFactory() {
return connectionFactory;
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);


@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
Expand Down
38 changes: 15 additions & 23 deletions partials/AmqpPublisher.java
@@ -1,40 +1,32 @@
{% macro amqpPublisher(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}
import javax.annotation.processing.Generated;


@Service
public class PublisherService {
@Autowired
private RabbitTemplate template;
@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
public interface PublisherService {

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
{%- else %}
{%- set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- endif %}
{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} payload);
{%- if channel.hasParameters() %}
void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload);
{% endif %}
{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
public void {{channel.subscribe().id() | camelCase}}(){
{{schemaName}} {{channelName}}Payload = new {{schemaName}}();
template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload);
}

{% endif %}
{% endfor %}

Expand Down
81 changes: 81 additions & 0 deletions partials/AmqpPublisherImpl.java
@@ -0,0 +1,81 @@
{% macro amqpPublisherImpl(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}

import javax.annotation.processing.Generated;

@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
@Service
public class PublisherServiceImpl implements PublisherService {
@Autowired
private RabbitTemplate template;

{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
@Value("${amqp.{{- varName -}}.exchange}")
private String {{varName}}Exchange;
@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;
{%- endif %}
{% endfor %}

{%- set anyChannelHasParameter = false %}
{% for channelName, channel in asyncapi.channels() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %}
{%- if channel.hasSubscribe() %}
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
{%- else %}
{%- set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- endif %}
{%- set channelVariable = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
public void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} payload){
template.convertAndSend({{channelVariable}}Exchange, {{channelVariable}}RoutingKey, payload);
}

{%- if channel.hasParameters() %}
public void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload) {
String compiledRoutingKey = compileRoutingKey({{channelVariable}}RoutingKey, {% for parameterName, parameter in channel.parameters() %}{{parameterName}}{% if not loop.last %}, {% endif %}{%- endfor %});
template.convertAndSend({{channelVariable}}Exchange, compiledRoutingKey, payload);
}
{% endif %}

{% endif %}
{% endfor %}

{%- if anyChannelHasParameter %}
private String compileRoutingKey(String routingKeyTemplate, String... parameters) {
StringBuilder result = new StringBuilder();
int routeKeyPossition = 0;
int parametersIndex = 0;
while (routeKeyPossition < routingKeyTemplate.length()) {
while (routingKeyTemplate.charAt(routeKeyPossition) != '*') {
routeKeyPossition++;
result.append(routingKeyTemplate.charAt(routeKeyPossition));
}
routeKeyPossition++;
String parameter = parameters[parametersIndex++];
result.append(parameter != null ? parameter : "*");
}
return result.toString();
}
{%- endif %}

}

{% endmacro %}
32 changes: 23 additions & 9 deletions partials/CommonPublisher.java
@@ -1,22 +1,36 @@
{% macro commonPublisher(asyncapi) %}

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
{% macro commonPublisher(asyncapi, params) %}

{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{%- if channel.hasParameters() %}
{%- for parameterName, parameter in channel.parameters() %}
{%- if parameter.schema().type() === 'object' %}
import {{params['userJavaPackage']}}.model.{{parameterName | camelCase | upperFirst}};
{%- endif %}
{%- endfor -%}
{% endif -%}
{% endfor %}
import javax.annotation.processing.Generated;

@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
@MessagingGateway
public interface PublisherService {

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
{%- if channel.hasSubscribe() %}
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
{%- else %}
{%- set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- endif %}
{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
@Gateway(requestChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
void {{channel.subscribe().id() | camelCase}}(String data);
*/{% endif %}{% set hasParameters = channel.hasParameters() %}
void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %});
{% endif %}
{% endfor %}
}
Expand Down

0 comments on commit b1e3eb2

Please sign in to comment.