-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpool.go
158 lines (135 loc) · 3.13 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package redisgo
import (
"context"
"sync/atomic"
"time"
)
// PoolConn represents a pooled conn
type PoolConn struct {
*Conn
p *Pool
freedAt time.Time
createdAt time.Time
}
// CreatedAt returns the create time of the conn
func (c *PoolConn) CreatedAt() time.Time {
return c.createdAt
}
// Close puts PoolConn back to the pool
func (c *PoolConn) Close() {
if c.Conn.pd != 0 {
c.Conn.Close()
}
p := c.p
c.p = nil
p.put(c)
}
// Pool represents a Conn pool
type Pool struct {
dial DialFunc
maxIdle int
maxActive int64
maxIdleTime time.Duration
maxConnTime time.Duration
active int64
ch chan *PoolConn
nowfunc func() time.Time
}
// PoolOption represents a pool option
type PoolOption func(p *Pool)
// WithMaxIdle limits pool max idle conns to n, default: 5.
func WithMaxIdle(n int) PoolOption {
return func(p *Pool) {
p.maxIdle = int(n)
}
}
// WithMaxActive limits pool outgoing conns to n.
// the Active counter is incr with a success Pool.Get() call,
// and decr after Poo.Put() call.
// ErrMaxActive is returned by Get() if the counter exceeded n
func WithMaxActive(n int) PoolOption {
return func(p *Pool) {
p.maxActive = int64(n)
}
}
// WithMaxIdleTime sets max idle time of a connection in pool
func WithMaxIdleTime(t time.Duration) PoolOption {
return func(p *Pool) {
p.maxIdleTime = t
}
}
// WithMaxConnTime sets max conn time of a connection in pool from dial opertion
func WithMaxConnTime(t time.Duration) PoolOption {
return func(p *Pool) {
p.maxConnTime = t
}
}
type DialFunc func(ctx context.Context) (*Conn, error)
// NewPool creates a instance of Pool with dialfunc
func NewPool(dialfunc DialFunc, ops ...PoolOption) *Pool {
p := &Pool{dial: dialfunc}
p.maxIdle = 5
for _, op := range ops {
op(p)
}
p.ch = make(chan *PoolConn, p.maxIdle)
p.nowfunc = time.Now
return p
}
// Idle returns idle conn number
func (p *Pool) Idle() int {
return len(p.ch)
}
// Active returns active conn number
func (p *Pool) Active() int {
return int(atomic.LoadInt64(&p.active))
}
// Get returns PoolConn from pool. ctx is passed to DialFunc.
func (p *Pool) Get(ctx context.Context) (*PoolConn, error) {
select {
case conn := <-p.ch:
now := p.nowfunc()
if (p.maxIdleTime > 0 && now.Sub(conn.freedAt) > p.maxIdleTime) ||
(p.maxConnTime > 0 && now.Sub(conn.CreatedAt()) > p.maxConnTime) {
p.closeconn(conn)
return p.Get(ctx)
}
conn.p = p
return conn, nil
case <-ctx.Done():
return nil, ctx.Err()
default:
}
active := atomic.AddInt64(&p.active, 1)
if p.maxActive > 0 && active > p.maxActive {
atomic.AddInt64(&p.active, -1)
return nil, ErrMaxActive
}
c, err := p.dial(ctx)
if err != nil {
atomic.AddInt64(&p.active, -1)
return nil, err
}
return &PoolConn{Conn: c, p: p, createdAt: p.nowfunc()}, nil
}
func (p *Pool) put(conn *PoolConn) {
if conn.Err() != nil {
p.closeconn(conn)
return
}
if p.maxConnTime > 0 && p.nowfunc().Sub(conn.createdAt) > p.maxConnTime {
p.closeconn(conn)
return
}
conn.freedAt = p.nowfunc()
select {
case p.ch <- conn:
return
default:
p.closeconn(conn)
}
}
func (p *Pool) closeconn(conn *PoolConn) {
atomic.AddInt64(&p.active, -1)
conn.Conn.Close()
}