Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some issues with the new segmented store #710

Open
hylkevds opened this issue Jan 28, 2023 · 4 comments
Open

Some issues with the new segmented store #710

hylkevds opened this issue Jan 28, 2023 · 4 comments

Comments

@hylkevds
Copy link
Collaborator

Testing the latest master with my stress test app (many clients, connects, disconnects, etc...), I run into some exceptions.

As soon as the store opens a new page file this happens:

java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(23) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
	at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(23) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
	at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:895)
	at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:903)
	at io.netty.buffer.WrappedByteBuf.readBytes(WrappedByteBuf.java:657)
	at io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:498)
	at io.moquette.persistence.SegmentPersistentQueue$SerDes.writePayload(SegmentPersistentQueue.java:53)
	at io.moquette.persistence.SegmentPersistentQueue$SerDes.write(SegmentPersistentQueue.java:41)
	at io.moquette.persistence.SegmentPersistentQueue$SerDes.toBytes(SegmentPersistentQueue.java:26)
	at io.moquette.persistence.SegmentPersistentQueue.enqueue(SegmentPersistentQueue.java:125)
	at io.moquette.persistence.SegmentPersistentQueue.enqueue(SegmentPersistentQueue.java:16)
	at io.moquette.broker.Session.sendPublishQos2(Session.java:328)
	at io.moquette.broker.Session.sendPublishOnSessionAtQos(Session.java:257)
	at io.moquette.broker.Session.sendNotRetainedPublishOnSessionAtQos(Session.java:243)
	at io.moquette.broker.PostOffice.publishToSession(PostOffice.java:519)
	at io.moquette.broker.PostOffice.publishToSession(PostOffice.java:508)
	at io.moquette.broker.PostOffice.lambda$publish2Subscribers$3(PostOffice.java:482)
	at io.moquette.broker.PostOffice$BatchingPublishesCollector.lambda$routeBatchedPublishes$0(PostOffice.java:439)
	at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
	at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:635)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)

After that there are all sorts of other exceptions, but those are probably caused by the inconsistent state of the store:

java.util.concurrent.ExecutionException: java.nio.BufferUnderflowException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
	at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.BufferUnderflowException: null
	at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:643)
	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165)
	at io.moquette.persistence.SegmentPersistentQueue$SerDes.fromBytes(SegmentPersistentQueue.java:86)
	at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:148)
	at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:16)
	at io.moquette.broker.Session.drainQueueToConnection(Session.java:414)
	at io.moquette.broker.Session.processPubComp(Session.java:230)
	at io.moquette.broker.MQTTConnection.lambda$processPubComp$0(MQTTConnection.java:112)
	at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
	at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:635)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)
	... 2 common frames omitted

I'll try to track down the details.

@hylkevds
Copy link
Collaborator Author

I'm getting closer to identifying the problem. It must be a threading issue. I just got:

java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(23) exceeds writerIndex(23): PooledSlicedByteBuf(ridx: 0, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 56, widx: 56, cap: 64))

Since 0 + 23 definitely does not exceed 23, the numbers must have been changed by another thread, in between the check and the creation of the exception. And that should never happen, and could very well explain the weird symptoms. And also why it doesn't happen in the unit tests.

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 5, 2023

Ok, I found the issue for this Exception. The different threads all work on the same buffer, with the same indexes. Ideally we could make one retainedDuplicate for each thread, This PR just makes a duplicate when reading the buffer: #716. Not quite as nice, but does have the advantage that we don't need to reset the reader index.

However, there are more issues:

19:52:46.403 [Session Executor 11] INFO     i.m.broker.SessionEventLoop - SessionEventLoop Session Executor 11 reached exception in processing command
java.util.concurrent.ExecutionException: java.nio.BufferUnderflowException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:49)
	at io.moquette.broker.SessionEventLoop.run(SessionEventLoop.java:34)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.BufferUnderflowException: null
	at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:643)
	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165)
	at io.moquette.persistence.SegmentPersistentQueue$SerDes.fromBytes(SegmentPersistentQueue.java:87)
	at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:149)
	at io.moquette.persistence.SegmentPersistentQueue.dequeue(SegmentPersistentQueue.java:17)
	at io.moquette.broker.Session.drainQueueToConnection(Session.java:414)
	at io.moquette.broker.Session.processPubComp(Session.java:230)
	at io.moquette.broker.MQTTConnection.lambda$processPubComp$0(MQTTConnection.java:116)
	at io.moquette.broker.SessionCommand.execute(SessionCommand.java:23)
	at io.moquette.broker.PostOffice.lambda$routeCommand$5(PostOffice.java:648)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at io.moquette.broker.SessionEventLoop.executeTask(SessionEventLoop.java:46)
	... 2 common frames omitted

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 5, 2023

Making a duplicate per Command Thread was easier than expected. I've updated #716 :)

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 13, 2023

After chopping up #716 there is one open question left:

Currently each Segment directly operates on a (shared) MemoryMappedBuffer of the entire Page.

I'm wondering if it isn't better to give each Segment a slice() of the mapped (page) buffer. That slice will be operating directly on the page buffer, but will not interfere with the slices of other Segments. For reading and writing, instead of calculating the position relative on the page, it would have to be calculated relative to the slice.

I'm also wondering about the efficiency of the QueuePool.openNextTailSegment(String name) method. It seems to re-open the entire page file buffer for each segment in that page that is opened. Do you know if the back-end implementation is smart enough to not reserve memory for the entire file, for each segment that is opened it in? If we do change the Segment implementation to work on a slice, then we could just map the segment part of the file to the Segment. That may be more memory efficient.

WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant