Skip to content

Commit

Permalink
Fixes moquette-io#455, moquette-io#587: QOS2 CCE and Deadlock
Browse files Browse the repository at this point in the history
- moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
  PublishedMessage, but they can also be PubRelMarker. This caused a
  ClassCastException.

- moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
  deadlocks the system.
  • Loading branch information
hylkevds committed May 24, 2021
1 parent ca4762d commit c695368
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import io.moquette.broker.SessionRegistry.PubRelMarker;
import io.moquette.broker.SessionRegistry.PublishedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
Expand Down Expand Up @@ -192,18 +193,14 @@ public void processPubRec(int packetId) {
inflightSlots.incrementAndGet();
}

if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
inflightWindow.put(pubRelPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRelPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRelPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
inflightWindow.put(pubRelPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRelPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRelPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

drainQueueToConnection();
} else {
sessionQueue.add(new SessionRegistry.PubRelMarker());
}
drainQueueToConnection();
}

public void processPubComp(int messageID) {
Expand Down Expand Up @@ -331,17 +328,23 @@ public void resendInflightNotAcked() {
debugLogPacketIds(expired);

for (InFlightPacket notAckPacketId : expired) {
if (inflightWindow.containsKey(notAckPacketId.packetId)) {
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId);
if (msg == null) {
// Already acked...
continue;
}
if (msg instanceof PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final PublishedMessage pubMsg = (PublishedMessage) msg;
final Topic topic = pubMsg.topic;
final MqttQoS qos = pubMsg.publishingQos;
final ByteBuf payload = pubMsg.payload;
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);

// It was removed from a store, release.
msg.release();
}
}
}
Expand Down Expand Up @@ -384,16 +387,12 @@ private void drainQueueToConnection() {
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(
msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
// we fetched msg from a map, but the release is cancelled out by the above retain
}
}
Expand Down

0 comments on commit c695368

Please sign in to comment.