Skip to content

Commit

Permalink
Finish user properties implementation (#877)
Browse files Browse the repository at this point in the history
Covered publish with user properties with tests and fixed publish of will messages.

Updated the creation of Will publish to include the user properties.
Also covered with tests.
Moved some test utility methods into abstract base class.
  • Loading branch information
andsel authored Nov 27, 2024
1 parent 0be6cfe commit c555c19
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 29 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877)
[feature] Topic alias: implemented handling of topic alias received by publishers. (#873)
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
[feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848)
Expand Down
23 changes: 20 additions & 3 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,33 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession

private void publishWill(ISessionsRepository.Will will) {
final Instant messageExpiryInstant = willMessageExpiry(will);
MqttPublishMessage willPublishMessage = MqttMessageBuilders.publish()
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish()
.topicName(will.topic)
.retained(will.retained)
.qos(will.qos)
.payload(Unpooled.copiedBuffer(will.payload))
.build();
.payload(Unpooled.copiedBuffer(will.payload));

if (will.properties.userProperties().isPresent()) {
Map<String, String> willUserProperties = will.properties.userProperties().get();
if (!willUserProperties.isEmpty()) {
publishBuilder.properties(copyWillUserProperties(willUserProperties));
}
}
MqttPublishMessage willPublishMessage = publishBuilder.build();

publish2Subscribers(WILL_PUBLISHER, messageExpiryInstant, willPublishMessage);
}

private static MqttProperties copyWillUserProperties(Map<String, String> willUserProperties) {
MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
for (Map.Entry<String, String> userProperty : willUserProperties.entrySet()) {
userProperties.add(userProperty.getKey(), userProperty.getValue());
}
final MqttProperties willProperties = new MqttProperties();
willProperties.add(userProperties);
return willProperties;
}

private static Instant willMessageExpiry(ISessionsRepository.Will will) {
Optional<Duration> messageExpiryOpt = will.properties.messageExpiry();
if (messageExpiryOpt.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
Expand All @@ -50,7 +54,10 @@
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class AbstractServerIntegrationWithoutClientFixture {

Expand All @@ -65,6 +72,11 @@ public static void beforeTests() {
Awaitility.setDefaultTimeout(Durations.ONE_SECOND);
}

static void verifyPayloadInUTF8(Mqtt5Publish msgPub, String expectedPayload) {
assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present");
assertEquals(expectedPayload, new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8));
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
Expand Down Expand Up @@ -138,6 +150,25 @@ static Mqtt5BlockingClient createPublisherClient() {
return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher");
}

static void subscribeToAtQos1(Mqtt5BlockingClient subscriber, String topicFilter) {
Mqtt5SubAck subAck = subscribe(subscriber, topicFilter, MqttQos.AT_LEAST_ONCE);
assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1);
}

static Mqtt5SubAck subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) {
return subscriberClient.subscribeWith()
.topicFilter(topicFilter)
.qos(mqttQos)
.send();
}

static void verifyPublishSucceeded(Mqtt5PublishResult publishResult) {
assertTrue(publishResult instanceof Mqtt5PublishResult.Mqtt5Qos1Result, "QoS1 Response must be present");
Mqtt5PublishResult.Mqtt5Qos1Result qos1Result = (Mqtt5PublishResult.Mqtt5Qos1Result) publishResult;
assertEquals(Mqtt5PubAckReasonCode.SUCCESS, qos1Result.getPubAck().getReasonCode(),
"Publish can't be accepted by the broker");
}

protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,42 @@ public void fireWillAfterTheDelaySpecifiedInConnectPropertiesAndMessageExpiry()

TestUtils.verifyPublishedMessage(testamentSubscriber, 10,
(Mqtt5Publish message) -> {
final String payload = new String(message.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("Goodbye", payload, "Will message must be received");
verifyPayloadInUTF8(message, "Goodbye");

long expiry = message.getMessageExpiryInterval().orElse(-1L);
assertEquals(messageExpiry, expiry);
});
}

@Test
public void givenWillPropertiesWithUserPropertySetDuringConnectionWhenWillPublishIsTriggeredThenAlsoTheUserPropertiesAreSent() throws InterruptedException {
int messageExpiry = 5;
Mqtt5ConnectBuilder connectBuilder = defaultWillBuilder(1)
.messageExpiryInterval(messageExpiry)
.userProperties()
.add("content-type", "text/plain")
.applyUserProperties()
.applyWillPublish();

final Mqtt5BlockingClient clientWithWill =
createAndConnectWithBuilder("simple_client", connectBuilder);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();

// schedule a bad disconnect
scheduleDisconnectWithErrorCode(clientWithWill, Duration.ofMillis(500));

TestUtils.verifyPublishedMessage(testamentSubscriber, 10,
(Mqtt5Publish message) -> {
verifyPayloadInUTF8(message, "Goodbye");

long expiry = message.getMessageExpiryInterval().orElse(-1L);
assertEquals(messageExpiry, expiry);

UserPropertiesTest.verifyContainUserProperty(message, "content-type", "text/plain");
});
}

private static void verifyPublishedMessage(Mqtt5BlockingClient subscriber, int timeout, String message) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeout, TimeUnit.SECONDS);
Expand Down Expand Up @@ -314,15 +342,15 @@ private Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clien

@NotNull
private static Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clientId, int delayInSeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = deafaultWillBuilder(delayInSeconds);
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = defaultWillBuilder(delayInSeconds);

Mqtt5ConnectBuilder connectBuilder = willPublishBuilder.applyWillPublish();

return createAndConnectWithBuilder(clientId, connectBuilder);
}

@NotNull
private static Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> deafaultWillBuilder(int delayInSeconds) {
private static Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> defaultWillBuilder(int delayInSeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = Mqtt5Connect.builder()
.keepAlive(10)
.willPublish()
Expand All @@ -337,10 +365,10 @@ private static Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBui
}

@NotNull
private static Mqtt5BlockingClient createAndConnectClientWithWillTestamentAndMessageExpiry(String clientId,
int delayInSeconds,
int messageExpirySeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = deafaultWillBuilder(delayInSeconds);
static Mqtt5BlockingClient createAndConnectClientWithWillTestamentAndMessageExpiry(String clientId,
int delayInSeconds,
int messageExpirySeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = defaultWillBuilder(delayInSeconds);

willPublishBuilder.messageExpiryInterval(messageExpirySeconds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -77,10 +76,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo
.payload("OK".getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertTrue(responseResult instanceof Mqtt5PublishResult.Mqtt5Qos1Result, "QoS1 Response must be present");
Mqtt5PublishResult.Mqtt5Qos1Result qos1Result = (Mqtt5PublishResult.Mqtt5Qos1Result) responseResult;
assertEquals(Mqtt5PubAckReasonCode.SUCCESS, qos1Result.getPubAck().getReasonCode(),
"Open door response cannot be published ");
AbstractServerIntegrationWithoutClientFixture.verifyPublishSucceeded(responseResult);
});

// wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost
Expand All @@ -97,11 +93,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo
}

private static void subscribeToResponseTopic(Mqtt5BlockingClient requester, String responseTopic) {
Mqtt5SubAck subAck = requester.subscribeWith()
.topicFilter(responseTopic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1);
subscribeToAtQos1(requester, responseTopic);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,6 @@ private static void publish(Mqtt5BlockingClient publisherClient, String topicNam
.send();
}

static void subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) {
subscriberClient.subscribeWith()
.topicFilter(topicFilter)
.qos(mqttQos)
.send();
}

@Test
public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeThenTheSharedSubscriptionRemainsValid() throws Exception {
String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
*
* Copyright (c) 2012-2024 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*
*/

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UserPropertiesTest extends AbstractServerIntegrationTest {
@Override
public String clientName() {
return "unused";
}

@Test
public void givenSubscriberWhenPublishWithUserPropertiesMatchingTheTopicFilterArrivesThenUserPropertiesReachTheSubscriber() throws InterruptedException {
final Mqtt5BlockingClient publisher = createHiveBlockingClient("publisher");
final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber");
subscribeToAtQos1(subscriber, "some/interesting/thing");
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {

Mqtt5PublishResult publishResult = publisher.publishWith()
.topic("some/interesting/thing")
.payload("OK".getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.userProperties()
.add("content-type", "application/plain")
.applyUserProperties()
.send();
verifyPublishSucceeded(publishResult);

verifyPublishMessage(publishes, receivedPub -> {
verifyPayloadInUTF8(receivedPub, "OK");
verifyContainUserProperty(receivedPub, "content-type", "application/plain");
});
}
}

@Test
public void givenRetainedPublishWithUserPropertiesWhenClientSubscribesToMatchingTheTopicFilterThenUserPropertiesReachTheSubscriber() throws InterruptedException {
final Mqtt5BlockingClient publisher = createHiveBlockingClient("publisher");
Mqtt5PublishResult publishResult = publisher.publishWith()
.topic("some/interesting/thing")
.payload("OK".getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.retain(true)
.userProperties()
.add("content-type", "application/plain")
.applyUserProperties()
.send();
verifyPublishSucceeded(publishResult);

final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber");
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
subscribeToAtQos1(subscriber, "some/interesting/thing");

verifyPublishMessage(publishes, receivedPub -> {
verifyPayloadInUTF8(receivedPub, "OK");
verifyContainUserProperty(receivedPub, "content-type", "application/plain");
});
}
}

protected static void verifyContainUserProperty(Mqtt5Publish receivedPub, String expectedName, String expectedValue) {
Optional<? extends Mqtt5UserProperty> userProp = receivedPub.getUserProperties().asList()
.stream()
.filter(prop -> prop.getName().toString().equals(expectedName))
.findFirst();
assertTrue(userProp.isPresent(), "Expected a user property named 'content-type'");
String propertyValue = userProp
.map(Mqtt5UserProperty::getValue)
.map(Object::toString)
.orElse("<empty-string>");
assertEquals(expectedValue, propertyValue);
}
}

0 comments on commit c555c19

Please sign in to comment.