diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 658929378..c227ef8d6 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -377,7 +377,13 @@ private void drainQueueToConnection() { int sendPacketId = mqttConnection.nextPacketId(); // Putting it in a map, but the retain is cancelled out by the below release. - inflightWindow.put(sendPacketId, msg); + EnqueuedMessage old = inflightWindow.put(sendPacketId, msg); + if (old != null) { + // Something was already there. Release, and free up its slot. + old.release(); + inflightSlots.incrementAndGet(); + } + inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS)); if (msg instanceof SessionRegistry.PubRelMarker) { MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId); mqttConnection.sendIfWritableElseDrop(pubRel);