Skip to content

Commit

Permalink
[Test] Release unpooled buffer after usage to avoid leakage (#862)
Browse files Browse the repository at this point in the history
Fix unpooled buffer leakage in tests.
  • Loading branch information
andsel authored Nov 27, 2024
1 parent 4209906 commit a91fac5
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 11 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/maven_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ name: Java CI with Maven

on:
push:
branches: [ main, mqtt5_development ]
branches: [ main ]
pull_request:
branches: [ main, mqtt5_development ]
branches: [ main ]

jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-latest]
java: [11, 17]
os: [ubuntu-22.04, ubuntu-latest]
java: [17, 21]
arch: [x64] # when ARM will be present add aarch64
fail-fast: false
max-parallel: 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ public void close() {
// Update all not clean session with the proper expiry date
updateNotCleanSessionsWithProperExpire();
queueRepository.close();
pool.values().forEach(Session::cleanUp);
}

private void updateNotCleanSessionsWithProperExpire() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String di
break;
case PUBLISH:
MqttPublishMessage publish = (MqttPublishMessage) msg;
LOG.debug("{} PUBLISH <{}> to topics <{}>", direction, clientID, publish.variableHeader().topicName());
LOG.debug("{} PUBLISH <{}> to topics <{}> qos {} packetId <{}>", direction, clientID,
publish.variableHeader().topicName(), publish.fixedHeader().qosLevel(), publish.variableHeader().packetId()
);
break;
case PUBREC:
case PUBCOMP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.interception.BrokerInterceptor;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.ReferenceCountUtil;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -49,6 +50,7 @@ static void assertConnectAccepted(EmbeddedChannel channel) {
static void verifyReceivePublish(EmbeddedChannel embeddedChannel, String expectedTopic, String expectedContent) {
MqttPublishMessage receivedPublish = embeddedChannel.flushOutbound().readOutbound();
assertPublishIsCorrect(expectedTopic, expectedContent, receivedPublish);
ReferenceCountUtil.release(receivedPublish);
}

private static void assertPublishIsCorrect(String expectedTopic, String expectedContent,
Expand Down
13 changes: 10 additions & 3 deletions broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.ReferenceCountUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,16 +66,18 @@ public void testPubAckDrainMessagesRemainingInQueue() {
testChannel.close();
}

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
private ByteBuf sendQoS1To(Session client, Topic destinationTopic, String message) {
final ByteBuf payload = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, message);
final SessionRegistry.PublishedMessage publishedMessage = new SessionRegistry.PublishedMessage(destinationTopic, MqttQoS.AT_LEAST_ONCE, payload, false, Instant.MAX);
client.sendPublishOnSessionAtQos(publishedMessage);
ReferenceCountUtil.release(payload);
return payload;
}

@Test
public void testFirstResendOfANotAckedMessage() throws InterruptedException {
final Topic destinationTopic = new Topic("/a/b");
sendQoS1To(client, destinationTopic, "Message not ACK-ed at first send!");
ByteBuf payloadCreated = sendQoS1To(client, destinationTopic, "Message not ACK-ed at first send!");
// verify the first time the message is sent
ConnectionTestUtils.verifyReceivePublish(testChannel, destinationTopic.toString(), "Message not ACK-ed at first send!");

Expand All @@ -86,12 +89,14 @@ public void testFirstResendOfANotAckedMessage() throws InterruptedException {

// verify the first time the message is sent
ConnectionTestUtils.verifyReceivePublish(testChannel, destinationTopic.toString(), "Message not ACK-ed at first send!");

ReferenceCountUtil.release(payloadCreated);
}

@Test
public void testSecondResendOfANotAckedMessage() throws InterruptedException {
final Topic destinationTopic = new Topic("/a/b");
sendQoS1To(client, destinationTopic, "Message not ACK-ed at first send!");
ByteBuf payloadCreated = sendQoS1To(client, destinationTopic, "Message not ACK-ed at first send!");
// verify the first time the message is sent
ConnectionTestUtils.verifyReceivePublish(testChannel, destinationTopic.toString(), "Message not ACK-ed at first send!");

Expand All @@ -112,6 +117,8 @@ public void testSecondResendOfANotAckedMessage() throws InterruptedException {

// verify the first time the message is sent
ConnectionTestUtils.verifyReceivePublish(testChannel, destinationTopic.toString(), "Message not ACK-ed at first send!");

ReferenceCountUtil.release(payloadCreated);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ protected void consumePendingPublishAndAcknowledge(String expectedPayload) throw
acknowledge(packetId);
}

private void acknowledge(int packetId) {
protected void acknowledge(int packetId) {
acknowledge(packetId, lowLevelClient);
}

protected void acknowledge(int packetId, Client client) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
false, 0);
MqttPubAckMessage pubAck = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
lowLevelClient.sendMessage(pubAck);
client.sendMessage(pubAck);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void receiveInflightPublishesAfterAReconnect() throws InterruptedExceptio
publishPayload = "Fake Payload";
}
assertEquals("Hello", publishPayload, "The inflight payload from previous subscription MUST be received");
acknowledge(opt.get().variableHeader().packetId(), reconnectingSubscriber);

reconnectingSubscriber.disconnect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
assertNotNull(messageExpiryProperty, "message expiry property must be present");
assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered");

// send PUBACK to release
acknowledge(publish.variableHeader().packetId());
assertTrue(publish.release(), "Last reference of publish should be released");
}

Expand Down
6 changes: 5 additions & 1 deletion broker/src/test/java/io/moquette/testclient/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.ReferenceCountUtil;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -188,7 +189,6 @@ public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInfligh
return doConnect(connectMessage);
}


private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) throws InterruptedException {
this.sendMessage(connectMessage);

Expand Down Expand Up @@ -288,6 +288,10 @@ public MqttMessage subscribeWithError(String topic, MqttQoS qos) {
public void disconnect() {
final MqttMessage disconnectMessage = MqttMessageBuilders.disconnect().build();
sendMessage(disconnectMessage);
// release all queued publishes
for (MqttMessage msg : receivedMessages) {
ReferenceCountUtil.release(msg);
}
}

public void shutdownConnection() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public String getID() {
public void onPublish(InterceptPublishMessage msg) {
final String decodedPayload = msg.getPayload().toString(UTF_8);
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);

// super handle the message buffer count
super.onPublish(msg);
}

@Override
Expand Down

0 comments on commit a91fac5

Please sign in to comment.