Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of shared subscriptions #796

Merged
merged 19 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bbba65e
Added test to validate the normal flow of shared-subscription and pub…
andsel Nov 18, 2023
2d60b65
Switched CTrie add subscription method to accept a wrapper class Subs…
andsel Nov 25, 2023
5b16f77
Refactored subscription directory add of subscription to work on deco…
andsel Nov 26, 2023
6f6736a
Implemented full flow of adding shared subscriprion on Subscription d…
andsel Dec 3, 2023
e8ae55f
[Skip CI] Update new file with license header
andsel Dec 3, 2023
76fcc69
Added error notification on raw client used in test
andsel Dec 8, 2023
6f384d7
Implemented test to verify a shared subscription on read prohibited t…
andsel Dec 8, 2023
ad5577d
Added a more fluent implementation of IAuthorizatorPolicy to be lever…
andsel Dec 8, 2023
e6ef6f7
Changed PostOffice to verify read access also on shared subscriptions
andsel Dec 8, 2023
4604ce4
Implemented test to verify that mixed sahred and non-shared topics ar…
andsel Dec 9, 2023
7427fda
Do not send retained messages on shared subscription's clients.
andsel Dec 10, 2023
27ae45a
Refactoring, extracted method to make more readable the caller
andsel Dec 10, 2023
10b501b
Updated handling of overlapping shared subscription to send multiple …
andsel Dec 10, 2023
1ecc13a
Added test to verify the update of a shared subscription's qos
andsel Dec 11, 2023
fb596cb
[Doc] Added changelog line and reshaped test for readability
andsel Dec 11, 2023
06d6119
Test fix: augmented timeout on check to pass tests on CI (maybe)
andsel Dec 15, 2023
d21cd29
Reworked the publish verification to execute check on publisching cal…
andsel Dec 15, 2023
98abd7b
[Skip CI] minor touches
andsel Dec 15, 2023
2e58add
Fixed failing test, has to check that the message is published at QoS…
andsel Dec 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Version 0.18-SNAPSHOT:
[feature] shared subscriptions:
- Initial implementation of shared subscription subscribe and publish part. (#796)
[fix] Implements requirements on reserved topics (starts with $). Implements the matching rules and avoid to proceed with processing on client's publishes on those topics (#793)
[feature] Handle will delay interval and MQTT5's Will optional properties (#770)
[fix] Handle empty collector batches in PostOffice (#777)
Expand Down
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
https://github.com/netty/netty/blob/netty-4.1.100.Final/pom.xml#L650 -->
<netty.tcnative.version>2.0.61.Final</netty.tcnative.version>
<paho.version>1.2.5</paho.version>
<hivemqclient.version>1.3.0</hivemqclient.version>
<hivemqclient.version>1.3.3</hivemqclient.version>
<h2.version>2.1.212</h2.version>
</properties>

Expand Down
61 changes: 42 additions & 19 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import static io.moquette.broker.Utils.messageId;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
Expand All @@ -39,6 +40,18 @@ final class Authorizator {
this.policy = policy;
}


List<MqttTopicSubscription> verifyAlsoSharedTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
return verifyTopicsReadAccessWithTopicExtractor(clientID, username, msg, Authorizator::extractShareTopic);
}

private static Topic extractShareTopic(String s) {
if (SharedSubscriptionUtils.isSharedSubscription(s)) {
return Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(s));
}
return Topic.asTopic(s);
}

/**
* @param clientID
* the clientID
Expand All @@ -49,33 +62,43 @@ final class Authorizator {
* @return the list of verified topics for the given subscribe message.
*/
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
return verifyTopicsReadAccessWithTopicExtractor(clientID, username, msg, Topic::asTopic);
}

private List<MqttTopicSubscription> verifyTopicsReadAccessWithTopicExtractor(String clientID, String username,
MqttSubscribeMessage msg, Function<String, Topic> topicExtractor) {
List<MqttTopicSubscription> ackTopics = new ArrayList<>();

final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!policy.canRead(topic, username, clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic username: {}, messageId: {}, " +
"topic: {}", username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic username: {}, messageId: {}, topic: {}",
username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.warn("Topic filter is not valid username: {}, messageId: {}, topic: {}",
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
Topic topic = topicExtractor.apply(req.topicName());
final MqttQoS qos = getQoSCheckingAlsoPermissionsOnTopic(clientID, username, messageId, topic,
req.qualityOfService());
ackTopics.add(new MqttTopicSubscription(req.topicName(), qos));
}
return ackTopics;
}

private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String username, int messageId,
Topic topic, MqttQoS requestedQoS) {
if (policy.canRead(topic, username, clientID)) {
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic username: {}, messageId: {}, topic: {}",
username, messageId, topic);
return requestedQoS;
}

LOG.warn("Topic filter is not valid username: {}, messageId: {}, topic: {}",
username, messageId, topic);
return FAILURE;
}

// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic username: {}, messageId: {}, " +
"topic: {}", username, messageId, topic);
return FAILURE;
}

/**
* Ask the authorization policy if the topic can be used in a publish.
*
Expand Down
10 changes: 6 additions & 4 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,10 @@ void sendPublish(MqttPublishMessage publishMsg) {
final String topicName = publishMsg.variableHeader().topicName();
MqttQoS qos = publishMsg.fixedHeader().qosLevel();
if (LOG.isTraceEnabled()) {
LOG.trace("Sending PUBLISH({}) message. MessageId={}, topic={}, payload={}", qos, packetId, topicName,
DebugUtils.payload2Str(publishMsg.payload()));
LOG.trace("Sending PUBLISH({}) message. MessageId={}, topic={}, payload={} to {}", qos, packetId, topicName,
DebugUtils.payload2Str(publishMsg.payload()), getClientId());
} else {
LOG.debug("Sending PUBLISH({}) message. MessageId={}, topic={}", qos, packetId, topicName);
LOG.debug("Sending PUBLISH({}) message. MessageId={}, topic={} to {}", qos, packetId, topicName, getClientId());
}
sendIfWritableElseDrop(publishMsg);
}
Expand All @@ -703,7 +703,7 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("OUT {}", msg.fixedHeader().messageType());
}
if (channel.isWritable()) {

LOG.debug("Sending message {} on the wire to {}", msg.fixedHeader().messageType(), getClientId());
// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
Object retainedDup = msg;
Expand All @@ -718,6 +718,8 @@ void sendIfWritableElseDrop(MqttMessage msg) {
channelFuture = channel.write(retainedDup);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
} else {
LOG.debug("Dropping message {} from the wire, msg: {}", msg.fixedHeader().messageType(), msg);
}
}

Expand Down
103 changes: 58 additions & 45 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moquette.broker;

import io.moquette.broker.scheduler.ScheduledExpirationService;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
Expand All @@ -32,7 +33,6 @@
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.apache.commons.codec.binary.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -273,38 +274,77 @@ public void wipeExistingScheduledWill(String clientId) {
sessionRepository.deleteWill(clientId);
}

// Used for internal purposes of subscribeClientToTopics method
private static final class SharedSubscriptionData {
final ShareName name;
final Topic topicFilter;
final MqttQoS requestedQoS;

private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttQoS requestedQoS) {
Objects.requireNonNull(name);
Objects.requireNonNull(topicFilter);
Objects.requireNonNull(requestedQoS);
this.name = name;
this.topicFilter = topicFilter;
this.requestedQoS = requestedQoS;
}

static SharedSubscriptionData fromMqttSubscription(MqttTopicSubscription sub) {
return new SharedSubscriptionData(new ShareName(SharedSubscriptionUtils.extractShareName(sub.topicName())),
Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.qualityOfService());
}
}

public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, String username,
MQTTConnection mqttConnection) {
// verify which topics of the subscribe ongoing has read access permission
int messageID = messageId(msg);
final Session session = sessionRegistry.retrieve(clientID);

final List<SharedSubscriptionData> sharedSubscriptions;

if (mqttConnection.isProtocolVersion5()) {
for (MqttTopicSubscription topicFilter : msg.payload().topicSubscriptions()) {
if (isSharedSubscription(topicFilter.topicName())) {
final String shareName = extractShareName(topicFilter.topicName());
if (!validateShareName(shareName)) {
// this is a malformed packet, MQTT-4.13.1-1, disconnect it
LOG.info("{} used an invalid shared subscription name {}, disconnecting", clientID, shareName);
session.disconnectFromBroker();
return;
}
}
sharedSubscriptions = msg.payload().topicSubscriptions().stream()
.filter(sub -> SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(SharedSubscriptionData::fromMqttSubscription)
.collect(Collectors.toList());

Optional<SharedSubscriptionData> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.name.toString()))
.findFirst();
if (invalidSharedSubscription.isPresent()) {
// this is a malformed packet, MQTT-4.13.1-1, disconnect it
LOG.info("{} used an invalid shared subscription name {}, disconnecting", clientID, invalidSharedSubscription.get().name);
session.disconnectFromBroker();
return;
}
} else {
sharedSubscriptions = Collections.emptyList();
}

List<MqttTopicSubscription> ackTopics;
if (mqttConnection.isProtocolVersion5()) {
ackTopics = authorizator.verifyAlsoSharedTopicsReadAccess(clientID, username, msg);
} else {
ackTopics = authorizator.verifyTopicsReadAccess(clientID, username, msg);
}
List<MqttTopicSubscription> ackTopics = authorizator.verifyTopicsReadAccess(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);

// store topics subscriptions in session
// store topics of non-shared subscriptions in session
List<Subscription> newSubscriptions = ackTopics.stream()
.filter(req -> req.qualityOfService() != FAILURE)
.map(req -> {
final Topic topic = new Topic(req.topicName());
return new Subscription(clientID, topic, req.qualityOfService());
.filter(sub -> sub.qualityOfService() != FAILURE)
.filter(sub -> !SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(sub -> {
final Topic topic = new Topic(sub.topicName());
return new Subscription(clientID, topic, sub.qualityOfService());
}).collect(Collectors.toList());

for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos());
}

for (SharedSubscriptionData sharedSubData : sharedSubscriptions) {
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS);
}

// add the subscriptions to Session
Expand All @@ -320,33 +360,6 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

/**
* @return the share name in the topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractShareName(String sharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = sharedTopicFilter.indexOf('/', afterShare);
return sharedTopicFilter.substring(afterShare, endOfShareName);
}

/**
* @return true if shareName is well formed, is at least one characted and doesn't contain wildcard matchers
* */
private boolean validateShareName(String shareName) {
// MQTT-4.8.2-1 MQTT-4.8.2-2, must be longer than 1 char and do not contain + or #
Objects.requireNonNull(shareName);
return shareName.length() > 0 && !shareName.contains("+") && !shareName.contains("#");
}

/**
* @return true if topic filter is shared format
* */
private static boolean isSharedSubscription(String topicFilter) {
Objects.requireNonNull(topicFilter, "topicFilter can't be null");
return topicFilter.startsWith("$share/");
}

private void publishRetainedMessagesForSubscriptions(String clientID, List<Subscription> newSubscriptions) {
Session targetSession = this.sessionRegistry.retrieve(clientID);
for (Subscription subscription : newSubscriptions) {
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void startServer(IConfig config) throws IOException {
* @throws IOException in case of any IO Error.
*/
public void startServer(IConfig config, List<? extends InterceptHandler> handlers) throws IOException {
LOG.debug("Starting moquette integration using IConfig instance and intercept handlers");
LOG.debug("Starting Moquette integration using IConfig instance and intercept handlers");
startServer(config, handlers, null, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2012-2023 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.moquette.broker;

import java.util.Objects;

/**
* Utility class that collects common utils methods for shared subscription topic parsing
* */
class SharedSubscriptionUtils {

/**
* @return the share name in the topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractShareName(String sharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = sharedTopicFilter.indexOf('/', afterShare);
return sharedTopicFilter.substring(afterShare, endOfShareName);
}

/**
* @return the filter part from full topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractFilterFromShared(String fullSharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = fullSharedTopicFilter.indexOf('/', afterShare);
return fullSharedTopicFilter.substring(endOfShareName + 1);
}

/**
* @return true if topic filter is shared format
* */
protected static boolean isSharedSubscription(String topicFilter) {
Objects.requireNonNull(topicFilter, "topicFilter can't be null");
return topicFilter.startsWith("$share/");
}

/**
* @return true if shareName is well-formed, is at least one characted and doesn't contain wildcard matchers
* */
protected static boolean validateShareName(String shareName) {
// MQTT-4.8.2-1 MQTT-4.8.2-2, must be longer than 1 char and do not contain + or #
Objects.requireNonNull(shareName);
return shareName.length() > 0 && !shareName.contains("+") && !shareName.contains("#");
}
}
Loading
Loading