Skip to content

Commit

Permalink
Fixed tests and core to avoid buffer leakage
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 17, 2024
1 parent 24dc2c3 commit fda7eaf
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ public void close() {
// Update all not clean session with the proper expiry date
updateNotCleanSessionsWithProperExpire();
queueRepository.close();
pool.values().forEach(Session::cleanUp);
}

private void updateNotCleanSessionsWithProperExpire() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String di
break;
case PUBLISH:
MqttPublishMessage publish = (MqttPublishMessage) msg;
LOG.debug("{} PUBLISH <{}> to topics <{}>", direction, clientID, publish.variableHeader().topicName());
LOG.debug("{} PUBLISH <{}> to topics <{}> qos {} packetId <{}>", direction, clientID,
publish.variableHeader().topicName(), publish.fixedHeader().qosLevel(), publish.variableHeader().packetId()
);
break;
case PUBREC:
case PUBCOMP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
assertNotNull(messageExpiryProperty, "message expiry property must be present");
assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered");

// send PUBACK to release
acknowledge(publish.variableHeader().packetId());
assertTrue(publish.release(), "Last reference of publish should be released");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public String getID() {
public void onPublish(InterceptPublishMessage msg) {
final String decodedPayload = msg.getPayload().toString(UTF_8);
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);

// super handle the message buffer count
super.onPublish(msg);
}

@Override
Expand Down

0 comments on commit fda7eaf

Please sign in to comment.