Skip to content

Commit

Permalink
perf: add cache to indexstore
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci committed Jul 10, 2023
1 parent b4270fb commit cb67986
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 29 deletions.
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const (
optionNameUsePostageSnapshot = "use-postage-snapshot"
optionNameStorageIncentivesEnable = "storage-incentives-enable"
optionNameStateStoreCacheCapacity = "statestore-cache-capacity"
optionNameIndexStoreCacheCapacity = "indexstore-cache-capacity"
)

// nolint:gochecknoinits
Expand Down Expand Up @@ -298,6 +299,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Bool(optionNameUsePostageSnapshot, false, "bootstrap node using postage snapshot from the network")
cmd.Flags().Bool(optionNameStorageIncentivesEnable, true, "enable storage incentives feature")
cmd.Flags().Uint64(optionNameStateStoreCacheCapacity, 100_000, "lru memory caching capacity in number of statestore entries")
cmd.Flags().Uint64(optionNameIndexStoreCacheCapacity, 100_000, "lru memory caching capacity in number of indexstore entries")
}

func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
UsePostageSnapshot: c.config.GetBool(optionNameUsePostageSnapshot),
EnableStorageIncentives: c.config.GetBool(optionNameStorageIncentivesEnable),
StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity),
IndexstoreCacheCapacity: c.config.GetUint64(optionNameIndexStoreCacheCapacity),
})

return b, err
Expand Down
10 changes: 6 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type Options struct {
UsePostageSnapshot bool
EnableStorageIncentives bool
StatestoreCacheCapacity uint64
IndexstoreCacheCapacity uint64
}

const (
Expand Down Expand Up @@ -759,16 +760,17 @@ func NewBee(
}

lo := &storer.Options{
Address: swarmAddress,
CacheCapacity: o.CacheCapacity,
LdbOpenFilesLimit: o.DBOpenFilesLimit,
LdbBlockCacheCapacity: o.DBBlockCacheCapacity,
LdbWriteBufferSize: o.DBWriteBufferSize,
LdbDisableSeeksCompaction: o.DBDisableSeeksCompaction,
ItemCacheCapacity: o.IndexstoreCacheCapacity,
CacheCapacity: o.CacheCapacity,
Address: swarmAddress,
WarmupDuration: o.WarmupTime,
RadiusSetter: kad,
Batchstore: batchStore,
StateStore: stateStore,
RadiusSetter: kad,
WarmupDuration: o.WarmupTime,
Logger: logger,
}

Expand Down
85 changes: 71 additions & 14 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,29 @@
package cache

import (
"context"
"errors"
"sync"

"github.com/ethersphere/bee/pkg/storage"
"github.com/hashicorp/golang-lru/v2"
)

var _ storage.Store = (*Cache)(nil)
var _ storage.BatchedStore = (*Cache)(nil)

// Cache is a wrapper around a storage.Store that adds a layer
// of in-memory caching for the Get and Has operations.
// Cache is a wrapper around a storage.BatchedStore that adds
// a layer of in-memory caching for the Get and Has operations.
type Cache struct {
storage.Store
storage.BatchedStore

lru *lru.Cache[string, []byte]
metrics metrics
}

// Wrap adds a layer of in-memory caching to basic Store operations.
// Wrap adds a layer of in-memory caching to basic BatchedStore operations.
// This call will panic if the capacity is less than or equal to zero.
// It will also panic if the given store implements storage.Tx.
func Wrap(store storage.Store, capacity int) *Cache {
func Wrap(store storage.BatchedStore, capacity int) *Cache {
if _, ok := store.(storage.Tx); ok {
panic(errors.New("cache should not be used with transactions"))
}
Expand All @@ -46,7 +49,7 @@ func (c *Cache) add(i storage.Item) {
c.lru.Add(i.ID(), b)
}

// Get implements storage.Store interface.
// Get implements storage.BatchedStore interface.
// On a call it tries to first retrieve the item from cache.
// If the item does not exist in cache, it tries to retrieve
// it from the underlying store.
Expand All @@ -56,7 +59,7 @@ func (c *Cache) Get(i storage.Item) error {
return i.Unmarshal(val)
}

if err := c.Store.Get(i); err != nil {
if err := c.BatchedStore.Get(i); err != nil {
return err
}

Expand All @@ -66,7 +69,7 @@ func (c *Cache) Get(i storage.Item) error {
return nil
}

// Has implements storage.Store interface.
// Has implements storage.BatchedStore interface.
// On a call it tries to first retrieve the item from cache.
// If the item does not exist in cache, it tries to retrieve
// it from the underlying store.
Expand All @@ -77,20 +80,74 @@ func (c *Cache) Has(k storage.Key) (bool, error) {
}

c.metrics.CacheMiss.Inc()
return c.Store.Has(k)
return c.BatchedStore.Has(k)
}

// Put implements storage.Store interface.
// Put implements storage.BatchedStore interface.
// On a call it also inserts the item into the cache so that the next
// call to Put and Has will be able to retrieve the item from cache.
func (c *Cache) Put(i storage.Item) error {
c.add(i)
return c.Store.Put(i)
return c.BatchedStore.Put(i)
}

// Delete implements storage.Store interface.
// Delete implements storage.BatchedStore interface.
// On a call it also removes the item from the cache.
func (c *Cache) Delete(i storage.Item) error {
_ = c.lru.Remove(i.ID())
return c.Store.Delete(i)
return c.BatchedStore.Delete(i)
}

// Batch implements storage.BatchedStore interface.
func (c *Cache) Batch(ctx context.Context) (storage.Batch, error) {
batch, err := c.BatchedStore.Batch(ctx)
if err != nil {
return nil, err
}
return &batchOpsTracker{
batch,
new(sync.Map),
func(key string) { c.lru.Remove(key) },
}, nil
}

// batchOpsTracker is a wrapper around storage.Batch that also
// keeps track of the keys that are added or deleted.
// When the batchOpsTracker is committed, we need to invalidate
// the cache for those keys added or deleted.
type batchOpsTracker struct {
storage.Batch

keys *sync.Map
invalidate func(key string)
}

// Put implements storage.Batch interface.
// On a call it also keeps track of the key that is added.
func (b *batchOpsTracker) Put(i storage.Item) error {
b.keys.Store(i.ID(), struct{}{})
return b.Batch.Put(i)
}

// Delete implements storage.Batch interface.
// On a call it also keeps track of the key that is deleted.
func (b *batchOpsTracker) Delete(i storage.Item) error {
b.keys.Store(i.ID(), struct{}{})
return b.Batch.Delete(i)
}

// Commit implements storage.Batch interface.
// On a call it also invalidates the cache
// for the keys that were added or deleted.
func (b *batchOpsTracker) Commit() error {
if err := b.Batch.Commit(); err != nil {
return err
}

b.keys.Range(func(key, _ interface{}) bool {
b.invalidate(key.(string))
return true
})
b.keys = new(sync.Map)
return nil
}
2 changes: 1 addition & 1 deletion pkg/storage/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func TestCache(t *testing.T) {
t.Fatalf("create store failed: %v", err)
}

storagetest.TestStore(t, cache.Wrap(store, 100_000))
storagetest.TestBatchedStore(t, cache.Wrap(store, 100_000))
}
12 changes: 11 additions & 1 deletion pkg/storage/leveldbstore/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
Expand Down Expand Up @@ -51,7 +52,16 @@ func (s *TxStore) Recover() error {
return fmt.Errorf("leveldbstore: recovery: iteration failed: %w", err)
}

if err := s.BatchedStore.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
// TODO: this is a quick and dirty hack to get the underlying leveldb.DB get rid of this by leveraging DB() T store method.
var db *leveldb.DB
switch s := s.BatchedStore.(type) {
case *Store:
db = s.db
case *cache.Cache:
db = s.BatchedStore.(*Store).db
}

if err := db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
return fmt.Errorf("leveldbstore: recovery: unable to write batch: %w", err)
}
return nil
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/leveldbstore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/google/uuid"
"github.com/syndtr/goleveldb/leveldb"
Expand Down Expand Up @@ -318,6 +319,15 @@ func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore {
panic(errors.New("leveldbstore: nil store"))
}

// TODO: this is a quick and dirty hack to get the underlying leveldb.DB get rid of this by leveraging DB() T store method.
var db *leveldb.DB
switch s := s.BatchedStore.(type) {
case *Store:
db = s.db
case *cache.Cache:
db = s.BatchedStore.(*Store).db
}

batch := new(leveldb.Batch)
return &TxStore{
TxStoreBase: &storage.TxStoreBase{
Expand All @@ -326,7 +336,7 @@ func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore {
},
revOps: &txRevertOpStore{
id: id(uuid.NewString()),
db: s.BatchedStore.(*Store).db,
db: db,
batch: batch,
revOpsFn: map[storage.TxOpCode]storage.TxRevertFn[[]byte, []byte]{
storage.PutCreateOp: func(k, _ []byte) error {
Expand Down
25 changes: 17 additions & 8 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/storage"
scache "github.com/ethersphere/bee/pkg/storage/cache"
"github.com/ethersphere/bee/pkg/storage/leveldbstore"
"github.com/ethersphere/bee/pkg/storage/migration"
"github.com/ethersphere/bee/pkg/storer/internal"
Expand Down Expand Up @@ -270,15 +271,15 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
}

func initDiskRepository(ctx context.Context, basePath string, opts *Options) (storage.Repository, io.Closer, error) {
store, err := initStore(basePath, opts)
ldb, err := initStore(basePath, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

if opts.LdbStats.Load() != nil {
go func() {
ldbStats := opts.LdbStats.Load()
logger := log.NewLogger(loggerName).Register()
logger := opts.Logger.WithName("leveldb-stats").Register()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

Expand All @@ -288,7 +289,7 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
return
case <-ticker.C:
stats := new(leveldb.DBStats)
switch err := store.DB().Stats(stats); {
switch err := ldb.DB().Stats(stats); {
case errors.Is(err, leveldb.ErrClosed):
return
case err != nil:
Expand Down Expand Up @@ -328,7 +329,7 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
}
}

recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, store, opts)
recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, ldb, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
}
Expand All @@ -342,6 +343,11 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
}

var store storage.BatchedStore = ldb
if opts.ItemCacheCapacity > 0 {
store = scache.Wrap(ldb, int(opts.ItemCacheCapacity))
}

txStore := leveldbstore.NewTxStore(store)
if err := txStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover index store: %w", err)
Expand All @@ -352,7 +358,7 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil
return storage.NewRepository(txStore, txChunkStore), closer(ldb, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -419,18 +425,21 @@ type Options struct {
LdbBlockCacheCapacity uint64
LdbWriteBufferSize uint64
LdbDisableSeeksCompaction bool
CacheCapacity uint64
Logger log.Logger

ItemCacheCapacity uint64
CacheCapacity uint64

Address swarm.Address
WarmupDuration time.Duration
Batchstore postage.Storer
ValidStamp postage.ValidStampFn
RadiusSetter topology.SetStorageRadiuser
Batchstore postage.Storer
StateStore storage.StateStorer

ReserveCapacity int
ReserveWakeUpDuration time.Duration

Logger log.Logger
}

func defaultOptions() *Options {
Expand Down

0 comments on commit cb67986

Please sign in to comment.