Skip to content

Commit

Permalink
feat(storer): introduce stats for LevelDB
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci committed Jul 7, 2023
1 parent 6fb07f3 commit 34e3a05
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
10 changes: 10 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type metrics struct {
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats prometheus.HistogramVec
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -116,6 +117,15 @@ func newMetrics() metrics {
Help: "Number of batches expired, that were processed.",
},
),
LevelDBStats: *prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_stats",
Help: "LevelDB statistics.",
},
[]string{"counter"},
),
}
}

Expand Down
57 changes: 53 additions & 4 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/ethersphere/bee/pkg/log"
Expand All @@ -23,7 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/pusher"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/sharky"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/leveldbstore"
"github.com/ethersphere/bee/pkg/storage/migration"
"github.com/ethersphere/bee/pkg/storer/internal"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/ethersphere/bee/pkg/topology"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"resenje.org/multex"
)
Expand Down Expand Up @@ -273,6 +275,50 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

if opts.LdbStats.Load() != nil {
go func() {
ldbStats := opts.LdbStats.Load()
logger := log.NewLogger(loggerName).Register()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := new(leveldb.DBStats)
switch err := store.DB().Stats(stats); {
case errors.Is(err, leveldb.ErrClosed):
return
case err != nil:
logger.Error(err, "snapshot levelDB stats")
default:
ldbStats.WithLabelValues("write_delay_count").Observe(float64(stats.WriteDelayCount))
ldbStats.WithLabelValues("write_delay_duration").Observe(stats.WriteDelayDuration.Seconds())
ldbStats.WithLabelValues("alive_snapshots").Observe(float64(stats.AliveSnapshots))
ldbStats.WithLabelValues("alive_iterators").Observe(float64(stats.AliveIterators))
ldbStats.WithLabelValues("io_write").Observe(float64(stats.IOWrite))
ldbStats.WithLabelValues("io_read").Observe(float64(stats.IORead))
ldbStats.WithLabelValues("block_cache_size").Observe(float64(stats.BlockCacheSize))
ldbStats.WithLabelValues("opened_tables_count").Observe(float64(stats.OpenedTablesCount))
ldbStats.WithLabelValues("mem_comp").Observe(float64(stats.MemComp))
ldbStats.WithLabelValues("level_0_comp").Observe(float64(stats.Level0Comp))
ldbStats.WithLabelValues("non_level_0_comp").Observe(float64(stats.NonLevel0Comp))
ldbStats.WithLabelValues("seek_comp").Observe(float64(stats.SeekComp))
for i := 0; i < len(stats.LevelSizes); i++ {
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_size", i)).Observe(float64(stats.LevelSizes[i]))
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_tables_count", i)).Observe(float64(stats.LevelTablesCounts[i]))
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_read", i)).Observe(float64(stats.LevelRead[i]))
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i]))
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(stats.LevelDurations[i].Seconds())
}
}
}
}
}()
}

sharkyBasePath := path.Join(basePath, sharkyPath)

if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) {
Expand Down Expand Up @@ -367,8 +413,8 @@ const lockKeyNewSession string = "new_session"

// Options provides a container to configure different things in the storer.
type Options struct {
// These are options related to levelDB. Currently the underlying storage used
// is levelDB.
// These are options related to levelDB. Currently, the underlying storage used is levelDB.
LdbStats atomic.Pointer[prometheus.HistogramVec]
LdbOpenFilesLimit uint64
LdbBlockCacheCapacity uint64
LdbWriteBufferSize uint64
Expand Down Expand Up @@ -447,6 +493,9 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
opts = defaultOptions()
}

metrics := newMetrics()
opts.LdbStats.CompareAndSwap(nil, &metrics.LevelDBStats)

if dirPath == "" {
repo, dbCloser, err = initInmemRepository()
if err != nil {
Expand Down Expand Up @@ -476,7 +525,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
logger := opts.Logger.WithName(loggerName).Register()

db := &DB{
metrics: newMetrics(),
metrics: metrics,
logger: logger,
baseAddr: opts.Address,
repo: repo,
Expand Down

0 comments on commit 34e3a05

Please sign in to comment.