forked from jeffallen/mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.go
171 lines (150 loc) · 4.37 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Package mqtt implements MQTT clients and servers.
package mqtt
import (
crand "crypto/rand"
"fmt"
"io"
"log"
"math/rand"
"net"
"runtime"
"sync/atomic"
"time"
proto "github.com/huin/mqtt"
)
// A random number generator ready to make client-id's, if
// they do not provide them to us.
var cliRand *rand.Rand
func init() {
var seed int64
var sb [4]byte
crand.Read(sb[:])
seed = int64(time.Now().Nanosecond())<<32 |
int64(sb[0])<<24 | int64(sb[1])<<16 |
int64(sb[2])<<8 | int64(sb[3])
cliRand = rand.New(rand.NewSource(seed))
}
type stats struct {
recv int64
sent int64
clients int64
clientsMax int64
lastmsgs int64
}
func (s *stats) messageRecv() { atomic.AddInt64(&s.recv, 1) }
func (s *stats) messageSend() { atomic.AddInt64(&s.sent, 1) }
func (s *stats) clientConnect() { atomic.AddInt64(&s.clients, 1) }
func (s *stats) clientDisconnect() { atomic.AddInt64(&s.clients, -1) }
func statsMessage(topic string, stat int64) *proto.Publish {
return &proto.Publish{
Header: header(dupFalse, proto.QosAtMostOnce, retainTrue),
TopicName: topic,
Payload: newIntPayload(stat),
}
}
func (s *stats) publish(sub *subscriptions, interval time.Duration) {
clients := atomic.LoadInt64(&s.clients)
clientsMax := atomic.LoadInt64(&s.clientsMax)
if clients > clientsMax {
clientsMax = clients
atomic.StoreInt64(&s.clientsMax, clientsMax)
}
sub.submit(nil, statsMessage("$SYS/broker/clients/active", clients))
sub.submit(nil, statsMessage("$SYS/broker/clients/maximum", clientsMax))
sub.submit(nil, statsMessage("$SYS/broker/messages/received",
atomic.LoadInt64(&s.recv)))
sub.submit(nil, statsMessage("$SYS/broker/messages/sent",
atomic.LoadInt64(&s.sent)))
msgs := atomic.LoadInt64(&s.recv) + atomic.LoadInt64(&s.sent)
msgpersec := (msgs - s.lastmsgs) / int64(interval/time.Second)
// no need for atomic because we are the only reader/writer of it
s.lastmsgs = msgs
sub.submit(nil, statsMessage("$SYS/broker/messages/per-sec", msgpersec))
}
// An intPayload implements proto.Payload, and is an int64 that
// formats itself and then prints itself into the payload.
type intPayload string
func newIntPayload(i int64) intPayload {
return intPayload(fmt.Sprint(i))
}
func (ip intPayload) ReadPayload(r io.Reader) error {
// not implemented
return nil
}
func (ip intPayload) WritePayload(w io.Writer) error {
_, err := w.Write([]byte(string(ip)))
return err
}
func (ip intPayload) Size() int {
return len(ip)
}
// A Server holds all the state associated with an MQTT server.
type Server struct {
l net.Listener
subs *subscriptions
stats *stats
Done chan struct{}
StatsInterval time.Duration // Defaults to 10 seconds. Must be set using sync/atomic.StoreInt64().
Dump bool // When true, dump the messages in and out.
rand *rand.Rand
}
// NewServer creates a new MQTT server, which accepts connections from
// the given listener. When the server is stopped (for instance by
// another goroutine closing the net.Listener), channel Done will become
// readable.
func NewServer(l net.Listener) *Server {
svr := &Server{
l: l,
stats: &stats{},
Done: make(chan struct{}),
StatsInterval: time.Second * 10,
subs: newSubscriptions(runtime.GOMAXPROCS(0)),
}
// start the stats reporting goroutine
go func() {
for {
svr.stats.publish(svr.subs, svr.StatsInterval)
select {
case <-svr.Done:
return
default:
// keep going
}
time.Sleep(svr.StatsInterval)
}
}()
return svr
}
// Start makes the Server start accepting and handling connections.
func (s *Server) Start() {
go func() {
for {
conn, err := s.l.Accept()
if err != nil {
log.Print("Accept: ", err)
break
}
cli := s.newIncomingConn(conn)
s.stats.clientConnect()
cli.start()
}
close(s.Done)
}()
}
// header is used to initialize a proto.Header when the zero value
// is not correct. The zero value of proto.Header is
// the equivalent of header(dupFalse, proto.QosAtMostOnce, retainFalse)
// and is correct for most messages.
func header(d dupFlag, q proto.QosLevel, r retainFlag) proto.Header {
return proto.Header{
DupFlag: bool(d), QosLevel: q, Retain: bool(r),
}
}
type retainFlag bool
type dupFlag bool
const (
retainFalse retainFlag = false
retainTrue = true
dupFalse dupFlag = false
dupTrue = true
)