diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index aaa55475d..3aa91dd5b 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -638,4 +638,15 @@ public void removeInterceptHandler(InterceptHandler interceptHandler) { public Collection listConnectedClients() { return sessions.listConnectedClients(); } + + + + /** + * Attempts to disconnect a client given its client id. + * Iterates over the session pool. returning client descriptors of any matched (now disconnected) + * clients. + * */ + public List kickClientId(String clientId) { + return sessions.kickClient(clientId); + } } diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index aa077ccd7..c48fe9796 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -30,6 +30,8 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -311,6 +313,22 @@ Collection listConnectedClients() { .collect(Collectors.toList()); } + List kickClient(String clientId) { + return pool.values().stream() + .filter(Session::connected) + .filter(session -> Objects.equals(clientId, session.getClientID())) + .map(this::disconnectSession) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + private Optional disconnectSession(Session s) { + Optional descriptor = createClientDescriptor(s); + s.closeImmediately(); + return descriptor; + } + private Optional createClientDescriptor(Session s) { final String clientID = s.getClientID(); final Optional remoteAddressOpt = s.remoteAddress();