From 8736462e155c11bedaac306844894fe47f5ab8f7 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Thu, 25 Feb 2021 15:30:09 +0100 Subject: [PATCH] Fixes #455, #587: QOS2 CCE and Deadlock - #455: resendInflightNotAcked() assumed all messages in the inflightWindow are PublishedMessage, but they can also be PubRelMarker. This caused a ClassCastException. - #587: When sending PubRelMessages, never put them on the queue, since this deadlocks the system. --- .../main/java/io/moquette/broker/Session.java | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index e6ad29774..f318a4d74 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -18,6 +18,7 @@ import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; import io.moquette.broker.SessionRegistry.EnqueuedMessage; +import io.moquette.broker.SessionRegistry.PubRelMarker; import io.moquette.broker.SessionRegistry.PublishedMessage; import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; @@ -196,18 +197,12 @@ public void processPubRec(int pubRecPacketId) { return; } - inflightSlots.incrementAndGet(); - if (canSkipQueue()) { - inflightSlots.decrementAndGet(); - inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker()); - inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS)); - MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); - mqttConnection.sendIfWritableElseDrop(pubRel); + inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker()); + inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS)); + MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); + mqttConnection.sendIfWritableElseDrop(pubRel); - drainQueueToConnection(); - } else { - sessionQueue.add(new SessionRegistry.PubRelMarker()); - } + drainQueueToConnection(); } public void processPubComp(int messageID) { @@ -348,13 +343,22 @@ public void resendInflightNotAcked() { debugLogPacketIds(expired); for (InFlightPacket notAckPacketId : expired) { - if (inflightWindow.containsKey(notAckPacketId.packetId)) { - final SessionRegistry.PublishedMessage msg = - (SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId); - final Topic topic = msg.topic; - final MqttQoS qos = msg.publishingQos; - final ByteBuf payload = msg.payload; + EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId); + if (msg == null) { + // Already acked... + continue; + } + if (msg instanceof PubRelMarker) { + MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId); + inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); + mqttConnection.sendIfWritableElseDrop(pubRel); + } else { + final PublishedMessage pubMsg = (PublishedMessage) msg; + final Topic topic = pubMsg.topic; + final MqttQoS qos = pubMsg.publishingQos; + final ByteBuf payload = pubMsg.payload; MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload); + inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); mqttConnection.sendPublish(publishMsg); } } @@ -398,16 +402,12 @@ private void drainQueueToConnection() { inflightSlots.incrementAndGet(); } inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS)); - if (msg instanceof SessionRegistry.PubRelMarker) { - MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId); - mqttConnection.sendIfWritableElseDrop(pubRel); - } else { - final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg; - MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(), - msgPub.publishingQos, - msgPub.payload, sendPacketId); - mqttConnection.sendPublish(publishMsg); - } + final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg; + MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId( + msgPub.topic.toString(), + msgPub.publishingQos, + msgPub.payload, sendPacketId); + mqttConnection.sendPublish(publishMsg); // we fetched msg from a map, but the release is cancelled out by the above retain } }