-
Notifications
You must be signed in to change notification settings - Fork 822
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial implementation of shared subscriptions (#796)
Contains a series of updates to the broker internal structures to sustain shared subscription registering and dispatching of publish messages. In CTrie's CNode add a separate shared subscriptions data structure to contains the shared subscriptions list related to share name for a given topic filter. It's implemented as a map keyed by share name which maps to a list of shared subscriptions. Updated the CTrie's insertion method to use a data class SubscriptionRequest to capture the shared and non shared subscription actions. Implemented test cases to cover the various MQTT spec requirements as integrations tests, and adapted utility classes (like the low level Client to permit the verification).
- Loading branch information
Showing
30 changed files
with
1,211 additions
and
295 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
broker/src/main/java/io/moquette/broker/SharedSubscriptionUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (c) 2012-2023 The original author or authors | ||
* ------------------------------------------------------ | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v1.0 | ||
* and Apache License v2.0 which accompanies this distribution. | ||
* | ||
* The Eclipse Public License is available at | ||
* http://www.eclipse.org/legal/epl-v10.html | ||
* | ||
* The Apache License v2.0 is available at | ||
* http://www.opensource.org/licenses/apache2.0.php | ||
* | ||
* You may elect to redistribute this code under either of these licenses. | ||
*/ | ||
package io.moquette.broker; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* Utility class that collects common utils methods for shared subscription topic parsing | ||
* */ | ||
class SharedSubscriptionUtils { | ||
|
||
/** | ||
* @return the share name in the topic filter of format $share/{shareName}/{topicFilter} | ||
* */ | ||
// VisibleForTesting | ||
protected static String extractShareName(String sharedTopicFilter) { | ||
int afterShare = "$share/".length(); | ||
int endOfShareName = sharedTopicFilter.indexOf('/', afterShare); | ||
return sharedTopicFilter.substring(afterShare, endOfShareName); | ||
} | ||
|
||
/** | ||
* @return the filter part from full topic filter of format $share/{shareName}/{topicFilter} | ||
* */ | ||
// VisibleForTesting | ||
protected static String extractFilterFromShared(String fullSharedTopicFilter) { | ||
int afterShare = "$share/".length(); | ||
int endOfShareName = fullSharedTopicFilter.indexOf('/', afterShare); | ||
return fullSharedTopicFilter.substring(endOfShareName + 1); | ||
} | ||
|
||
/** | ||
* @return true if topic filter is shared format | ||
* */ | ||
protected static boolean isSharedSubscription(String topicFilter) { | ||
Objects.requireNonNull(topicFilter, "topicFilter can't be null"); | ||
return topicFilter.startsWith("$share/"); | ||
} | ||
|
||
/** | ||
* @return true if shareName is well-formed, is at least one characted and doesn't contain wildcard matchers | ||
* */ | ||
protected static boolean validateShareName(String shareName) { | ||
// MQTT-4.8.2-1 MQTT-4.8.2-2, must be longer than 1 char and do not contain + or # | ||
Objects.requireNonNull(shareName); | ||
return shareName.length() > 0 && !shareName.contains("+") && !shareName.contains("#"); | ||
} | ||
} |
Oops, something went wrong.