Skip to content

Commit

Permalink
Fixes moquette-io#583: inflightSlots counting error
Browse files Browse the repository at this point in the history
When processing an ACK, the inflightSlots should only be increased if it
is the first time the ACK is received for a given message. The ACK may
be received multiple times, the subsequent ACKs should not cause the
count to be increased.
  • Loading branch information
hylkevds committed May 24, 2021
1 parent e5d58a4 commit 038815f
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ public void processPubRec(int packetId) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
Expand All @@ -211,10 +211,9 @@ public void processPubComp(int messageID) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();

// TODO notify the interceptor
Expand Down Expand Up @@ -315,9 +314,9 @@ void pubAckReceived(int ackPacketId) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}

Expand Down

0 comments on commit 038815f

Please sign in to comment.