From c555c19dc9a506995f851bad1d8c2d9df20b5cdc Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 27 Nov 2024 17:00:13 +0100 Subject: [PATCH] Finish user properties implementation (#877) Covered publish with user properties with tests and fixed publish of will messages. Updated the creation of Will publish to include the user properties. Also covered with tests. Moved some test utility methods into abstract base class. --- ChangeLog.txt | 1 + .../java/io/moquette/broker/PostOffice.java | 23 +++- .../mqtt5/AbstractServerIntegrationTest.java | 1 + ...ServerIntegrationWithoutClientFixture.java | 33 +++++- .../integration/mqtt5/ConnectTest.java | 44 ++++++-- .../mqtt5/RequestResponseTest.java | 12 +-- .../mqtt5/SharedSubscriptionTest.java | 7 -- .../integration/mqtt5/UserPropertiesTest.java | 102 ++++++++++++++++++ 8 files changed, 194 insertions(+), 29 deletions(-) create mode 100644 broker/src/test/java/io/moquette/integration/mqtt5/UserPropertiesTest.java diff --git a/ChangeLog.txt b/ChangeLog.txt index e9b985295..6c7741633 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877) [feature] Topic alias: implemented handling of topic alias received by publishers. (#873) [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) [feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848) diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index ee764a154..485fcf5dc 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -311,16 +311,33 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession private void publishWill(ISessionsRepository.Will will) { final Instant messageExpiryInstant = willMessageExpiry(will); - MqttPublishMessage willPublishMessage = MqttMessageBuilders.publish() + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish() .topicName(will.topic) .retained(will.retained) .qos(will.qos) - .payload(Unpooled.copiedBuffer(will.payload)) - .build(); + .payload(Unpooled.copiedBuffer(will.payload)); + + if (will.properties.userProperties().isPresent()) { + Map willUserProperties = will.properties.userProperties().get(); + if (!willUserProperties.isEmpty()) { + publishBuilder.properties(copyWillUserProperties(willUserProperties)); + } + } + MqttPublishMessage willPublishMessage = publishBuilder.build(); publish2Subscribers(WILL_PUBLISHER, messageExpiryInstant, willPublishMessage); } + private static MqttProperties copyWillUserProperties(Map willUserProperties) { + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (Map.Entry userProperty : willUserProperties.entrySet()) { + userProperties.add(userProperty.getKey(), userProperty.getValue()); + } + final MqttProperties willProperties = new MqttProperties(); + willProperties.add(userProperties); + return willProperties; + } + private static Instant willMessageExpiry(ISessionsRepository.Will will) { Optional messageExpiryOpt = will.properties.messageExpiry(); if (messageExpiryOpt.isPresent()) { diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index 06f1919ce..9e7da1a4a 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -13,6 +13,7 @@ import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java index 0d48821fa..da0b0e3f1 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java @@ -26,6 +26,10 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; import io.moquette.broker.Server; import io.moquette.broker.config.IConfig; import io.moquette.broker.config.MemoryConfig; @@ -50,7 +54,10 @@ import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class AbstractServerIntegrationWithoutClientFixture { @@ -65,6 +72,11 @@ public static void beforeTests() { Awaitility.setDefaultTimeout(Durations.ONE_SECOND); } + static void verifyPayloadInUTF8(Mqtt5Publish msgPub, String expectedPayload) { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + assertEquals(expectedPayload, new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8)); + } + @BeforeEach public void setUp() throws Exception { dbPath = IntegrationUtils.tempH2Path(tempFolder); @@ -138,6 +150,25 @@ static Mqtt5BlockingClient createPublisherClient() { return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher"); } + static void subscribeToAtQos1(Mqtt5BlockingClient subscriber, String topicFilter) { + Mqtt5SubAck subAck = subscribe(subscriber, topicFilter, MqttQos.AT_LEAST_ONCE); + assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1); + } + + static Mqtt5SubAck subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) { + return subscriberClient.subscribeWith() + .topicFilter(topicFilter) + .qos(mqttQos) + .send(); + } + + static void verifyPublishSucceeded(Mqtt5PublishResult publishResult) { + assertTrue(publishResult instanceof Mqtt5PublishResult.Mqtt5Qos1Result, "QoS1 Response must be present"); + Mqtt5PublishResult.Mqtt5Qos1Result qos1Result = (Mqtt5PublishResult.Mqtt5Qos1Result) publishResult; + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, qos1Result.getPubAck().getReasonCode(), + "Publish can't be accepted by the broker"); + } + protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer action, Duration timeout, String message) throws InterruptedException { try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { action.accept(null); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index a24efe2ee..a977e7693 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -157,14 +157,42 @@ public void fireWillAfterTheDelaySpecifiedInConnectPropertiesAndMessageExpiry() TestUtils.verifyPublishedMessage(testamentSubscriber, 10, (Mqtt5Publish message) -> { - final String payload = new String(message.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("Goodbye", payload, "Will message must be received"); + verifyPayloadInUTF8(message, "Goodbye"); long expiry = message.getMessageExpiryInterval().orElse(-1L); assertEquals(messageExpiry, expiry); }); } + @Test + public void givenWillPropertiesWithUserPropertySetDuringConnectionWhenWillPublishIsTriggeredThenAlsoTheUserPropertiesAreSent() throws InterruptedException { + int messageExpiry = 5; + Mqtt5ConnectBuilder connectBuilder = defaultWillBuilder(1) + .messageExpiryInterval(messageExpiry) + .userProperties() + .add("content-type", "text/plain") + .applyUserProperties() + .applyWillPublish(); + + final Mqtt5BlockingClient clientWithWill = + createAndConnectWithBuilder("simple_client", connectBuilder); + + final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); + + // schedule a bad disconnect + scheduleDisconnectWithErrorCode(clientWithWill, Duration.ofMillis(500)); + + TestUtils.verifyPublishedMessage(testamentSubscriber, 10, + (Mqtt5Publish message) -> { + verifyPayloadInUTF8(message, "Goodbye"); + + long expiry = message.getMessageExpiryInterval().orElse(-1L); + assertEquals(messageExpiry, expiry); + + UserPropertiesTest.verifyContainUserProperty(message, "content-type", "text/plain"); + }); + } + private static void verifyPublishedMessage(Mqtt5BlockingClient subscriber, int timeout, String message) throws InterruptedException { try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { Optional publishMessage = publishes.receive(timeout, TimeUnit.SECONDS); @@ -314,7 +342,7 @@ private Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clien @NotNull private static Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clientId, int delayInSeconds) { - Mqtt5WillPublishBuilder.Nested.Complete willPublishBuilder = deafaultWillBuilder(delayInSeconds); + Mqtt5WillPublishBuilder.Nested.Complete willPublishBuilder = defaultWillBuilder(delayInSeconds); Mqtt5ConnectBuilder connectBuilder = willPublishBuilder.applyWillPublish(); @@ -322,7 +350,7 @@ private static Mqtt5BlockingClient createAndConnectClientWithWillTestament(Strin } @NotNull - private static Mqtt5WillPublishBuilder.Nested.Complete deafaultWillBuilder(int delayInSeconds) { + private static Mqtt5WillPublishBuilder.Nested.Complete defaultWillBuilder(int delayInSeconds) { Mqtt5WillPublishBuilder.Nested.Complete willPublishBuilder = Mqtt5Connect.builder() .keepAlive(10) .willPublish() @@ -337,10 +365,10 @@ private static Mqtt5WillPublishBuilder.Nested.Complete willPublishBuilder = deafaultWillBuilder(delayInSeconds); + static Mqtt5BlockingClient createAndConnectClientWithWillTestamentAndMessageExpiry(String clientId, + int delayInSeconds, + int messageExpirySeconds) { + Mqtt5WillPublishBuilder.Nested.Complete willPublishBuilder = defaultWillBuilder(delayInSeconds); willPublishBuilder.messageExpiryInterval(messageExpirySeconds); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index f56a7296d..469fd6704 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -77,10 +76,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo .payload("OK".getBytes(StandardCharsets.UTF_8)) .qos(MqttQos.AT_LEAST_ONCE) .send(); - assertTrue(responseResult instanceof Mqtt5PublishResult.Mqtt5Qos1Result, "QoS1 Response must be present"); - Mqtt5PublishResult.Mqtt5Qos1Result qos1Result = (Mqtt5PublishResult.Mqtt5Qos1Result) responseResult; - assertEquals(Mqtt5PubAckReasonCode.SUCCESS, qos1Result.getPubAck().getReasonCode(), - "Open door response cannot be published "); + AbstractServerIntegrationWithoutClientFixture.verifyPublishSucceeded(responseResult); }); // wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost @@ -97,11 +93,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo } private static void subscribeToResponseTopic(Mqtt5BlockingClient requester, String responseTopic) { - Mqtt5SubAck subAck = requester.subscribeWith() - .topicFilter(responseTopic) - .qos(MqttQos.AT_LEAST_ONCE) - .send(); - assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1); + subscribeToAtQos1(requester, responseTopic); } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java index 47016d496..48bedfc4b 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java @@ -239,13 +239,6 @@ private static void publish(Mqtt5BlockingClient publisherClient, String topicNam .send(); } - static void subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) { - subscriberClient.subscribeWith() - .topicFilter(topicFilter) - .qos(mqttQos) - .send(); - } - @Test public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeThenTheSharedSubscriptionRemainsValid() throws Exception { String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living"; diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/UserPropertiesTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/UserPropertiesTest.java new file mode 100644 index 000000000..33924c759 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/UserPropertiesTest.java @@ -0,0 +1,102 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.integration.mqtt5; + +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UserPropertiesTest extends AbstractServerIntegrationTest { + @Override + public String clientName() { + return "unused"; + } + + @Test + public void givenSubscriberWhenPublishWithUserPropertiesMatchingTheTopicFilterArrivesThenUserPropertiesReachTheSubscriber() throws InterruptedException { + final Mqtt5BlockingClient publisher = createHiveBlockingClient("publisher"); + final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber"); + subscribeToAtQos1(subscriber, "some/interesting/thing"); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + + Mqtt5PublishResult publishResult = publisher.publishWith() + .topic("some/interesting/thing") + .payload("OK".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .userProperties() + .add("content-type", "application/plain") + .applyUserProperties() + .send(); + verifyPublishSucceeded(publishResult); + + verifyPublishMessage(publishes, receivedPub -> { + verifyPayloadInUTF8(receivedPub, "OK"); + verifyContainUserProperty(receivedPub, "content-type", "application/plain"); + }); + } + } + + @Test + public void givenRetainedPublishWithUserPropertiesWhenClientSubscribesToMatchingTheTopicFilterThenUserPropertiesReachTheSubscriber() throws InterruptedException { + final Mqtt5BlockingClient publisher = createHiveBlockingClient("publisher"); + Mqtt5PublishResult publishResult = publisher.publishWith() + .topic("some/interesting/thing") + .payload("OK".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .retain(true) + .userProperties() + .add("content-type", "application/plain") + .applyUserProperties() + .send(); + verifyPublishSucceeded(publishResult); + + final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber"); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + subscribeToAtQos1(subscriber, "some/interesting/thing"); + + verifyPublishMessage(publishes, receivedPub -> { + verifyPayloadInUTF8(receivedPub, "OK"); + verifyContainUserProperty(receivedPub, "content-type", "application/plain"); + }); + } + } + + protected static void verifyContainUserProperty(Mqtt5Publish receivedPub, String expectedName, String expectedValue) { + Optional userProp = receivedPub.getUserProperties().asList() + .stream() + .filter(prop -> prop.getName().toString().equals(expectedName)) + .findFirst(); + assertTrue(userProp.isPresent(), "Expected a user property named 'content-type'"); + String propertyValue = userProp + .map(Mqtt5UserProperty::getValue) + .map(Object::toString) + .orElse(""); + assertEquals(expectedValue, propertyValue); + } +}