Skip to content

Commit

Permalink
Fixes moquette-io#455, moquette-io#587: QOS2 CCE and Deadlock
Browse files Browse the repository at this point in the history
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system. Since the queue can not contain PubRelMessages,
drainQueueToConnection can be simplified.
  • Loading branch information
hylkevds committed Jul 4, 2021
1 parent 06c64bf commit 0a81d90
Showing 1 changed file with 27 additions and 29 deletions.
56 changes: 27 additions & 29 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,12 @@ public void processPubRec(int pubRecPacketId) {
return;
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

drainQueueToConnection();
} else {
sessionQueue.add(new SessionRegistry.PubRelMarker());
}
drainQueueToConnection();
}

public void processPubComp(int messageID) {
Expand Down Expand Up @@ -348,18 +342,25 @@ public void resendInflightNotAcked() {
debugLogPacketIds(expired);

for (InFlightPacket notAckPacketId : expired) {
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId);
if (msg == null) {
// Already acked...
continue;
}
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg;
final Topic topic = pubMsg.topic;
final MqttQoS qos = pubMsg.publishingQos;
final ByteBuf payload = pubMsg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
}
}
}

Expand Down Expand Up @@ -400,16 +401,13 @@ private void drainQueueToConnection() {
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(
msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);

// we fetched msg from a map, but the release is cancelled out by the above retain
}
}
Expand Down

0 comments on commit 0a81d90

Please sign in to comment.