forked from mennanov/limiters
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocks.go
145 lines (123 loc) · 4.21 KB
/
locks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package limiters
import (
"context"
"github.com/go-redsync/redsync/v4"
redsyncredis "github.com/go-redsync/redsync/v4/redis"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// DistLocker is a context aware distributed locker (interface is similar to sync.Locker).
type DistLocker interface {
// Lock locks the locker.
Lock(ctx context.Context) error
// Unlock unlocks the previously successfully locked lock.
Unlock(ctx context.Context) error
}
// LockNoop is a no-op implementation of the DistLocker interface.
// It should only be used with the in-memory backends as they are already thread-safe and don't need distributed locks.
type LockNoop struct {
}
// NewLockNoop creates a new LockNoop.
func NewLockNoop() *LockNoop {
return &LockNoop{}
}
// Lock imitates locking.
func (n LockNoop) Lock(ctx context.Context) error {
return ctx.Err()
}
// Unlock does nothing.
func (n LockNoop) Unlock(_ context.Context) error {
return nil
}
// LockEtcd implements the DistLocker interface using etcd.
//
// See https://github.com/etcd-io/etcd/blob/master/Documentation/learning/why.md#using-etcd-for-distributed-coordination
type LockEtcd struct {
cli *clientv3.Client
prefix string
logger Logger
mu *concurrency.Mutex
session *concurrency.Session
}
// NewLockEtcd creates a new instance of LockEtcd.
func NewLockEtcd(cli *clientv3.Client, prefix string, logger Logger) *LockEtcd {
return &LockEtcd{cli: cli, prefix: prefix, logger: logger}
}
// Lock creates a new session-based lock in etcd and locks it.
func (l *LockEtcd) Lock(ctx context.Context) error {
var err error
l.session, err = concurrency.NewSession(l.cli, concurrency.WithTTL(1))
if err != nil {
return errors.Wrap(err, "failed to create an etcd session")
}
l.mu = concurrency.NewMutex(l.session, l.prefix)
return errors.Wrap(l.mu.Lock(ctx), "failed to lock a mutex in etcd")
}
// Unlock unlocks the previously locked lock.
func (l *LockEtcd) Unlock(ctx context.Context) error {
defer func() {
if err := l.session.Close(); err != nil {
l.logger.Log(err)
}
}()
return errors.Wrap(l.mu.Unlock(ctx), "failed to unlock a mutex in etcd")
}
// LockConsul is a wrapper around github.com/hashicorp/consul/api.Lock that implements the DistLocker interface.
type LockConsul struct {
lock *api.Lock
}
// NewLockConsul creates a new LockConsul instance.
func NewLockConsul(lock *api.Lock) *LockConsul {
return &LockConsul{lock: lock}
}
// Lock locks the lock in Consul.
func (l *LockConsul) Lock(ctx context.Context) error {
_, err := l.lock.Lock(ctx.Done())
return errors.Wrap(err, "failed to lock a mutex in consul")
}
// Unlock unlocks the lock in Consul.
func (l *LockConsul) Unlock(_ context.Context) error {
return l.lock.Unlock()
}
// LockZookeeper is a wrapper around github.com/samuel/go-zookeeper/zk.Lock that implements the DistLocker interface.
type LockZookeeper struct {
lock *zk.Lock
}
// NewLockZookeeper creates a new instance of LockZookeeper.
func NewLockZookeeper(lock *zk.Lock) *LockZookeeper {
return &LockZookeeper{lock: lock}
}
// Lock locks the lock in Zookeeper.
// TODO: add context aware support once https://github.com/samuel/go-zookeeper/pull/168 is merged.
func (l *LockZookeeper) Lock(_ context.Context) error {
return l.lock.Lock()
}
// Unlock unlocks the lock in Zookeeper.
func (l *LockZookeeper) Unlock(_ context.Context) error {
return l.lock.Unlock()
}
// LockRedis is a wrapper around github.com/go-redsync/redsync that implements the DistLocker interface.
type LockRedis struct {
mutex *redsync.Mutex
}
// NewLockRedis creates a new instance of LockRedis.
func NewLockRedis(pool redsyncredis.Pool, mutexName string) *LockRedis {
rs := redsync.New(pool)
mutex := rs.NewMutex(mutexName)
return &LockRedis{mutex: mutex}
}
// Lock locks the lock in Redis.
func (l *LockRedis) Lock(_ context.Context) error {
err := l.mutex.Lock()
return errors.Wrap(err, "failed to lock a mutex in redis")
}
// Unlock unlocks the lock in Redis.
func (l *LockRedis) Unlock(_ context.Context) error {
if ok, err := l.mutex.Unlock(); !ok || err != nil {
return errors.Wrap(err, "failed to unlock a mutex in redis")
}
return nil
}