Skip to content

Commit

Permalink
fix: flatten cache concurrency and ll
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Feb 12, 2025
1 parent 14b1b58 commit 4fe09da
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 53 deletions.
11 changes: 3 additions & 8 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rueidis
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/redis/rueidis/internal/cache"
Expand Down Expand Up @@ -202,13 +201,9 @@ func NewFlattenCache(limit int) CacheStore {
type flatten struct {
flights *cache.DoubleMap[*adapterEntry]
cache *cache.LRUDoubleMap[[]byte]
close int32
}

func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
if atomic.LoadInt32(&f.close) == 1 {
return RedisMessage{}, nil
}
ts := now.UnixMilli()
if e, ok := f.cache.Find(key, cmd, ts); ok {
var ret RedisMessage
Expand Down Expand Up @@ -248,7 +243,7 @@ func (f *flatten) Cancel(key, cmd string, err error) {

func (f *flatten) Delete(keys []RedisMessage) {
if keys == nil {
f.cache.DeleteAll()
f.cache.Reset()
} else {
for _, k := range keys {
f.cache.Delete(k.string)
Expand All @@ -257,8 +252,8 @@ func (f *flatten) Delete(keys []RedisMessage) {
}

func (f *flatten) Close(err error) {
atomic.StoreInt32(&f.close, 1)
f.flights.Iterate(func(entry *adapterEntry) {
f.cache.DeleteAll()
f.flights.Close(func(entry *adapterEntry) {
entry.setErr(err)
})
}
16 changes: 11 additions & 5 deletions internal/cache/double.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok b
m.mu.RUnlock()
return
}
if m.ma == nil {
m.mu.RUnlock()
return
}
m.mu.RUnlock()
m.mu.Lock()
h := m.ma[key1]
Expand All @@ -49,6 +53,9 @@ func (m *DoubleMap[V]) FindOrInsert(key1, key2 string, fn func() V) (val V, ok b
m.mu.Unlock()
return
}
} else if m.ma == nil {
m.mu.Unlock()
return
} else {
h = &head[V]{}
m.ma[key1] = h
Expand Down Expand Up @@ -93,19 +100,18 @@ func (m *DoubleMap[V]) delete(keys []string) {
m.mu.Unlock()
}

func (m *DoubleMap[V]) Iterate(cb func(V)) {
m.mu.RLock()
func (m *DoubleMap[V]) Close(cb func(V)) {
m.mu.Lock()
for _, h := range m.ma {
h.mu.RLock()
if h.node.key != "" {
cb(h.node.val)
}
for curr := h.node.next; curr != nil; curr = curr.next {
cb(curr.val)
}
h.mu.RUnlock()
}
m.mu.RUnlock()
m.ma = nil
m.mu.Unlock()
}

type empties struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/double_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDoubleMap(t *testing.T) {
t.Fatalf("should insert 1 but not found")
}
c := 0
m.Iterate(func(i int) {
m.Close(func(i int) {
if i != 2 {
t.Fatalf("should iterate 2")
}
Expand Down
98 changes: 60 additions & 38 deletions internal/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,43 +65,72 @@ func (m *LRUDoubleMap[V]) Find(key1, key2 string, ts int64) (val V, ok bool) {
return
}

func (m *LRUDoubleMap[V]) remove(h *linked[V]) {
h.mark -= 1
next := h.next
prev := h.prev
h.next = nil
h.prev = nil
if next != nil {
(*linked[V])(next).prev = prev
}
if prev != nil {
(*linked[V])(prev).next = next
}
if m.head == unsafe.Pointer(h) {
m.head = next
}
if m.tail == unsafe.Pointer(h) {
m.tail = prev
}
m.total -= h.size
delete(m.ma, h.key)
}

func (m *LRUDoubleMap[V]) move(h *linked[V]) {
prev := h.prev
next := h.next
if prev != nil {
(*linked[V])(prev).next = next
}
if next != nil {
(*linked[V])(next).prev = prev
}
h.next = nil
if m.tail != nil && m.tail != unsafe.Pointer(h) {
h.prev = m.tail
(*linked[V])(m.tail).next = unsafe.Pointer(h)
}
m.tail = unsafe.Pointer(h)
if m.head == unsafe.Pointer(h) && next != nil {
m.head = next
}
}

func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) {
// TODO: a RLock fast path?
m.mu.Lock()
if m.ma == nil {
m.mu.Unlock()
return
}
m.total += size
for m.head != nil {
h := (*linked[V])(m.head)
if m.total <= m.limit && h.ts != 0 { // TODO: clear expired entries?
break
}
m.total -= h.size
delete(m.ma, h.key)
h.mark -= 1
m.head = h.next
if m.head != nil {
(*linked[V])(m.head).prev = nil
} else {
m.tail = nil
break
}
m.remove(h)
}

h := m.ma[key1]
if h == nil {
h = &linked[V]{key: key1, ts: ts, mark: m.mark}
m.ma[key1] = h
} else if h.ts <= ts {
m.total -= h.size
h.size = 0
}
h.ts = ts
h.size += size
h.next = nil
if m.tail != nil && m.tail != unsafe.Pointer(h) {
h.prev = m.tail
(*linked[V])(m.tail).next = unsafe.Pointer(h)
}
m.tail = unsafe.Pointer(h)
m.move(h)
if m.head == nil {
m.head = unsafe.Pointer(h)
}
Expand All @@ -118,6 +147,16 @@ func (m *LRUDoubleMap[V]) Delete(key1 string) {
}

func (m *LRUDoubleMap[V]) DeleteAll() {
m.mu.Lock()
m.ma = nil
m.head = nil
m.tail = nil
m.total = 0
m.mark++
m.mu.Unlock()
}

func (m *LRUDoubleMap[V]) Reset() {
m.mu.Lock()
m.ma = make(map[string]*linked[V], len(m.ma))
m.head = nil
Expand All @@ -131,25 +170,8 @@ func (m *LRUDoubleMap[V]) moveToTail(b map[*linked[V]]struct{}) {
m.mu.Lock()
defer m.mu.Unlock()
for h := range b {
if h.mark != m.mark {
continue
}
prev := h.prev
next := h.next
if prev != nil {
(*linked[V])(prev).next = next
}
if next != nil {
(*linked[V])(next).prev = prev
}
h.next = nil
if m.tail != nil && m.tail != unsafe.Pointer(h) {
h.prev = m.tail
(*linked[V])(m.tail).next = unsafe.Pointer(h)
}
m.tail = unsafe.Pointer(h)
if m.head == unsafe.Pointer(h) && next != nil {
m.head = next
if h.mark == m.mark {
m.move(h)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestLRUCache_LRU_GC_2(t *testing.T) {
if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 {
t.Fatal("not find")
}
m.DeleteAll()
m.Reset()
runtime.GC()
runtime.GC()
m.Insert("a", "a", bpsize-1, 2, 0)
Expand Down

0 comments on commit 4fe09da

Please sign in to comment.