diff --git a/pkg/node/node.go b/pkg/node/node.go index 9c603bbc3c4..be70fc1037c 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -777,6 +777,7 @@ func NewBee( lo.ReserveCapacity = ReserveCapacity lo.ReserveWakeUpDuration = reserveWakeUpDuration lo.RadiusSetter = kad + lo.ReserveInitialCleanup = true } localStore, err := storer.New(ctx, path, lo) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 7ae0c2b692b..a793678d8c9 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -63,10 +63,11 @@ func New( } rs.radius.Store(uint32(rItem.Radius)) - err = rs.RecountSize(store) + size, err := store.Count(&batchRadiusItem{}) if err != nil { return nil, err } + rs.size.Store(int64(size)) return rs, nil } @@ -368,18 +369,6 @@ func (r *Reserve) Size() int { return int(r.size.Load()) } -// RecountSize recounts the entire reserve from the Store. -// Not thread-safe so it must NOT be called along all other calls to update the size. -func (r *Reserve) RecountSize(store storage.Store) error { - size, err := store.Count(&batchRadiusItem{}) - if err != nil { - return err - } - r.size.Store(int64(size)) - - return nil -} - func (r *Reserve) Capacity() int { return r.capacity } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 4221a935916..da130e814ea 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -46,10 +46,6 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat cancel() }() - if err := db.reserveCleanup(ctx); err != nil { - db.logger.Error(err, "cleanup") - } - overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity) defer overCapUnsub() @@ -283,23 +279,11 @@ func (db *DB) reserveCleanup(ctx context.Context) error { dur := captureDuration(time.Now()) removed := 0 defer func() { + db.reserve.AddSize(-removed) + db.logger.Info("cleanup finished", "removed", removed, "duration", dur()) db.metrics.MethodCallsDuration.WithLabelValues("reserve", "cleanup").Observe(dur()) db.metrics.ReserveCleanup.Add(float64(removed)) - db.logger.Info("cleanup finished", "removed", removed, "duration", dur()) - - if removed == 0 { - return - } - - db.lock.Lock(reserveUpdateLockKey) - defer db.lock.Unlock(reserveUpdateLockKey) - - if err := db.reserve.RecountSize(db.repo.IndexStore()); err != nil { - db.logger.Error(err, "recount reserve size") - } - - db.metrics.ReserveSize.Set(float64(db.ReserveSize())) - + db.metrics.ReserveSize.Set(float64(db.reserve.Size())) }() ids := map[string]struct{}{} diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index a7316c5ef86..598f5a7698d 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -382,6 +382,7 @@ type Options struct { RadiusSetter topology.SetStorageRadiuser StateStore storage.StateStorer + ReserveInitialCleanup bool ReserveCapacity int ReserveWakeUpDuration time.Duration } @@ -506,6 +507,14 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { return nil, err } db.reserve = rs + + if opts.ReserveInitialCleanup { + err = db.reserveCleanup(ctx) + if err != nil { + return nil, err + } + } + db.metrics.StorageRadius.Set(float64(rs.Radius())) db.metrics.ReserveSize.Set(float64(rs.Size())) }