Skip to content

Commit

Permalink
perf: lock optimizations for put speed (#4207)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Jul 6, 2023
1 parent e01947f commit 6fb07f3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 67 deletions.
6 changes: 3 additions & 3 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 16*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 16*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address")
Expand Down
5 changes: 3 additions & 2 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package node
import (
"errors"
"fmt"
"github.com/syndtr/goleveldb/leveldb/opt"
"path/filepath"

"github.com/ethersphere/bee/pkg/log"
Expand All @@ -28,7 +29,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
} else {
dataDir = filepath.Join(dataDir, "statestore")
}
ldb, err := leveldbstore.New(dataDir, nil)
ldb, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression})
if err != nil {
return nil, nil, err
}
Expand All @@ -48,7 +49,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat
} else {
dataDir = filepath.Join(dataDir, "stamperstore")
}
stamperStore, err := leveldbstore.New(dataDir, nil)
stamperStore, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression})
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"math"
"strconv"
"sync/atomic"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
"resenje.org/singleflight"
)

Expand Down Expand Up @@ -82,6 +84,7 @@ type Syncer struct {
validStamp postage.ValidStampFn
intervalsSF singleflight.Group
syncInProgress atomic.Int32
binLock *multex.Multex

maxPage uint64

Expand All @@ -107,6 +110,7 @@ func New(
logger: logger.WithName(loggerName).Register(),
quit: make(chan struct{}),
maxPage: maxPage,
binLock: multex.New(),
}
}

Expand Down Expand Up @@ -262,6 +266,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
s.metrics.Delivered.Add(float64(len(chunksToPut)))
s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Add(float64(len(chunksToPut)))

// if we have parallel sync workers for the same bin, we need to rate limit them
// in order to not overload the storage with unnecessary requests as there is
// a chance that the same chunk is being synced by multiple workers.
key := strconv.Itoa(int(bin))
s.binLock.Lock(key)
defer s.binLock.Unlock(key)

for _, c := range chunksToPut {
if err := s.store.ReservePutter().Put(ctx, c); err != nil {
// in case of these errors, no new items are added to the storage, so it
Expand Down
9 changes: 0 additions & 9 deletions pkg/statestore/storeadapter/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
func allSteps() migration.Steps {
return map[uint64]migration.StepFn{
1: epochMigration,
2: clearBlocklist,
3: clearBlocklist,
}
}

Expand Down Expand Up @@ -52,10 +50,3 @@ func epochMigration(s storage.Store) error {
return false, nil
})
}

func clearBlocklist(s storage.Store) error {
st := &StateStorerAdapter{storage: s}
return st.Iterate("blocklist-", func(key, _ []byte) (stop bool, err error) {
return false, st.Delete(string(key))
})
}
19 changes: 16 additions & 3 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ type ChunkItem struct {
ChunkAddress swarm.Address
BatchID []byte
Type swarm.ChunkType
BinID uint64
}

func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func(ChunkItem) (bool, error)) error {
Expand All @@ -261,6 +262,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb
ChunkAddress: item.Address,
BatchID: item.BatchID,
Type: item.ChunkType,
BinID: item.BinID,
}

stop, err := cb(chItem)
Expand Down Expand Up @@ -305,14 +307,25 @@ func (r *Reserve) DeleteChunk(
}
err := store.IndexStore().Get(item)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return err
}
return removeChunk(ctx, store, item)
}

// CleanupBinIndex removes the bin index entry for the chunk. This is called mainly
// to cleanup the bin index if other indexes are missing during reserve cleanup.
func (r *Reserve) CleanupBinIndex(
ctx context.Context,
store internal.Storage,
chunkAddress swarm.Address,
binID uint64,
) {
_ = store.IndexStore().Delete(&chunkBinItem{
Bin: swarm.Proximity(r.baseAddr.Bytes(), chunkAddress.Bytes()),
BinID: binID,
})
}

func removeChunk(ctx context.Context, store internal.Storage, item *batchRadiusItem) error {

indexStore := store.IndexStore()
Expand Down
138 changes: 88 additions & 50 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
cleanupDur = time.Hour * 6
)

func reserveUpdateBatchLockKey(batchID []byte) string {
return fmt.Sprintf("%s%s", reserveUpdateLockKey, hex.EncodeToString(batchID))
}

var errMaxRadius = errors.New("max radius reached")

type Syncer interface {
Expand Down Expand Up @@ -138,6 +142,13 @@ func (db *DB) evictionWorker(ctx context.Context) {
}
}

time.AfterFunc(30*time.Minute, func() {
db.logger.Info("initial reserve cleanup started")
if err := db.reserveCleanup(ctx); err != nil {
db.logger.Error(err, "cleanup")
}
})

cleanUpTicker := time.NewTicker(cleanupDur)
defer cleanUpTicker.Stop()

Expand Down Expand Up @@ -220,19 +231,19 @@ func (db *DB) ReservePutter() storage.Putter {
return putterWithMetrics{
storage.PutterFunc(
func(ctx context.Context, chunk swarm.Chunk) (err error) {
db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)

var (
newIndex bool
)
db.lock.Lock(reserveUpdateBatchLockKey(chunk.Stamp().BatchID()))
err = db.Execute(ctx, func(tx internal.Storage) error {
newIndex, err = db.reserve.Put(ctx, tx, chunk)
if err != nil {
return fmt.Errorf("reserve: putter.Put: %w", err)
}
return nil
})
db.lock.Unlock(reserveUpdateBatchLockKey(chunk.Stamp().BatchID()))
if err != nil {
return err
}
Expand Down Expand Up @@ -289,32 +300,55 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (re

var evicted int

type evictItems struct {
address swarm.Address
batchID []byte
}

var itemsToEvict []evictItems
var itemsToEvict []swarm.Address

err := db.reserve.IterateBatchBin(ctx, db.repo, b, batchID, func(address swarm.Address) (bool, error) {
itemsToEvict = append(itemsToEvict, evictItems{
address: address,
batchID: batchID,
})
itemsToEvict = append(itemsToEvict, address)
return false, nil
})
if err != nil {
return fmt.Errorf("reserve: iterate batch bin: %w", err)
}

for _, item := range itemsToEvict {
err = db.removeChunk(ctx, item.address, item.batchID)
if err != nil {
retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err))
continue
func() {
// reserve updates if we are expiring batches. This is because the
// reserve updates will not be related to these entries as the batch
// is no longer valid.
if upToBin != swarm.MaxBins {
db.lock.Lock(reserveUpdateBatchLockKey(batchID))
defer db.lock.Unlock(reserveUpdateBatchLockKey(batchID))
}
evicted++
}

batchCnt := 1000

for i := 0; i < len(itemsToEvict); i += batchCnt {
end := i + batchCnt
if end > len(itemsToEvict) {
end = len(itemsToEvict)
}

retErr = db.Execute(ctx, func(tx internal.Storage) error {
var tErr error
for _, item := range itemsToEvict[i:end] {
chunk, err := db.ChunkStore().Get(ctx, item)
if err == nil {
err := db.Cache().Put(ctx, chunk)
if err != nil {
db.logger.Warning("reserve eviction cache put", "err", err)
}
}

err = db.reserve.DeleteChunk(ctx, tx, item, batchID)
if err != nil {
tErr = errors.Join(tErr, fmt.Errorf("reserve: delete chunk: %w", err))
continue
}
evicted++
}
return tErr
})
}
}()

// if there was an error, we still need to update the chunks that have already
// been evicted from the reserve
Expand All @@ -335,27 +369,6 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (re
return nil
}

func (db *DB) removeChunk(ctx context.Context, address swarm.Address, batchID []byte) error {
return db.Execute(ctx, func(tx internal.Storage) error {
chunk, err := db.ChunkStore().Get(ctx, address)
if err == nil {
err := db.Cache().Put(ctx, chunk)
if err != nil {
db.logger.Warning("reserve eviction cache put", "err", err)
}
}

db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)

err = db.reserve.DeleteChunk(ctx, tx, address, batchID)
if err != nil {
return fmt.Errorf("reserve: delete chunk: %w", err)
}
return nil
})
}

func (db *DB) reserveCleanup(ctx context.Context) error {
dur := captureDuration(time.Now())
removed := 0
Expand All @@ -382,17 +395,42 @@ func (db *DB) reserveCleanup(ctx context.Context) error {
expiredBatches := make(map[string]struct{})
var retErr error

for _, item := range itemsToEvict {
err = db.removeChunk(ctx, item.ChunkAddress, item.BatchID)
if err != nil {
retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err))
continue
}
removed++
if _, ok := expiredBatches[string(item.BatchID)]; !ok {
expiredBatches[string(item.BatchID)] = struct{}{}
db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(item.BatchID))
batchCnt := 1000

for i := 0; i < len(itemsToEvict); i += batchCnt {
end := i + batchCnt
if end > len(itemsToEvict) {
end = len(itemsToEvict)
}

retErr = db.Execute(ctx, func(tx internal.Storage) error {
var tErr error
for _, item := range itemsToEvict[i:end] {
chunk, err := db.ChunkStore().Get(ctx, item.ChunkAddress)
if err == nil {
err := db.Cache().Put(ctx, chunk)
if err != nil {
db.logger.Warning("reserve eviction cache put", "err", err)
}
}

err = db.reserve.DeleteChunk(ctx, tx, item.ChunkAddress, item.BatchID)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
db.reserve.CleanupBinIndex(ctx, tx, item.ChunkAddress, item.BinID)
continue
}
tErr = errors.Join(tErr, fmt.Errorf("reserve: delete chunk: %w", err))
continue
}
removed++
if _, ok := expiredBatches[string(item.BatchID)]; !ok {
expiredBatches[string(item.BatchID)] = struct{}{}
db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(item.BatchID))
}
}
return tErr
})
}

return retErr
Expand Down
1 change: 1 addition & 0 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
BlockCacheCapacity: int(opts.LdbBlockCacheCapacity),
WriteBuffer: int(opts.LdbWriteBufferSize),
DisableSeeksCompaction: opts.LdbDisableSeeksCompaction,
Compression: opt.NoCompression,
})
if err != nil {
return nil, fmt.Errorf("failed creating levelDB index store: %w", err)
Expand Down

0 comments on commit 6fb07f3

Please sign in to comment.