From 65060ce0351f3d935d5f6a26804e4307afe715ed Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Wed, 5 Jun 2024 14:57:21 +0200 Subject: [PATCH] Drop faulty neighbors. --- pkg/network/p2p/neighbor.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/network/p2p/neighbor.go b/pkg/network/p2p/neighbor.go index 56511270d..a265481c6 100644 --- a/pkg/network/p2p/neighbor.go +++ b/pkg/network/p2p/neighbor.go @@ -3,6 +3,7 @@ package p2p import ( "context" "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/protocol" @@ -15,7 +16,8 @@ import ( ) const ( - NeighborsSendQueueSize = 20_000 + NeighborsSendQueueSize = 20_000 + DroppedPacketDisconnectThreshold = 100 ) type queuedPacket struct { @@ -31,7 +33,8 @@ type ( // neighbor describes the established p2p connection to another peer. type neighbor struct { - peer *network.Peer + peer *network.Peer + droppedPacketCounter atomic.Uint32 logger log.Logger @@ -84,7 +87,12 @@ func (n *neighbor) Peer() *network.Peer { func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) { select { case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}: + n.droppedPacketCounter.Store(0) default: + // Drop a neighbor that does not read from the full queue. + if n.droppedPacketCounter.Add(1) >= DroppedPacketDisconnectThreshold { + n.Close() + } n.logger.LogWarn("Dropped packet due to SendQueue being full") } }