Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature #4793] Support MQTT protocol #4794

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

karsonto
Copy link
Contributor

Fixes #4793

Motivation

Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.

Modifications

Describe the modifications you've done.

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copy link

codecov bot commented Mar 19, 2024

Codecov Report

Attention: Patch coverage is 0.23256% with 429 lines in your changes are missing coverage. Please review.

Project coverage is 16.11%. Comparing base (0250a89) to head (44e61e9).
Report is 7 commits behind head on master.

Files Patch % Lines
...ntime/core/protocol/mqtt/client/ClientManager.java 0.00% 125 Missing ⚠️
...he/eventmesh/runtime/boot/EventMeshMQTTServer.java 0.00% 86 Missing ⚠️
...re/protocol/mqtt/processor/SubscrubeProcessor.java 0.00% 48 Missing ⚠️
...me/core/protocol/mqtt/exception/MqttException.java 0.00% 31 Missing ⚠️
...rotocol/mqtt/processor/ClientConnectProcessor.java 0.00% 30 Missing ⚠️
...core/protocol/mqtt/processor/PublishProcessor.java 0.00% 28 Missing ⚠️
...eventmesh/runtime/boot/EventMeshMqttBootstrap.java 0.00% 19 Missing ⚠️
.../protocol/mqtt/processor/UnSubscrubeProcessor.java 0.00% 16 Missing ⚠️
...protocol/mqtt/processor/AbstractMqttProcessor.java 0.00% 11 Missing ⚠️
.../protocol/mqtt/processor/HealthCheckProcessor.java 0.00% 11 Missing ⚠️
... and 6 more
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #4794       +/-   ##
=============================================
+ Coverage          0   16.11%   +16.11%     
- Complexity        0     1734     +1734     
=============================================
  Files             0      870      +870     
  Lines             0    31682    +31682     
  Branches          0     2739     +2739     
=============================================
+ Hits              0     5106     +5106     
- Misses            0    26097    +26097     
- Partials          0      479      +479     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.



protected final transient Map<MqttMessageType, MqttProcessor> processorTable =
new ConcurrentHashMap<>(64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is not serializable, is the transient keyword redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,please review again.

} catch (Exception ex) {
log.error("EventMeshMQTTServer RemotingServer shutdown Err!", ex);
}
System.exit(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it appropriate to exit the process when the MQTT server fails to start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reference to other EM protocols.If you want to modify it after the discussion, I will ignore this startup failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xwm1992

Currently tcp protocol, http protocol server must be started successfully, grpc protocol did not so. Now added MQTT protocol server must be started successfullyt, please community to give advice, I can not decide.

目前tcp协议、http协议的server必须成功启动,grpc协议没有如此,现在新增MQTT协议的server是否必须启动成功,请社区给出意见,我权衡不好。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解这里的必须启动成功实际上是受是否开启MQTT协议的配置控制的吧?这里的退出我认为没有问题,如果MQTT协议加载有问题退出了,那其实可以在配置中移除MQTT协议,保证TCP、HTTP等协议正常启动服务就好。

this.cleanThread = new Thread(() -> {
while (true && !Thread.currentThread().isInterrupted()) {
try {
ClientInfo clientInfo = delayQueue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the role of delayQueue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The role of delayQueue is to clean up the time out connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it up to the client to decide and set the length of time that each client's connection survives when it sends a message?

每个客户端的连接的存活时长,是靠客户端发送消息时自己决定并设置好吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

客户端可以携带keep alive time参数,用做会话存活时间


@Override
public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException {
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,please review again.

protected final Map<MqttMessageType, MqttProcessor> processorTable =
new ConcurrentHashMap<>(64);

private final transient AtomicBoolean started = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem.
I'm not familiar with MQTT, so the rest of review work needs the community to complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem.

Waiting for your response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the problem here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,please review again.

@xwm1992
Copy link
Contributor

xwm1992 commented Mar 26, 2024

@karsonto
I don’t understand the implementation of the publish processor, and I don’t see how messages of the mqtt protocol are converted and sent to eventmesh storage.


我没有理解对于publish processor的实现,没有看到将mqtt协议的消息进行转换后发送到eventmesh storage。

@karsonto
Copy link
Contributor Author

karsonto commented Mar 26, 2024

@xwm1992
This PR is the framework design of the MQTT protocol , and will continue to optimize persist messages in another PR.

@karsonto
Copy link
Contributor Author

This PR is the framework design of the MQTT protocol , and will continue to optimize persist messages in another PR.

Copy link
Member

@Pil0tXia Pil0tXia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is eventmesh-protocol-plugin implementation the only work left for this protocol in Runtime?

@karsonto
Copy link
Contributor Author

karsonto commented Apr 15, 2024

Is eventmesh-protocol-plugin implementation the only work left for this protocol in Runtime?

I think there is unnecessry to implement this module for this time. If it needs to be implemented in the future, I will submit a PR to supplement it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support MQTT protocol
4 participants