From 64ee99b39d337893768d4646a034bdad896bd3a2 Mon Sep 17 00:00:00 2001 From: codchen Date: Mon, 13 Feb 2023 11:53:45 +0800 Subject: [PATCH 01/15] Enrich net-info endpoint --- internal/p2p/peermanager.go | 38 +++++++++++++++++++++++++++++++++++++ internal/rpc/core/env.go | 4 +++- internal/rpc/core/net.go | 15 +++++++++++---- rpc/coretypes/responses.go | 16 ++++++++++++---- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 5be75ec84..c8f3dda5e 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -7,6 +7,7 @@ import ( "math" "math/rand" "sort" + "strings" "sync" "time" @@ -1002,6 +1003,17 @@ func (m *PeerManager) Scores() map[types.NodeID]PeerScore { return scores } +func (m *PeerManager) Score(id types.NodeID) int { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, ok := m.store.Get(id) + if ok { + return int(peer.Score()) + } + return -1 +} + // Status returns the status for a peer, primarily for testing. func (m *PeerManager) Status(id types.NodeID) PeerStatus { m.mtx.Lock() @@ -1014,6 +1026,32 @@ func (m *PeerManager) Status(id types.NodeID) PeerStatus { } } +func (m *PeerManager) State(id types.NodeID) string { + m.mtx.Lock() + defer m.mtx.Unlock() + + states := []string{} + if _, ok := m.ready[id]; ok { + states = append(states, "ready") + } + if _, ok := m.dialing[id]; ok { + states = append(states, "dialing") + } + if _, ok := m.upgrading[id]; ok { + states = append(states, "upgrading") + } + if _, ok := m.connected[id]; ok { + states = append(states, "connected") + } + if _, ok := m.evict[id]; ok { + states = append(states, "evict") + } + if _, ok := m.evicting[id]; ok { + states = append(states, "evicting") + } + return strings.Join(states, ",") +} + // findUpgradeCandidate looks for a lower-scored peer that we could evict // to make room for the given peer. Returns an empty ID if none is found. // If the peer is already being upgraded to, we return that same upgrade. diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 124525f26..f52d732e3 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -59,10 +59,12 @@ type consensusState interface { type peerManager interface { Peers() []types.NodeID + Score(types.NodeID) int + State(types.NodeID) string Addresses(types.NodeID) []p2p.NodeAddress } -//---------------------------------------------- +// ---------------------------------------------- // Environment contains objects and interfaces used by the RPC. It is expected // to be setup once during startup. type Environment struct { diff --git a/internal/rpc/core/net.go b/internal/rpc/core/net.go index b18f1e2fc..124deb757 100644 --- a/internal/rpc/core/net.go +++ b/internal/rpc/core/net.go @@ -14,6 +14,7 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, peerList := env.PeerManager.Peers() peers := make([]coretypes.Peer, 0, len(peerList)) + peerConnections := make([]coretypes.PeerConnection, 0, len(peerList)) for _, peer := range peerList { addrs := env.PeerManager.Addresses(peer) if len(addrs) == 0 { @@ -24,13 +25,19 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, ID: peer, URL: addrs[0].String(), }) + peerConnections = append(peerConnections, coretypes.PeerConnection{ + ID: peer, + State: env.PeerManager.State(peer), + Score: env.PeerManager.Score(peer), + }) } return &coretypes.ResultNetInfo{ - Listening: env.IsListening, - Listeners: env.Listeners, - NPeers: len(peers), - Peers: peers, + Listening: env.IsListening, + Listeners: env.Listeners, + NPeers: len(peers), + Peers: peers, + PeerConnections: peerConnections, }, nil } diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index ef740f89d..cde125fa2 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -174,10 +174,11 @@ func (s *ResultStatus) TxIndexEnabled() bool { // Info about peer connections type ResultNetInfo struct { - Listening bool `json:"listening"` - Listeners []string `json:"listeners"` - NPeers int `json:"n_peers,string"` - Peers []Peer `json:"peers"` + Listening bool `json:"listening"` + Listeners []string `json:"listeners"` + NPeers int `json:"n_peers,string"` + Peers []Peer `json:"peers"` + PeerConnections []PeerConnection `json:"peer_connections"` } // Log from dialing seeds @@ -196,6 +197,13 @@ type Peer struct { URL string `json:"url"` } +// A peer connection +type PeerConnection struct { + ID types.NodeID `json:"node_id"` + State string `json:"state"` + Score int `json:"score,string"` +} + // Validators for a height. type ResultValidators struct { BlockHeight int64 `json:"block_height,string"` From 868afcbe29b92e510c7da24232373363a1524a42 Mon Sep 17 00:00:00 2001 From: codchen Date: Mon, 13 Feb 2023 16:22:52 +0800 Subject: [PATCH 02/15] add metrics --- internal/p2p/metrics.gen.go | 7 +++++++ internal/p2p/metrics.go | 2 ++ internal/p2p/router.go | 5 ++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index cbfba29d9..b0041db5e 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -68,6 +68,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_queue_msg_size", Help: "The size of messages sent over a peer's queue for a specific p2p Channel.", }, append(labels, "ch_id")).With(labelsAndValues...), + PeerChannelSend: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_channel_send", + Help: "", + }, labels).With(labelsAndValues...), } } @@ -82,5 +88,6 @@ func NopMetrics() *Metrics { RouterChannelQueueSend: discard.NewHistogram(), PeerQueueDroppedMsgs: discard.NewCounter(), PeerQueueMsgSize: discard.NewGauge(), + PeerChannelSend: discard.NewCounter(), } } diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index b45f128e5..98b1a82c0 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -59,6 +59,8 @@ type Metrics struct { // queue for a specific flow (i.e. Channel). //metrics:The size of messages sent over a peer's queue for a specific p2p Channel. PeerQueueMsgSize metrics.Gauge `metrics_labels:"ch_id" metric_name:"router_channel_queue_msg_size"` + + PeerChannelSend metrics.Counter } type metricsLabelCache struct { diff --git a/internal/p2p/router.go b/internal/p2p/router.go index cb0d32713..e22902812 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -361,6 +361,9 @@ func (r *Router) routeChannel( // check whether the peer is receiving on that channel if _, ok := peerChs[chID]; ok { queues = append(queues, q) + r.metrics.PeerSendBytesTotal.With( + "chID", fmt.Sprintf("%d", chID), + "peer_id", string(nodeID)).Add(1) } } @@ -403,7 +406,7 @@ func (r *Router) routeChannel( r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): - r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Error("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) case <-ctx.Done(): return From fa6d94339a0382590fd5c2a0aa74e8bc51f519e0 Mon Sep 17 00:00:00 2001 From: codchen Date: Mon, 13 Feb 2023 17:07:17 +0800 Subject: [PATCH 03/15] fix --- internal/p2p/router.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index e22902812..bcce86ca0 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -363,7 +363,8 @@ func (r *Router) routeChannel( queues = append(queues, q) r.metrics.PeerSendBytesTotal.With( "chID", fmt.Sprintf("%d", chID), - "peer_id", string(nodeID)).Add(1) + "peer_id", string(nodeID), + "message_type", "").Add(1) } } From 3ffc908a8d536daae45f16a75e7193fe1e52852a Mon Sep 17 00:00:00 2001 From: codchen Date: Mon, 13 Feb 2023 23:55:21 +0800 Subject: [PATCH 04/15] accurate metrics --- internal/p2p/router.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index bcce86ca0..fbcc4f6d5 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -361,10 +361,6 @@ func (r *Router) routeChannel( // check whether the peer is receiving on that channel if _, ok := peerChs[chID]; ok { queues = append(queues, q) - r.metrics.PeerSendBytesTotal.With( - "chID", fmt.Sprintf("%d", chID), - "peer_id", string(nodeID), - "message_type", "").Add(1) } } @@ -944,6 +940,11 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect return err } + r.metrics.PeerSendBytesTotal.With( + "chID", fmt.Sprintf("%d", envelope.ChannelID), + "peer_id", string(peerID), + "message_type", "").Add(1) + r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message) case <-peerQueue.closed(): From 2aa2aa6416cdf09a20c6134bb11840a3bc789dbe Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 13 Feb 2023 13:51:13 -0800 Subject: [PATCH 05/15] Add more logs and num channels per peer --- internal/p2p/metrics.gen.go | 7 +++++++ internal/p2p/metrics.go | 2 ++ internal/p2p/router.go | 13 +++++++++++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index b0041db5e..adbb70e6e 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -32,6 +32,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_send_bytes_total", Help: "Number of bytes per channel sent to a given peer.", }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), + PeerNumChannels: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_num_channels", + Help: "Number of channels open for peer", + }, labels).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -82,6 +88,7 @@ func NopMetrics() *Metrics { Peers: discard.NewGauge(), PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), + PeerNumChannels: discard.NewGauge(), PeerPendingSendBytes: discard.NewGauge(), RouterPeerQueueRecv: discard.NewHistogram(), RouterPeerQueueSend: discard.NewHistogram(), diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index 98b1a82c0..f9b7c7cb3 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -32,6 +32,8 @@ type Metrics struct { PeerReceiveBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` // Number of bytes per channel sent to a given peer. PeerSendBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` + // Number of channels open for peer + PeerNumChannels metrics.Gauge // Number of bytes pending being sent to a given peer. PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"` diff --git a/internal/p2p/router.go b/internal/p2p/router.go index fbcc4f6d5..c0f07ae7b 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -328,9 +328,11 @@ func (r *Router) routeChannel( select { case envelope, ok := <-outCh: if !ok { + r.logger.Error("[TMDEBUG] receiving envelope failed, dropping message", "channel", chID) return } if envelope.IsZero() { + r.logger.Error("[TMDEBUG] envelope is zero, dropping message", "channel", chID) continue } @@ -361,6 +363,8 @@ func (r *Router) routeChannel( // check whether the peer is receiving on that channel if _, ok := peerChs[chID]; ok { queues = append(queues, q) + } else { + r.logger.Error("[TMDEBUG] broadcast mode block dropping message because peer is not ok", "peer", envelope.To, "channel", chID) } } @@ -373,13 +377,14 @@ func (r *Router) routeChannel( if ok { peerChs := r.peerChannels[envelope.To] + r.metrics.PeerNumChannels.With("peer_id", string(envelope.To)).Set(float64(len(peerChs))) // check whether the peer is receiving on that channel _, contains = peerChs[chID] } r.peerMtx.RUnlock() if !ok { - r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because we cannot find peer", "peer", envelope.To, "channel", chID) continue } @@ -388,6 +393,7 @@ func (r *Router) routeChannel( // peer doesn't have available. This is a known issue due to // how peer subscriptions work: // https://github.com/tendermint/tendermint/issues/6598 + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because contains is false (peer is not receiving on channel)", "peer", envelope.To, "channel", chID) continue } @@ -403,15 +409,17 @@ func (r *Router) routeChannel( r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): - r.logger.Error("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because q is closed", "peer", envelope.To, "channel", chID) case <-ctx.Done(): + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because context is done", "peer", envelope.To, "channel", chID) return } } case peerError, ok := <-errCh: if !ok { + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because peer err", "channel", chID) return } @@ -431,6 +439,7 @@ func (r *Router) routeChannel( } case <-ctx.Done(): + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because context is done 2", "channel", chID) return } } From 93e3f5c018e123c46169a23e1df627325859bee8 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 13 Feb 2023 14:22:08 -0800 Subject: [PATCH 06/15] fix metrics --- internal/p2p/metrics.gen.go | 2 +- internal/p2p/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index adbb70e6e..23e400b08 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -37,7 +37,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Subsystem: MetricsSubsystem, Name: "peer_num_channels", Help: "Number of channels open for peer", - }, labels).With(labelsAndValues...), + }, append(labels, "peer_id")).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index f9b7c7cb3..f5eb5c5f9 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -33,7 +33,7 @@ type Metrics struct { // Number of bytes per channel sent to a given peer. PeerSendBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` // Number of channels open for peer - PeerNumChannels metrics.Gauge + PeerNumChannels metrics.Gauge `metrics_labels:"peer_id"` // Number of bytes pending being sent to a given peer. PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"` From 8520c0d0edd70322423fe7a4635db0d2fc00ea78 Mon Sep 17 00:00:00 2001 From: codchen Date: Tue, 14 Feb 2023 11:28:38 +0800 Subject: [PATCH 07/15] add more logs --- internal/consensus/reactor.go | 2 ++ internal/p2p/router.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 83d7a536d..59685c225 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -996,6 +996,8 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda } }() + } else { + r.logger.Error("[TMDEBUG] peer status up received while PeerState is running") } case p2p.PeerStatusDown: diff --git a/internal/p2p/router.go b/internal/p2p/router.go index c0f07ae7b..ea88305e1 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -761,7 +761,9 @@ func (r *Router) handshakePeer( } nodeInfo := r.nodeInfoProducer() + r.logger.Info(fmt.Sprintf("[TMDEBUG] handshaking %s with channels %s", expectID, nodeInfo.Channels)) peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey) + r.logger.Info(fmt.Sprintf("[TMDEBUG] handshaked %s with channels %s", peerInfo.NodeID, peerInfo.Channels)) if err != nil { return peerInfo, err } From 3d0fc99ca528769257ee307d543f02c8ee3fa238 Mon Sep 17 00:00:00 2001 From: codchen Date: Tue, 14 Feb 2023 13:39:41 +0800 Subject: [PATCH 08/15] more logs --- internal/consensus/reactor.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 59685c225..8b4099a52 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -510,6 +510,7 @@ func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh * OUTER_LOOP: for { if !r.IsRunning() { + r.logger.Error("[TMDEBUG] gossipDataRoutine reactor not running", "peer", ps.peerID) return } @@ -517,6 +518,7 @@ OUTER_LOOP: select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipDataRoutine outer done", "peer", ps.peerID) return case <-timer.C: } @@ -530,7 +532,7 @@ OUTER_LOOP: part := rs.ProposalBlockParts.GetPart(index) partProto, err := part.ToProto() if err != nil { - logger.Error("failed to convert block part to proto", "err", err) + r.logger.Error("[TMDEBUG] gossipDataRoutine failed to convert block part to proto", "peer", ps.peerID, "err", err) return } @@ -543,6 +545,7 @@ OUTER_LOOP: Part: *partProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send part error", "peer", ps.peerID, "err", err) return } @@ -602,6 +605,7 @@ OUTER_LOOP: Proposal: *propProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send proposal error", "peer", ps.peerID, "err", err) return } @@ -627,6 +631,7 @@ OUTER_LOOP: ProposalPol: *pPolProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send POL error", "peer", ps.peerID, "err", err) return } } @@ -744,11 +749,13 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh for { if !r.IsRunning() { + r.logger.Error("[TMDEBUG] gossipVotesRoutine reactor not running", "peer", ps.peerID) return } select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipVotesRoutine done", "peer", ps.peerID) return default: } @@ -759,6 +766,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine height match error", "error", err, "peer", ps.peerID) return } else if ok { continue @@ -768,6 +776,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh // special catchup logic -- if peer is lagging by height 1, send LastCommit if prs.Height != 0 && rs.Height == prs.Height+1 { if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine special catchup error", "error", err, "peer", ps.peerID) return } else if ok { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) @@ -790,6 +799,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh continue } if ok, err := r.pickSendVote(ctx, ps, ec, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine catchup error", "error", err, "peer", ps.peerID) return } else if ok { logger.Debug("picked Catchup commit to send", "height", prs.Height) @@ -800,6 +810,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh timer.Reset(r.state.config.PeerGossipSleepDuration) select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipVotesRoutine done 2", "peer", ps.peerID) return case <-timer.C: } @@ -945,7 +956,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh // the peer. During peer removal, we remove the peer for our set of peers and // signal to all spawned goroutines to gracefully exit in a non-blocking manner. func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) { - r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) + r.logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() defer r.mtx.Unlock() @@ -976,12 +987,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda go func() { select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] canceled", "peer", peerUpdate.NodeID) return case <-r.readySignal: } // do nothing if the peer has // stopped while we've been waiting. if !ps.IsRunning() { + r.logger.Error("[TMDEBUG] not running", "peer", peerUpdate.NodeID) return } // start goroutines for this peer From 72eaf5963f13a4e4a5f8750e068b1666d905e0bf Mon Sep 17 00:00:00 2001 From: codchen Date: Tue, 14 Feb 2023 20:40:05 +0800 Subject: [PATCH 09/15] more metrics --- internal/consensus/metrics.gen.go | 76 ++++++++++++++++++------------- internal/consensus/metrics.go | 3 ++ internal/consensus/reactor.go | 40 ++++++++++++++++ node/node.go | 2 +- 4 files changed, 89 insertions(+), 32 deletions(-) diff --git a/internal/consensus/metrics.gen.go b/internal/consensus/metrics.gen.go index 37bac369b..3d5510159 100644 --- a/internal/consensus/metrics.gen.go +++ b/internal/consensus/metrics.gen.go @@ -20,6 +20,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "height", Help: "Height of the chain.", }, labels).With(labelsAndValues...), + GossipDataCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "gossip_data_count", + Help: "", + }, append(labels, "peer_id", "branch")).With(labelsAndValues...), + GossipVotesCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "gossip_votes_count", + Help: "", + }, append(labels, "peer_id", "branch")).With(labelsAndValues...), ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -162,25 +174,25 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_block_created_on_propose", - Help: "Number of proposal blocks created on propose received.", + Help: "", }, append(labels, "success")).With(labelsAndValues...), ProposalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_txs", - Help: "Number of txs in a proposal.", + Help: "", }, labels).With(labelsAndValues...), ProposalMissingTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_missing_txs", - Help: "Number of missing txs when trying to create proposal.", + Help: "", }, labels).With(labelsAndValues...), MissingTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "missing_txs", - Help: "Number of missing txs when a proposal is received.", + Help: "", }, append(labels, "proposer_address")).With(labelsAndValues...), QuorumPrevoteDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -237,29 +249,35 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ - Height: discard.NewGauge(), - ValidatorLastSignedHeight: discard.NewGauge(), - Rounds: discard.NewGauge(), - RoundDuration: discard.NewHistogram(), - Validators: discard.NewGauge(), - ValidatorsPower: discard.NewGauge(), - ValidatorPower: discard.NewGauge(), - ValidatorMissedBlocks: discard.NewGauge(), - MissingValidators: discard.NewGauge(), - MissingValidatorsPower: discard.NewGauge(), - ByzantineValidators: discard.NewGauge(), - ByzantineValidatorsPower: discard.NewGauge(), - BlockIntervalSeconds: discard.NewHistogram(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewHistogram(), - TotalTxs: discard.NewGauge(), - CommittedHeight: discard.NewGauge(), - BlockSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), - BlockParts: discard.NewCounter(), - StepDuration: discard.NewHistogram(), - BlockGossipReceiveLatency: discard.NewHistogram(), - BlockGossipPartsReceived: discard.NewCounter(), + Height: discard.NewGauge(), + GossipDataCount: discard.NewCounter(), + GossipVotesCount: discard.NewCounter(), + ValidatorLastSignedHeight: discard.NewGauge(), + Rounds: discard.NewGauge(), + RoundDuration: discard.NewHistogram(), + Validators: discard.NewGauge(), + ValidatorsPower: discard.NewGauge(), + ValidatorPower: discard.NewGauge(), + ValidatorMissedBlocks: discard.NewGauge(), + MissingValidators: discard.NewGauge(), + MissingValidatorsPower: discard.NewGauge(), + ByzantineValidators: discard.NewGauge(), + ByzantineValidatorsPower: discard.NewGauge(), + BlockIntervalSeconds: discard.NewHistogram(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewHistogram(), + TotalTxs: discard.NewGauge(), + CommittedHeight: discard.NewGauge(), + BlockSyncing: discard.NewGauge(), + StateSyncing: discard.NewGauge(), + BlockParts: discard.NewCounter(), + StepDuration: discard.NewHistogram(), + BlockGossipReceiveLatency: discard.NewHistogram(), + BlockGossipPartsReceived: discard.NewCounter(), + ProposalBlockCreatedOnPropose: discard.NewCounter(), + ProposalTxs: discard.NewGauge(), + ProposalMissingTxs: discard.NewGauge(), + MissingTxs: discard.NewGauge(), QuorumPrevoteDelay: discard.NewGauge(), FullPrevoteDelay: discard.NewGauge(), ProposalTimestampDifference: discard.NewHistogram(), @@ -268,9 +286,5 @@ func NopMetrics() *Metrics { ProposalCreateCount: discard.NewCounter(), RoundVotingPowerPercent: discard.NewGauge(), LateVotes: discard.NewCounter(), - ProposalBlockCreatedOnPropose: discard.NewCounter(), - ProposalTxs: discard.NewGauge(), - ProposalMissingTxs: discard.NewGauge(), - MissingTxs: discard.NewGauge(), } } diff --git a/internal/consensus/metrics.go b/internal/consensus/metrics.go index 0b8617be8..49e22da89 100644 --- a/internal/consensus/metrics.go +++ b/internal/consensus/metrics.go @@ -24,6 +24,9 @@ type Metrics struct { // Height of the chain. Height metrics.Gauge + GossipDataCount metrics.Counter `metrics_labels:"peer_id, branch"` + GossipVotesCount metrics.Counter `metrics_labels:"peer_id, branch"` + // Last height signed by this validator if the node is a validator. ValidatorLastSignedHeight metrics.Gauge `metrics_labels:"validator_address"` diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 8b4099a52..bf6438fbb 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -506,6 +506,11 @@ func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh * timer := time.NewTimer(0) defer timer.Stop() + defer func() { + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "6").Add(1) + }() OUTER_LOOP: for { @@ -550,6 +555,9 @@ OUTER_LOOP: } ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "1").Add(1) continue OUTER_LOOP } } @@ -575,15 +583,24 @@ OUTER_LOOP: // Continue the loop since prs is a copy and not effected by this // initialization. + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "2").Add(1) continue OUTER_LOOP } r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh) + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "3").Add(1) continue OUTER_LOOP } // if height and round don't match, sleep if (rs.Height != prs.Height) || (rs.Round != prs.Round) { + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "4").Add(1) continue OUTER_LOOP } @@ -636,6 +653,9 @@ OUTER_LOOP: } } } + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "5").Add(1) } } @@ -746,6 +766,11 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh timer := time.NewTimer(0) defer timer.Stop() + defer func() { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "6").Add(1) + }() for { if !r.IsRunning() { @@ -769,6 +794,9 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh r.logger.Error("[TMDEBUG] gossipVotesRoutine height match error", "error", err, "peer", ps.peerID) return } else if ok { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "1").Add(1) continue } } @@ -780,6 +808,9 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh return } else if ok { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "2").Add(1) continue } } @@ -796,6 +827,9 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh ec = r.state.blockStore.LoadBlockCommit(prs.Height).WrappedExtendedCommit() } if ec == nil { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "3").Add(1) continue } if ok, err := r.pickSendVote(ctx, ps, ec, voteCh); err != nil { @@ -803,10 +837,16 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh return } else if ok { logger.Debug("picked Catchup commit to send", "height", prs.Height) + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "4").Add(1) continue } } + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "5").Add(1) timer.Reset(r.state.config.PeerGossipSleepDuration) select { case <-ctx.Done(): diff --git a/node/node.go b/node/node.go index 7c381eea3..164b6ab7c 100644 --- a/node/node.go +++ b/node/node.go @@ -324,7 +324,7 @@ func makeNode( node.rpcEnv.ConsensusState = csState csReactor := consensus.NewReactor( - logger, + logger.With("module", "csreactor"), csState, peerManager.Subscribe, eventBus, From 7a0ea9b529e1bd71a8211f8c34b6a73432286f9f Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 00:32:36 +0800 Subject: [PATCH 10/15] add logs --- internal/consensus/peer_state.go | 1 + internal/consensus/reactor.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index f081eada1..6a3b2206f 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -396,6 +396,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { // ignore duplicates or decreases if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 { + ps.logger.Error(fmt.Sprintf("received duplicate new round step msg %d %d %d %d %d %d", msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step), "peer", ps.peerID) return } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index bf6438fbb..0ed420515 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -772,6 +772,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh "branch", "6").Add(1) }() + logCounter := 0 for { if !r.IsRunning() { r.logger.Error("[TMDEBUG] gossipVotesRoutine reactor not running", "peer", ps.peerID) @@ -844,6 +845,11 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh } } + if logCounter%100 == 0 { + // print every 10s + r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d", blockStoreBase, prs.Height, rs.Height)) + } + logCounter++ r.Metrics.GossipVotesCount.With( "peer_id", string(ps.peerID), "branch", "5").Add(1) From 46f4e849a1405a9dbf0c350c51987fdd7f07b99b Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 00:46:56 +0800 Subject: [PATCH 11/15] add more logs --- internal/consensus/reactor.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 0ed420515..f70d2c184 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -789,8 +789,10 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh rs := r.getRoundState() prs := ps.GetRoundState() + enteredB1 := false // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { + enteredB1 = true if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil { r.logger.Error("[TMDEBUG] gossipVotesRoutine height match error", "error", err, "peer", ps.peerID) return @@ -802,8 +804,10 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh } } + enteredB2 := false // special catchup logic -- if peer is lagging by height 1, send LastCommit if prs.Height != 0 && rs.Height == prs.Height+1 { + enteredB2 = true if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil { r.logger.Error("[TMDEBUG] gossipVotesRoutine special catchup error", "error", err, "peer", ps.peerID) return @@ -818,7 +822,9 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh // catchup logic -- if peer is lagging by more than 1, send Commit blockStoreBase := r.state.blockStore.Base() + enteredCatchup := false if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase { + enteredCatchup = true // Load the block's extended commit for prs.Height, which contains precommit // signatures for prs.Height. var ec *types.ExtendedCommit @@ -847,7 +853,8 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh if logCounter%100 == 0 { // print every 10s - r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d", blockStoreBase, prs.Height, rs.Height)) + r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d %t %t %t", blockStoreBase, prs.Height, rs.Height, enteredB1, enteredB2, enteredCatchup)) + r.logger.Error(ps.PRS.String()) } logCounter++ r.Metrics.GossipVotesCount.With( From 3359131ae4c1bbbf2f9ee7aa65ebde83edb96b75 Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 13:06:18 +0800 Subject: [PATCH 12/15] add peer ID --- internal/consensus/reactor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index f70d2c184..d2f6310bd 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -851,10 +851,10 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh } } - if logCounter%100 == 0 { + if logCounter%300 == 0 { // print every 10s - r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d %t %t %t", blockStoreBase, prs.Height, rs.Height, enteredB1, enteredB2, enteredCatchup)) - r.logger.Error(ps.PRS.String()) + r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d %t %t %t", blockStoreBase, prs.Height, rs.Height, enteredB1, enteredB2, enteredCatchup), "peer", ps.peerID) + r.logger.Error(ps.PRS.String(), "peer", ps.peerID) } logCounter++ r.Metrics.GossipVotesCount.With( From d8113c74aef7766e18d1eca63b09e263e4a228a2 Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 15:22:36 +0800 Subject: [PATCH 13/15] bump --- internal/consensus/reactor.go | 4 +++- internal/p2p/metrics.gen.go | 14 ++++++++++++++ internal/p2p/metrics.go | 3 +++ internal/p2p/router.go | 12 +++++++++++- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index d2f6310bd..449732572 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -315,10 +315,12 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) { } func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { - return stateCh.Send(ctx, p2p.Envelope{ + err := stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: makeRoundStepMessage(rs), }) + r.logger.Info(fmt.Sprintf("broadcasting %d-%d-%d: %s", rs.Height, rs.Round, rs.Step, err)) + return err } func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index 23e400b08..3057d539a 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -80,6 +80,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_channel_send", Help: "", }, labels).With(labelsAndValues...), + LastEnqueuedAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_enqueued_at", + Help: "", + }, append(labels, "ch_id", "peer_id")).With(labelsAndValues...), + LastSentAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_sent_at", + Help: "", + }, append(labels, "ch_id", "peer_id")).With(labelsAndValues...), } } @@ -96,5 +108,7 @@ func NopMetrics() *Metrics { PeerQueueDroppedMsgs: discard.NewCounter(), PeerQueueMsgSize: discard.NewGauge(), PeerChannelSend: discard.NewCounter(), + LastEnqueuedAt: discard.NewGauge(), + LastSentAt: discard.NewGauge(), } } diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index f5eb5c5f9..7f9ce0618 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -63,6 +63,9 @@ type Metrics struct { PeerQueueMsgSize metrics.Gauge `metrics_labels:"ch_id" metric_name:"router_channel_queue_msg_size"` PeerChannelSend metrics.Counter + + LastEnqueuedAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` + LastSentAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` } type metricsLabelCache struct { diff --git a/internal/p2p/router.go b/internal/p2p/router.go index ea88305e1..5c9fed622 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -324,6 +324,9 @@ func (r *Router) routeChannel( errCh <-chan PeerError, wrapper Wrapper, ) { + defer func() { + r.logger.Error("[TMDEBUG] exited", "channel", chID) + }() for { select { case envelope, ok := <-outCh: @@ -353,16 +356,19 @@ func (r *Router) routeChannel( // collect peer queues to pass the message via var queues []queue + var peerIDs []types.NodeID if envelope.Broadcast { r.peerMtx.RLock() queues = make([]queue, 0, len(r.peerQueues)) + peerIDs = make([]types.NodeID, 0, len(r.peerQueues)) for nodeID, q := range r.peerQueues { peerChs := r.peerChannels[nodeID] // check whether the peer is receiving on that channel if _, ok := peerChs[chID]; ok { queues = append(queues, q) + peerIDs = append(peerIDs, nodeID) } else { r.logger.Error("[TMDEBUG] broadcast mode block dropping message because peer is not ok", "peer", envelope.To, "channel", chID) } @@ -398,14 +404,16 @@ func (r *Router) routeChannel( } queues = []queue{q} + peerIDs = []types.NodeID{envelope.To} } // send message to peers - for _, q := range queues { + for idx, q := range queues { start := time.Now().UTC() select { case q.enqueue() <- envelope: + r.metrics.LastEnqueuedAt.With("peer_id", string(peerIDs[idx]), "chId", fmt.Sprintf("%d", chID)).Set(float64(time.Now().UnixMilli())) r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): @@ -951,6 +959,8 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect return err } + r.metrics.LastSentAt.With("peer_id", string(peerID), "chId", fmt.Sprintf("%d", envelope.ChannelID)).Set(float64(time.Now().UnixMilli())) + r.metrics.PeerSendBytesTotal.With( "chID", fmt.Sprintf("%d", envelope.ChannelID), "peer_id", string(peerID), From ddc907dd2feb0d08dcf0c76000a0108971b67782 Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 16:24:30 +0800 Subject: [PATCH 14/15] fix --- internal/consensus/state.go | 4 ++-- internal/p2p/router.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 70c2dc44c..af5b0ff41 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1595,7 +1595,7 @@ func (cs *State) enterPrevote(ctx context.Context, height int64, round int32, en cs.newStep() }() - logger.Info("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) + logger.Info("entering prevote step", "entry", entryLabel, "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) // Sign and broadcast vote as necessary cs.doPrevote(ctx, height, round) @@ -1837,7 +1837,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, return } - logger.Info("entering precommit step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) + logger.Info("entering precommit step", "entry", entryLabel, "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) defer func() { // Done enterPrecommit: diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 5c9fed622..708a0ec50 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -413,7 +413,7 @@ func (r *Router) routeChannel( select { case q.enqueue() <- envelope: - r.metrics.LastEnqueuedAt.With("peer_id", string(peerIDs[idx]), "chId", fmt.Sprintf("%d", chID)).Set(float64(time.Now().UnixMilli())) + r.metrics.LastEnqueuedAt.With("peer_id", string(peerIDs[idx]), "ch_id", fmt.Sprintf("%d", chID)).Set(float64(time.Now().UnixMilli())) r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): @@ -959,7 +959,7 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect return err } - r.metrics.LastSentAt.With("peer_id", string(peerID), "chId", fmt.Sprintf("%d", envelope.ChannelID)).Set(float64(time.Now().UnixMilli())) + r.metrics.LastSentAt.With("peer_id", string(peerID), "ch_id", fmt.Sprintf("%d", envelope.ChannelID)).Set(float64(time.Now().UnixMilli())) r.metrics.PeerSendBytesTotal.With( "chID", fmt.Sprintf("%d", envelope.ChannelID), From 3a5ee0cdb5484d1d38139fbea4afa42d495917cf Mon Sep 17 00:00:00 2001 From: codchen Date: Wed, 15 Feb 2023 21:47:50 +0800 Subject: [PATCH 15/15] add heartbeater --- internal/consensus/state.go | 19 +++++++++++++++++++ internal/p2p/conn/connection.go | 7 +++++++ internal/p2p/conn/connection_test.go | 4 ++-- internal/p2p/metrics.gen.go | 7 +++++++ internal/p2p/metrics.go | 1 + internal/p2p/transport_mconn.go | 15 +++++++++++++-- internal/p2p/transport_mconn_test.go | 4 ++++ node/setup.go | 1 + 8 files changed, 54 insertions(+), 4 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index af5b0ff41..d7821f151 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -49,6 +49,7 @@ var ( ) var msgQueueSize = 1000 +var heartbeatIntervalInSecs = 10 // msgs from the reactor which may update the state type msgInfo struct { @@ -490,6 +491,7 @@ func (cs *State) OnStart(ctx context.Context) error { // now start the receiveRoutine go cs.receiveRoutine(ctx, 0) + go cs.heartbeater(ctx) // schedule the first round! // use GetRoundState so we don't race the receiveRoutine for access @@ -894,6 +896,23 @@ func (cs *State) newStep() { } } +func (cs *State) heartbeater(ctx context.Context) { + for { + select { + case <-time.After(time.Duration(heartbeatIntervalInSecs) * time.Second): + cs.fireHeartbeatEvent() + case <-ctx.Done(): + return + } + } +} + +func (cs *State) fireHeartbeatEvent() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.evsw.FireEvent(types.EventNewRoundStepValue, &cs.RoundState) +} + //----------------------------------------- // the main go routines diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 901597fb6..bcc97fd4f 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -119,6 +119,8 @@ type MConnection struct { created time.Time // time of creation _maxPacketMsgSize int + + setMet func(time.Time, string) } // MConnConfig is a MConnection configuration. @@ -163,6 +165,7 @@ func NewMConnection( onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig, + setMet func(time.Time, string), ) *MConnection { mconn := &MConnection{ logger: logger, @@ -178,6 +181,7 @@ func NewMConnection( config: config, created: time.Now(), cancel: func() {}, + setMet: setMet, } mconn.BaseService = *service.NewBaseService(logger, "MConnection", mconn) @@ -720,6 +724,9 @@ func (ch *channel) nextPacketMsg() tmp2p.PacketMsg { func (ch *channel) writePacketMsgTo(w io.Writer) (n int, err error) { packet := ch.nextPacketMsg() n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) + if err == nil { + ch.conn.setMet(time.Now(), fmt.Sprintf("%d", ch.desc.ID)) + } atomic.AddInt64(&ch.recentlySent, int64(n)) return } diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index 5a604cd23..602dec42b 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -43,7 +43,7 @@ func createMConnectionWithCallbacks( cfg.PingInterval = 250 * time.Millisecond cfg.PongTimeout = 500 * time.Millisecond chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := NewMConnection(logger, conn, chDescs, onReceive, onError, cfg) + c := NewMConnection(logger, conn, chDescs, onReceive, onError, cfg, nil) return c } @@ -435,7 +435,7 @@ func newClientAndServerConnsForReadErrors( } logger := log.NewNopLogger() - mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig()) + mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig(), nil) err := mconnClient.Start(ctx) require.NoError(t, err) diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index 3057d539a..a2c233eff 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -92,6 +92,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "last_sent_at", Help: "", }, append(labels, "ch_id", "peer_id")).With(labelsAndValues...), + LastWrittenAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_written_at", + Help: "", + }, append(labels, "ch_id")).With(labelsAndValues...), } } @@ -110,5 +116,6 @@ func NopMetrics() *Metrics { PeerChannelSend: discard.NewCounter(), LastEnqueuedAt: discard.NewGauge(), LastSentAt: discard.NewGauge(), + LastWrittenAt: discard.NewGauge(), } } diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index 7f9ce0618..9e803cb75 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -66,6 +66,7 @@ type Metrics struct { LastEnqueuedAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` LastSentAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` + LastWrittenAt metrics.Gauge `metrics_labels:"ch_id"` } type metricsLabelCache struct { diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index 5cc10fd8f..08105902a 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "sync" + "time" "golang.org/x/net/netutil" @@ -51,6 +52,8 @@ type MConnTransport struct { lastConnsMutex *sync.Mutex lastConns map[string]net.Conn + + Met *Metrics } // NewMConnTransport sets up a new MConnection transport. This uses the @@ -61,6 +64,7 @@ func NewMConnTransport( mConnConfig conn.MConnConfig, channelDescs []*ChannelDescriptor, options MConnTransportOptions, + met *Metrics, ) *MConnTransport { return &MConnTransport{ logger: logger, @@ -70,6 +74,7 @@ func NewMConnTransport( channelDescs: channelDescs, lastConns: make(map[string]net.Conn), lastConnsMutex: &sync.Mutex{}, + Met: met, } } @@ -178,7 +183,7 @@ func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { } else { m.logger.Error("accepting a connection whose remote address is not TCPAddr") } - return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs, m.Met), nil } } @@ -217,7 +222,7 @@ func (m *MConnTransport) Dial(ctx context.Context, endpoint *Endpoint) (Connecti } } - return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs, m.Met), nil } // Close implements Transport. @@ -274,6 +279,7 @@ type mConnConnection struct { errorCh chan error doneCh chan struct{} closeOnce sync.Once + Met *Metrics mconn *conn.MConnection // set during Handshake() } @@ -290,6 +296,7 @@ func newMConnConnection( conn net.Conn, mConnConfig conn.MConnConfig, channelDescs []*ChannelDescriptor, + met *Metrics, ) *mConnConnection { return &mConnConnection{ logger: logger, @@ -299,6 +306,7 @@ func newMConnConnection( receiveCh: make(chan mConnMessage), errorCh: make(chan error, 1), // buffered to avoid onError leak doneCh: make(chan struct{}), + Met: met, } } @@ -414,6 +422,9 @@ func (c *mConnConnection) handshake( c.onReceive, c.onError, c.mConnConfig, + func(t time.Time, s string) { + c.Met.LastWrittenAt.With("ch_id", s).Set(float64(t.UnixMilli())) + }, ) return mconn, peerInfo, secretConn.RemotePubKey(), nil diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index c478dbe1d..28d5bfeaa 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -24,6 +24,7 @@ func init() { conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, + nil, ) err := transport.Listen(&p2p.Endpoint{ Protocol: p2p.MConnProtocol, @@ -46,6 +47,7 @@ func TestMConnTransport_AcceptBeforeListen(t *testing.T) { p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, + nil, ) t.Cleanup(func() { _ = transport.Close() @@ -69,6 +71,7 @@ func TestMConnTransport_AcceptMaxAcceptedConnections(t *testing.T) { p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, + nil, ) t.Cleanup(func() { _ = transport.Close() @@ -158,6 +161,7 @@ func TestMConnTransport_Listen(t *testing.T) { conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, + nil, ) // Transport should not listen on any endpoints yet. diff --git a/node/setup.go b/node/setup.go index cf65856dc..05699c4b2 100644 --- a/node/setup.go +++ b/node/setup.go @@ -299,6 +299,7 @@ func createRouter( p2p.MConnTransportOptions{ MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), }, + p2pMetrics, ) ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))