Skip to content

Commit

Permalink
use cached now to improve read performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Yiling-J committed Oct 4, 2024
1 parent 2ec0835 commit 9eaa74b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
15 changes: 15 additions & 0 deletions internal/clock/clock.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
package clock

import (
"sync/atomic"
"time"
)

type Clock struct {
Start time.Time
now atomic.Int64
}

func (c *Clock) NowNano() int64 {
return time.Since(c.Start).Nanoseconds()
}

func (c *Clock) NowNanoCached() int64 {
return c.now.Load()
}

func (c *Clock) RefreshNowCache() {
c.now.Store(c.NowNano())
}

// used in test only
func (c *Clock) SetNowCache(n int64) {
c.now.Store(n)
}

func (c *Clock) ExpireNano(ttl time.Duration) int64 {
return c.NowNano() + ttl.Nanoseconds()
}
Expand Down
26 changes: 25 additions & 1 deletion internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,26 @@ func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, b
var value V
if ok {
expire := entry.expire.Load()
if expire != 0 && expire <= s.timerwheel.clock.NowNano() {
var expired bool
if expire != 0 {
// Cached now is refreshed every second by ticker.
// However, since tickers aren't guaranteed to be precise
// https://github.com/golang/go/issues/45632
// relax this to 30 seconds. If the entry's expiration time is
// less than 30 seconds compared to the cached now,
// refetch the accurate now and compare again.
nowCached := s.timerwheel.clock.NowNanoCached()
if expire-nowCached <= 0 {
expired = true
} else if expire-nowCached < 30*1e9 {
now := s.timerwheel.clock.NowNano()
expired = (expire-now <= 0)
} else {
expired = false
}
}

if expired {
ok = false
s.policy.misses.Add(1)
} else {
Expand Down Expand Up @@ -580,6 +599,10 @@ func (s *Store[K, V]) drainWrite() {
}

func (s *Store[K, V]) maintenance() {
s.mlock.Lock()
s.timerwheel.clock.RefreshNowCache()
s.mlock.Unlock()

go func() {
s.mlock.Lock()
s.maintenanceTicker = time.NewTicker(time.Second)
Expand All @@ -592,6 +615,7 @@ func (s *Store[K, V]) maintenance() {
return
case <-s.maintenanceTicker.C:
s.mlock.Lock()
s.timerwheel.clock.RefreshNowCache()
if s.closed {
s.mlock.Unlock()
return
Expand Down
39 changes: 39 additions & 0 deletions internal/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,42 @@ func TestStore_PolicyCounter(t *testing.T) {
require.Equal(t, uint64(1600), store.policy.hits.Value())
require.Equal(t, uint64(1600), store.policy.misses.Value())
}

func TestStore_GetExpire(t *testing.T) {
store := NewStore[int, int](1000, false, nil, nil, nil, 0, 0, nil)
defer store.Close()

_, i := store.index(123)
fakeNow := store.timerwheel.clock.NowNano() - 100*10e9
testNow := store.timerwheel.clock.NowNano()
entry := &Entry[int, int]{
key: 123,
value: 123,
}
entry.expire.Store(fakeNow)

store.shards[i].hashmap[123] = entry
store.mlock.Lock()

// already exprired
store.timerwheel.clock.SetNowCache(fakeNow + 1)
_, ok := store.Get(123)
require.False(t, ok)

// use cached now, not expire
store.timerwheel.clock.SetNowCache(fakeNow - 31*10e9)
v, ok := store.Get(123)
require.True(t, ok)
require.Equal(t, 123, v)

// less than 30 seconds and not expired, use real now
store.timerwheel.clock.SetNowCache(fakeNow - 1)
_, ok = store.Get(123)
require.False(t, ok)
store.mlock.Unlock()

// ticker refresh cached now
time.Sleep(1200 * time.Millisecond)
cachedNow := store.timerwheel.clock.NowNanoCached()
require.True(t, cachedNow > testNow)
}

0 comments on commit 9eaa74b

Please sign in to comment.