-
-
Notifications
You must be signed in to change notification settings - Fork 51
/
MqttConfig.java
142 lines (120 loc) · 6.52 KB
/
MqttConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
{% macro mqttConfig(asyncapi, params) %}
import {{params['userJavaPackage']}}.service.MessageHandlerService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import javax.annotation.processing.Generated;
@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
@Configuration
public class Config {
@Value("${mqtt.broker.address}")
private String address;
@Value("${mqtt.broker.timeout.connection}")
private int connectionTimeout;
@Value("${mqtt.broker.timeout.disconnection}")
private long disconnectionTimeout;
@Value("${mqtt.broker.timeout.completion}")
private long completionTimeout;
@Value("${mqtt.broker.clientId}")
private String clientId;
@Value("${mqtt.broker.username}")
private String username;
@Value("${mqtt.broker.password}")
private String password;
{% for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'mqtt' and server.binding('mqtt') %}
{% if server.binding('mqtt').cleanSession | isDefined %}
@Value("${mqtt.broker.cleanSession}")
private boolean cleanSession;
{% endif %}{% if server.binding('mqtt').keepAlive | isDefined %}
@Value("${mqtt.broker.timeout.keepAlive}")
private int keepAliveInterval;
{% endif %}{% if server.binding('mqtt').lastWill %}
@Value("${mqtt.broker.lastWill.topic}")
private String lastWillTopic;
@Value("${mqtt.broker.lastWill.message}")
private String lastWillMessage;
@Value("${mqtt.broker.lastWill.qos}")
private int lastWillQos;
@Value("${mqtt.broker.lastWill.retain}")
private boolean lastWillRetain;
{% endif %}{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}")
private String {{channel.publish().id() | camelCase-}}Topic;
{% elif channel.hasSubscribe() %}
@Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}")
private String {{channel.subscribe().id() | camelCase-}}Topic;
{% endif %}{% endfor %}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
{% for serverName, server in asyncapi.servers() %}
{% if server.protocol() == 'mqtt' and server.binding('mqtt').lastWill %}options.setWill(lastWillTopic, lastWillMessage.getBytes(), lastWillQos, lastWillRetain);{% endif %}
{% if server.protocol() == 'mqtt' and server.binding('mqtt').cleanSession | isDefined %}options.setCleanSession(cleanSession);{% endif %}
{% if server.protocol() == 'mqtt' and server.binding('mqtt').keepAlive | isDefined %}options.setKeepAliveInterval(keepAliveInterval);{% endif %}{% endfor %}
options.setServerURIs(new String[] { address });
if (!StringUtils.isEmpty(username)) {
options.setUserName(username);
}
if (!StringUtils.isEmpty(password)) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(connectionTimeout);
factory.setConnectionOptions(options);
return factory;
}
@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Bean
public IntegrationFlow {{channel.publish().id() | camelCase}}Flow() {
return IntegrationFlows.from({{channel.publish().id() | camelCase}}Inbound())
.handle(messageHandlerService::handle{{channel.publish().id() | camelCase | upperFirst}})
.get();
}
@Bean
public MessageProducerSupport {{channel.publish().id() | camelCase}}Inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
mqttClientFactory(), {{channel.publish().id() | camelCase}}Topic);
adapter.setCompletionTimeout(connectionTimeout);
adapter.setDisconnectCompletionTimeout(disconnectionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
return adapter;
}
{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Bean
public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
public MessageHandler {{channel.subscribe().id() | camelCase}}Outbound() {
MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
pahoMessageHandler.setAsync(true);
pahoMessageHandler.setCompletionTimeout(completionTimeout);
pahoMessageHandler.setDisconnectCompletionTimeout(disconnectionTimeout);
pahoMessageHandler.setDefaultTopic({{channel.subscribe().id() | camelCase}}Topic);
{% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').retain | isDefined %}pahoMessageHandler.setDefaultRetained({{channel.subscribe().binding('mqtt').retain}});{% endif %}
{% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').qos | isDefined %}pahoMessageHandler.setDefaultQos({{channel.subscribe().binding('mqtt').qos}});{% endif %}
return pahoMessageHandler;
}
{% endif %}{% endfor %}
}
{% endmacro %}