Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create configurable eviction channel #695

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Return error and reject requests
AlessandroPatti committed Sep 7, 2023
commit 6b59d310cd60e8ff607774c3a3c8dc3665a32eaa
6 changes: 2 additions & 4 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
@@ -393,10 +393,8 @@ func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSiz
random: random,
}

if !c.lru.Add(key, newItem) {
err = fmt.Errorf("INTERNAL ERROR: failed to add: %s, size %d (on disk: %d)",
key, logicalSize, sizeOnDisk)
log.Println(err.Error())
if err := c.lru.Add(key, newItem); err != nil {
log.Println(err)
return unreserve, removeTempfile, err
}

4 changes: 2 additions & 2 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
@@ -455,9 +455,9 @@ func TestCacheExistingFiles(t *testing.T) {

evicted := []Key{}
origOnEvict := testCache.lru.onEvict
testCache.lru.onEvict = func(key Key, value lruItem) {
testCache.lru.onEvict = func(key Key, value lruItem) error {
evicted = append(evicted, key.(string))
origOnEvict(key, value)
return origOnEvict(key, value)
}

if testCache.lru.Len() != 4 {
15 changes: 7 additions & 8 deletions cache/disk/load.go
Original file line number Diff line number Diff line change
@@ -599,14 +599,15 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
onEvict := func(key Key, value lruItem) error {
select {
case evictionQueue <- EvictionTask{Key: key, lruItem: value}:
c.evictionGauge.Inc()
c.evictionBytesGauge.Add(float64(value.sizeOnDisk))
return nil
default:
c.evictionCounter.WithLabelValues("full").Inc()
log.Printf("Too many enqueued evictions, could not evict %s", key)
return fmt.Errorf("Too many enqueued evictions, could not evict %s", key)
}
}

@@ -615,12 +616,10 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item))

for i := 0; i < len(result.item); i++ {
ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if !ok {
err = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
if err != nil {
return err
}
err := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if err != nil {
_ = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
return err
}
}

43 changes: 30 additions & 13 deletions cache/disk/lru.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ import (
type Key interface{}

// EvictCallback is the type of callbacks that are invoked when items are evicted.
type EvictCallback func(key Key, value lruItem)
type EvictCallback func(key Key, value lruItem) error

// SizedLRU is an LRU cache that will keep its total size below maxSize by evicting
// items.
@@ -101,25 +101,27 @@ func (c *SizedLRU) RegisterMetrics() {
// Note that this function rounds file sizes up to the nearest
// BlockSize (4096) bytes, as an estimate of actual disk usage since
// most linux filesystems default to 4kb blocks.
func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
func (c *SizedLRU) Add(key Key, value lruItem) error {

roundedUpSizeOnDisk := roundUp4k(value.sizeOnDisk)

if roundedUpSizeOnDisk > c.maxSize {
return false
return fmt.Errorf("Unable to reserve space for blob (size: %d) larger than cache size %d", roundedUpSizeOnDisk, c.maxSize)
}

var sizeDelta, uncompressedSizeDelta int64
if ee, ok := c.cache[key]; ok {
sizeDelta = roundedUpSizeOnDisk - roundUp4k(ee.Value.(*entry).value.sizeOnDisk)
if c.reservedSize+sizeDelta > c.maxSize {
return false
return fmt.Errorf("INTERNAL ERROR: not enough space for blob with size %d (undersized cache?)", value.size)
}
uncompressedSizeDelta = roundUp4k(value.size) - roundUp4k(ee.Value.(*entry).value.size)

prevValue := ee.Value.(*entry).value
if c.onEvict != nil {
c.onEvict(key, prevValue)
if err := c.onEvict(key, prevValue); err != nil {
return err
}
}

c.ll.MoveToFront(ee)
@@ -128,7 +130,7 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
} else {
sizeDelta = roundedUpSizeOnDisk
if c.reservedSize+sizeDelta > c.maxSize {
return false
return fmt.Errorf("INTERNAL ERROR: unable to reclaim enough space for blob with size %d (undersized cache?)", value.size)
}
uncompressedSizeDelta = roundUp4k(value.size)
ele := c.ll.PushFront(&entry{key, value})
@@ -140,7 +142,11 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
for c.currentSize+sizeDelta > c.maxSize {
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
delete(c.cache, key)
return err
}
}
}

@@ -150,7 +156,7 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))

return true
return nil
}

// Get looks up a key in the cache
@@ -164,12 +170,16 @@ func (c *SizedLRU) Get(key Key) (value lruItem, ok bool) {
}

// Remove removes a (key, value) from the cache
func (c *SizedLRU) Remove(key Key) {
func (c *SizedLRU) Remove(key Key) error {
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
return err
}
c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))
}
return nil
}

// Len returns the number of items in the cache
@@ -234,7 +244,10 @@ func (c *SizedLRU) Reserve(size int64) (bool, error) {
for sumLargerThan(size, c.currentSize, c.maxSize) {
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
return false, err
}
} else {
return false, errReservation // This should have been caught at the start.
}
@@ -267,7 +280,7 @@ func (c *SizedLRU) Unreserve(size int64) error {
return nil
}

func (c *SizedLRU) removeElement(e *list.Element) {
func (c *SizedLRU) removeElement(e *list.Element) error {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
@@ -276,8 +289,12 @@ func (c *SizedLRU) removeElement(e *list.Element) {
c.counterEvictedBytes.Add(float64(kv.value.sizeOnDisk))

if c.onEvict != nil {
c.onEvict(kv.key, kv.value)
err := c.onEvict(kv.key, kv.value)
if err != nil {
return err
}
}
return nil
}

// Round n up to the nearest multiple of BlockSize (4096).
39 changes: 23 additions & 16 deletions cache/disk/lru_test.go
Original file line number Diff line number Diff line change
@@ -37,9 +37,9 @@ func TestBasics(t *testing.T) {
// Add an item
aKey := "akey"
anItem := lruItem{size: 5, sizeOnDisk: 5}
ok = lru.Add(aKey, anItem)
if !ok {
t.Fatalf("Add: failed inserting item")
err := lru.Add(aKey, anItem)
if err != nil {
t.Fatalf("Add: failed inserting item: %s", err)
}

getItem, getOk := lru.Get(aKey)
@@ -53,15 +53,19 @@ func TestBasics(t *testing.T) {
checkSizeAndNumItems(t, lru, BlockSize, 1)

// Remove the item
lru.Remove(aKey)
err = lru.Remove(aKey)
if err != nil {
t.Fatal(err)
}
checkSizeAndNumItems(t, lru, 0, 0)
}

func TestEviction(t *testing.T) {
// Keep track of evictions using the callback
var evictions []int
onEvict := func(key Key, value lruItem) {
onEvict := func(key Key, value lruItem) error {
evictions = append(evictions, key.(int))
return nil
}

lru := NewSizedLRU(10*BlockSize, onEvict, 0)
@@ -85,9 +89,9 @@ func TestEviction(t *testing.T) {

for i, thisExpected := range expectedSizesNumItems {
item := lruItem{size: int64(i) * BlockSize, sizeOnDisk: int64(i) * BlockSize}
ok := lru.Add(i, item)
if !ok {
t.Fatalf("Add: failed adding %d", i)
err := lru.Add(i, item)
if err != nil {
t.Fatalf("Add: failed adding %d: %s", i, err)
}

checkSizeAndNumItems(t, lru, thisExpected.expBlocks*BlockSize, thisExpected.expNumItems)
@@ -103,8 +107,8 @@ func TestRejectBigItem(t *testing.T) {
// Bounded caches should reject big items
lru := NewSizedLRU(10, nil, 0)

ok := lru.Add("hello", lruItem{size: 11, sizeOnDisk: 11})
if ok {
err := lru.Add("hello", lruItem{size: 11, sizeOnDisk: 11})
if err == nil {
t.Fatalf("Add succeeded, expected it to fail")
}

@@ -115,7 +119,10 @@ func TestReserveZeroAlwaysPossible(t *testing.T) {
largeItem := lruItem{size: math.MaxInt64, sizeOnDisk: math.MaxInt64}

lru := NewSizedLRU(math.MaxInt64, nil, 0)
lru.Add("foo", largeItem)
err := lru.Add("foo", largeItem)
if err != nil {
t.Fatal(err)
}
ok, err := lru.Reserve(0)
if err != nil {
t.Fatal(err)
@@ -246,8 +253,8 @@ func TestAddWithSpaceReserved(t *testing.T) {
t.Fatalf("Expected to be able to reserve 1")
}

ok = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if ok {
err = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if err == nil {
t.Fatal("Expected to not be able to add item with size 2")
}

@@ -256,8 +263,8 @@ func TestAddWithSpaceReserved(t *testing.T) {
t.Fatal("Expected to be able to unreserve 1:", err)
}

ok = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if !ok {
t.Fatal("Expected to be able to add item with size 2")
err = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if err != nil {
t.Fatalf("Expected to be able to add item with size 2: %s", err)
}
}