Skip to content

Commit

Permalink
metrics: add separate statesync and blocksync metrics (#9682)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters authored Nov 10, 2022
1 parent f12588a commit 99a7ac8
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

- Tooling
- [tools/tm-signer-harness] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106)
- [metrics] \#9682 move state-syncing and block-syncing metrics to their respective packages (@cmwaters)
labels have moved from block_syncing -> blocksync_syncing and state_syncing -> statesync_syncing

### FEATURES

Expand Down
30 changes: 30 additions & 0 deletions blocksync/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions blocksync/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blocksync

import (
"github.com/go-kit/kit/metrics"
)

const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "blocksync"
)

//go:generate go run ../scripts/metricsgen -struct=Metrics

// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is block syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}
7 changes: 6 additions & 1 deletion blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ type Reactor struct {

requestsCh <-chan BlockRequest
errorsCh <-chan peerError

metrics *Metrics
}

// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool) *Reactor {
blockSync bool, metrics *Metrics) *Reactor {

if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
Expand All @@ -88,6 +90,7 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: metrics,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
Expand Down Expand Up @@ -236,6 +239,8 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)

trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync, NopMetrics())
bcReactor.SetLogger(logger.With("module", "blocksync"))

return ReactorPair{bcReactor, proxyApp}
Expand Down
14 changes: 0 additions & 14 deletions consensus/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ type Metrics struct {
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"`
// Whether or not a node is block syncing. 1 if yes, 0 if no.
BlockSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge

// Number of block parts transmitted by each peer.
BlockParts metrics.Counter `metrics_labels:"peer_id"`
Expand Down
2 changes: 0 additions & 2 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
conR.mtx.Lock()
conR.waitSync = false
conR.mtx.Unlock()
conR.Metrics.BlockSyncing.Set(0)
conR.Metrics.StateSyncing.Set(0)

if skipWAL {
conR.conS.doWALCatchup = false
Expand Down
13 changes: 4 additions & 9 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewNode(config *cfg.Config,
return nil, err
}

csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID)

// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
Expand Down Expand Up @@ -249,18 +249,12 @@ func NewNode(config *cfg.Config,
)

// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}

// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
csMetrics.StateSyncing.Set(1)
} else if blockSync {
csMetrics.BlockSyncing.Set(1)
}
// Make ConsensusReactor
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger,
Expand All @@ -275,6 +269,7 @@ func NewNode(config *cfg.Config,
proxyApp.Snapshot(),
proxyApp.Query(),
config.StateSync.TempDir,
ssMetrics,
)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))

Expand Down
18 changes: 9 additions & 9 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
dbm "github.com/tendermint/tm-db"

abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blocksync"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
Expand Down Expand Up @@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
}

// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics)

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID),
blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics()
}
}

Expand Down Expand Up @@ -336,10 +338,11 @@ func createBlocksyncReactor(config *cfg.Config,
blockStore *store.BlockStore,
blockSync bool,
logger log.Logger,
metrics *blocksync.Metrics,
) (bcReactor p2p.Reactor, err error) {
switch config.BlockSync.Version {
case "v0":
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, blockSync, metrics)
case "v1", "v2":
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
default:
Expand Down Expand Up @@ -575,9 +578,6 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React
}

if blockSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.Metrics.StateSyncing.Set(0)
conR.Metrics.BlockSyncing.Set(1)
err = bcR.SwitchToBlockSync(state)
if err != nil {
ssR.Logger.Error("Failed to switch to block sync", "err", err)
Expand Down
30 changes: 30 additions & 0 deletions statesync/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions statesync/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package statesync

import (
"github.com/go-kit/kit/metrics"
)

const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "statesync"
)

//go:generate go run ../scripts/metricsgen -struct=Metrics

// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is state syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}
5 changes: 5 additions & 0 deletions statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Reactor struct {
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
tempDir string
metrics *Metrics

// This will only be set when a state sync is in progress. It is used to feed received
// snapshots and chunks into the sync.
Expand All @@ -46,12 +47,14 @@ func NewReactor(
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
tempDir string,
metrics *Metrics,
) *Reactor {

r := &Reactor{
cfg: cfg,
conn: conn,
connQuery: connQuery,
metrics: metrics,
}
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)

Expand Down Expand Up @@ -265,6 +268,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
r.metrics.Syncing.Set(1)
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()

Expand All @@ -284,6 +288,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)

r.mtx.Lock()
r.syncer = nil
r.metrics.Syncing.Set(0)
r.mtx.Unlock()
return state, commit, err
}
4 changes: 2 additions & 2 deletions statesync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {

// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {

// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {
Expand Down

0 comments on commit 99a7ac8

Please sign in to comment.