diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 757d755e8d1..b552eaae6b8 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -242,7 +242,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database") cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes") cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes") - cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks") + cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks") cmd.Flags().String(optionNamePassword, "", "password for decrypting keys") cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys") cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address") diff --git a/pkg/postage/batchstore/mock/store.go b/pkg/postage/batchstore/mock/store.go index 0ea7d3aed86..f3c10bf236d 100644 --- a/pkg/postage/batchstore/mock/store.go +++ b/pkg/postage/batchstore/mock/store.go @@ -135,6 +135,8 @@ func (bs *BatchStore) Get(id []byte) (*postage.Batch, error) { // Iterate mocks the Iterate method from the BatchStore func (bs *BatchStore) Iterate(f func(*postage.Batch) (bool, error)) error { + bs.mtx.Lock() + defer bs.mtx.Unlock() if bs.batch == nil { return nil } @@ -144,6 +146,8 @@ func (bs *BatchStore) Iterate(f func(*postage.Batch) (bool, error)) error { // Save mocks the Save method from the BatchStore. func (bs *BatchStore) Save(batch *postage.Batch) error { + bs.mtx.Lock() + defer bs.mtx.Unlock() if bs.batch != nil { return errors.New("batch already taken") } diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index 31d95a3e47f..00c90e8b361 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -331,5 +331,5 @@ func (m txChunkStoreWithMetrics) Iterate(ctx context.Context, fn IterateChunkFn) // captureDuration returns a function that returns the duration since the given start. func captureDuration(start time.Time) (elapsed func() float64) { - return func() float64 { return float64(time.Since(start).Nanoseconds()) } + return func() float64 { return time.Since(start).Seconds() } } diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index a793678d8c9..7ae0c2b692b 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -63,11 +63,10 @@ func New( } rs.radius.Store(uint32(rItem.Radius)) - size, err := store.Count(&batchRadiusItem{}) + err = rs.RecountSize(store) if err != nil { return nil, err } - rs.size.Store(int64(size)) return rs, nil } @@ -369,6 +368,18 @@ func (r *Reserve) Size() int { return int(r.size.Load()) } +// RecountSize recounts the entire reserve from the Store. +// Not thread-safe so it must NOT be called along all other calls to update the size. +func (r *Reserve) RecountSize(store storage.Store) error { + size, err := store.Count(&batchRadiusItem{}) + if err != nil { + return err + } + r.size.Store(int64(size)) + + return nil +} + func (r *Reserve) Capacity() int { return r.capacity } diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index 13dd0d7ce08..990186b352d 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -20,6 +20,7 @@ type metrics struct { MethodCalls prometheus.CounterVec MethodCallsDuration prometheus.HistogramVec ReserveSize prometheus.Gauge + ReserveCleanup prometheus.Counter StorageRadius prometheus.Gauge CacheSize prometheus.Gauge EvictedChunkCount prometheus.Counter @@ -58,6 +59,14 @@ func newMetrics() metrics { Help: "Number of chunks in reserve.", }, ), + ReserveCleanup: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "reserve_cleanup", + Help: "Number of cleaned-up expired chunks.", + }, + ), StorageRadius: prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: m.Namespace, @@ -147,5 +156,5 @@ func (m getterWithMetrics) Get(ctx context.Context, address swarm.Address) (swar // captureDuration returns a function that returns the duration since the given start. func captureDuration(start time.Time) func() float64 { - return func() float64 { return float64(time.Since(start).Nanoseconds()) } + return func() float64 { return time.Since(start).Seconds() } } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index fe625c5e8db..4221a935916 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -23,6 +23,8 @@ const ( reserveOverCapacity = "reserveOverCapacity" reserveUnreserved = "reserveUnreserved" reserveUpdateLockKey = "reserveUpdateLockKey" + + cleanupDur = time.Hour * 6 ) var errMaxRadius = errors.New("max radius reached") @@ -76,7 +78,11 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat // syncing can now begin now that the reserver worker is running db.syncer.Start(ctx) - wakeUpTicker := time.NewTicker(wakeUpDur) + radiusWakeUpTicker := time.NewTicker(wakeUpDur) + defer radiusWakeUpTicker.Stop() + + cleanUpTicker := time.NewTicker(cleanupDur) + defer cleanUpTicker.Stop() for { select { @@ -86,7 +92,7 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat db.logger.Error(err, "reserve unreserve") } db.metrics.OverCapTriggerCount.Inc() - case <-wakeUpTicker.C: + case <-radiusWakeUpTicker.C: radius := db.reserve.Radius() if db.reserve.Size() < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > 0 { radius-- @@ -97,11 +103,10 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat db.logger.Info("reserve radius decrease", "radius", radius) } db.metrics.StorageRadius.Set(float64(radius)) - + case <-cleanUpTicker.C: if err := db.reserveCleanup(ctx); err != nil { db.logger.Error(err, "cleanup") } - case <-db.quit: return } @@ -276,21 +281,43 @@ func (db *DB) removeChunk(ctx context.Context, address swarm.Address, batchID [] func (db *DB) reserveCleanup(ctx context.Context) error { dur := captureDuration(time.Now()) + removed := 0 defer func() { db.metrics.MethodCallsDuration.WithLabelValues("reserve", "cleanup").Observe(dur()) - }() + db.metrics.ReserveCleanup.Add(float64(removed)) + db.logger.Info("cleanup finished", "removed", removed, "duration", dur()) - return db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) { + if removed == 0 { + return + } - ok, err := db.batchstore.Exists(ci.BatchID) - if err != nil { - return false, err + db.lock.Lock(reserveUpdateLockKey) + defer db.lock.Unlock(reserveUpdateLockKey) + + if err := db.reserve.RecountSize(db.repo.IndexStore()); err != nil { + db.logger.Error(err, "recount reserve size") } - if !ok { + db.metrics.ReserveSize.Set(float64(db.ReserveSize())) + + }() + + ids := map[string]struct{}{} + + err := db.batchstore.Iterate(func(b *postage.Batch) (bool, error) { + ids[string(b.ID)] = struct{}{} + return false, nil + }) + if err != nil { + return err + } + + return db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) { + if _, ok := ids[string(ci.BatchID)]; !ok { + removed++ + db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(ci.BatchID)) return false, db.removeChunk(ctx, ci.ChunkAddress, ci.BatchID) } - return false, nil }) }