-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchannel.go
77 lines (66 loc) · 1.69 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package websocket
import (
"crypto/tls"
"net"
"sync"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/mailru/easygo/netpoll"
)
// Channel is a websocket connection that messages are read from and written to.
type Channel struct {
handler *Handler // Handle managing this connection.
conn net.Conn // Websocket connection.
readDesc *netpoll.Desc // Read descriptor for netpoll.
// onClose is called when the channel is closed.
onClose func()
onCloseMux sync.Mutex
}
// Create a new channel for the connection.
func newChannel(conn net.Conn, handler *Handler) *Channel {
fdConn := conn
tlsConn, ok := conn.(*tls.Conn)
if ok {
fdConn = tlsConn.NetConn()
}
return &Channel{
handler: handler,
conn: conn,
readDesc: netpoll.Must(netpoll.HandleReadOnce(fdConn)),
}
}
// Close the channel.
func (c *Channel) Close() {
c.handler.poller.Stop(c.readDesc)
c.conn.Close()
c.onCloseMux.Lock()
defer c.onCloseMux.Unlock()
if c.onClose != nil {
c.onClose()
}
}
// Send a message over the channel. Once write concurrency of the handler is
// reached this method will block.
func (c *Channel) Send(op OpCode, data []byte) {
c.handler.writePool.schedule(func() {
err := wsutil.WriteServerMessage(c.conn, ws.OpCode(op), data)
if err != nil {
c.Close()
}
})
}
// SetOnClose sets the callback to get called when the channel is closed.
func (c *Channel) SetOnClose(callback func()) {
c.onCloseMux.Lock()
defer c.onCloseMux.Unlock()
c.onClose = callback
}
// Read and process the message from the connection.
func (c *Channel) read() {
data, op, err := wsutil.ReadClientData(c.conn)
if err != nil {
c.Close()
return
}
c.handler.callback(c, OpCode(op), data)
}