Skip to content

Commit

Permalink
fix: clean up runs when reserve is constructed and reserve size recou…
Browse files Browse the repository at this point in the history
…nt is removed (#4201)
  • Loading branch information
istae authored Jul 2, 2023
1 parent 10f55d3 commit 2ace005
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 32 deletions.
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ func NewBee(
lo.ReserveCapacity = ReserveCapacity
lo.ReserveWakeUpDuration = reserveWakeUpDuration
lo.RadiusSetter = kad
lo.ReserveInitialCleanup = true
}

localStore, err := storer.New(ctx, path, lo)
Expand Down
15 changes: 2 additions & 13 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func New(
}
rs.radius.Store(uint32(rItem.Radius))

err = rs.RecountSize(store)
size, err := store.Count(&batchRadiusItem{})
if err != nil {
return nil, err
}
rs.size.Store(int64(size))

return rs, nil
}
Expand Down Expand Up @@ -368,18 +369,6 @@ func (r *Reserve) Size() int {
return int(r.size.Load())
}

// RecountSize recounts the entire reserve from the Store.
// Not thread-safe so it must NOT be called along all other calls to update the size.
func (r *Reserve) RecountSize(store storage.Store) error {
size, err := store.Count(&batchRadiusItem{})
if err != nil {
return err
}
r.size.Store(int64(size))

return nil
}

func (r *Reserve) Capacity() int {
return r.capacity
}
Expand Down
22 changes: 3 additions & 19 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat
cancel()
}()

if err := db.reserveCleanup(ctx); err != nil {
db.logger.Error(err, "cleanup")
}

overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity)
defer overCapUnsub()

Expand Down Expand Up @@ -283,23 +279,11 @@ func (db *DB) reserveCleanup(ctx context.Context) error {
dur := captureDuration(time.Now())
removed := 0
defer func() {
db.reserve.AddSize(-removed)
db.logger.Info("cleanup finished", "removed", removed, "duration", dur())
db.metrics.MethodCallsDuration.WithLabelValues("reserve", "cleanup").Observe(dur())
db.metrics.ReserveCleanup.Add(float64(removed))
db.logger.Info("cleanup finished", "removed", removed, "duration", dur())

if removed == 0 {
return
}

db.lock.Lock(reserveUpdateLockKey)
defer db.lock.Unlock(reserveUpdateLockKey)

if err := db.reserve.RecountSize(db.repo.IndexStore()); err != nil {
db.logger.Error(err, "recount reserve size")
}

db.metrics.ReserveSize.Set(float64(db.ReserveSize()))

db.metrics.ReserveSize.Set(float64(db.reserve.Size()))
}()

ids := map[string]struct{}{}
Expand Down
9 changes: 9 additions & 0 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type Options struct {
RadiusSetter topology.SetStorageRadiuser
StateStore storage.StateStorer

ReserveInitialCleanup bool
ReserveCapacity int
ReserveWakeUpDuration time.Duration
}
Expand Down Expand Up @@ -506,6 +507,14 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
db.reserve = rs

if opts.ReserveInitialCleanup {
err = db.reserveCleanup(ctx)
if err != nil {
return nil, err
}
}

db.metrics.StorageRadius.Set(float64(rs.Radius()))
db.metrics.ReserveSize.Set(float64(rs.Size()))
}
Expand Down

0 comments on commit 2ace005

Please sign in to comment.