Skip to content

Commit

Permalink
add option WaitReplicas
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed May 5, 2022
1 parent ff2792f commit 6f9b936
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 22 deletions.
67 changes: 47 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ type Options struct {
LockExpire time.Duration
// LockSleep is the sleep interval time if try lock failed. default is 1000ms
LockSleep time.Duration
// WaitReplicas is the number of replicas to wait for. default is 0
// if WaitReplicas is > 0, it will use redis WAIT command to wait for DelayDelete synchronized.
WaitReplicas int
// WaitReplicasTimeout is the number of replicas to wait for. default is 3000ms
// if WaitReplicas is > 0, WaitReplicasTimeout is the timeout for WAIT command.
WaitReplicasTimeout time.Duration
// RandomExpireAdjustment is the random adjustment for the expire time. default 0.1
// if the expire time is set to 600s, and this value is set to 0.1, then the actual expire time will be 540s - 600s
// solve the problem of cache avalanche.
Expand All @@ -42,11 +48,12 @@ type Options struct {
// NewDefaultOptions return default options
func NewDefaultOptions() Options {
return Options{
Delay: 10,
EmptyExpire: 60,
LockExpire: 3,
LockSleep: 1000,
Delay: 10 * time.Second,
EmptyExpire: 60 * time.Second,
LockExpire: 3 * time.Second,
LockSleep: 1000 * time.Millisecond,
RandomExpireAdjustment: 0.1,
WaitReplicasTimeout: 3000 * time.Millisecond,
}
}

Expand Down Expand Up @@ -77,16 +84,36 @@ func (c *Client) DelayDelete(key string) error {
return nil
}
debugf("deleting: key=%s", key)
_, err := callLua(c.rdb, ` -- delete
local v = redis.call('HGET', KEYS[1], 'value')
if v == false then
return
end
redis.call('HSET', KEYS[1], 'lockUtil', 0)
redis.call('HDEL', KEYS[1], 'lockOwner')
redis.call('EXPIRE', KEYS[1], ARGV[1])
`, []string{key}, []interface{}{c.Options.Delay})
return err
luaFn := func(con redisConn) error {
_, err := callLua(con, c.rdb.Context(), ` -- delete
local v = redis.call('HGET', KEYS[1], 'value')
if v == false then
return
end
redis.call('HSET', KEYS[1], 'lockUtil', 0)
redis.call('HDEL', KEYS[1], 'lockOwner')
redis.call('EXPIRE', KEYS[1], ARGV[1])
`, []string{key}, []interface{}{c.Options.Delay})
return err
}
if c.Options.WaitReplicas > 0 {
rconn := c.rdb.Conn(c.rdb.Context())
defer rconn.Close()
err := luaFn(rconn)
cmd := redis.NewCmd(c.rdb.Context(), "WAIT", c.Options.WaitReplicas, c.Options.WaitReplicasTimeout)
if err == nil && c.Options.WaitReplicas > 0 {
err = rconn.Process(c.rdb.Context(), cmd)
}
var replicas int
if err == nil {
replicas, err = cmd.Int()
}
if err == nil && replicas < c.Options.WaitReplicas {
err = fmt.Errorf("wait replicas %d failed. result replicas: %d", c.Options.WaitReplicas, replicas)
}
return err
}
return luaFn(c.rdb)
}

// Fetch returns the value store in cache indexed by the key.
Expand All @@ -105,7 +132,7 @@ func (c *Client) Fetch(key string, expire time.Duration, fn func() (string, erro
}

func (c *Client) luaGet(key string, owner string) ([]interface{}, error) {
res, err := callLua(c.rdb, ` -- luaGet
res, err := callLua(c.rdb, c.rdb.Context(), ` -- luaGet
local v = redis.call('HGET', KEYS[1], 'value')
local lu = redis.call('HGET', KEYS[1], 'lockUtil')
if lu ~= false and tonumber(lu) < tonumber(ARGV[1]) or lu == false and v == false then
Expand All @@ -123,7 +150,7 @@ func (c *Client) luaGet(key string, owner string) ([]interface{}, error) {
}

func (c *Client) luaSet(key string, value string, expire int, owner string) error {
_, err := callLua(c.rdb, `-- luaSet
_, err := callLua(c.rdb, c.rdb.Context(), `-- luaSet
local o = redis.call('HGET', KEYS[1], 'lockOwner')
if o ~= ARGV[2] then
return
Expand Down Expand Up @@ -158,7 +185,7 @@ func (c *Client) weakFetch(key string, expire time.Duration, fn func() (string,
r, err := c.luaGet(key, owner)
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("empty result for %s locked by other, so sleep %d ms", key, c.Options.LockSleep)
time.Sleep(time.Duration(c.Options.LockSleep) * time.Millisecond)
time.Sleep(c.Options.LockSleep)
r, err = c.luaGet(key, owner)
}
if err != nil {
Expand All @@ -182,7 +209,7 @@ func (c *Client) strongFetch(key string, expire time.Duration, fn func() (string
r, err := c.luaGet(key, owner)
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("locked by other, so sleep %d ms", c.Options.LockSleep)
time.Sleep(time.Duration(c.Options.LockSleep) * time.Millisecond)
time.Sleep(c.Options.LockSleep)
r, err = c.luaGet(key, owner)
}
if err != nil {
Expand Down Expand Up @@ -211,7 +238,7 @@ func (c *Client) RawSet(key string, value string, expire time.Duration) error {
// LockForUpdate locks the key, used in very strict strong consistency mode
func (c *Client) LockForUpdate(key string, owner string) error {
lockUtil := math.Pow10(10)
res, err := callLua(c.rdb, ` -- luaLock
res, err := callLua(c.rdb, c.rdb.Context(), ` -- luaLock
local lu = redis.call('HGET', KEYS[1], 'lockUtil')
local lo = redis.call('HGET', KEYS[1], 'lockOwner')
if lu == false or tonumber(lu) < tonumber(ARGV[2]) or lo == ARGV[1] then
Expand All @@ -229,7 +256,7 @@ func (c *Client) LockForUpdate(key string, owner string) error {

// UnlockForUpdate unlocks the key, used in very strict strong consistency mode
func (c *Client) UnlockForUpdate(key string, owner string) error {
_, err := callLua(c.rdb, ` -- luaUnlock
_, err := callLua(c.rdb, c.rdb.Context(), ` -- luaUnlock
local lo = redis.call('HGET', KEYS[1], 'lockOwner')
if lo == ARGV[1] then
redis.call('DEL', KEYS[1])
Expand Down
9 changes: 9 additions & 0 deletions client_cover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,12 @@ func TestPanicFetch(t *testing.T) {
assert.Nil(t, err)
time.Sleep(20 * time.Millisecond)
}

func TestDelayDeleteWait(t *testing.T) {
clearCache()
rc := NewClient(rdb, NewDefaultOptions())
rc.Options.WaitReplicas = 1
rc.Options.WaitReplicasTimeout = 10
err := rc.DelayDelete("key1")
assert.Error(t, err, fmt.Errorf("wait replicas 1 failed. result replicas: 0"))
}
9 changes: 7 additions & 2 deletions utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rockscache

import (
"context"
"log"
"runtime/debug"
"time"
Expand All @@ -24,9 +25,13 @@ func now() int64 {
return time.Now().Unix()
}

func callLua(rdb *redis.Client, script string, keys []string, args []interface{}) (interface{}, error) {
type redisConn interface {
Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
}

func callLua(rdb redisConn, ctx context.Context, script string, keys []string, args []interface{}) (interface{}, error) {
debugf("callLua: script=%s, keys=%v, args=%v", script, keys, args)
v, err := rdb.Eval(rdb.Context(), script, keys, args).Result()
v, err := rdb.Eval(ctx, script, keys, args).Result()
if err == redis.Nil {
err = nil
}
Expand Down

0 comments on commit 6f9b936

Please sign in to comment.