Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reporting of processed commands in ITs #3677

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -75,7 +75,6 @@
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.proton.ProtonHelper;


/**
* Integration tests for sending commands to device connected to the MQTT adapter.
*
Expand Down Expand Up @@ -164,7 +163,7 @@ public void testSendOneWayCommandSucceeds(
final String commandTargetDeviceId = endpointConfig.isSubscribeAsGateway()
? helper.setupGatewayDeviceBlocking(tenantId, deviceId, 5)
: deviceId;
final Checkpoint commandsReceived = ctx.checkpoint(COMMANDS_TO_SEND);
final var remainingCommandsToBeProcessed = new CountDownLatch(COMMANDS_TO_SEND);

final AtomicInteger counter = new AtomicInteger();
testSendCommandSucceeds(ctx, commandTargetDeviceId, msg -> {
Expand All @@ -173,7 +172,7 @@ public void testSendOneWayCommandSucceeds(
ctx.verify(() -> {
endpointConfig.assertCommandPublishTopicStructure(topic, commandTargetDeviceId, true, "setValue");
});
commandsReceived.flag();
remainingCommandsToBeProcessed.countDown();
}, payload -> {
counter.incrementAndGet();
return helper.sendOneWayCommand(
Expand All @@ -183,7 +182,7 @@ public void testSendOneWayCommandSucceeds(
"text/plain",
payload,
helper.getSendCommandTimeout(counter.get() == 1));
}, endpointConfig, COMMANDS_TO_SEND, MqttQoS.AT_MOST_ONCE);
}, endpointConfig, remainingCommandsToBeProcessed, MqttQoS.AT_MOST_ONCE);
}

/**
Expand All @@ -196,6 +195,7 @@ public void testSendOneWayCommandSucceeds(
*/
@ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("allCombinations")
@Timeout(timeUnit = TimeUnit.SECONDS, value = 10)
public void testSendCommandSucceedsWithQos0(
final MqttCommandEndpointConfiguration endpointConfig,
final VertxTestContext ctx) throws InterruptedException {
Expand All @@ -213,6 +213,7 @@ public void testSendCommandSucceedsWithQos0(
*/
@ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("allCombinations")
@Timeout(timeUnit = TimeUnit.SECONDS, value = 10)
public void testSendCommandSucceedsWithQos1(
final MqttCommandEndpointConfiguration endpointConfig,
final VertxTestContext ctx) throws InterruptedException {
Expand All @@ -230,6 +231,7 @@ private void testSendCommandSucceeds(
: deviceId;

final AtomicInteger counter = new AtomicInteger();
final var remainingCommandsToBeProcessed = new CountDownLatch(COMMANDS_TO_SEND);
testSendCommandSucceeds(ctx, commandTargetDeviceId, msg -> {
LOGGER.trace("received command [{}]", msg.topicName());
final ResourceIdentifier topic = ResourceIdentifier.fromString(msg.topicName());
Expand Down Expand Up @@ -267,9 +269,10 @@ private void testSendCommandSucceeds(
DownstreamMessageAssertions.assertCommandAndControlApiProperties(
response, tenantId, commandTargetDeviceId);
});
remainingCommandsToBeProcessed.countDown();
return (Void) null;
});
}, endpointConfig, COMMANDS_TO_SEND, qos);
}, endpointConfig, remainingCommandsToBeProcessed, qos);
}

private void testSendCommandSucceeds(
Expand All @@ -278,27 +281,29 @@ private void testSendCommandSucceeds(
final Handler<MqttPublishMessage> commandConsumer,
final Function<Buffer, Future<Void>> commandSender,
final MqttCommandEndpointConfiguration endpointConfig,
final int totalNoOfCommandsToSend,
final CountDownLatch remainingCommandsToBeProcessed,
final MqttQoS subscribeQos) throws InterruptedException {

final VertxTestContext setup = new VertxTestContext();
final Checkpoint ready = setup.checkpoint(2);

helper.registry
.addDeviceToTenant(tenantId, deviceId, password)
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.compose(ok -> createConsumer(tenantId, msg -> {
// expect empty notification with TTD -1
setup.verify(() -> assertThat(msg.getContentType()).isEqualTo(EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION));
final TimeUntilDisconnectNotification notification = msg.getTimeUntilDisconnectNotification().orElse(null);
LOGGER.info("received notification [{}]", notification);
setup.verify(() -> assertThat(notification).isNotNull());
if (notification.getTtd() == -1) {
ready.flag();
}
}))
.compose(conAck -> subscribeToCommands(commandTargetDeviceId, commandConsumer, endpointConfig, subscribeQos))
.onComplete(setup.succeeding(ok -> ready.flag()));
.addDeviceToTenant(tenantId, deviceId, password)
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.compose(ok -> createConsumer(tenantId, msg -> {
// expect empty notification with TTD -1
setup.verify(() -> assertThat(msg.getContentType())
.isEqualTo(EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION));
final TimeUntilDisconnectNotification notification = msg.getTimeUntilDisconnectNotification()
.orElse(null);
LOGGER.info("received notification [{}]", notification);
setup.verify(() -> assertThat(notification).isNotNull());
if (notification.getTtd() == -1) {
ready.flag();
}
}))
.compose(conAck -> subscribeToCommands(commandTargetDeviceId, commandConsumer, endpointConfig, subscribeQos))
.onComplete(setup.succeeding(ok -> ready.flag()));

assertWithMessage("setup of adapter finished within %s seconds", IntegrationTestSupport.getTestSetupTimeout())
.that(setup.awaitCompletion(IntegrationTestSupport.getTestSetupTimeout(), TimeUnit.SECONDS))
Expand All @@ -307,30 +312,23 @@ private void testSendCommandSucceeds(
ctx.failNow(setup.causeOfFailure());
return;
}

final Checkpoint sendCommandsSucceeded = ctx.checkpoint();
final CountDownLatch commandsSucceeded = new CountDownLatch(totalNoOfCommandsToSend);
final var totalNoOfCommandsToSend = remainingCommandsToBeProcessed.getCount();
final AtomicInteger commandsSent = new AtomicInteger(0);
final AtomicLong lastReceivedTimestamp = new AtomicLong(0);
final AtomicLong lastSentTimestamp = new AtomicLong(0);
final long start = System.currentTimeMillis();

while (commandsSent.get() < totalNoOfCommandsToSend) {
final CountDownLatch commandSent = new CountDownLatch(1);
context.runOnContext(go -> {
commandsSent.getAndIncrement();
commandsSent.incrementAndGet();
final Buffer msg = commandsSent.get() % 2 == 0
? Buffer.buffer("value: " + commandsSent.get())
: null; // use 'null' payload for half the commands, ensuring such commands also get forwarded
commandSender.apply(msg).onComplete(sendAttempt -> {
if (sendAttempt.failed()) {
LOGGER.info("error sending command {}", commandsSent.get(), sendAttempt.cause());
} else {
lastReceivedTimestamp.set(System.currentTimeMillis());
commandsSucceeded.countDown();
if (commandsSucceeded.getCount() % 20 == 0) {
LOGGER.info("commands succeeded: {}", totalNoOfCommandsToSend - commandsSucceeded.getCount());
}
}
lastSentTimestamp.set(System.currentTimeMillis());
if (commandsSent.get() % 20 == 0) {
LOGGER.info("commands sent: " + commandsSent.get());
}
Expand All @@ -342,20 +340,20 @@ private void testSendCommandSucceeds(
}

final long timeToWait = totalNoOfCommandsToSend * 200;
if (!commandsSucceeded.await(timeToWait, TimeUnit.MILLISECONDS)) {
LOGGER.info("Timeout of {} milliseconds reached, stop waiting for commands to succeed", timeToWait);
if (!remainingCommandsToBeProcessed.await(timeToWait, TimeUnit.MILLISECONDS)) {
LOGGER.info("Timeout of {} milliseconds reached, stop waiting for commands to be processed", timeToWait);
}
if (lastReceivedTimestamp.get() == 0L) {
if (lastSentTimestamp.get() == 0L) {
// no message has been received at all
lastReceivedTimestamp.set(System.currentTimeMillis());
lastSentTimestamp.set(System.currentTimeMillis());
}
final long commandsCompleted = totalNoOfCommandsToSend - commandsSucceeded.getCount();
LOGGER.info("commands sent: {}, commands succeeded: {} after {} milliseconds",
commandsSent.get(), commandsCompleted, lastReceivedTimestamp.get() - start);
if (commandsCompleted == commandsSent.get()) {
sendCommandsSucceeded.flag();
final long commandsProcessed = totalNoOfCommandsToSend - remainingCommandsToBeProcessed.getCount();
LOGGER.info("commands sent: {}, commands processed: {} after {} milliseconds",
commandsSent.get(), commandsProcessed, lastSentTimestamp.get() - start);
if (commandsProcessed == commandsSent.get()) {
ctx.completeNow();
} else {
ctx.failNow(new IllegalStateException("did not complete all commands sent"));
ctx.failNow(new IllegalStateException("device did not process all commands that had been sent"));
}
}

Expand Down Expand Up @@ -394,18 +392,18 @@ public void testSendCommandViaAmqpFailsForMalformedMessage(
ready.flag();
}
})
.compose(consumer -> helper.registry.addDeviceToTenant(tenantId, deviceId, password))
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.compose(conAck -> subscribeToCommands(commandTargetDeviceId, msg -> {
// all commands should get rejected because they fail to pass the validity check
ctx.failNow(new IllegalStateException("should not have received command"));
}, endpointConfig, MqttQoS.AT_MOST_ONCE))
.compose(ok -> helper.createGenericAmqpMessageSender(endpointConfig.getNorthboundEndpoint(), tenantId))
.onComplete(setup.succeeding(genericSender -> {
LOGGER.debug("created generic sender for sending commands [target address: {}]", linkTargetAddress);
amqpCmdSenderRef.set(genericSender);
ready.flag();
}));
.compose(consumer -> helper.registry.addDeviceToTenant(tenantId, deviceId, password))
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.compose(conAck -> subscribeToCommands(commandTargetDeviceId, msg -> {
// all commands should get rejected because they fail to pass the validity check
ctx.failNow(new IllegalStateException("should not have received command"));
}, endpointConfig, MqttQoS.AT_MOST_ONCE))
.compose(ok -> helper.createGenericAmqpMessageSender(endpointConfig.getNorthboundEndpoint(), tenantId))
.onComplete(setup.succeeding(genericSender -> {
LOGGER.debug("created generic sender for sending commands [target address: {}]", linkTargetAddress);
amqpCmdSenderRef.set(genericSender);
ready.flag();
}));

assertWithMessage("setup of adapter finished within %s seconds", IntegrationTestSupport.getTestSetupTimeout())
.that(setup.awaitCompletion(IntegrationTestSupport.getTestSetupTimeout(), TimeUnit.SECONDS))
Expand All @@ -423,10 +421,11 @@ public void testSendCommandViaAmqpFailsForMalformedMessage(
messageWithoutSubject.setAddress(messageAddress);
messageWithoutSubject.setMessageId("message-id");
messageWithoutSubject.setReplyTo("reply/to/address");
amqpCmdSenderRef.get().sendAndWaitForOutcome(messageWithoutSubject, NoopSpan.INSTANCE).onComplete(ctx.failing(t -> {
ctx.verify(() -> assertThat(t).isInstanceOf(ClientErrorException.class));
failedAttempts.flag();
}));
amqpCmdSenderRef.get().sendAndWaitForOutcome(messageWithoutSubject, NoopSpan.INSTANCE)
.onComplete(ctx.failing(t -> {
ctx.verify(() -> assertThat(t).isInstanceOf(ClientErrorException.class));
failedAttempts.flag();
}));

LOGGER.debug("sending command message lacking message ID and correlation ID");
final Message messageWithoutId = ProtonHelper.message("input data");
Expand Down Expand Up @@ -465,12 +464,13 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
final VertxTestContext setup = new VertxTestContext();
final Checkpoint ready = setup.checkpoint(2);

final Future<MessageConsumer> kafkaAsyncErrorResponseConsumer = helper.createDeliveryFailureCommandResponseConsumer(
ctx,
tenantId,
HttpURLConnection.HTTP_BAD_REQUEST,
response -> expectedCommandResponses.countDown(),
null);
final Future<MessageConsumer> kafkaAsyncErrorResponseConsumer = helper
.createDeliveryFailureCommandResponseConsumer(
ctx,
tenantId,
HttpURLConnection.HTTP_BAD_REQUEST,
response -> expectedCommandResponses.countDown(),
null);

createConsumer(tenantId, msg -> {
// expect empty notification with TTD -1
Expand Down Expand Up @@ -505,8 +505,7 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
final Map<String, Object> properties1 = Map.of(
MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId,
MessageHelper.SYS_PROPERTY_CONTENT_TYPE, MessageHelper.CONTENT_TYPE_OCTET_STREAM,
KafkaRecordHelper.HEADER_RESPONSE_REQUIRED, true
);
KafkaRecordHelper.HEADER_RESPONSE_REQUIRED, true);
kafkaSenderRef.get().sendAndWaitForOutcome(commandTopic, tenantId, deviceId, Buffer.buffer(), properties1)
.onComplete(ctx.succeeding(ok -> {}));

Expand All @@ -516,8 +515,7 @@ public void testSendCommandViaKafkaFailsForMalformedMessage(
MessageHelper.SYS_PROPERTY_CORRELATION_ID, correlationId,
MessageHelper.APP_PROPERTY_DEVICE_ID, deviceId,
MessageHelper.SYS_PROPERTY_CONTENT_TYPE, MessageHelper.CONTENT_TYPE_OCTET_STREAM,
KafkaRecordHelper.HEADER_RESPONSE_REQUIRED, true
);
KafkaRecordHelper.HEADER_RESPONSE_REQUIRED, true);
kafkaSenderRef.get().sendAndWaitForOutcome(commandTopic, tenantId, deviceId, Buffer.buffer(), properties2)
.onComplete(ctx.succeeding(ok -> {}));

Expand Down Expand Up @@ -576,7 +574,7 @@ public void testSendCommandFailsForCommandNotAcknowledgedByDevice(
counter.incrementAndGet();
return helper.sendCommand(tenantId, commandTargetDeviceId, "setValue", "text/plain", payload,
helper.getSendCommandTimeout(counter.get() == 1))
.mapEmpty();
.mapEmpty();
};

helper.registry
Expand Down Expand Up @@ -703,16 +701,16 @@ private Future<Void> injectMqttClientPubAckBlocker(final AtomicBoolean outboundP
final NetSocketInternal connection = (NetSocketInternal) connectionMethod.invoke(mqttClient);
connection.channelHandlerContext().pipeline().addBefore("handler", "OutboundPubAckBlocker",
new ChannelOutboundHandlerAdapter() {
@Override
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
throws Exception {
if (outboundPubAckBlocked.get() && msg instanceof io.netty.handler.codec.mqtt.MqttPubAckMessage) {
LOGGER.debug("suppressing PubAck, message id: {}", ((MqttPubAckMessage) msg).variableHeader().messageId());
} else {
super.write(ctx, msg, promise);
}
}
});
} else {
super.write(ctx, msg, promise);
}
}
});
return Future.succeededFuture();
} catch (final Exception e) {
LOGGER.error("failed to inject PubAck blocking handler");
Expand Down