Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect invalid and inactive peers #431

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions gossip/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gossip

import "time"

func setProgressThreshold(threshold time.Duration) {
noProgressTime = threshold
}

func setApplicationThreshold(threshold time.Duration) {
noAppMessageTime = threshold
}
97 changes: 81 additions & 16 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ const (
// txChanSize is the size of channel listening to NewTxsNotify.
// The number is referenced from the size of tx pool.
txChanSize = 4096

// percentage of useless peer nodes to allow
uselessPeerPercentage = 0.2 // 20%

// Number of application errors that can be tolerated before banning the node and disconnecting
toleranceOfApplicationErrors = 3
)

var (
ErrorProgressTimeout = errors.New("progress timeout")
ErrorApplicationTimeout = errors.New("application timeout")
)

func errResp(code errCode, format string, v ...interface{}) error {
Expand Down Expand Up @@ -782,23 +793,30 @@ func (h *handler) handle(p *peer) error {
p.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
useless := discfilter.Banned(p.Node().ID(), p.Node().Record())
if !useless && (!eligibleForSnap(p.Peer) || !strings.Contains(strings.ToLower(p.Name()), "opera")) {
useless = true

// Some clients have compatible caps and thus pass discovery checks and seep in to
// protocol handler. We should ban these clients immediately.
// ex: go-corex, Efireal, Geth all with caps=[opera/62]
if !strings.Contains(strings.ToLower(p.Name()), "opera") {
discfilter.Ban(p.ID())
return p2p.DiscProtocolError
}
if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers

// A useless peer is the one which does not support protocols opera/63 & fsnap/1.
useless := !eligibleForSnap(p.Peer)
if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= int((float32(h.maxPeers)*uselessPeerPercentage)) {
// don't allow more than 20% of useless peers
return p2p.DiscTooManyPeers
}
if !p.Peer.Info().Network.Trusted && useless {
if h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers
return p2p.DiscTooManyPeers
}
p.SetUseless()
}

// Disconnect if maxPeers is reached
if h.peers.Len() >= h.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}

h.peerWG.Add(1)
defer h.peerWG.Done()

Expand All @@ -809,9 +827,7 @@ func (h *handler) handle(p *peer) error {
)
if err := p.Handshake(h.NetworkID, myProgress, common.Hash(genesis)); err != nil {
p.Log().Debug("Handshake failed", "err", err)
if !useless {
discfilter.Ban(p.ID())
}
discfilter.Ban(p.ID())
return err
}

Expand Down Expand Up @@ -856,11 +872,51 @@ func (h *handler) handle(p *peer) error {
// after this will be sent via broadcasts.
h.syncTransactions(p, h.txpool.SampleHashes(h.config.Protocol.MaxInitialTxHashesSend))

// Handle incoming messages until the connection is torn down
// Handle incoming messages until the connection is torn down or the inactivity
// timer times out.
var noOfApplicationErrors = 0
// progress and application
progressWatchDogTimer := time.NewTimer(noProgressTime)
applicationWatchDogTimer := time.NewTimer(noAppMessageTime)
for {
if err := h.handleMsg(p); err != nil {
p.Log().Debug("Message handling failed", "err", err)
return err
select {
case <-progressWatchDogTimer.C:
// If self syncing, don't check peer progress
if !h.syncStatus.AcceptTxs() {
progressWatchDogTimer.Reset(noProgressTime)
break
}
if p.IsPeerProgressing() {
progressWatchDogTimer.Reset(noProgressTime)
} else {
p.Log().Warn("progress timer timeout: ", "name", p.Name(), "node", p.Node().String())
discfilter.Ban(p.ID())
return ErrorProgressTimeout
}
case <-applicationWatchDogTimer.C:
if p.IsApplicationProgressing() {
applicationWatchDogTimer.Reset(noAppMessageTime)
} else {
p.Log().Warn("application timer timeout: ", "name", p.Name(), "node", p.Node().String())
discfilter.Ban(p.ID())
return ErrorApplicationTimeout
}
default:
err := h.handleMsg(p)
if err != nil {
p.Log().Debug("Message handling failed", "err", err)
if strings.Contains(err.Error(), errorToString[ErrPeerNotProgressing]) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use errors.Is here instead of comparing strings?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use errors.Is() only to compare errors. But in this place, the error is defined as a string.
If we want to change it, we should define all the errors as errors.New().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed. If there are more such string based errors instead of errors.New() based ones (which I believe would be better) - then this should go into a separate PR to address. So up to you if you want to do anything in this PR.

discfilter.Ban(p.ID())
return err
}
// Ban peer and disconnect if the number of errors in the handling of application message
// crosses a threshold.
noOfApplicationErrors++
if noOfApplicationErrors > toleranceOfApplicationErrors {
discfilter.Ban(p.ID())
return err
}
}
}
}
}
Expand Down Expand Up @@ -1014,6 +1070,10 @@ func (h *handler) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
p.SetProgress(progress)
// If peer has not progressed for noProgressTime minutes, then disconnect the peer.
if !p.IsPeerProgressing() {
return errResp(ErrPeerNotProgressing, "%v: %v", "epoch is not progressing for ", noProgressTime)
}

case msg.Code == EvmTxsMsg:
// Transactions arrived, make sure we have a valid and fresh graph to handle them
Expand Down Expand Up @@ -1316,6 +1376,11 @@ func (h *handler) handleMsg(p *peer) error {
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}

if msg.Code != ProgressMsg {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not yet familiar with all message codes, but is ProgressMsg the only message which signals that there is progress?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

// Since a valid application message is received, set the peer as progressing.
p.SetApplicationProgress()
}
return nil
}

Expand Down
42 changes: 41 additions & 1 deletion gossip/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
handshakeTimeout = 5 * time.Second
)

var (
noProgressTime = 3 * time.Minute // Max allowed minutes to elapse without Epoch progress
noAppMessageTime = 15 * time.Minute // Max allowed minutes to elapse without any Application message
)

// PeerInfo represents a short summary of the sub-protocol metadata known
// about a connected peer.
type PeerInfo struct {
Expand Down Expand Up @@ -62,7 +67,9 @@ type peer struct {
queuedDataSemaphore *datasemaphore.DataSemaphore
term chan struct{} // Termination channel to stop the broadcaster

progress PeerProgress
progress PeerProgress
progressTime time.Time // The last progress message (with progressed Epoch)
appMessageTime time.Time // The last valid application message received

snapExt *snapPeer // Satellite `snap` connection
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
Expand All @@ -85,7 +92,29 @@ func (p *peer) SetProgress(x PeerProgress) {
p.Lock()
defer p.Unlock()

// Check if the peer is progressing
if x.More(p.progress) {
p.setPeerAsProgressing(x)
}
}

func (p *peer) setPeerAsProgressing(x PeerProgress) {
p.progress = x
p.progressTime = time.Now()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why p.appMessageTime is locked, but p.progressTime isn't?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's locked in SetProgress() where setPeerAsProgressing() is called.

}

func (p *peer) IsPeerProgressing() bool {
return time.Since(p.progressTime) < noProgressTime
}

func (p *peer) SetApplicationProgress() {
p.Lock()
defer p.Unlock()
p.appMessageTime = time.Now()
}

func (p *peer) IsApplicationProgressing() bool {
return time.Since(p.appMessageTime) < noAppMessageTime
}

func (p *peer) InterestedIn(h hash.Event) bool {
Expand All @@ -100,6 +129,15 @@ func (p *peer) InterestedIn(h hash.Event) bool {
!p.knownEvents.Contains(h)
}

func (a *PeerProgress) More(b PeerProgress) bool {
if a.Epoch > b.Epoch {
return true
} else if a.Epoch == b.Epoch && a.LastBlockIdx > b.LastBlockIdx {
return true
}
return false
}

func (a *PeerProgress) Less(b PeerProgress) bool {
if a.Epoch != b.Epoch {
return a.Epoch < b.Epoch
Expand All @@ -119,6 +157,8 @@ func newPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, cfg PeerCacheConfi
queue: make(chan broadcastItem, cfg.MaxQueuedItems),
queuedDataSemaphore: datasemaphore.New(dag.Metric{cfg.MaxQueuedItems, cfg.MaxQueuedSize}, getSemaphoreWarningFn("Peers queue")),
term: make(chan struct{}),
progressTime: time.Now(),
appMessageTime: time.Now(),
}

go peer.broadcast(peer.queue)
Expand Down
67 changes: 67 additions & 0 deletions gossip/peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package gossip

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPeerProgressWithEpoch(t *testing.T) {
// Increment epoch and see if the peer is progressing
setProgressThreshold(10 * time.Millisecond)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(20 * time.Millisecond) //set the threshold to 2 second
ep2 := PeerProgress{Epoch: 2}
newPeer.SetProgress(ep2)
require.True(t, newPeer.IsPeerProgressing(), "Peer is not progressing")
}

func TestPeerNotProgressWithEpoch(t *testing.T) {
// Don't Increment epoch and check if the peer is not progressing
setProgressThreshold(10 * time.Millisecond)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(20 * time.Millisecond) //set the threshold to 2 second so that the threshold is expired
ep2 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep2)
require.False(t, newPeer.IsPeerProgressing(), "Peer is progressing")
}

func TestPeerNotProgressTimeout(t *testing.T) {
// Don't Increment epoch and check if the peer is not progressing
setProgressThreshold(10 * time.Millisecond)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(20 * time.Millisecond) //set the threshold to 2 second so that the timer expires
require.False(t, newPeer.IsPeerProgressing(), "Peer is progressing")
}

func TestApplicationProgressMessage(t *testing.T) {
// send a valid application message and check if the peer is progressing
setApplicationThreshold(20 * time.Millisecond)
newPeer := getPeer()
newPeer.SetApplicationProgress() // simulate receiving of a valid application message
time.Sleep(10 * time.Millisecond) //set the threshold to 1 second
require.True(t, newPeer.IsApplicationProgressing(), "Application is not progressing")
}

func TestApplicationNotProgressingMessage(t *testing.T) {
// send a valid application message and check if the peer is progressing
setApplicationThreshold(10 * time.Millisecond)
newPeer := getPeer()
newPeer.SetApplicationProgress() // simulate receiving of a valid application message
time.Sleep(20 * time.Millisecond) //set the threshold to 2 second so that the threshold timer expires
require.False(t, newPeer.IsApplicationProgressing(), "Application is progressing")
}

func getPeer() *peer {
peer := &peer{
progressTime: time.Now(),
}
return peer
}
2 changes: 2 additions & 0 deletions gossip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
ErrNoStatusMsg
ErrExtraStatusMsg
ErrSuspendedPeer
ErrPeerNotProgressing
ErrEmptyMessage = 0xf00
)

Expand All @@ -97,6 +98,7 @@ var errorToString = map[int]string{
ErrNoStatusMsg: "No status message",
ErrExtraStatusMsg: "Extra status message",
ErrSuspendedPeer: "Suspended peer",
ErrPeerNotProgressing: "Peer not progressing",
ErrEmptyMessage: "Empty message",
}

Expand Down