Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrich net-info endpoint #67

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
76 changes: 45 additions & 31 deletions internal/consensus/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Metrics struct {
// Height of the chain.
Height metrics.Gauge

GossipDataCount metrics.Counter `metrics_labels:"peer_id, branch"`
GossipVotesCount metrics.Counter `metrics_labels:"peer_id, branch"`

// Last height signed by this validator if the node is a validator.
ValidatorLastSignedHeight metrics.Gauge `metrics_labels:"validator_address"`

Expand Down
1 change: 1 addition & 0 deletions internal/consensus/peer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {

// ignore duplicates or decreases
if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
ps.logger.Error(fmt.Sprintf("received duplicate new round step msg %d %d %d %d %d %d", msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step), "peer", ps.peerID)
return
}

Expand Down
76 changes: 73 additions & 3 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,12 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
}

func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
err := stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: makeRoundStepMessage(rs),
})
r.logger.Info(fmt.Sprintf("broadcasting %d-%d-%d: %s", rs.Height, rs.Round, rs.Step, err))
return err
}

func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
Expand Down Expand Up @@ -506,17 +508,24 @@ func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *

timer := time.NewTimer(0)
defer timer.Stop()
defer func() {
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "6").Add(1)
}()

OUTER_LOOP:
for {
if !r.IsRunning() {
r.logger.Error("[TMDEBUG] gossipDataRoutine reactor not running", "peer", ps.peerID)
return
}

timer.Reset(r.state.config.PeerGossipSleepDuration)

select {
case <-ctx.Done():
r.logger.Error("[TMDEBUG] gossipDataRoutine outer done", "peer", ps.peerID)
return
case <-timer.C:
}
Expand All @@ -530,7 +539,7 @@ OUTER_LOOP:
part := rs.ProposalBlockParts.GetPart(index)
partProto, err := part.ToProto()
if err != nil {
logger.Error("failed to convert block part to proto", "err", err)
r.logger.Error("[TMDEBUG] gossipDataRoutine failed to convert block part to proto", "peer", ps.peerID, "err", err)
return
}

Expand All @@ -543,10 +552,14 @@ OUTER_LOOP:
Part: *partProto,
},
}); err != nil {
r.logger.Error("[TMDEBUG] gossipDataRoutine send part error", "peer", ps.peerID, "err", err)
return
}

ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "1").Add(1)
continue OUTER_LOOP
}
}
Expand All @@ -572,15 +585,24 @@ OUTER_LOOP:

// Continue the loop since prs is a copy and not effected by this
// initialization.
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "2").Add(1)
continue OUTER_LOOP
}

r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh)
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "3").Add(1)
continue OUTER_LOOP
}

// if height and round don't match, sleep
if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "4").Add(1)
continue OUTER_LOOP
}

Expand All @@ -602,6 +624,7 @@ OUTER_LOOP:
Proposal: *propProto,
},
}); err != nil {
r.logger.Error("[TMDEBUG] gossipDataRoutine send proposal error", "peer", ps.peerID, "err", err)
return
}

Expand All @@ -627,10 +650,14 @@ OUTER_LOOP:
ProposalPol: *pPolProto,
},
}); err != nil {
r.logger.Error("[TMDEBUG] gossipDataRoutine send POL error", "peer", ps.peerID, "err", err)
return
}
}
}
r.Metrics.GossipDataCount.With(
"peer_id", string(ps.peerID),
"branch", "5").Add(1)
}
}

Expand Down Expand Up @@ -741,43 +768,65 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh

timer := time.NewTimer(0)
defer timer.Stop()
defer func() {
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "6").Add(1)
}()

logCounter := 0
for {
if !r.IsRunning() {
r.logger.Error("[TMDEBUG] gossipVotesRoutine reactor not running", "peer", ps.peerID)
return
}

select {
case <-ctx.Done():
r.logger.Error("[TMDEBUG] gossipVotesRoutine done", "peer", ps.peerID)
return
default:
}

rs := r.getRoundState()
prs := ps.GetRoundState()

enteredB1 := false
// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
enteredB1 = true
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil {
r.logger.Error("[TMDEBUG] gossipVotesRoutine height match error", "error", err, "peer", ps.peerID)
return
} else if ok {
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "1").Add(1)
continue
}
}

enteredB2 := false
// special catchup logic -- if peer is lagging by height 1, send LastCommit
if prs.Height != 0 && rs.Height == prs.Height+1 {
enteredB2 = true
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
r.logger.Error("[TMDEBUG] gossipVotesRoutine special catchup error", "error", err, "peer", ps.peerID)
return
} else if ok {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "2").Add(1)
continue
}
}

// catchup logic -- if peer is lagging by more than 1, send Commit
blockStoreBase := r.state.blockStore.Base()
enteredCatchup := false
if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase {
enteredCatchup = true
// Load the block's extended commit for prs.Height, which contains precommit
// signatures for prs.Height.
var ec *types.ExtendedCommit
Expand All @@ -787,19 +836,36 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
ec = r.state.blockStore.LoadBlockCommit(prs.Height).WrappedExtendedCommit()
}
if ec == nil {
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "3").Add(1)
continue
}
if ok, err := r.pickSendVote(ctx, ps, ec, voteCh); err != nil {
r.logger.Error("[TMDEBUG] gossipVotesRoutine catchup error", "error", err, "peer", ps.peerID)
return
} else if ok {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "4").Add(1)
continue
}
}

if logCounter%300 == 0 {
// print every 10s
r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d %t %t %t", blockStoreBase, prs.Height, rs.Height, enteredB1, enteredB2, enteredCatchup), "peer", ps.peerID)
r.logger.Error(ps.PRS.String(), "peer", ps.peerID)
}
logCounter++
r.Metrics.GossipVotesCount.With(
"peer_id", string(ps.peerID),
"branch", "5").Add(1)
timer.Reset(r.state.config.PeerGossipSleepDuration)
select {
case <-ctx.Done():
r.logger.Error("[TMDEBUG] gossipVotesRoutine done 2", "peer", ps.peerID)
return
case <-timer.C:
}
Expand Down Expand Up @@ -945,7 +1011,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh
// the peer. During peer removal, we remove the peer for our set of peers and
// signal to all spawned goroutines to gracefully exit in a non-blocking manner.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)

r.mtx.Lock()
defer r.mtx.Unlock()
Expand Down Expand Up @@ -976,12 +1042,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
go func() {
select {
case <-ctx.Done():
r.logger.Error("[TMDEBUG] canceled", "peer", peerUpdate.NodeID)
return
case <-r.readySignal:
}
// do nothing if the peer has
// stopped while we've been waiting.
if !ps.IsRunning() {
r.logger.Error("[TMDEBUG] not running", "peer", peerUpdate.NodeID)
return
}
// start goroutines for this peer
Expand All @@ -996,6 +1064,8 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
}

}()
} else {
r.logger.Error("[TMDEBUG] peer status up received while PeerState is running")
}

case p2p.PeerStatusDown:
Expand Down
Loading