Skip to content

Commit

Permalink
feat(storer): introduce stats for LevelDB
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci committed Jul 6, 2023
1 parent e01947f commit 9753ea7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type metrics struct {
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats prometheus.HistogramVec
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -116,6 +117,15 @@ func newMetrics() metrics {
Help: "Number of batches expired, that were processed.",
},
),
LevelDBStats: *prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_stats",
Help: "LevelDB statistics.",
},
[]string{"counter"},
),
}
}

Expand Down
52 changes: 51 additions & 1 deletion pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"github.com/syndtr/goleveldb/leveldb"
"io"
"io/fs"
"math/big"
Expand Down Expand Up @@ -272,6 +273,49 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

if opts.LdbStats != nil {
go func() {
logger := log.NewLogger(loggerName).Register()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := new(leveldb.DBStats)
switch err := store.DB().Stats(stats); {
case errors.Is(err, leveldb.ErrClosed):
return
case err != nil:
logger.Error(err, "snapshot levelDB stats")
default:
opts.LdbStats.WithLabelValues("write_delay_count").Observe(float64(stats.WriteDelayCount))
opts.LdbStats.WithLabelValues("write_delay_duration").Observe(float64(stats.WriteDelayDuration))
opts.LdbStats.WithLabelValues("alive_snapshots").Observe(float64(stats.AliveSnapshots))
opts.LdbStats.WithLabelValues("alive_iterators").Observe(float64(stats.AliveIterators))
opts.LdbStats.WithLabelValues("io_write").Observe(float64(stats.IOWrite))
opts.LdbStats.WithLabelValues("io_read").Observe(float64(stats.IORead))
opts.LdbStats.WithLabelValues("block_cache_size").Observe(float64(stats.BlockCacheSize))
opts.LdbStats.WithLabelValues("opened_tables_count").Observe(float64(stats.OpenedTablesCount))
opts.LdbStats.WithLabelValues("mem_comp").Observe(float64(stats.MemComp))
opts.LdbStats.WithLabelValues("level_0_comp").Observe(float64(stats.Level0Comp))
opts.LdbStats.WithLabelValues("non_level_0_comp").Observe(float64(stats.NonLevel0Comp))
opts.LdbStats.WithLabelValues("seek_comp").Observe(float64(stats.SeekComp))
for i := 0; i < len(stats.LevelSizes); i++ {
opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_size", i)).Observe(float64(stats.LevelSizes[i]))
opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_tables_count", i)).Observe(float64(stats.LevelTablesCounts[i]))
opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_read", i)).Observe(float64(stats.LevelRead[i]))
opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i]))
opts.LdbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(float64(stats.LevelDurations[i]))
}
}
}
}
}()
}

sharkyBasePath := path.Join(basePath, sharkyPath)

if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) {
Expand Down Expand Up @@ -368,6 +412,7 @@ const lockKeyNewSession string = "new_session"
type Options struct {
// These are options related to levelDB. Currently the underlying storage used
// is levelDB.
LdbStats *prometheus.HistogramVec
LdbOpenFilesLimit uint64
LdbBlockCacheCapacity uint64
LdbWriteBufferSize uint64
Expand Down Expand Up @@ -446,6 +491,11 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
opts = defaultOptions()
}

metrics := newMetrics()
if opts.LdbStats == nil {
opts.LdbStats = &metrics.LevelDBStats
}

if dirPath == "" {
repo, dbCloser, err = initInmemRepository()
if err != nil {
Expand Down Expand Up @@ -475,7 +525,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
logger := opts.Logger.WithName(loggerName).Register()

db := &DB{
metrics: newMetrics(),
metrics: metrics,
logger: logger,
baseAddr: opts.Address,
repo: repo,
Expand Down

0 comments on commit 9753ea7

Please sign in to comment.