diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index dfc66460ff3..77940da8a75 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,6 +20,8 @@ - Tooling - [tools/tm-signer-harness] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106) + - [metrics] \#9682 move state-syncing and block-syncing metrics to their respective packages (@cmwaters) + labels have moved from block_syncing -> blocksync_syncing and state_syncing -> statesync_syncing ### FEATURES diff --git a/blocksync/metrics.gen.go b/blocksync/metrics.gen.go new file mode 100644 index 00000000000..1d093fb314f --- /dev/null +++ b/blocksync/metrics.gen.go @@ -0,0 +1,30 @@ +// Code generated by metricsgen. DO NOT EDIT. + +package blocksync + +import ( + "github.com/go-kit/kit/metrics/discard" + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "syncing", + Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.", + }, labels).With(labelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + Syncing: discard.NewGauge(), + } +} diff --git a/blocksync/metrics.go b/blocksync/metrics.go new file mode 100644 index 00000000000..78a6337b945 --- /dev/null +++ b/blocksync/metrics.go @@ -0,0 +1,19 @@ +package blocksync + +import ( + "github.com/go-kit/kit/metrics" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blocksync" +) + +//go:generate go run ../scripts/metricsgen -struct=Metrics + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Whether or not a node is block syncing. 1 if yes, 0 if no. + Syncing metrics.Gauge +} diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 87dae817095..eeada7da2e0 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -58,11 +58,13 @@ type Reactor struct { requestsCh <-chan BlockRequest errorsCh <-chan peerError + + metrics *Metrics } // NewReactor returns new reactor instance. func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - blockSync bool) *Reactor { + blockSync bool, metrics *Metrics) *Reactor { if state.LastBlockHeight != store.Height() { panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, @@ -88,6 +90,7 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS blockSync: blockSync, requestsCh: requestsCh, errorsCh: errorsCh, + metrics: metrics, } bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR) return bcR @@ -236,6 +239,8 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (bcR *Reactor) poolRoutine(stateSynced bool) { + bcR.metrics.Syncing.Set(1) + defer bcR.metrics.Syncing.Set(0) trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) defer trySyncTicker.Stop() diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index a88e0591254..0ab127b5306 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -145,7 +145,7 @@ func newReactor( blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync, NopMetrics()) bcReactor.SetLogger(logger.With("module", "blocksync")) return ReactorPair{bcReactor, proxyApp} diff --git a/consensus/metrics.gen.go b/consensus/metrics.gen.go index 6f1699cdd74..94ea5d2247e 100644 --- a/consensus/metrics.gen.go +++ b/consensus/metrics.gen.go @@ -118,18 +118,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "latest_block_height", Help: "The latest block height.", }, labels).With(labelsAndValues...), - BlockSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "block_syncing", - Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), - StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "state_syncing", - Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -208,8 +196,6 @@ func NopMetrics() *Metrics { BlockSizeBytes: discard.NewGauge(), TotalTxs: discard.NewGauge(), CommittedHeight: discard.NewGauge(), - BlockSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), BlockParts: discard.NewCounter(), StepDurationSeconds: discard.NewHistogram(), BlockGossipPartsReceived: discard.NewCounter(), diff --git a/consensus/metrics.go b/consensus/metrics.go index e6a8f284ac0..f8262d3916c 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -61,10 +61,6 @@ type Metrics struct { TotalTxs metrics.Gauge // The latest block height. CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"` - // Whether or not a node is block syncing. 1 if yes, 0 if no. - BlockSyncing metrics.Gauge - // Whether or not a node is state syncing. 1 if yes, 0 if no. - StateSyncing metrics.Gauge // Number of block parts transmitted by each peer. BlockParts metrics.Counter `metrics_labels:"peer_id"` diff --git a/consensus/reactor.go b/consensus/reactor.go index d308da2a009..a8d672e446c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -119,8 +119,6 @@ func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { conR.mtx.Lock() conR.waitSync = false conR.mtx.Unlock() - conR.Metrics.BlockSyncing.Set(0) - conR.Metrics.StateSyncing.Set(0) if skipWAL { conR.conS.doWALCatchup = false diff --git a/node/node.go b/node/node.go index 90a9356dba1..ddf86e0dcff 100644 --- a/node/node.go +++ b/node/node.go @@ -160,7 +160,7 @@ func NewNode(config *cfg.Config, return nil, err } - csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID) + csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics) @@ -249,18 +249,12 @@ func NewNode(config *cfg.Config, ) // Make BlocksyncReactor. Don't start block sync if we're doing a state sync first. - bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger) + bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics) if err != nil { return nil, fmt.Errorf("could not create blocksync reactor: %w", err) } - // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. - // FIXME We need to update metrics here, since other reactors don't have access to them. - if stateSync { - csMetrics.StateSyncing.Set(1) - } else if blockSync { - csMetrics.BlockSyncing.Set(1) - } + // Make ConsensusReactor consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger, @@ -275,6 +269,7 @@ func NewNode(config *cfg.Config, proxyApp.Snapshot(), proxyApp.Query(), config.StateSync.TempDir, + ssMetrics, ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) diff --git a/node/setup.go b/node/setup.go index e9449558eaa..5bada8b6f2c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -12,7 +12,7 @@ import ( dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" - bc "github.com/tendermint/tendermint/blocksync" + "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" @@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { } // MetricsProvider returns a consensus, p2p and mempool Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) +type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) { + return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), - proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID) + proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID), + blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID), + statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID) } - return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics() + return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics() } } @@ -336,10 +338,11 @@ func createBlocksyncReactor(config *cfg.Config, blockStore *store.BlockStore, blockSync bool, logger log.Logger, + metrics *blocksync.Metrics, ) (bcReactor p2p.Reactor, err error) { switch config.BlockSync.Version { case "v0": - bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync) + bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, blockSync, metrics) case "v1", "v2": return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version) default: @@ -575,9 +578,6 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React } if blockSync { - // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) - conR.Metrics.BlockSyncing.Set(1) err = bcR.SwitchToBlockSync(state) if err != nil { ssR.Logger.Error("Failed to switch to block sync", "err", err) diff --git a/statesync/metrics.gen.go b/statesync/metrics.gen.go new file mode 100644 index 00000000000..1941c9270e3 --- /dev/null +++ b/statesync/metrics.gen.go @@ -0,0 +1,30 @@ +// Code generated by metricsgen. DO NOT EDIT. + +package statesync + +import ( + "github.com/go-kit/kit/metrics/discard" + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "syncing", + Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", + }, labels).With(labelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + Syncing: discard.NewGauge(), + } +} diff --git a/statesync/metrics.go b/statesync/metrics.go new file mode 100644 index 00000000000..9a4d7fcefab --- /dev/null +++ b/statesync/metrics.go @@ -0,0 +1,19 @@ +package statesync + +import ( + "github.com/go-kit/kit/metrics" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "statesync" +) + +//go:generate go run ../scripts/metricsgen -struct=Metrics + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Whether or not a node is state syncing. 1 if yes, 0 if no. + Syncing metrics.Gauge +} diff --git a/statesync/reactor.go b/statesync/reactor.go index 096fdd1b7f5..d650a6a55db 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -33,6 +33,7 @@ type Reactor struct { conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery tempDir string + metrics *Metrics // This will only be set when a state sync is in progress. It is used to feed received // snapshots and chunks into the sync. @@ -46,12 +47,14 @@ func NewReactor( conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string, + metrics *Metrics, ) *Reactor { r := &Reactor{ cfg: cfg, conn: conn, connQuery: connQuery, + metrics: metrics, } r.BaseReactor = *p2p.NewBaseReactor("StateSync", r) @@ -265,6 +268,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.mtx.Unlock() return sm.State{}, nil, errors.New("a state sync is already in progress") } + r.metrics.Syncing.Set(1) r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir) r.mtx.Unlock() @@ -284,6 +288,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.mtx.Lock() r.syncer = nil + r.metrics.Syncing.Set(0) r.mtx.Unlock() return state, commit, err } diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 8d06c7c2dad..eed3b23617e 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -71,7 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) { // Start a reactor and send a ssproto.ChunkRequest, then wait for and check response cfg := config.DefaultStateSyncConfig() - r := NewReactor(*cfg, conn, nil, "") + r := NewReactor(*cfg, conn, nil, "", NopMetrics()) err := r.Start() require.NoError(t, err) t.Cleanup(func() { @@ -161,7 +161,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { // Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses cfg := config.DefaultStateSyncConfig() - r := NewReactor(*cfg, conn, nil, "") + r := NewReactor(*cfg, conn, nil, "", NopMetrics()) err := r.Start() require.NoError(t, err) t.Cleanup(func() {