Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Disconnections happen a lot #1692

Open
wernermorgenstern opened this issue Sep 13, 2023 · 9 comments
Open

MQTT Disconnections happen a lot #1692

wernermorgenstern opened this issue Sep 13, 2023 · 9 comments

Comments

@wernermorgenstern
Copy link

We have the following use case:
We have devices, which send a notification every 10 seconds to the MQTT Broker (VerneMQ).
We have a client service, which connects to the MQTT Broker and subscribes to the Topic, and then handles the incoming messages. The QoS for the subscription is 2.

The client service runs on AWS, in Kubernetes (EKS)

When we have a few thousand (like around 2000 right now) of these devices, it keeps on processing fine for a few hours. After about 4-5 hours, the connection breaks, and it is not processing any more messages, even though the mqttClient.connected is still true.

And on the ACK, I assume, I see in the error logs a ECONNRESET error message on a TCP Write Stream.
After that happens, it will never reconnect and start getting messages.

I already added a cron job every minute, to check when the last message was received in the Handler, and if is over a minute, it will unsubscribe, and resubscribe. But then it never processes more messages, or it gets a few (maybe 20), and then it will have another ECONNRESET Error.

Only way to get it to work again, is to restart the service.

However, when we only have like 100 devices sending the 10 second messages, than it works better.

I am wondering if maybe the MQTT Client runs out of memory (too many messages)?

This has been frustrating me now for a few weeks now. And what is hard to troubleshoot, is that this issue only happens after 4 or 5 hours.
So after changing some code, or doing more checks, I will have to wait 4-5 hours, before I can deploy any changes.

@robertsLando
Copy link
Member

robertsLando commented Sep 14, 2023

If I got it correctly:

2000 mqtt devices publishing 1 message every 10 seconds means you will have around 200 QoS2 msg/sec
1 client service that is subscribed to receive all those messages

The client, after around 4/5h stop processing messages. This means it stops after receiving around 3M messages from devices.

You could try to reproduce this easily with a script IMO, you create a publisher script that inits N clients that publish those messages (much faster then 200 msg/s so the issue should show up in less time) and a subscriber script that subscribes to them, once the bug is reproduced you could use https://clinicjs.org/heapprofiler/ or simply take a snapshot of the application and then analyze it in chrome dev tools in order to detect where the bug could be.

Based on this:

But then it never processes more messages, or it gets a few (maybe 20), and then it will have another ECONNRESET Error.

What I could think is that at a certain point the client gets stuck because it cannot process the messages in the stream anymore (there are too messages in the queue) and the socket at that point should be closed and opened again, so instead of simply unsubscribe you should restart the client instead.

@wernermorgenstern
Copy link
Author

I made a change to the VerneMQ Instance in AWS ECS, and it has been at 1804 (100%) processing for 6 hours. I had to change some tcp_listen_options, especially the linger delay to a non-zero value. I found this suggestion on the VerneMQ Github page, under issues. So I will let it run for now, and see how long it will stay at no disconnection.

I do like your idea though about the Script.

So regarding your idea, to restart the client. I don't necessarily restart the whole service. What is the best way to restart the MQTTJS Client?
Should I do the following:

  1. Remove the listener on the message event
  2. Then call .end(false)
  3. Then set the client variable to undefined
  4. And then connect, subscribe, and add the listener

@robertsLando
Copy link
Member

@wernermorgenstern You can use reconnect()

@wernermorgenstern
Copy link
Author

When we have a few thousand (like around 6884 right now) of these devices, we can't keep up on the Client Side.
For 6884 devices, at every 10s per device, we are getting 41304 messages per minute.

I have currently running 10 instances of the Client Service, but we are still behind by 2 minutes or more.
Each message gets processed, and when I put timing logging statements, each messages takes about 10 milliseconds.

I have seen that MQTT.js gets multiple messages in each connection (so it seems to be batching them).

What other ways can I implement it in NodeJS? (We use Typescript, and convert to Javascript, and then run the Javascript code).

I was thinking Worker Threads, but before I go and put the effort in, I want to actually make sure that would help in solving the issue.

I found two libraries, which implement Worker Threads with easier/simpler ways.
https://www.npmjs.com/package/piscina
https://github.com/poolifier/poolifier

I don't necessarily want to increase instances. We want to put up the Devices to 10_000, and even 50_000, and even higher.

@robertsLando
Copy link
Member

robertsLando commented Sep 18, 2023

Tried to monitor CPU/RAM on the AWS instance? Maybe you just don't have enough "power"

Anyway a possible solution would be to put the client behind a load balance and spread the load across multiple instances.

Even easier is to use shared subscriptions and spawn multiple clients, the load balance at that point is handled on broker side.

Giving that this could be considered an "enterprise usage" of this library consider to sponsor the project

@wernermorgenstern
Copy link
Author

@robertsLando ,

  • We are using shared subscriptions
  • I have 10 instances running, using the shared subscriptions. And all of them are picking up messages. I also am using now worker threads, so with 10 instances, and 8 CPU's, I have basically 80 Clients
  • Regarding AWS Usage (using Grafana Dashboard), our CPU usage is 0.04 per Instance.

The MQTT Broker is self-hosted VerneMQ Broker. I will try to update the Version for the VerneMQ Broker. We are 4 minor versions behind.

@robertsLando
Copy link
Member

robertsLando commented Sep 18, 2023

It depends so how long does it takes in order to process those messages, it could be that the messages are coming too fast, using worker threads could be a possible solution but you need to first identify the root cause of the problem

@wernermorgenstern
Copy link
Author

It takes on average between 5ms and 10ms to process each message. It does two Redis Lookups, decrypts the payload, a few calculations, and then saves a record back to Redis.

I will look more into the Router. It is hard to determine where the backup occurs.
Is it on the Router Side, or the Client SIde? That is what I will need to determine somehow.

On the Publisher's side, we add a Timestamp to the Topic, so when the Client gets the message, I can get the timestamp when it was published through the topic. And right now, that is 8 minutes behind.

@robertsLando
Copy link
Member

And right now, that is 8 minutes behind.

That's a lot

Is it on the Router Side, or the Client SIde? That is what I will need to determine somehow.

I don't understand what you mean with router/client side. Are you sure verneMQ is not the problem here? maybe it's him that forwards messages in late?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants