From 6f9b9360703d4da19caff95f8760838c344bc995 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 5 May 2022 21:31:17 +0800 Subject: [PATCH] add option WaitReplicas --- client.go | 67 +++++++++++++++++++++++++++++++------------- client_cover_test.go | 9 ++++++ utils.go | 9 ++++-- 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/client.go b/client.go index d40a89e..71d8910 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,12 @@ type Options struct { LockExpire time.Duration // LockSleep is the sleep interval time if try lock failed. default is 1000ms LockSleep time.Duration + // WaitReplicas is the number of replicas to wait for. default is 0 + // if WaitReplicas is > 0, it will use redis WAIT command to wait for DelayDelete synchronized. + WaitReplicas int + // WaitReplicasTimeout is the number of replicas to wait for. default is 3000ms + // if WaitReplicas is > 0, WaitReplicasTimeout is the timeout for WAIT command. + WaitReplicasTimeout time.Duration // RandomExpireAdjustment is the random adjustment for the expire time. default 0.1 // if the expire time is set to 600s, and this value is set to 0.1, then the actual expire time will be 540s - 600s // solve the problem of cache avalanche. @@ -42,11 +48,12 @@ type Options struct { // NewDefaultOptions return default options func NewDefaultOptions() Options { return Options{ - Delay: 10, - EmptyExpire: 60, - LockExpire: 3, - LockSleep: 1000, + Delay: 10 * time.Second, + EmptyExpire: 60 * time.Second, + LockExpire: 3 * time.Second, + LockSleep: 1000 * time.Millisecond, RandomExpireAdjustment: 0.1, + WaitReplicasTimeout: 3000 * time.Millisecond, } } @@ -77,16 +84,36 @@ func (c *Client) DelayDelete(key string) error { return nil } debugf("deleting: key=%s", key) - _, err := callLua(c.rdb, ` -- delete -local v = redis.call('HGET', KEYS[1], 'value') -if v == false then - return -end -redis.call('HSET', KEYS[1], 'lockUtil', 0) -redis.call('HDEL', KEYS[1], 'lockOwner') -redis.call('EXPIRE', KEYS[1], ARGV[1]) - `, []string{key}, []interface{}{c.Options.Delay}) - return err + luaFn := func(con redisConn) error { + _, err := callLua(con, c.rdb.Context(), ` -- delete + local v = redis.call('HGET', KEYS[1], 'value') + if v == false then + return + end + redis.call('HSET', KEYS[1], 'lockUtil', 0) + redis.call('HDEL', KEYS[1], 'lockOwner') + redis.call('EXPIRE', KEYS[1], ARGV[1]) + `, []string{key}, []interface{}{c.Options.Delay}) + return err + } + if c.Options.WaitReplicas > 0 { + rconn := c.rdb.Conn(c.rdb.Context()) + defer rconn.Close() + err := luaFn(rconn) + cmd := redis.NewCmd(c.rdb.Context(), "WAIT", c.Options.WaitReplicas, c.Options.WaitReplicasTimeout) + if err == nil && c.Options.WaitReplicas > 0 { + err = rconn.Process(c.rdb.Context(), cmd) + } + var replicas int + if err == nil { + replicas, err = cmd.Int() + } + if err == nil && replicas < c.Options.WaitReplicas { + err = fmt.Errorf("wait replicas %d failed. result replicas: %d", c.Options.WaitReplicas, replicas) + } + return err + } + return luaFn(c.rdb) } // Fetch returns the value store in cache indexed by the key. @@ -105,7 +132,7 @@ func (c *Client) Fetch(key string, expire time.Duration, fn func() (string, erro } func (c *Client) luaGet(key string, owner string) ([]interface{}, error) { - res, err := callLua(c.rdb, ` -- luaGet + res, err := callLua(c.rdb, c.rdb.Context(), ` -- luaGet local v = redis.call('HGET', KEYS[1], 'value') local lu = redis.call('HGET', KEYS[1], 'lockUtil') if lu ~= false and tonumber(lu) < tonumber(ARGV[1]) or lu == false and v == false then @@ -123,7 +150,7 @@ func (c *Client) luaGet(key string, owner string) ([]interface{}, error) { } func (c *Client) luaSet(key string, value string, expire int, owner string) error { - _, err := callLua(c.rdb, `-- luaSet + _, err := callLua(c.rdb, c.rdb.Context(), `-- luaSet local o = redis.call('HGET', KEYS[1], 'lockOwner') if o ~= ARGV[2] then return @@ -158,7 +185,7 @@ func (c *Client) weakFetch(key string, expire time.Duration, fn func() (string, r, err := c.luaGet(key, owner) for err == nil && r[0] == nil && r[1].(string) != locked { debugf("empty result for %s locked by other, so sleep %d ms", key, c.Options.LockSleep) - time.Sleep(time.Duration(c.Options.LockSleep) * time.Millisecond) + time.Sleep(c.Options.LockSleep) r, err = c.luaGet(key, owner) } if err != nil { @@ -182,7 +209,7 @@ func (c *Client) strongFetch(key string, expire time.Duration, fn func() (string r, err := c.luaGet(key, owner) for err == nil && r[1] != nil && r[1] != locked { // locked by other debugf("locked by other, so sleep %d ms", c.Options.LockSleep) - time.Sleep(time.Duration(c.Options.LockSleep) * time.Millisecond) + time.Sleep(c.Options.LockSleep) r, err = c.luaGet(key, owner) } if err != nil { @@ -211,7 +238,7 @@ func (c *Client) RawSet(key string, value string, expire time.Duration) error { // LockForUpdate locks the key, used in very strict strong consistency mode func (c *Client) LockForUpdate(key string, owner string) error { lockUtil := math.Pow10(10) - res, err := callLua(c.rdb, ` -- luaLock + res, err := callLua(c.rdb, c.rdb.Context(), ` -- luaLock local lu = redis.call('HGET', KEYS[1], 'lockUtil') local lo = redis.call('HGET', KEYS[1], 'lockOwner') if lu == false or tonumber(lu) < tonumber(ARGV[2]) or lo == ARGV[1] then @@ -229,7 +256,7 @@ func (c *Client) LockForUpdate(key string, owner string) error { // UnlockForUpdate unlocks the key, used in very strict strong consistency mode func (c *Client) UnlockForUpdate(key string, owner string) error { - _, err := callLua(c.rdb, ` -- luaUnlock + _, err := callLua(c.rdb, c.rdb.Context(), ` -- luaUnlock local lo = redis.call('HGET', KEYS[1], 'lockOwner') if lo == ARGV[1] then redis.call('DEL', KEYS[1]) diff --git a/client_cover_test.go b/client_cover_test.go index 831ef65..a1e8a2e 100644 --- a/client_cover_test.go +++ b/client_cover_test.go @@ -67,3 +67,12 @@ func TestPanicFetch(t *testing.T) { assert.Nil(t, err) time.Sleep(20 * time.Millisecond) } + +func TestDelayDeleteWait(t *testing.T) { + clearCache() + rc := NewClient(rdb, NewDefaultOptions()) + rc.Options.WaitReplicas = 1 + rc.Options.WaitReplicasTimeout = 10 + err := rc.DelayDelete("key1") + assert.Error(t, err, fmt.Errorf("wait replicas 1 failed. result replicas: 0")) +} diff --git a/utils.go b/utils.go index 4b1fcc1..463a96c 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,7 @@ package rockscache import ( + "context" "log" "runtime/debug" "time" @@ -24,9 +25,13 @@ func now() int64 { return time.Now().Unix() } -func callLua(rdb *redis.Client, script string, keys []string, args []interface{}) (interface{}, error) { +type redisConn interface { + Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd +} + +func callLua(rdb redisConn, ctx context.Context, script string, keys []string, args []interface{}) (interface{}, error) { debugf("callLua: script=%s, keys=%v, args=%v", script, keys, args) - v, err := rdb.Eval(rdb.Context(), script, keys, args).Result() + v, err := rdb.Eval(ctx, script, keys, args).Result() if err == redis.Nil { err = nil }