-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
145 lines (131 loc) · 4.38 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package goredis
import (
"fmt"
"time"
"github.com/go-redis/redis/v7"
)
// Client redis (cluster) client.
type (
Client struct {
cfg *Config
Cmdable
}
Cmdable interface {
redis.Cmdable
Subscribe(channels ...string) *redis.PubSub
// tx watch
Watch(fn func(*redis.Tx) error, keys ...string) error
}
// Alias
PubSub = redis.PubSub
Message = redis.Message
GeoLocation = redis.GeoLocation
GeoRadiusQuery = redis.GeoRadiusQuery
ZRangeBy = redis.ZRangeBy
Z = redis.Z
Pipeliner = redis.Pipeliner
RedisCmdable = redis.Cmdable
SliceCmd = redis.SliceCmd
StatusCmd = redis.StatusCmd
Cmder = redis.Cmder
IntCmd = redis.IntCmd
DurationCmd = redis.DurationCmd
BoolCmd = redis.BoolCmd
StringCmd = redis.StringCmd
FloatCmd = redis.FloatCmd
StringSliceCmd = redis.StringSliceCmd
BoolSliceCmd = redis.BoolSliceCmd
StringStringMapCmd = redis.StringStringMapCmd
StringIntMapCmd = redis.StringIntMapCmd
ZSliceCmd = redis.ZSliceCmd
ScanCmd = redis.ScanCmd
ClusterSlotsCmd = redis.ClusterSlotsCmd
)
// NewClient creates a redis(cluster) client from yaml config, and pings the client.
func NewClient(cfg *Config) (*Client, error) {
var c = &Client{
cfg: cfg,
}
switch cfg.DeployType {
case TypeSingle:
c.Cmdable = redis.NewClient(&redis.Options{
Addr: cfg.ForSingle.Addr,
Password: cfg.Password,
MaxRetries: cfg.MaxRetries,
MaxRetryBackoff: time.Duration(cfg.ForSingle.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(cfg.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,
PoolSize: cfg.PoolSizePerNode,
PoolTimeout: time.Duration(cfg.PoolTimeout) * time.Second,
IdleTimeout: time.Duration(cfg.IdleTimeout) * time.Second,
IdleCheckFrequency: time.Duration(cfg.IdleCheckFrequency) * time.Second,
})
case TypeCluster:
c.Cmdable = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: cfg.ForCluster.Addrs,
MaxRedirects: cfg.ForCluster.MaxRedirects,
ReadOnly: cfg.ReadOnly,
RouteByLatency: cfg.ForCluster.RouteByLatency,
Password: cfg.Password,
MaxRetries: cfg.MaxRetries,
DialTimeout: time.Duration(cfg.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cfg.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cfg.WriteTimeout) * time.Second,
PoolSize: cfg.PoolSizePerNode,
PoolTimeout: time.Duration(cfg.PoolTimeout) * time.Second,
IdleTimeout: time.Duration(cfg.IdleTimeout) * time.Second,
IdleCheckFrequency: time.Duration(cfg.IdleCheckFrequency) * time.Second,
})
default:
return nil, fmt.Errorf("redis.Config.DeployType: optional enumeration list: %s, %s", TypeSingle, TypeCluster)
}
if _, err := c.Ping().Result(); err != nil {
return nil, err
}
return c, nil
}
// Config returns config.
func (c *Client) Config() *Config {
return c.cfg
}
// IsCluster returns whether it is a cluster.
func (c *Client) IsCluster() bool {
return c.cfg.DeployType == TypeCluster
}
// ToSingle tries to convert it to *redis.Client.
func (c *Client) ToSingle() (*redis.Client, bool) {
cli, ok := c.Cmdable.(*redis.Client)
return cli, ok
}
// ToCluster tries to convert it to *redis.ClusterClient.
func (c *Client) ToCluster() (*redis.ClusterClient, bool) {
clu, ok := c.Cmdable.(*redis.ClusterClient)
return clu, ok
}
// LockCallback 使用分布式锁执行回调函数
// 注意:每10毫秒尝试1次上锁,且上锁后默认锁定1分钟
func (c *Client) LockCallback(lockKey string, callback func(), maxLock ...time.Duration) error {
var d = time.Minute
if len(maxLock) > 0 {
d = maxLock[0]
}
// lock
for lockOk, err := c.SetNX(lockKey, "", d).Result(); !lockOk; lockOk, err = c.SetNX(lockKey, "", d).Result() {
if err != nil && !IsRedisNil(err) {
return err
}
time.Sleep(time.Millisecond * 10)
}
// unlock
defer c.Del(lockKey)
// do
callback()
return nil
}
// Redis nil reply, .e.g. when key does not exist.
const Nil = redis.Nil
// IsRedisNil Is the redis nil reply? .e.g. when key does not exist.
func IsRedisNil(err error) bool {
return redis.Nil == err
}