diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index b552eaae6b8..f3436b2a685 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -240,9 +240,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory") cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize)) cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database") - cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes") - cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes") - cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks") + cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 16*1024*1024, "size of block cache of the database in bytes") + cmd.Flags().Uint64(optionNameDBWriteBufferSize, 16*1024*1024, "size of the database write buffer in bytes") + cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks") cmd.Flags().String(optionNamePassword, "", "password for decrypting keys") cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys") cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address") diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index de25a8ea59f..eeb6583e8c1 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -7,6 +7,7 @@ package node import ( "errors" "fmt" + "github.com/syndtr/goleveldb/leveldb/opt" "path/filepath" "github.com/ethersphere/bee/pkg/log" @@ -28,7 +29,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st } else { dataDir = filepath.Join(dataDir, "statestore") } - ldb, err := leveldbstore.New(dataDir, nil) + ldb, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression}) if err != nil { return nil, nil, err } @@ -48,7 +49,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat } else { dataDir = filepath.Join(dataDir, "stamperstore") } - stamperStore, err := leveldbstore.New(dataDir, nil) + stamperStore, err := leveldbstore.New(dataDir, &opt.Options{Compression: opt.NoCompression}) if err != nil { return nil, err } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index c6fce400352..b933f47e734 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "math" + "strconv" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/ethersphere/bee/pkg/storage" storer "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" + "resenje.org/multex" "resenje.org/singleflight" ) @@ -82,6 +84,7 @@ type Syncer struct { validStamp postage.ValidStampFn intervalsSF singleflight.Group syncInProgress atomic.Int32 + binLock *multex.Multex maxPage uint64 @@ -107,6 +110,7 @@ func New( logger: logger.WithName(loggerName).Register(), quit: make(chan struct{}), maxPage: maxPage, + binLock: multex.New(), } } @@ -262,6 +266,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start s.metrics.Delivered.Add(float64(len(chunksToPut))) s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Add(float64(len(chunksToPut))) + // if we have parallel sync workers for the same bin, we need to rate limit them + // in order to not overload the storage with unnecessary requests as there is + // a chance that the same chunk is being synced by multiple workers. + key := strconv.Itoa(int(bin)) + s.binLock.Lock(key) + defer s.binLock.Unlock(key) + for _, c := range chunksToPut { if err := s.store.ReservePutter().Put(ctx, c); err != nil { // in case of these errors, no new items are added to the storage, so it diff --git a/pkg/statestore/storeadapter/migration.go b/pkg/statestore/storeadapter/migration.go index 3674b9a4c8e..20c2cb57470 100644 --- a/pkg/statestore/storeadapter/migration.go +++ b/pkg/statestore/storeadapter/migration.go @@ -14,8 +14,6 @@ import ( func allSteps() migration.Steps { return map[uint64]migration.StepFn{ 1: epochMigration, - 2: clearBlocklist, - 3: clearBlocklist, } } @@ -52,10 +50,3 @@ func epochMigration(s storage.Store) error { return false, nil }) } - -func clearBlocklist(s storage.Store) error { - st := &StateStorerAdapter{storage: s} - return st.Iterate("blocklist-", func(key, _ []byte) (stop bool, err error) { - return false, st.Delete(string(key)) - }) -} diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 7774700707b..b6bfa3cca77 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -247,6 +247,7 @@ type ChunkItem struct { ChunkAddress swarm.Address BatchID []byte Type swarm.ChunkType + BinID uint64 } func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func(ChunkItem) (bool, error)) error { @@ -261,6 +262,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb ChunkAddress: item.Address, BatchID: item.BatchID, Type: item.ChunkType, + BinID: item.BinID, } stop, err := cb(chItem) @@ -305,14 +307,25 @@ func (r *Reserve) DeleteChunk( } err := store.IndexStore().Get(item) if err != nil { - if errors.Is(err, storage.ErrNotFound) { - return nil - } return err } return removeChunk(ctx, store, item) } +// CleanupBinIndex removes the bin index entry for the chunk. This is called mainly +// to cleanup the bin index if other indexes are missing during reserve cleanup. +func (r *Reserve) CleanupBinIndex( + ctx context.Context, + store internal.Storage, + chunkAddress swarm.Address, + binID uint64, +) { + _ = store.IndexStore().Delete(&chunkBinItem{ + Bin: swarm.Proximity(r.baseAddr.Bytes(), chunkAddress.Bytes()), + BinID: binID, + }) +} + func removeChunk(ctx context.Context, store internal.Storage, item *batchRadiusItem) error { indexStore := store.IndexStore() diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 9f3ee490074..d940230926b 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -29,6 +29,10 @@ const ( cleanupDur = time.Hour * 6 ) +func reserveUpdateBatchLockKey(batchID []byte) string { + return fmt.Sprintf("%s%s", reserveUpdateLockKey, hex.EncodeToString(batchID)) +} + var errMaxRadius = errors.New("max radius reached") type Syncer interface { @@ -138,6 +142,13 @@ func (db *DB) evictionWorker(ctx context.Context) { } } + time.AfterFunc(30*time.Minute, func() { + db.logger.Info("initial reserve cleanup started") + if err := db.reserveCleanup(ctx); err != nil { + db.logger.Error(err, "cleanup") + } + }) + cleanUpTicker := time.NewTicker(cleanupDur) defer cleanUpTicker.Stop() @@ -220,12 +231,11 @@ func (db *DB) ReservePutter() storage.Putter { return putterWithMetrics{ storage.PutterFunc( func(ctx context.Context, chunk swarm.Chunk) (err error) { - db.lock.Lock(reserveUpdateLockKey) - defer db.lock.Unlock(reserveUpdateLockKey) var ( newIndex bool ) + db.lock.Lock(reserveUpdateBatchLockKey(chunk.Stamp().BatchID())) err = db.Execute(ctx, func(tx internal.Storage) error { newIndex, err = db.reserve.Put(ctx, tx, chunk) if err != nil { @@ -233,6 +243,7 @@ func (db *DB) ReservePutter() storage.Putter { } return nil }) + db.lock.Unlock(reserveUpdateBatchLockKey(chunk.Stamp().BatchID())) if err != nil { return err } @@ -289,32 +300,55 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (re var evicted int - type evictItems struct { - address swarm.Address - batchID []byte - } - - var itemsToEvict []evictItems + var itemsToEvict []swarm.Address err := db.reserve.IterateBatchBin(ctx, db.repo, b, batchID, func(address swarm.Address) (bool, error) { - itemsToEvict = append(itemsToEvict, evictItems{ - address: address, - batchID: batchID, - }) + itemsToEvict = append(itemsToEvict, address) return false, nil }) if err != nil { return fmt.Errorf("reserve: iterate batch bin: %w", err) } - for _, item := range itemsToEvict { - err = db.removeChunk(ctx, item.address, item.batchID) - if err != nil { - retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err)) - continue + func() { + // reserve updates if we are expiring batches. This is because the + // reserve updates will not be related to these entries as the batch + // is no longer valid. + if upToBin != swarm.MaxBins { + db.lock.Lock(reserveUpdateBatchLockKey(batchID)) + defer db.lock.Unlock(reserveUpdateBatchLockKey(batchID)) } - evicted++ - } + + batchCnt := 1000 + + for i := 0; i < len(itemsToEvict); i += batchCnt { + end := i + batchCnt + if end > len(itemsToEvict) { + end = len(itemsToEvict) + } + + retErr = db.Execute(ctx, func(tx internal.Storage) error { + var tErr error + for _, item := range itemsToEvict[i:end] { + chunk, err := db.ChunkStore().Get(ctx, item) + if err == nil { + err := db.Cache().Put(ctx, chunk) + if err != nil { + db.logger.Warning("reserve eviction cache put", "err", err) + } + } + + err = db.reserve.DeleteChunk(ctx, tx, item, batchID) + if err != nil { + tErr = errors.Join(tErr, fmt.Errorf("reserve: delete chunk: %w", err)) + continue + } + evicted++ + } + return tErr + }) + } + }() // if there was an error, we still need to update the chunks that have already // been evicted from the reserve @@ -335,27 +369,6 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (re return nil } -func (db *DB) removeChunk(ctx context.Context, address swarm.Address, batchID []byte) error { - return db.Execute(ctx, func(tx internal.Storage) error { - chunk, err := db.ChunkStore().Get(ctx, address) - if err == nil { - err := db.Cache().Put(ctx, chunk) - if err != nil { - db.logger.Warning("reserve eviction cache put", "err", err) - } - } - - db.lock.Lock(reserveUpdateLockKey) - defer db.lock.Unlock(reserveUpdateLockKey) - - err = db.reserve.DeleteChunk(ctx, tx, address, batchID) - if err != nil { - return fmt.Errorf("reserve: delete chunk: %w", err) - } - return nil - }) -} - func (db *DB) reserveCleanup(ctx context.Context) error { dur := captureDuration(time.Now()) removed := 0 @@ -382,17 +395,42 @@ func (db *DB) reserveCleanup(ctx context.Context) error { expiredBatches := make(map[string]struct{}) var retErr error - for _, item := range itemsToEvict { - err = db.removeChunk(ctx, item.ChunkAddress, item.BatchID) - if err != nil { - retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err)) - continue - } - removed++ - if _, ok := expiredBatches[string(item.BatchID)]; !ok { - expiredBatches[string(item.BatchID)] = struct{}{} - db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(item.BatchID)) + batchCnt := 1000 + + for i := 0; i < len(itemsToEvict); i += batchCnt { + end := i + batchCnt + if end > len(itemsToEvict) { + end = len(itemsToEvict) } + + retErr = db.Execute(ctx, func(tx internal.Storage) error { + var tErr error + for _, item := range itemsToEvict[i:end] { + chunk, err := db.ChunkStore().Get(ctx, item.ChunkAddress) + if err == nil { + err := db.Cache().Put(ctx, chunk) + if err != nil { + db.logger.Warning("reserve eviction cache put", "err", err) + } + } + + err = db.reserve.DeleteChunk(ctx, tx, item.ChunkAddress, item.BatchID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + db.reserve.CleanupBinIndex(ctx, tx, item.ChunkAddress, item.BinID) + continue + } + tErr = errors.Join(tErr, fmt.Errorf("reserve: delete chunk: %w", err)) + continue + } + removed++ + if _, ok := expiredBatches[string(item.BatchID)]; !ok { + expiredBatches[string(item.BatchID)] = struct{}{} + db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(item.BatchID)) + } + } + return tErr + }) } return retErr diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index dd8dd6b329b..9012f39b24a 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -258,6 +258,7 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) { BlockCacheCapacity: int(opts.LdbBlockCacheCapacity), WriteBuffer: int(opts.LdbWriteBufferSize), DisableSeeksCompaction: opts.LdbDisableSeeksCompaction, + Compression: opt.NoCompression, }) if err != nil { return nil, fmt.Errorf("failed creating levelDB index store: %w", err)