From 9eaa74b26dea8e68cd648d5edc570e3257451da0 Mon Sep 17 00:00:00 2001 From: Yiling-J Date: Fri, 4 Oct 2024 21:58:38 +0800 Subject: [PATCH] use cached now to improve read performance --- internal/clock/clock.go | 15 +++++++++++++++ internal/store.go | 26 +++++++++++++++++++++++++- internal/store_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/internal/clock/clock.go b/internal/clock/clock.go index d480c50..5f890a5 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -1,17 +1,32 @@ package clock import ( + "sync/atomic" "time" ) type Clock struct { Start time.Time + now atomic.Int64 } func (c *Clock) NowNano() int64 { return time.Since(c.Start).Nanoseconds() } +func (c *Clock) NowNanoCached() int64 { + return c.now.Load() +} + +func (c *Clock) RefreshNowCache() { + c.now.Store(c.NowNano()) +} + +// used in test only +func (c *Clock) SetNowCache(n int64) { + c.now.Store(n) +} + func (c *Clock) ExpireNano(ttl time.Duration) int64 { return c.NowNano() + ttl.Nanoseconds() } diff --git a/internal/store.go b/internal/store.go index b46eab2..b48cc19 100644 --- a/internal/store.go +++ b/internal/store.go @@ -220,7 +220,26 @@ func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, b var value V if ok { expire := entry.expire.Load() - if expire != 0 && expire <= s.timerwheel.clock.NowNano() { + var expired bool + if expire != 0 { + // Cached now is refreshed every second by ticker. + // However, since tickers aren't guaranteed to be precise + // https://github.com/golang/go/issues/45632 + // relax this to 30 seconds. If the entry's expiration time is + // less than 30 seconds compared to the cached now, + // refetch the accurate now and compare again. + nowCached := s.timerwheel.clock.NowNanoCached() + if expire-nowCached <= 0 { + expired = true + } else if expire-nowCached < 30*1e9 { + now := s.timerwheel.clock.NowNano() + expired = (expire-now <= 0) + } else { + expired = false + } + } + + if expired { ok = false s.policy.misses.Add(1) } else { @@ -580,6 +599,10 @@ func (s *Store[K, V]) drainWrite() { } func (s *Store[K, V]) maintenance() { + s.mlock.Lock() + s.timerwheel.clock.RefreshNowCache() + s.mlock.Unlock() + go func() { s.mlock.Lock() s.maintenanceTicker = time.NewTicker(time.Second) @@ -592,6 +615,7 @@ func (s *Store[K, V]) maintenance() { return case <-s.maintenanceTicker.C: s.mlock.Lock() + s.timerwheel.clock.RefreshNowCache() if s.closed { s.mlock.Unlock() return diff --git a/internal/store_test.go b/internal/store_test.go index 1c47830..b5b3511 100644 --- a/internal/store_test.go +++ b/internal/store_test.go @@ -163,3 +163,42 @@ func TestStore_PolicyCounter(t *testing.T) { require.Equal(t, uint64(1600), store.policy.hits.Value()) require.Equal(t, uint64(1600), store.policy.misses.Value()) } + +func TestStore_GetExpire(t *testing.T) { + store := NewStore[int, int](1000, false, nil, nil, nil, 0, 0, nil) + defer store.Close() + + _, i := store.index(123) + fakeNow := store.timerwheel.clock.NowNano() - 100*10e9 + testNow := store.timerwheel.clock.NowNano() + entry := &Entry[int, int]{ + key: 123, + value: 123, + } + entry.expire.Store(fakeNow) + + store.shards[i].hashmap[123] = entry + store.mlock.Lock() + + // already exprired + store.timerwheel.clock.SetNowCache(fakeNow + 1) + _, ok := store.Get(123) + require.False(t, ok) + + // use cached now, not expire + store.timerwheel.clock.SetNowCache(fakeNow - 31*10e9) + v, ok := store.Get(123) + require.True(t, ok) + require.Equal(t, 123, v) + + // less than 30 seconds and not expired, use real now + store.timerwheel.clock.SetNowCache(fakeNow - 1) + _, ok = store.Get(123) + require.False(t, ok) + store.mlock.Unlock() + + // ticker refresh cached now + time.Sleep(1200 * time.Millisecond) + cachedNow := store.timerwheel.clock.NowNanoCached() + require.True(t, cachedNow > testNow) +}