From 0a81d903f3cedbfe757a89ee6e76527ec4b3e2ac Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Thu, 25 Feb 2021 15:30:09 +0100 Subject: [PATCH] Fixes #455, #587: QOS2 CCE and Deadlock #455: resendInflightNotAcked() assumed all messages in the inflightWindow are PublishedMessage, but they can also be PubRelMarker. This caused a ClassCastException. #587: When sending PubRelMessages, never put them on the queue, since this deadlocks the system. Since the queue can not contain PubRelMessages, drainQueueToConnection can be simplified. --- .../main/java/io/moquette/broker/Session.java | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index e134f3e07..6d52b16af 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -196,18 +196,12 @@ public void processPubRec(int pubRecPacketId) { return; } - inflightSlots.incrementAndGet(); - if (canSkipQueue()) { - inflightSlots.decrementAndGet(); - inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker()); - inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS)); - MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); - mqttConnection.sendIfWritableElseDrop(pubRel); + inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker()); + inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS)); + MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); + mqttConnection.sendIfWritableElseDrop(pubRel); - drainQueueToConnection(); - } else { - sessionQueue.add(new SessionRegistry.PubRelMarker()); - } + drainQueueToConnection(); } public void processPubComp(int messageID) { @@ -348,18 +342,25 @@ public void resendInflightNotAcked() { debugLogPacketIds(expired); for (InFlightPacket notAckPacketId : expired) { - final SessionRegistry.PublishedMessage msg = - (SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId); + final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId); if (msg == null) { // Already acked... continue; } - final Topic topic = msg.topic; - final MqttQoS qos = msg.publishingQos; - final ByteBuf payload = msg.payload; - MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload); - inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); - mqttConnection.sendPublish(publishMsg); + if (msg instanceof SessionRegistry.PubRelMarker) { + MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId); + inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); + mqttConnection.sendIfWritableElseDrop(pubRel); + } else { + final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg; + final Topic topic = pubMsg.topic; + final MqttQoS qos = pubMsg.publishingQos; + final ByteBuf payload = pubMsg.payload; + final ByteBuf copiedPayload = payload.retainedDuplicate(); + MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload); + inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); + mqttConnection.sendPublish(publishMsg); + } } } @@ -400,16 +401,13 @@ private void drainQueueToConnection() { inflightSlots.incrementAndGet(); } inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS)); - if (msg instanceof SessionRegistry.PubRelMarker) { - MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId); - mqttConnection.sendIfWritableElseDrop(pubRel); - } else { - final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg; - MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(), - msgPub.publishingQos, - msgPub.payload, sendPacketId); - mqttConnection.sendPublish(publishMsg); - } + final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg; + MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId( + msgPub.topic.toString(), + msgPub.publishingQos, + msgPub.payload, sendPacketId); + mqttConnection.sendPublish(publishMsg); + // we fetched msg from a map, but the release is cancelled out by the above retain } }