Skip to content

Commit

Permalink
allow clients listing
Browse files Browse the repository at this point in the history
  • Loading branch information
source-c committed Apr 27, 2023
1 parent 5ceafef commit 30bab2a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
11 changes: 8 additions & 3 deletions src-clj/clj_mqtt_broker/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
(:import (io.moquette.interception InterceptHandler)
(io.moquette BrokerConstants)
(io.netty.handler.codec.mqtt MqttQoS)
(com.dkdhub.mqtt_broker IBroker)
(com.dkdhub.mqtt_broker AdvancedBroker IBroker)
(java.util Hashtable Map Properties)))

(defn ->QoS [qos]
Expand Down Expand Up @@ -41,7 +41,8 @@
(open [o ^InterceptHandler handlers])
(stop [o])
(close [o])
(send [o from to data qos retain?]))
(send [o from to data qos retain?])
(clients [o]))

(deftype Broker [^IBroker instance]
CljBroker
Expand All @@ -56,4 +57,8 @@
(if (bytes? data) data (.getBytes ^String data))
(if (keyword? qos) (->QoS qos) (MqttQoS/valueOf (int qos)))
(if (boolean? retain?) retain? false))
this))
this)
(clients [_]
(if (instance? AdvancedBroker instance)
(into [] (.clients instance))
nil)))
2 changes: 1 addition & 1 deletion src-java/com/dkdhub/mqtt_broker/AdvancedBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void send(String from, String topic, byte[] data, MqttQoS qos, Boolean re
m_server.internalPublish(message, from);
}

List<Map<String, ? extends Serializable>> clients() {
public List<Map<String, ? extends Serializable>> clients() {
return m_server.listConnectedClients().parallelStream()
.map(cl -> Map.of(
"id", cl.getClientID(),
Expand Down
14 changes: 13 additions & 1 deletion test/basic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,21 @@
(deftest check-advanced-constructs
(testing "Checking advanced constructs")

(log/info "--------- MQTT Advanced Broker empty loop ---------")
(is (let [b (Broker. (AdvancedBroker. (mqtt-config)))]
(with-open [srv (open b (BasicHandler. "3456"))]
(Thread/sleep 2000)
(send srv "FROM" "/TEMPERATURE" "TEST" 1 false)
(Thread/sleep 2000))
true)))
true))

(is (let [b (Broker. (SimpleBroker. config-name))]
(with-open [srv (open b (BasicHandler. "3456"))]
(nil? (clients srv)))))

(is (let [b (Broker. (AdvancedBroker. (mqtt-config)))]
(nil? (clients b))))

(is (let [b (Broker. (AdvancedBroker. (mqtt-config)))]
(with-open [srv (open b (BasicHandler. "3456"))]
(sequential? (clients srv))))))

0 comments on commit 30bab2a

Please sign in to comment.