-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocker.go
83 lines (72 loc) · 2.97 KB
/
locker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package redisson
import (
"context"
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
)
// Locker is the interface of lock
type Locker interface {
// WithContext acquires a distributed redis lock by name by waiting for it. It may return ErrLockerClosed.
WithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error)
// TryWithContext tries to acquire a distributed redis lock by name without waiting. It may return ErrNotLocked.
TryWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error)
// ForceWithContext takes over a distributed redis lock by canceling the original holder. It may return ErrNotLocked.
ForceWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error)
}
const fallbackSETPXVersion = "6.2.0"
type wrapLocker struct {
v ConfInterface
rueidislock.Locker
}
func (w *wrapLocker) wrap(ctx context.Context, name string, f func(ctx context.Context, name string) (context.Context, context.CancelFunc, error)) (context.Context, context.CancelFunc, error) {
if _, ok := ctx.Deadline(); ok {
return f(ctx, name)
}
ctx0, cancel0 := context.WithTimeout(ctx, w.v.GetWriteTimeout())
ctx1, cancel1, err := f(ctx0, name)
return ctx1, func() {
if cancel1 != nil {
cancel1()
}
cancel0()
}, err
}
func (w *wrapLocker) WithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
return w.wrap(ctx, name, w.Locker.WithContext)
}
// TryWithContext tries to acquire a distributed redis lock by name without waiting. It may return ErrNotLocked.
func (w *wrapLocker) TryWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
return w.wrap(ctx, name, w.Locker.TryWithContext)
}
// ForceWithContext takes over a distributed redis lock by canceling the original holder. It may return ErrNotLocked.
func (w *wrapLocker) ForceWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
return w.wrap(ctx, name, w.Locker.ForceWithContext)
}
// newLocker 新建一个 locker
func newLocker(c *client, opts ...LockerOption) (Locker, error) {
// 校验版本
if c.version.LessThan(mustNewSemVersion(fallbackSETPXVersion)) {
opts = append(opts, WithLockerOptionFallbackSETPX(true))
}
cc := newLockerOptions(opts...)
l, err := rueidislock.NewLocker(rueidislock.LockerOption{
KeyPrefix: cc.GetKeyPrefix(),
KeyValidity: cc.GetKeyValidity(),
TryNextAfter: cc.GetTryNextAfter(),
KeyMajority: cc.GetKeyMajority(),
NoLoopTracking: cc.GetNoLoopTracking(),
FallbackSETPX: cc.GetFallbackSETPX(),
ClientOption: confVisitor2ClientOption(c.v),
ClientBuilder: func(option rueidis.ClientOption) (rueidis.Client, error) {
return rueidis.NewClient(option)
},
})
if err != nil {
return nil, err
}
return &wrapLocker{Locker: l, v: c.v}, nil
}
// NewLocker 新建一个 locker
func (c *client) NewLocker(opts ...LockerOption) (Locker, error) {
return newLocker(c, opts...)
}