diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index a4e040c9e15..a0de4d57939 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -27,6 +27,7 @@ type metrics struct { ExpiredChunkCount prometheus.Counter OverCapTriggerCount prometheus.Counter ExpiredBatchCount prometheus.Counter + LevelDBStats prometheus.HistogramVec } // newMetrics is a convenient constructor for creating new metrics. @@ -116,6 +117,15 @@ func newMetrics() metrics { Help: "Number of batches expired, that were processed.", }, ), + LevelDBStats: *prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_stats", + Help: "LevelDB statistics.", + }, + []string{"counter"}, + ), } } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 9012f39b24a..93ee36fa9a2 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "github.com/syndtr/goleveldb/leveldb" "io" "io/fs" "math/big" @@ -273,6 +274,49 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err) } + if opts.LdbStats != nil { + go func() { + logger := log.NewLogger(loggerName).Register() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stats := new(leveldb.DBStats) + switch err := store.DB().Stats(stats); { + case errors.Is(err, leveldb.ErrClosed): + return + case err != nil: + logger.Error(err, "snapshot levelDB stats") + default: + opts.LdbStats.WithLabelValues("write_delay_count").Observe(float64(stats.WriteDelayCount)) + opts.LdbStats.WithLabelValues("write_delay_duration").Observe(stats.WriteDelayDuration.Seconds()) + opts.LdbStats.WithLabelValues("alive_snapshots").Observe(float64(stats.AliveSnapshots)) + opts.LdbStats.WithLabelValues("alive_iterators").Observe(float64(stats.AliveIterators)) + opts.LdbStats.WithLabelValues("io_write").Observe(float64(stats.IOWrite)) + opts.LdbStats.WithLabelValues("io_read").Observe(float64(stats.IORead)) + opts.LdbStats.WithLabelValues("block_cache_size").Observe(float64(stats.BlockCacheSize)) + opts.LdbStats.WithLabelValues("opened_tables_count").Observe(float64(stats.OpenedTablesCount)) + opts.LdbStats.WithLabelValues("mem_comp").Observe(float64(stats.MemComp)) + opts.LdbStats.WithLabelValues("level_0_comp").Observe(float64(stats.Level0Comp)) + opts.LdbStats.WithLabelValues("non_level_0_comp").Observe(float64(stats.NonLevel0Comp)) + opts.LdbStats.WithLabelValues("seek_comp").Observe(float64(stats.SeekComp)) + for i := 0; i < len(stats.LevelSizes); i++ { + opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_size", i)).Observe(float64(stats.LevelSizes[i])) + opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_tables_count", i)).Observe(float64(stats.LevelTablesCounts[i])) + opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_read", i)).Observe(float64(stats.LevelRead[i])) + opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i])) + opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(stats.LevelDurations[i].Seconds()) + } + } + } + } + }() + } + sharkyBasePath := path.Join(basePath, sharkyPath) if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) { @@ -369,6 +413,7 @@ const lockKeyNewSession string = "new_session" type Options struct { // These are options related to levelDB. Currently the underlying storage used // is levelDB. + LdbStats *prometheus.HistogramVec LdbOpenFilesLimit uint64 LdbBlockCacheCapacity uint64 LdbWriteBufferSize uint64 @@ -447,6 +492,11 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { opts = defaultOptions() } + metrics := newMetrics() + if opts.LdbStats == nil { + opts.LdbStats = &metrics.LevelDBStats + } + if dirPath == "" { repo, dbCloser, err = initInmemRepository() if err != nil { @@ -476,7 +526,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { logger := opts.Logger.WithName(loggerName).Register() db := &DB{ - metrics: newMetrics(), + metrics: metrics, logger: logger, baseAddr: opts.Address, repo: repo,