-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnection.go
158 lines (139 loc) · 3.33 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package gotcpserver
import (
"log"
"net"
"sync"
"time"
)
// NewServerConnection : 每新来一个连接请求实例化一个ServerConnection
func NewServerConnection(srv *Server, c net.Conn) *ServerConnection {
sc := &ServerConnection{
srv: srv,
conn: c,
packetReceiveChan: make(chan interface{}, ReceiveChanSize),
packetSendChan: make(chan interface{}, SendChanSize),
once: &sync.Once{},
connCloseChan: make(chan struct{}),
remoteAddr: c.RemoteAddr().String(),
}
srv.connMap.Put(sc.remoteAddr, sc)
return sc
}
// Do : 每个ServerConnection需要处理的事
func (sc *ServerConnection) Do() {
go sc.readLoop()
go sc.writeLoop()
go sc.handleLoop()
}
func (sc *ServerConnection) readLoop() {
defer func() {
recover()
sc.Close()
}()
for {
select {
case <-sc.connCloseChan:
return
default:
}
p, _ := mp.ReadPacket(sc.conn)
sc.packetReceiveChan <- p
log.Println("[INFO] Read Data From Client:", p, sc.remoteAddr)
}
}
func (sc *ServerConnection) writeLoop() {
defer func() {
recover()
sc.Close()
}()
for {
select {
case <-sc.connCloseChan:
return
case p := <-sc.packetSendChan:
bb, ok := p.(*MyPacket)
if ok {
log.Println("[INFO] Send Data To Client:", sc.conn.RemoteAddr().String(), bb, bb.data)
}
if _, err := sc.conn.Write(bb.data); err != nil {
return
}
}
}
}
func (sc *ServerConnection) handleLoop() {
defer func() {
recover()
sc.Close()
}()
for {
select {
case <-sc.connCloseChan:
return
case p := <-sc.packetReceiveChan:
aa, ok := p.(*MyPacket)
if ok {
log.Println("[INFO] Wait Handle*************880k:", aa)
}
log.Println("[INFO] Wait Handle:", sc.remoteAddr)
sc.AsyncWritePacket(aa, 2)
}
}
}
// AsyncWritePacket : async write packete to client
func (sc *ServerConnection) AsyncWritePacket(p Packet, timeout time.Duration) (err error) {
defer func() {
recover()
}()
if timeout == 0 {
select {
case sc.packetSendChan <- p:
return nil
default:
return ErrWriteBlock
}
} else {
select {
case sc.packetSendChan <- p:
return nil
case <-time.After(timeout):
return ErrWriteBlock
}
}
}
// Close : 关闭ServerConnection连接
func (sc *ServerConnection) Close() {
sc.once.Do(func() {
sc.conn.Close()
close(sc.connCloseChan)
close(sc.packetSendChan)
close(sc.packetReceiveChan)
sc.srv.connMap.Delete(sc.conn.RemoteAddr().String())
//log.Println("[INFO] ****", sc.srv.connMap)
})
}
// NewServerConnectionMap : ServerConnectionMap实例化
func NewServerConnectionMap() *ServerConnectionMap {
return &ServerConnectionMap{
scm: make(map[string]*ServerConnection),
}
}
// Put : 新增或者修改ServerConnectionMap中的一个键值对k,v
func (scm *ServerConnectionMap) Put(k string, v *ServerConnection) {
scm.Lock()
scm.scm[k] = v
scm.Unlock()
}
// Delete : 删除ServerConnectionMap中的一个键值对k,v
func (scm *ServerConnectionMap) Delete(k string) {
scm.Lock()
delete(scm.scm, k)
scm.Unlock()
}
// Get : 获取ServerConnectionMap中的一个键值对
func (scm *ServerConnectionMap) Get(k string) (*ServerConnection, bool) {
scm.Lock()
sc, ok := scm.scm[k]
scm.Unlock()
return sc, ok
}