Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Potential memory leak #734

Open
basimons opened this issue Mar 3, 2023 · 21 comments
Open

[Question] Potential memory leak #734

basimons opened this issue Mar 3, 2023 · 21 comments

Comments

@basimons
Copy link

basimons commented Mar 3, 2023

Hello,

I don't know if this is the right place to ask, if not, please let me know, I'll transfer this to stackoverflow.

We are using Moquette for our IOT data, and after applying some stateless transformation on the payload we put it in kafka. This all works as expected and is very performant, thank you for that!

However, we've been noticing that there is a memory leak in our service. I'm posting it here, as we also have a similar service who does basically the same, but listens to IOT data over http.

Our memory usage can be seen here:
image
Where the yellow line is the DirectBuffer part and the red line is the tenured gen.

The memory according to kubernetes (kubernetes.memory.working_set) is also steadily increasing, with roughly the same speed (indicating that it is probably the byte buffer).
image

Initially after exploring the heap dump, we didn't find anything strange. We also used jemalloc to track off heap memory usage, but that also is not pointing to anything in particular.

After doing some more reading, I ran this OQL query: SELECT x AS ByteBuffer, x.capacity AS Capacity, x.limit AS Limit, x.mark AS Mark, x.position AS Position FROM java.nio.DirectByteBuffer x WHERE ((x.capacity > (1024 * 1024)) and (x.cleaner != null)), using MAT, on the heap dump. The output of this seams to point to a potential issue as we get roughly 400 java.nio.DirectByteBuffer's with a size of 4194304 bytes. Am I correct in assuming there are 400 different ByteBufs of roughly 4mB, or am I wrong and there are just a bunch of ByteBufs pointing to the same byte[] underneath and there is no issue at all?

See output here:
image

Does anybody know if we might be using the Moquette library in a wrong way, that could cause such a thing? Because we are not using any direct byte buffers by ourselves, we only use a ByteBuf once when we create a MqttPublishMessage, which gets released by the Server#internalPublish method. If not, what would our next best step be, to figure out what is causing this?

We'd greatly appreciate some insight/tips/help, as we've been looking at it for a while, but can't seem to figure out what the cause is.

Thanks in advance!

@hylkevds
Copy link
Collaborator

hylkevds commented Mar 3, 2023

Are you, by any chance, connecting using cleanSession=false ?
Currently Moquette never cleans up sessions that have cleanSession=false. We have a work item for that: #732

@basimons
Copy link
Author

basimons commented Mar 6, 2023

Hmm, I checked, but it does not seem that that is the case. I did some extra checking, and I see that a lot of these byte buffers point to kafka threads, so I guess that I'm at the wrong place.

Thanks for you help.

@basimons basimons closed this as completed Mar 6, 2023
@basimons
Copy link
Author

basimons commented Mar 16, 2023

I think I might have to reopen this issue. It looks like I was pointed in the wrong direction by MAT. When I analyzed the same heap dump with jxray, I got this:
image
Which points to io.netty, which kafka does not use AFAIK.

Could this be caused by the Moquette library? As I don't know if these sessions gets saved into a local map, or do they get saved in a DirectByteBuffer.

I'll try to reproduce the behaviour locally, but thanks for your suggestion. I wouldn't be surprised if it would have to do with the clean_session=false parameter.

@basimons basimons reopened this Mar 16, 2023
@andsel
Copy link
Collaborator

andsel commented Mar 20, 2023

Am I correct in assuming there are 400 different ByteBufs of roughly 4mB

Yes.

The 4MB size made suspicious about UnsafeQueues, is the exact size of the UnsafeQueues's Segment.
Which version of the library are you using?

@basimons
Copy link
Author

We are currently running version 0.15.

We have tried upgrading to 0.16 once, but we ran into this issue. So I cannot tell you if upgrading to 0.16 would fix this issue unfortunately.

@andsel
Copy link
Collaborator

andsel commented Mar 21, 2023

Ok so in version 0.15 there weren't any UnsafeQueue, it's something that is shipping with 0.17.
Given that the project move forward a lot on 0.16 and the issue you reported is fixed in 0.17 would try the current main snapshot?

@zhengyuan-cn
Copy link

if you extends AbstractInterceptHandler , you shuld release by super.onPublish(msg) ,not call ReferenceCountUtil.release(msg);

@daigangHZ
Copy link

daigangHZ commented Apr 25, 2024

Sure, I also meet this issue(there's no memory leak with emqx):
2024-04-25 18:12:03.202 [nioEventLoopGroup-6-1] ERROR io.netty.util.ResourceLeakDetector[319] - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
io.netty.handler.codec.ReplayingDecoderByteBuf.readRetainedSlice(ReplayingDecoderByteBuf.java:588)
io.netty.handler.codec.mqtt.MqttDecoder.decodePublishPayload(MqttDecoder.java:654)
io.netty.handler.codec.mqtt.MqttDecoder.decodePayload(MqttDecoder.java:522)
io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:114)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)

@andsel
Copy link
Collaborator

andsel commented Apr 26, 2024

@daigangHZ thanks for reporting. Emqx is a different tool developed with different technologies (Erlang and BeamVM), so uses other pattern for memory management which it's hard to correlate with Moquette and JVM.
Which version of Moquette are you using?
Is it reproducible in someway?

@daigangHZ
Copy link

@daigangHZ thanks for reporting. Emqx is a different tool developed with different technologies (Erlang and BeamVM), so uses other pattern for memory management which it's hard to correlate with Moquette and JVM. Which version of Moquette are you using? Is it reproducible in someway?

This issue can be reproduced. I'm using version 0.18-Snapshot, with the Git commit ending at 23c7b39. I deployed the Moquette broker on a server (2 cores, 4GB RAM) and then started 100 MQTT clients. These 100 clients send messages to the broker at a rate of 5Hz. Another application listens to the messages sent by these 100 clients through the broker. Whenever I run this setup for about 12 hours, the broker process gets killed by the OOM killer (hence, I switched to EMQX to eliminate certain factors).

@hylkevds
Copy link
Collaborator

hylkevds commented Apr 26, 2024

Trying the Git HEAD version with my Moquette Load Test project results in clients getting empty messages, so something is not correct!

Hmm, can't reproduce it any more... Maybe a case of not cleaning the project and old class files lying around?

Wait, I can reproduce it. It happens with clients using QOS 1 or 2, starting from commit 4973627

@daigangHZ
Copy link

Trying the Git HEAD version with my Moquette Load Test project results in clients getting empty messages, so something is not correct!

Hmm, can't reproduce it any more... Maybe a case of not cleaning the project and old class files lying around?

Wait, I can reproduce it. It happens with clients using QOS 1 or 2, starting from commit 4973627

I also used QoS1 messages, so I'll roll back to a previous version and give it a try.

@hylkevds
Copy link
Collaborator

Turning on trace on the MQTTConnection class shows corruption:

16:08:10.656 [  Session Executor 5] INFO       i.m.broker.MQTTConnection - Sending PUBLISH(AT_LEAST_ONCE) message. MessageId=1, topic=Datastreams(0)/Observations, payload=Last: 0test to c-21a18438-defc-4ed4-8d09-f3fb9bd6e8b8
16:08:10.657 [  Session Executor 1] INFO       i.m.broker.MQTTConnection - Sending PUBLISH(AT_LEAST_ONCE) message. MessageId=1, topic=Datastreams(0)/Observations, payload=eams(0)/Observations<U+0001>Last: 0test to c-f2d4e452-da45-42fb-95ad-beeba8e01c64

First message is correct, the second one is corrupt. I think that due to the <U+0001> control character in the corrupted message the client ends up with an empty message.
The corrupted part varies, but in this case it looks like the last part of the topic. So there's probably an index not being set correctly.

hylkevds referenced this issue Apr 27, 2024
- Updates the serialization/deserialization of PublishedMessages in H2 and in segmented persistent queues to store and load MQTT properties, so that from a stored message with properties in queue then a PUBLISH messages could be recreated.
- Updates the createPublishMessage method factory in MQTTConnection to accept an optional list of MQTT properties to attach to the PUBLISH message.
- Updates PostOffice to send retained and non retained PUBLISH messages with subscriptionIdentifier MQTT property stored in the Subcription.
- Moved sendPublishQos0 method from MQTTConnection to Session where others sendPublishQos1 and sendPublishQos2 already resides.
- Added integration test to proof the publish with subscription identifier on both retained and non retained cases.
@hylkevds
Copy link
Collaborator

The corruption is possibly caused by SegmentedPersistentQueueSerDes.writePayload changing the readerIndex on a buffer without setting it back. I'm currently testing to confirm that.

But that doesn't explain the memory leak.
So you have 100 writing clients and 1 subscribing client?
Is your subscriber using cleanSession or not?
Is your subscriber receiving messages fast enough, or are they piling up on the server? (Moquette currently doesn't have a maximum on queue size.

@hylkevds
Copy link
Collaborator

Hmm, running tests in the profiler, it seems the H2 MVStore is the largest memory user that keeps growing.

There is also an inefficiency in the Session.inflightTimeouts. Messages are not removed from this Queue when they are successfully delivered. They are only removed after the timeout ends. This means that there are many more messages in it than there need to be. inflightTimeouts should only ever hold as many messages as there are inflightSlots. But on a busy session the timeout queue can be many thousands large.

@daigangHZ
Copy link

The corruption is possibly caused by SegmentedPersistentQueueSerDes.writePayload changing the readerIndex on a buffer without setting it back. I'm currently testing to confirm that.

But that doesn't explain the memory leak. So you have 100 writing clients and 1 subscribing client? Is your subscriber using cleanSession or not? Is your subscriber receiving messages fast enough, or are they piling up on the server? (Moquette currently doesn't have a maximum on queue size.

Yes, I have 100 clients sending messages, with one client receiving messages. I've utilized the cleanSession option and configured the automatic reconnection option. The client receiving messages processes them quickly, without any backlog.

@daigangHZ
Copy link

Hmm, running tests in the profiler, it seems the H2 MVStore is the largest memory user that keeps growing.

There is also an inefficiency in the Session.inflightTimeouts. Messages are not removed from this Queue when they are successfully delivered. They are only removed after the timeout ends. This means that there are many more messages in it than there need to be. inflightTimeouts should only ever hold as many messages as there are inflightSlots. But on a busy session the timeout queue can be many thousands large.

So, do we need to optimize this logic? It seems that this issue may be causing the memory growth.

@hylkevds
Copy link
Collaborator

So, do we need to optimize this logic? It seems that this issue may be causing the memory growth.

We definitely need to look at H2, since that seems to keep growing. The inflightTimeouts queue is bigger than it needs to be, but it doesn't grow.

@hylkevds
Copy link
Collaborator

I fixed some issues in the segmented queues that caused a memory leak, and improved the memory use of the Sessions.

@daigangHZ
Copy link

I fixed some issues in the segmented queues that caused a memory leak, and improved the memory use of the Sessions.

Awesome! I'll give it another try.

@hylkevds
Copy link
Collaborator

After testing for a night I found another small leak: #836.
The H2 memory use is not actually an issue, since it is capped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants