forked from vert-x3/vertx-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathServer.kt
109 lines (79 loc) · 3.65 KB
/
Server.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package io.vertx.example.mqtt.app
import io.netty.handler.codec.mqtt.MqttQoS
import io.vertx.core.buffer.Buffer
import io.vertx.mqtt.MqttServer
class Server : io.vertx.core.AbstractVerticle() {
override fun start() {
var mqttServer = MqttServer.create(vertx)
mqttServer.endpointHandler({ endpoint ->
// shows main connect info
println("MQTT client [${endpoint.clientIdentifier()}] request to connect, clean session = ${endpoint.isCleanSession()}")
if (endpoint.auth() != null) {
println("[username = ${endpoint.auth().userName()}, password = ${endpoint.auth().password()}]")
}
if (endpoint.will() != null) {
println("[will flag = ${endpoint.will().isWillFlag()} topic = ${endpoint.will().willTopic()} msg = ${endpoint.will().willMessage()} QoS = ${endpoint.will().willQos()} isRetain = ${endpoint.will().isWillRetain()}]")
}
println("[keep alive timeout = ${endpoint.keepAliveTimeSeconds()}]")
// accept connection from the remote client
endpoint.accept(false)
// handling requests for subscriptions
endpoint.subscribeHandler({ subscribe ->
var grantedQosLevels = mutableListOf<Any?>()
for (s in subscribe.topicSubscriptions()) {
println("Subscription for ${s.topicName()} with QoS ${s.qualityOfService()}")
grantedQosLevels.add(s.qualityOfService())
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels)
// just as example, publish a message on the first topic with requested QoS
endpoint.publish(subscribe.topicSubscriptions()[0].topicName(), Buffer.buffer("Hello from the Vert.x MQTT server"), subscribe.topicSubscriptions()[0].qualityOfService(), false, false)
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler({ messageId ->
println("Received ack for message = ${messageId}")
}).publishReceivedHandler({ messageId ->
endpoint.publishRelease(messageId)
}).publishCompletionHandler({ messageId ->
println("Received ack for message = ${messageId}")
})
})
// handling requests for unsubscriptions
endpoint.unsubscribeHandler({ unsubscribe ->
for (t in unsubscribe.topics()) {
println("Unsubscription for ${t}")
}
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId())
})
// handling ping from client
endpoint.pingHandler({ v ->
println("Ping received from client")
})
// handling disconnect message
endpoint.disconnectHandler({ v ->
println("Received disconnect from client")
})
// handling closing connection
endpoint.closeHandler({ v ->
println("Connection closed")
})
// handling incoming published messages
endpoint.publishHandler({ message ->
println("Just received message on [${message.topicName()}] payload [${message.payload()}] with QoS [${message.qosLevel()}]")
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId())
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId())
}
}).publishReleaseHandler({ messageId ->
endpoint.publishComplete(messageId)
})
}).listen(1883, "0.0.0.0", { ar ->
if (ar.succeeded()) {
println("MQTT server is listening on port ${mqttServer.actualPort()}")
} else {
System.err.println("Error on starting the server${ar.cause().getMessage()}")
}
})
}
}