diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index a4e040c9e15..a0de4d57939 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -27,6 +27,7 @@ type metrics struct { ExpiredChunkCount prometheus.Counter OverCapTriggerCount prometheus.Counter ExpiredBatchCount prometheus.Counter + LevelDBStats prometheus.HistogramVec } // newMetrics is a convenient constructor for creating new metrics. @@ -116,6 +117,15 @@ func newMetrics() metrics { Help: "Number of batches expired, that were processed.", }, ), + LevelDBStats: *prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_stats", + Help: "LevelDB statistics.", + }, + []string{"counter"}, + ), } } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 9012f39b24a..34a529cc8ba 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -15,6 +15,7 @@ import ( "path" "path/filepath" "sync" + "sync/atomic" "time" "github.com/ethersphere/bee/pkg/log" @@ -23,7 +24,7 @@ import ( "github.com/ethersphere/bee/pkg/pusher" "github.com/ethersphere/bee/pkg/retrieval" "github.com/ethersphere/bee/pkg/sharky" - storage "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/leveldbstore" "github.com/ethersphere/bee/pkg/storage/migration" "github.com/ethersphere/bee/pkg/storer/internal" @@ -38,6 +39,7 @@ import ( "github.com/ethersphere/bee/pkg/topology" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" "resenje.org/multex" ) @@ -273,6 +275,50 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err) } + if opts.LdbStats.Load() != nil { + go func() { + ldbStats := opts.LdbStats.Load() + logger := log.NewLogger(loggerName).Register() + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stats := new(leveldb.DBStats) + switch err := store.DB().Stats(stats); { + case errors.Is(err, leveldb.ErrClosed): + return + case err != nil: + logger.Error(err, "snapshot levelDB stats") + default: + ldbStats.WithLabelValues("write_delay_count").Observe(float64(stats.WriteDelayCount)) + ldbStats.WithLabelValues("write_delay_duration").Observe(stats.WriteDelayDuration.Seconds()) + ldbStats.WithLabelValues("alive_snapshots").Observe(float64(stats.AliveSnapshots)) + ldbStats.WithLabelValues("alive_iterators").Observe(float64(stats.AliveIterators)) + ldbStats.WithLabelValues("io_write").Observe(float64(stats.IOWrite)) + ldbStats.WithLabelValues("io_read").Observe(float64(stats.IORead)) + ldbStats.WithLabelValues("block_cache_size").Observe(float64(stats.BlockCacheSize)) + ldbStats.WithLabelValues("opened_tables_count").Observe(float64(stats.OpenedTablesCount)) + ldbStats.WithLabelValues("mem_comp").Observe(float64(stats.MemComp)) + ldbStats.WithLabelValues("level_0_comp").Observe(float64(stats.Level0Comp)) + ldbStats.WithLabelValues("non_level_0_comp").Observe(float64(stats.NonLevel0Comp)) + ldbStats.WithLabelValues("seek_comp").Observe(float64(stats.SeekComp)) + for i := 0; i < len(stats.LevelSizes); i++ { + ldbStats.WithLabelValues(fmt.Sprintf("level_%d_size", i)).Observe(float64(stats.LevelSizes[i])) + ldbStats.WithLabelValues(fmt.Sprintf("level_%d_tables_count", i)).Observe(float64(stats.LevelTablesCounts[i])) + ldbStats.WithLabelValues(fmt.Sprintf("level_%d_read", i)).Observe(float64(stats.LevelRead[i])) + ldbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i])) + ldbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(stats.LevelDurations[i].Seconds()) + } + } + } + } + }() + } + sharkyBasePath := path.Join(basePath, sharkyPath) if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) { @@ -367,8 +413,8 @@ const lockKeyNewSession string = "new_session" // Options provides a container to configure different things in the storer. type Options struct { - // These are options related to levelDB. Currently the underlying storage used - // is levelDB. + // These are options related to levelDB. Currently, the underlying storage used is levelDB. + LdbStats atomic.Pointer[prometheus.HistogramVec] LdbOpenFilesLimit uint64 LdbBlockCacheCapacity uint64 LdbWriteBufferSize uint64 @@ -447,6 +493,9 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { opts = defaultOptions() } + metrics := newMetrics() + opts.LdbStats.CompareAndSwap(nil, &metrics.LevelDBStats) + if dirPath == "" { repo, dbCloser, err = initInmemRepository() if err != nil { @@ -476,7 +525,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { logger := opts.Logger.WithName(loggerName).Register() db := &DB{ - metrics: newMetrics(), + metrics: metrics, logger: logger, baseAddr: opts.Address, repo: repo,