Skip to content

Commit

Permalink
Reduced memory use on busy Sessions by removing handled items from in…
Browse files Browse the repository at this point in the history
…flightTimeouts (#834)

Items in the inflightTimeouts DelayQueue were only ever removed when they
timed out. But in normal operation the related messages would have been
handled long before that. There should only ever be a number equal to the
number of inflightSlots in the queue, but the queue would grow to the
maximum number of messages ever handled in a 5 second interval. This made
each session take much more memory than needed.
  • Loading branch information
hylkevds committed May 1, 2024
1 parent 432b3f0 commit d784d67
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions broker/src/main/java/io/moquette/broker/Session.java
Expand Up @@ -192,6 +192,7 @@ boolean isClean() {

public void processPubRec(int pubRecPacketId) {
// Message discarded, make sure any buffers in it are released
cleanFromInflight(pubRecPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId);
if (removed == null) {
LOG.warn("Received a PUBREC with not matching packetId");
Expand All @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) {

public void processPubComp(int messageID) {
// Message discarded, make sure any buffers in it are released
cleanFromInflight(messageID);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed == null) {
LOG.warn("Received a PUBCOMP with not matching packetId");
Expand Down Expand Up @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
cleanFromInflight(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed == null) {
LOG.warn("Received a PUBACK with not matching packetId");
Expand All @@ -355,6 +358,10 @@ void pubAckReceived(int ackPacketId) {
drainQueueToConnection();
}

private void cleanFromInflight(int ackPacketId) {
inflightTimeouts.removeIf(d -> d.packetId == ackPacketId);
}

public void flushAllQueuedMessages() {
drainQueueToConnection();
}
Expand Down Expand Up @@ -495,6 +502,7 @@ public void cleanUp() {
// in case of in memory session queues all contained messages
// has to be released.
sessionQueue.closeAndPurge();
inflightTimeouts.clear();
for (EnqueuedMessage msg : inflightWindow.values()) {
msg.release();
}
Expand Down

0 comments on commit d784d67

Please sign in to comment.