Skip to content

Commit

Permalink
Revert "do not cache large blob history event (#4621)"
Browse files Browse the repository at this point in the history
This reverts commit f97b681.
  • Loading branch information
dnr committed Jul 28, 2023
1 parent 5dbbc45 commit 6c48ed2
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 242 deletions.
3 changes: 1 addition & 2 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type Cache interface {
// Iterator returns the iterator of the cache
Iterator() Iterator

// Size returns current size of the Cache, the size definition is implementation of SizeGetter interface
// for the entry size, if the entry does not implement SizeGetter interface, the size is 1
// Size returns the number of entries currently stored in the Cache
Size() int
}

Expand Down
47 changes: 13 additions & 34 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (

var (
// ErrCacheFull is returned if Put fails due to cache being filled with pinned elements
ErrCacheFull = errors.New("cache capacity is fully occupied with pinned elements")
// ErrCacheItemTooLarge is returned if Put fails due to item size being larger than max cache capacity
ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity")
ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements")
)

// lru is a concurrent fixed size cache that evicts elements in lru order
Expand All @@ -45,7 +43,6 @@ type (
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
currSize int
ttl time.Duration
pin bool
}
Expand All @@ -61,7 +58,6 @@ type (
createTime time.Time
value interface{}
refCount int
size int
}
)

Expand All @@ -87,7 +83,6 @@ func (it *iteratorImpl) Next() Entry {
entry = &entryImpl{
key: entry.key,
value: entry.value,
size: entry.size,
createTime: entry.createTime,
}
it.prepareNext()
Expand Down Expand Up @@ -129,10 +124,6 @@ func (entry *entryImpl) Value() interface{} {
return entry.value
}

func (entry *entryImpl) Size() int {
return entry.size
}

func (entry *entryImpl) CreateTime() time.Time {
return entry.createTime
}
Expand All @@ -148,7 +139,6 @@ func New(maxSize int, opts *Options) Cache {
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
currSize: 0,
pin: opts.Pin,
}
}
Expand Down Expand Up @@ -249,15 +239,12 @@ func (c *lru) Release(key interface{}) {
entry.refCount--
}

// Size returns the current size of the lru, useful if cache is not full. This size is calculated by summing
// the size of all entries in the cache. And the entry size is calculated by the size of the value.
// The size of the value is calculated implementing the Sizeable interface. If the value does not implement
// the Sizeable interface, the size is 1.
// Size returns the number of entries currently in the lru, useful if cache is not full
func (c *lru) Size() int {
c.mut.Lock()
defer c.mut.Unlock()

return c.currSize
return len(c.byKey)
}

// Put puts a new value associated with a given key, returning the existing value (if present)
Expand All @@ -266,22 +253,9 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
}

elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
Expand All @@ -308,7 +282,6 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry := &entryImpl{
key: key,
value: value,
size: entrySize,
}

if c.pin {
Expand All @@ -319,24 +292,30 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
entry.createTime = time.Now().UTC()
}

if len(c.byKey) >= c.maxSize {
c.evictOnceInternal()
}
if len(c.byKey) >= c.maxSize {
return nil, ErrCacheFull
}

element := c.byAccess.PushFront(entry)
c.byKey[key] = element
return nil, nil
}

func (c *lru) deleteInternal(element *list.Element) {
entry := c.byAccess.Remove(element).(*entryImpl)
c.currSize -= entry.Size()
delete(c.byKey, entry.key)
}

// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
func (c *lru) evictOnceInternal() {
element := c.byAccess.Back()
for c.currSize > c.maxSize && element != nil {
for element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
return
}

// entry.refCount > 0
Expand Down
65 changes: 3 additions & 62 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,16 @@
package cache

import (
"math/rand"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)

type (
keyType struct {
dummyString string
dummyInt int
}

testEntryWithCacheSize struct {
cacheSize int
}
)

func (c *testEntryWithCacheSize) CacheSize() int {
return c.cacheSize
type keyType struct {
dummyString string
dummyInt int
}

func TestLRU(t *testing.T) {
Expand Down Expand Up @@ -314,50 +302,3 @@ func TestZeroSizeCache(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 0, cache.Size())
}

func TestCache_ItemSizeTooLarge(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

res := cache.Put(uuid.New(), &testEntryWithCacheSize{maxTotalBytes})
assert.Equal(t, res, nil)

res, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{maxTotalBytes + 1})
assert.Equal(t, err, ErrCacheItemTooLarge)
assert.Equal(t, res, nil)

}

func TestCache_ItemHasCacheSizeDefined(t *testing.T) {
t.Parallel()

maxTotalBytes := 10
cache := NewLRU(maxTotalBytes)

numPuts := rand.Intn(1024)

startWG := sync.WaitGroup{}
endWG := sync.WaitGroup{}

startWG.Add(numPuts)
endWG.Add(numPuts)

go func() {
startWG.Wait()
assert.True(t, cache.Size() < maxTotalBytes)
}()
for i := 0; i < numPuts; i++ {
go func() {
defer endWG.Done()

startWG.Wait()
key := uuid.New()
cache.Put(key, &testEntryWithCacheSize{rand.Int()})
}()
startWG.Done()
}

endWG.Wait()
}
42 changes: 0 additions & 42 deletions common/cache/size_getter.go

This file was deleted.

72 changes: 0 additions & 72 deletions common/cache/size_getter_mock.go

This file was deleted.

7 changes: 3 additions & 4 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,9 @@ func NewConfig(
HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour),
HistoryCacheNonUserContextLockTimeout: dc.GetDurationProperty(dynamicconfig.HistoryCacheNonUserContextLockTimeout, 500*time.Millisecond),

EventsCacheInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 128*1024), // 128KB
EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 512*1024), // 512KB
EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour),

EventsCacheInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 128),
EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 512),
EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour),
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10),
Expand Down
Loading

0 comments on commit 6c48ed2

Please sign in to comment.