Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: lock optimizations for put speed #4207

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 1*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address")
Expand Down
5 changes: 3 additions & 2 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package node
import (
"errors"
"fmt"
"github.com/syndtr/goleveldb/leveldb/opt"
"path/filepath"

"github.com/ethersphere/bee/pkg/log"
Expand All @@ -28,7 +29,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
} else {
dataDir = filepath.Join(dataDir, "statestore")
}
ldb, err := leveldbstore.New(dataDir, nil)
ldb, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression})
if err != nil {
return nil, nil, err
}
Expand All @@ -48,7 +49,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat
} else {
dataDir = filepath.Join(dataDir, "stamperstore")
}
stamperStore, err := leveldbstore.New(dataDir, nil)
stamperStore, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression})
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
"resenje.org/singleflight"
)

Expand Down Expand Up @@ -82,6 +83,7 @@ type Syncer struct {
validStamp postage.ValidStampFn
intervalsSF singleflight.Group
syncInProgress atomic.Int32
binLock *multex.Multex

maxPage uint64

Expand All @@ -107,6 +109,7 @@ func New(
logger: logger.WithName(loggerName).Register(),
quit: make(chan struct{}),
maxPage: maxPage,
binLock: multex.New(),
}
}

Expand Down Expand Up @@ -262,6 +265,12 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
s.metrics.Delivered.Add(float64(len(chunksToPut)))
s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Add(float64(len(chunksToPut)))

// if we have parallel sync workers for the same bin, we need to rate limit them
// in order to not overload the storage with unnecessary requests as there is
// a chance that the same chunk is being synced by multiple workers.
s.binLock.Lock(fmt.Sprint(bin))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key := strconv.Itoa(int(bin))
s.binLock.Lock(key)
defer s.binLock.Unlock(key)

defer s.binLock.Unlock(fmt.Sprint(bin))

for _, c := range chunksToPut {
if err := s.store.ReservePutter().Put(ctx, c); err != nil {
// in case of these errors, no new items are added to the storage, so it
Expand Down
44 changes: 22 additions & 22 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,19 @@ func (db *DB) ReservePutter() storage.Putter {
return putterWithMetrics{
storage.PutterFunc(
func(ctx context.Context, chunk swarm.Chunk) (err error) {
db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)

var (
newIndex bool
)
db.lock.Lock(reserveUpdateLockKey)
err = db.Execute(ctx, func(tx internal.Storage) error {
newIndex, err = db.reserve.Put(ctx, tx, chunk)
if err != nil {
return fmt.Errorf("reserve: putter.Put: %w", err)
}
return nil
})
db.lock.Unlock(reserveUpdateLockKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,32 +289,35 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (re

var evicted int

type evictItems struct {
address swarm.Address
batchID []byte
}

var itemsToEvict []evictItems
var itemsToEvict []swarm.Address

err := db.reserve.IterateBatchBin(ctx, db.repo, b, batchID, func(address swarm.Address) (bool, error) {
itemsToEvict = append(itemsToEvict, evictItems{
address: address,
batchID: batchID,
})
itemsToEvict = append(itemsToEvict, address)
return false, nil
})
if err != nil {
return fmt.Errorf("reserve: iterate batch bin: %w", err)
}

for _, item := range itemsToEvict {
err = db.removeChunk(ctx, item.address, item.batchID)
if err != nil {
retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err))
continue
func() {
// this is a performance optimization where we dont need to lock the
// reserve updates if we are expiring batches. This is because the
// reserve updates will not be related to these entries as the batch
// is no longer valid.
if upToBin != swarm.MaxBins {
db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)
}
evicted++
}

for _, item := range itemsToEvict {
err = db.removeChunk(ctx, item, batchID)
if err != nil {
retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err))
continue
}
evicted++
}
}()

// if there was an error, we still need to update the chunks that have already
// been evicted from the reserve
Expand Down Expand Up @@ -345,9 +348,6 @@ func (db *DB) removeChunk(ctx context.Context, address swarm.Address, batchID []
}
}

db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)

err = db.reserve.DeleteChunk(ctx, tx, address, batchID)
if err != nil {
return fmt.Errorf("reserve: delete chunk: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
BlockCacheCapacity: int(opts.LdbBlockCacheCapacity),
WriteBuffer: int(opts.LdbWriteBufferSize),
DisableSeeksCompaction: opts.LdbDisableSeeksCompaction,
Compression: opt.NoCompression,
})
if err != nil {
return nil, fmt.Errorf("failed creating levelDB index store: %w", err)
Expand Down
Loading