-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconn.go
225 lines (196 loc) · 5.4 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package kpx
import (
"crypto/tls"
"math"
"net"
"time"
)
func ConfigureConn(conn net.Conn) {
// Reducing TIME_WAIT connections by disableing Nagle's algorythm
if c, ok := conn.(*net.TCPConn); ok {
_ = c.SetNoDelay(true)
return
}
if c, ok := conn.(*tls.Conn); ok {
ConfigureConn(c.NetConn())
return
}
}
/*
TimedConn is a wrapper around net.Conn which provides automatic read/write timeouts:
- if timeout > 0, set an absolute timeout on first read
- if timeout = 0, do not set timeout
- if timeout < 0, set a sliding timeout, which automatically increases each min( 30s , timeout/2 ).
*/
type TimedConn struct {
conn net.Conn
timeout int
last time.Time
self *TimedConn
ti *traceInfo
closed bool
}
func NewTimedConn(conn net.Conn, ti *traceInfo) *TimedConn {
c := TimedConn{conn: conn, ti: ti}
c.self = &c
return &c
}
func (tc *TimedConn) deadlines(reset bool) {
//logInfo("%s - %s / %d", tc.Id, tc.last, tc.timeout)
switch {
case tc.timeout > 0:
_ = tc.conn.SetReadDeadline(time.Now().Add(time.Duration(tc.timeout) * time.Second))
tc.timeout = 0
case tc.timeout < 0 && reset:
_ = tc.conn.SetReadDeadline(time.Now().Add(time.Duration(-tc.timeout) * time.Second))
tc.last = time.Now()
case tc.timeout < 0 && time.Since(tc.last).Seconds() > math.Min(30, float64(-tc.timeout/2)):
_ = tc.conn.SetReadDeadline(time.Now().Add(time.Duration(-tc.timeout) * time.Second))
tc.last = time.Now()
case tc.timeout == 0 && reset:
_ = tc.conn.SetReadDeadline(time.Time{})
}
//logInfo("%s + %s / %d", tc.Id, tc.last, tc.timeout)
}
func (tc *TimedConn) Read(b []byte) (n int, err error) {
tc.self.deadlines(false)
n, err = tc.conn.Read(b)
return n, err
}
func (tc *TimedConn) Write(b []byte) (n int, err error) {
tc.self.deadlines(false)
return tc.conn.Write(b)
}
func (tc *TimedConn) Close() error {
if !tc.closed && trace {
logTrace(tc.ti, "close connection")
}
return tc.conn.Close()
}
func (tc *TimedConn) LocalAddr() net.Addr {
return tc.conn.LocalAddr()
}
func (tc *TimedConn) RemoteAddr() net.Addr {
return tc.conn.RemoteAddr()
}
func (tc *TimedConn) SetDeadline(_ time.Time) error {
return nil
}
func (tc *TimedConn) SetReadDeadline(_ time.Time) error {
return nil
}
func (tc *TimedConn) SetWriteDeadline(_ time.Time) error {
return nil
}
// set read/write timeout: absolute timeout if > 0, sliding timeout if < 0, no timeout if 0.
//
// sliding timeout reinitialize the timeout each 1/2 timeout or 30 seconds to keep the connection open.
func (tc *TimedConn) setTimeout(timeout int) {
if trace {
logTrace(tc.ti, "set conn timeout %d", timeout)
}
if timeout < 0 {
// double sliding timeout because it is expanded only 1/2 timeout
timeout = timeout * 2
}
tc.timeout = timeout
tc.last = time.Time{}
tc.deadlines(true)
}
/*
CloseAwareConn is a connection that can detect if underlying connection is closed, but only on first Write() after Reset() has been called.
This way, we can choose when the closed connection can be replaced by a new one, ensuring connection closed is only handled when expected.
This allows to detect a restart of a remote proxy, for example.
On linux and Windows (and MacOS?), a double .Write() allows to detect a closed connection, but this trick does not work all the time.
*/
type CloseAwareConn struct {
reset bool
dialer *net.Dialer
network string
proxy string
conn net.Conn
reqId int32
currId int32
}
func NewCloseAwareConn(dialer *net.Dialer, network string, proxy string, reqId int32) (*CloseAwareConn, error) {
cc := &CloseAwareConn{
reset: false,
dialer: dialer,
network: network,
proxy: proxy,
conn: nil,
reqId: reqId,
currId: reqId,
}
err := cc.ReOpen()
if err != nil {
return nil, err
}
return cc, nil
}
func (cc *CloseAwareConn) Reset(reqId int32) {
cc.reset = true
cc.currId = reqId
}
func (cc *CloseAwareConn) ReOpen() error {
c, err := cc.dialer.Dial(cc.network, cc.proxy)
if err != nil {
return err
}
ConfigureConn(c)
cc.conn = c
return nil
}
func (cc *CloseAwareConn) Read(b []byte) (n int, err error) {
return cc.conn.Read(b)
}
func (cc *CloseAwareConn) Write(b []byte) (n int, err error) {
if cc.reset && len(b) > 0 {
cc.reset = false
// we can eventually recreate a new connection if writing first byte fails
n, err = cc.conn.Write(b[0:1])
if err != nil {
// try to recreate the connection
if cc.ReOpen() != nil {
return 0, err
}
if trace {
logInfo("(%d) connection %d replaced by a new one", cc.currId, cc.reqId)
}
return cc.conn.Write(b)
}
// we can eventually recreate a new connection if writing next byte fails
n, err = cc.conn.Write(b[1:])
if err != nil {
// try to recreate the connection
if cc.ReOpen() != nil {
return 0, err
}
if trace {
logInfo("(%d) connection %d replaced by a new one", cc.currId, cc.reqId)
}
return cc.conn.Write(b)
}
// and rewrite buffer
return n + 1, nil
}
return cc.conn.Write(b)
}
func (cc *CloseAwareConn) Close() error {
return cc.conn.Close()
}
func (cc *CloseAwareConn) LocalAddr() net.Addr {
return cc.conn.LocalAddr()
}
func (cc *CloseAwareConn) RemoteAddr() net.Addr {
return cc.conn.RemoteAddr()
}
func (cc *CloseAwareConn) SetDeadline(t time.Time) error {
return cc.conn.SetDeadline(t)
}
func (cc *CloseAwareConn) SetReadDeadline(t time.Time) error {
return cc.conn.SetReadDeadline(t)
}
func (cc *CloseAwareConn) SetWriteDeadline(t time.Time) error {
return cc.conn.SetWriteDeadline(t)
}