Skip to content

Commit

Permalink
Fix missing inflightTimeouts.add(), release of old message
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed May 24, 2021
1 parent 038815f commit ca4762d
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,13 @@ private void drainQueueToConnection() {
int sendPacketId = mqttConnection.nextPacketId();

// Putting it in a map, but the retain is cancelled out by the below release.
inflightWindow.put(sendPacketId, msg);
EnqueuedMessage old = inflightWindow.put(sendPacketId, msg);
if (old != null) {
// Something was already there. Release, and free up its slot.
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
Expand Down

0 comments on commit ca4762d

Please sign in to comment.