-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessagepump.go
205 lines (172 loc) · 4.77 KB
/
messagepump.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package rose
import (
"net"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
)
const (
// writeWait amount of time that can pass after a packet write times out.
writeTimeout = 10 * time.Second
// pongWait amount of time that can pass after a packet read times out (used for ping).
readTimeout = 10 * time.Second
// pingPeriod time between ping messages.
pingPeriod = (readTimeout * 9) / 10
// maxMessageSize maximum packet size in bytes.
maxMessageSize = 4096
// maxMessages maximum messages that can be queued for sending before blocking.
maxMessages = 8
)
// MessageType message id type
type MessageType uint64
// MessagePump pump used for reading and writing on a websocket.
type MessagePump struct {
server *Server
user User
Connected bool
Address net.Addr
// The websocket connection.
ws *websocket.Conn
// Buffered channel of outbound messages.
outgoing chan []byte
lock *sync.RWMutex
}
// NewMessagePump create a new MessagePump
func NewMessagePump(constructor UserConstructor, ws *websocket.Conn, server *Server) (pump *MessagePump) {
// Create new pump
pump = &MessagePump{
server: server,
ws: ws,
outgoing: make(chan []byte, maxMessages),
lock: new(sync.RWMutex),
}
// Create new user based on pattern
user := constructor(pump)
pump.user = user
pump.Connected = true
pump.Address = ws.RemoteAddr()
user.OnConnect()
return
}
// readPump pumps messages from the websocket connection.
func (pump *MessagePump) readPump() {
// Setup read pump
pump.ws.SetReadLimit(maxMessageSize)
if err := pump.ws.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
pump.server.log.Println(err)
return
}
pump.ws.SetPongHandler(func(string) error {
return pump.ws.SetReadDeadline(time.Now().Add(readTimeout))
})
// Loop to read messages until we need to exit
for {
// Read a message of the websocket, blocking
_, data, err := pump.ws.ReadMessage()
if err != nil {
break
}
// Read message type
messageType, bytes := proto.DecodeVarint(data)
message := data[bytes:]
// Send packet to user code
pump.user.HandlePacket(MessageType(messageType), message)
}
// The lock is so we're sure nothing writes to the pump while we close things
pump.lock.Lock()
// Close the writing channel so the rest stops
if pump.Connected {
close(pump.outgoing)
}
// Set user as disconnected
pump.Connected = false
pump.lock.Unlock()
pump.user.OnDisconnect(nil)
}
// write writes a message with the given message type and payload.
func (pump *MessagePump) writeRaw(mt int, payload []byte) error {
err := pump.ws.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
return err
}
return pump.ws.WriteMessage(mt, payload)
}
// writePump pumps messages from the hub to the websocket connection.
func (pump *MessagePump) writePump() {
pingTicker := time.NewTicker(pingPeriod)
writeloop:
for {
select {
case message, ok := <-pump.outgoing:
// Did the channel close? send close message over socket!
if !ok || message == nil {
pump.writeRaw(websocket.CloseMessage, []byte{})
break writeloop
}
// Write message to socket
if err := pump.writeRaw(websocket.BinaryMessage, message); err != nil {
break writeloop
}
case <-pingTicker.C:
// Send ping message to keep connection alive
if err := pump.writeRaw(websocket.PingMessage, []byte{}); err != nil {
break writeloop
}
}
}
// The lock is so we're sure nothing writes to the pump while we close things
pump.lock.Lock()
// Close the writing channel so the rest stops
if pump.Connected {
close(pump.outgoing)
}
// Set user as disconnected
pump.Connected = false
pump.lock.Unlock()
// Close connection
pingTicker.Stop()
if err := pump.ws.Close(); err != nil {
pump.server.log.Println(err)
}
}
// SendMessage schedule message to be send
func (pump *MessagePump) SendMessage(messageType MessageType, pb proto.Message) error {
// Create buffer to write to with the calculated size required
messageID := uint64(messageType)
// TODO Stop constantly making buffers
typebuf := make([]byte, 0, proto.SizeVarint(messageID)+proto.Size(pb))
response := proto.NewBuffer(typebuf)
// Serialize
if err := response.EncodeVarint(messageID); err != nil {
return err
}
if err := response.Marshal(pb); err != nil {
return err
}
// Send type prefixed message
writeLoop:
for {
pump.lock.Lock()
if !pump.Connected {
break writeLoop
}
select {
case pump.outgoing <- response.Bytes():
break writeLoop
default:
}
pump.lock.Unlock()
}
pump.lock.Unlock()
return nil
}
// Disconnect schedule disconnection of pump
func (pump *MessagePump) Disconnect() {
// Close the connection, lock in case we are already disconnecting
pump.lock.Lock()
if pump.Connected {
pump.outgoing <- nil
}
pump.lock.Unlock()
}