diff --git a/fastcache.go b/fastcache.go index 20a3c02..40ebdb2 100644 --- a/fastcache.go +++ b/fastcache.go @@ -211,6 +211,11 @@ func (c *Cache) UpdateStats(s *Stats) { s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors) } +// Iterator returns an fastcache iterator to iterate over the entire cache. +func (c *Cache) Iterator() *Iterator { + return newIterator(c) +} + type bucket struct { mu sync.RWMutex @@ -413,3 +418,30 @@ func (b *bucket) Del(h uint64) { delete(b.m, h) b.mu.Unlock() } + +func (b *bucket) copyKeys() ([][]byte, int) { + b.mu.RLock() + + keys := make([][]byte, len(b.m)) + numKeys := 0 + + for _, v := range b.m { + idx := v & ((1 << bucketSizeBits) - 1) + gen := v >> bucketSizeBits + + if gen == b.gen && idx < b.idx || gen+1 == b.gen && idx >= b.idx { + chunkIdx := idx / chunkSize + chunk := b.chunks[chunkIdx] + + kvLenBuf := chunk[idx : idx+4] + keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) + + idx += 4 + key := chunk[idx : idx+keyLen : idx+keyLen] + keys[numKeys] = key + numKeys++ + } + } + b.mu.RUnlock() + return keys, numKeys +} diff --git a/fastcache_test.go b/fastcache_test.go index 7e579dd..8818f9e 100644 --- a/fastcache_test.go +++ b/fastcache_test.go @@ -1,6 +1,7 @@ package fastcache import ( + "bytes" "fmt" "runtime" "sync" @@ -282,3 +283,76 @@ func TestCacheResetUpdateStatsSetConcurrent(t *testing.T) { statsWG.Wait() resettersWG.Wait() } + +func TestCacheCopyKeys(t *testing.T) { + c := New(1024) + defer c.Reset() + + numEntries := 100 + for i := 0; i < numEntries; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + v := []byte(fmt.Sprintf("value %d", i)) + c.Set(k, v) + vv := c.Get(nil, k) + if string(vv) != string(v) { + t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v) + } + } + + keys := make(map[string]struct{}) + keyCount := 0 + for _, b := range c.buckets { + bucketKeys, bucketKeyCount := b.copyKeys() + keyCount += bucketKeyCount + for _, k := range bucketKeys { + keys[string(k)] = struct{}{} + } + } + + if keyCount != numEntries { + t.Fatal("returned number of currBucketKeys is not matched to the expected one", + "expected", numEntries, "actual", keyCount) + } + + for i := 0; i < numEntries; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + if _, exist := keys[string(k)]; !exist { + t.Fatal("failed to find the key from returned currBucketKeys", "key", string(k)) + } + } +} + +func TestCacheIterator(t *testing.T) { + c := New(1024) + defer c.Reset() + + numEntries := 100 + keyValMap := make(map[string][]byte) + for i := 0; i < numEntries; i++ { + k := []byte(fmt.Sprintf("key %d", i)) + v := []byte(fmt.Sprintf("value %d", i)) + c.Set(k, v) + keyValMap[string(k)] = v + vv := c.Get(nil, k) + if string(vv) != string(v) { + t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v) + } + } + + itr := c.Iterator() + for itr.SetNext() { + entry, err := itr.Value() + if err != nil { + t.Fatal("unexpected error from itr.Value()", "err", err) + } + + val, exist := keyValMap[string(entry.key)] + if !exist { + t.Fatal("failed to retrieve an entry from cache which should exist") + } + if !bytes.Equal(val, entry.value) { + t.Fatalf("value from iterator is not the same as the expected one for key %q; got %q; want %q", + entry.key, entry.value, val) + } + } +} \ No newline at end of file diff --git a/go.mod b/go.mod index 1b53092..395143f 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/VictoriaMetrics/fastcache +go 1.15 + require ( github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 github.com/cespare/xxhash/v2 v2.1.1 diff --git a/iterator.go b/iterator.go new file mode 100644 index 0000000..a268c9a --- /dev/null +++ b/iterator.go @@ -0,0 +1,125 @@ +package fastcache + +import ( + "sync" +) + +type iteratorError string + +func (e iteratorError) Error() string { + return string(e) +} + +// ErrIterationFinished is reported when Value() is called after reached to the end of the iterator +const ErrIterationFinished = iteratorError("iterator reached the last element, Value() should not be called") + +var emptyEntry = Entry{} + +// Entry represents a key-value pair in fastcache +type Entry struct { + key []byte + value []byte +} + +// Key returns entry's key +func (e Entry) Key() []byte { + return e.key +} + +// Value returns entry's value +func (e Entry) Value() []byte { + return e.value +} + +func newIterator(c *Cache) *Iterator { + elements, count := c.buckets[0].copyKeys() + + return &Iterator{ + cache: c, + currBucketIdx: 0, + currKeyIdx: -1, + currBucketKeys: elements, + currBucketSize: count, + } +} + +// Iterator allows to iterate over entries in the cache +type Iterator struct { + mu sync.Mutex + cache *Cache + currBucketSize int + currBucketIdx int + currBucketKeys [][]byte + currKeyIdx int + currentEntryInfo Entry + + valid bool +} + +// SetNext moves to the next element and returns true if the value exists. +func (it *Iterator) SetNext() bool { + it.mu.Lock() + + it.valid = false + it.currKeyIdx++ + + // In case there are remaining currBucketKeys in the current bucket. + if it.currBucketSize > it.currKeyIdx { + it.valid = true + found := it.setCurrentEntry() + it.mu.Unlock() + + // if not found, check the next entry + if !found { + return it.SetNext() + } + return true + } + + // If we reached the end of a bucket, check the next one for further iteration. + for i := it.currBucketIdx + 1; i < len(it.cache.buckets); i++ { + it.currBucketKeys, it.currBucketSize = it.cache.buckets[i].copyKeys() + + // bucket is not an empty one, use it for iteration + if it.currBucketSize > 0 { + it.currKeyIdx = 0 + it.currBucketIdx = i + it.valid = true + found := it.setCurrentEntry() + it.mu.Unlock() + + // if not found, check the next entry + if !found { + return it.SetNext() + } + return true + } + } + it.mu.Unlock() + return false +} + +func (it *Iterator) setCurrentEntry() bool { + key := it.currBucketKeys[it.currKeyIdx] + val, found := it.cache.HasGet(nil, key) + + if found { + it.currentEntryInfo = Entry{ + key: key, + value: val, + } + } else { + it.currentEntryInfo = emptyEntry + } + + return found +} + +// Value returns the current entry of an iterator. +func (it *Iterator) Value() (Entry, error) { + if !it.valid { + return emptyEntry, ErrIterationFinished + } + + return it.currentEntryInfo, nil +}