Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Iterator for fastcache #44

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions fastcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (c *Cache) UpdateStats(s *Stats) {
s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors)
}

// Iterator returns an fastcache iterator to iterate over the entire cache.
func (c *Cache) Iterator() *Iterator {
return newIterator(c)
}

type bucket struct {
mu sync.RWMutex

Expand Down Expand Up @@ -413,3 +418,30 @@ func (b *bucket) Del(h uint64) {
delete(b.m, h)
b.mu.Unlock()
}

func (b *bucket) copyKeys() ([][]byte, int) {
b.mu.RLock()

keys := make([][]byte, len(b.m))
numKeys := 0

for _, v := range b.m {
idx := v & ((1 << bucketSizeBits) - 1)
gen := v >> bucketSizeBits

if gen == b.gen && idx < b.idx || gen+1 == b.gen && idx >= b.idx {
chunkIdx := idx / chunkSize
chunk := b.chunks[chunkIdx]

kvLenBuf := chunk[idx : idx+4]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cause the array to go out of bounds and generate Panic

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this line of code may solve the problem, can you help me take a look? idx = idx % chunkSize

keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1])

idx += 4
key := chunk[idx : idx+keyLen : idx+keyLen]
keys[numKeys] = key
numKeys++
}
}
b.mu.RUnlock()
return keys, numKeys
}
74 changes: 74 additions & 0 deletions fastcache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fastcache

import (
"bytes"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -282,3 +283,76 @@ func TestCacheResetUpdateStatsSetConcurrent(t *testing.T) {
statsWG.Wait()
resettersWG.Wait()
}

func TestCacheCopyKeys(t *testing.T) {
c := New(1024)
defer c.Reset()

numEntries := 100
for i := 0; i < numEntries; i++ {
k := []byte(fmt.Sprintf("key %d", i))
v := []byte(fmt.Sprintf("value %d", i))
c.Set(k, v)
vv := c.Get(nil, k)
if string(vv) != string(v) {
t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v)
}
}

keys := make(map[string]struct{})
keyCount := 0
for _, b := range c.buckets {
bucketKeys, bucketKeyCount := b.copyKeys()
keyCount += bucketKeyCount
for _, k := range bucketKeys {
keys[string(k)] = struct{}{}
}
}

if keyCount != numEntries {
t.Fatal("returned number of currBucketKeys is not matched to the expected one",
"expected", numEntries, "actual", keyCount)
}

for i := 0; i < numEntries; i++ {
k := []byte(fmt.Sprintf("key %d", i))
if _, exist := keys[string(k)]; !exist {
t.Fatal("failed to find the key from returned currBucketKeys", "key", string(k))
}
}
}

func TestCacheIterator(t *testing.T) {
c := New(1024)
defer c.Reset()

numEntries := 100
keyValMap := make(map[string][]byte)
for i := 0; i < numEntries; i++ {
k := []byte(fmt.Sprintf("key %d", i))
v := []byte(fmt.Sprintf("value %d", i))
c.Set(k, v)
keyValMap[string(k)] = v
vv := c.Get(nil, k)
if string(vv) != string(v) {
t.Fatalf("unexpected value for key %q; got %q; want %q", k, vv, v)
}
}

itr := c.Iterator()
for itr.SetNext() {
entry, err := itr.Value()
if err != nil {
t.Fatal("unexpected error from itr.Value()", "err", err)
}

val, exist := keyValMap[string(entry.key)]
if !exist {
t.Fatal("failed to retrieve an entry from cache which should exist")
}
if !bytes.Equal(val, entry.value) {
t.Fatalf("value from iterator is not the same as the expected one for key %q; got %q; want %q",
entry.key, entry.value, val)
}
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/VictoriaMetrics/fastcache

go 1.15

require (
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156
github.com/cespare/xxhash/v2 v2.1.1
Expand Down
125 changes: 125 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package fastcache

import (
"sync"
)

type iteratorError string

func (e iteratorError) Error() string {
return string(e)
}

// ErrIterationFinished is reported when Value() is called after reached to the end of the iterator
const ErrIterationFinished = iteratorError("iterator reached the last element, Value() should not be called")

var emptyEntry = Entry{}

// Entry represents a key-value pair in fastcache
type Entry struct {
key []byte
value []byte
}

// Key returns entry's key
func (e Entry) Key() []byte {
return e.key
}

// Value returns entry's value
func (e Entry) Value() []byte {
return e.value
}

func newIterator(c *Cache) *Iterator {
elements, count := c.buckets[0].copyKeys()

return &Iterator{
cache: c,
currBucketIdx: 0,
currKeyIdx: -1,
currBucketKeys: elements,
currBucketSize: count,
}
}

// Iterator allows to iterate over entries in the cache
type Iterator struct {
mu sync.Mutex
cache *Cache
currBucketSize int
currBucketIdx int
currBucketKeys [][]byte
currKeyIdx int
currentEntryInfo Entry

valid bool
}

// SetNext moves to the next element and returns true if the value exists.
func (it *Iterator) SetNext() bool {
it.mu.Lock()

it.valid = false
it.currKeyIdx++

// In case there are remaining currBucketKeys in the current bucket.
if it.currBucketSize > it.currKeyIdx {
it.valid = true
found := it.setCurrentEntry()
it.mu.Unlock()

// if not found, check the next entry
if !found {
return it.SetNext()
}
return true
}

// If we reached the end of a bucket, check the next one for further iteration.
for i := it.currBucketIdx + 1; i < len(it.cache.buckets); i++ {
it.currBucketKeys, it.currBucketSize = it.cache.buckets[i].copyKeys()

// bucket is not an empty one, use it for iteration
if it.currBucketSize > 0 {
it.currKeyIdx = 0
it.currBucketIdx = i
it.valid = true
found := it.setCurrentEntry()
it.mu.Unlock()

// if not found, check the next entry
if !found {
return it.SetNext()
}
return true
}
}
it.mu.Unlock()
return false
}

func (it *Iterator) setCurrentEntry() bool {
key := it.currBucketKeys[it.currKeyIdx]
val, found := it.cache.HasGet(nil, key)

if found {
it.currentEntryInfo = Entry{
key: key,
value: val,
}
} else {
it.currentEntryInfo = emptyEntry
}

return found
}

// Value returns the current entry of an iterator.
func (it *Iterator) Value() (Entry, error) {
if !it.valid {
return emptyEntry, ErrIterationFinished
}

return it.currentEntryInfo, nil
}