Skip to content

Commit

Permalink
[DEBUG] Enable message log for client to investigate problem in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 21, 2024
1 parent b26f940 commit 9e44a74
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeout
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("metrics", new MessageMetricsHandler(metricsCollector));
pipeline.addLast("messageLogger", new MQTTMessageLogger());
// pipeline.addLast("messageLogger", new MQTTMessageLogger());
if (metrics.isPresent()) {
pipeline.addLast("wizardMetrics", metrics.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,8 +43,11 @@

public class FlowControlTest extends AbstractServerIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(FlowControlTest.class);

@Test
public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected() throws IOException, InterruptedException {
LOG.info("givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnected");
final int serverSendQuota = 5;

// stop existing broker to restart with receiveMaximum configured
Expand Down Expand Up @@ -105,6 +110,7 @@ public String clientName() {

@Test
public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient() throws InterruptedException {
LOG.info("givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient");
connectLowLevel();

// subscribe with an identifier
Expand Down Expand Up @@ -152,13 +158,16 @@ private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient

@Test
public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException {
LOG.info("givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose");
// connect subscriber and published
// publisher send 20 events, 10 should be in the inflight, 10 remains on the queue
connectLowLevel();

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);

System.out.println("\n\n\n\n");
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
8 changes: 7 additions & 1 deletion broker/src/test/java/io/moquette/testclient/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moquette.testclient;

import io.moquette.BrokerConstants;
import io.moquette.broker.metrics.MQTTMessageLogger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -58,7 +59,7 @@ public interface ICallback {
EventLoopGroup workerGroup;
Channel m_channel;
private boolean m_connectionLost;
private ICallback callback;
private volatile ICallback callback;
private String clientId;
private AtomicReference<MqttMessage> receivedMsg = new AtomicReference<>();
private final BlockingQueue<MqttMessage> receivedMessages = new LinkedBlockingQueue<>();
Expand All @@ -83,6 +84,7 @@ public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("rawcli_decoder", new MqttDecoder());
pipeline.addLast("rawcli_encoder", MqttEncoder.INSTANCE);
pipeline.addLast("messageLogger", new MQTTMessageLogger());
pipeline.addLast("rawcli_handler", handler);
}
});
Expand Down Expand Up @@ -178,6 +180,7 @@ private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) {
latch.countDown();

// clear the callback
LOG.info("Connect callback set null by CONNECT");
setCallback(null);
ReferenceCountUtil.release(msg);
});
Expand Down Expand Up @@ -263,6 +266,7 @@ private void doSubscribe(MqttSubscribeMessage subscribeMessage, long timeout, Ti
subscribeAckLatch.countDown();

// clear the callback
LOG.info("Connect callback set null by SUBSCRIBE");
setCallback(null);
ReferenceCountUtil.release(msg);
});
Expand Down Expand Up @@ -291,6 +295,7 @@ public void publish(MqttPublishMessage publishMessage, int timeout, TimeUnit tim
publishResponseLatch.countDown();

// clear the callback
LOG.info("Connect callback set null by PUBLISH");
setCallback(null);
ReferenceCountUtil.release(msg);
});
Expand Down Expand Up @@ -348,6 +353,7 @@ public MqttMessage lastReceivedMessage() {

void messageReceived(MqttMessage msg) {
LOG.info("Received message {}", msg);
LOG.debug("Callback is {}", callback);
if (this.callback != null) {
this.callback.call(msg);
} else {
Expand Down
8 changes: 5 additions & 3 deletions broker/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ log4j.logger.io.moquette=WARN
#log4j.logger.io.moquette.broker=WARN
log4j.logger.io.moquette.broker.subscriptions.CTrieSpeedTest=INFO
log4j.logger.io.moquette.integration.ServerIntegrationRestartTest=INFO
log4j.logger.io.moquette.integration.mqtt5.FlowControlTest=INFO
log4j.logger.io.moquette.testclient.Client=DEBUG
log4j.logger.BufferManagement=TRACE

# stdout appender is set to be consoleAppender.
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
# Enable TRACE to see the output of ByteBuf tracking
#log4j.appender.stdout.Threshold=TRACE
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Threshold=DEBUG

# for debug trace
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
Expand All @@ -37,8 +39,8 @@ log4j.appender.stdout.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%
# Message Logger Configuration #
#####################################
log4j.appender.messagelog=org.apache.log4j.ConsoleAppender
log4j.appender.messagelog.Threshold=WARN
log4j.appender.messagelog.Threshold=DEBUG
log4j.appender.messagelog.layout=org.apache.log4j.PatternLayout
log4j.appender.messagelog.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%t] %-5p %c{1} %L %x - %m%n

log4j.category.io.moquette.broker.metrics.MQTTMessageLogger=WARN, messagelog
log4j.category.io.moquette.broker.metrics.MQTTMessageLogger=DEBUG, messagelog

0 comments on commit 9e44a74

Please sign in to comment.