From e2197d9350821b3bc970eec236d5cbe9c45e711e Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Mon, 26 Feb 2024 16:31:07 +0200 Subject: [PATCH] fix(p2p/session): return peer to the queue in case of ErrNotFound (#159) --- p2p/peer_tracker.go | 31 ------------------------------- p2p/peer_tracker_test.go | 26 ++++++++++---------------- p2p/session.go | 6 ++++++ 3 files changed, 16 insertions(+), 47 deletions(-) diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index f4834aeb..c4dba9b7 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "sort" "sync" "time" @@ -18,8 +17,6 @@ const ( defaultScore float32 = 1 // maxPeerTrackerSize specifies the max amount of peers that can be added to the peerTracker. maxPeerTrackerSize = 100 - // minPeerTrackerSizeBeforeGC specifies the minimum amount of tracked peers before the peerTracker starts removing peers with lower peer scores. - minPeerTrackerSizeBeforeGC = 10 ) var ( @@ -244,7 +241,6 @@ func (p *peerTracker) gc() { return case <-ticker.C: p.cleanUpDisconnectedPeers() - p.cleanUpTrackedPeers() p.dumpPeers(p.ctx) } } @@ -265,33 +261,6 @@ func (p *peerTracker) cleanUpDisconnectedPeers() { p.metrics.peersDisconnected(-deletedDisconnectedNum) } -func (p *peerTracker) cleanUpTrackedPeers() { - p.peerLk.Lock() - defer p.peerLk.Unlock() - - if len(p.trackedPeers) <= minPeerTrackerSizeBeforeGC { - return - } - - var deletedTrackedNum int - orderedPeers := make([]*peerStat, 0, len(p.trackedPeers)) - for _, peer := range p.trackedPeers { - orderedPeers = append(orderedPeers, peer) - } - sort.Slice(orderedPeers, func(i, j int) bool { - return orderedPeers[i].peerScore < orderedPeers[j].peerScore - }) - - for _, peer := range orderedPeers[:len(orderedPeers)-minPeerTrackerSizeBeforeGC] { - if peer.peerScore > defaultScore { - break - } - delete(p.trackedPeers, peer.peerID) - deletedTrackedNum++ - } - p.metrics.peersTracked(-deletedTrackedNum) -} - // dumpPeers stores peers to the peerTracker's PeerIDStore if // present. func (p *peerTracker) dumpPeers(ctx context.Context) { diff --git a/p2p/peer_tracker_test.go b/p2p/peer_tracker_test.go index 3543d985..db9ffc03 100644 --- a/p2p/peer_tracker_test.go +++ b/p2p/peer_tracker_test.go @@ -33,22 +33,17 @@ func TestPeerTracker_GC(t *testing.T) { maxAwaitingTime = time.Millisecond - peerlist := generateRandomPeerlist(t, minPeerTrackerSizeBeforeGC) - for i := 0; i < minPeerTrackerSizeBeforeGC; i++ { + peerlist := generateRandomPeerlist(t, 10) + for i := 0; i < 10; i++ { p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5} } - // add peers to trackedPeers to make total number of peers > maxPeerTrackerSize peerlist = generateRandomPeerlist(t, 4) - pid1 := peerlist[0] - pid2 := peerlist[1] - pid3 := peerlist[2] - pid4 := peerlist[3] - - p.trackedPeers[pid1] = &peerStat{peerID: pid1, peerScore: 0.5} - p.trackedPeers[pid2] = &peerStat{peerID: pid2, peerScore: 10} - p.disconnectedPeers[pid3] = &peerStat{peerID: pid3, pruneDeadline: time.Now()} - p.disconnectedPeers[pid4] = &peerStat{peerID: pid4, pruneDeadline: time.Now().Add(time.Minute * 10)} + pid1 := peerlist[2] + pid2 := peerlist[3] + + p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()} + p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)} assert.True(t, len(p.trackedPeers) > 0) assert.True(t, len(p.disconnectedPeers) > 0) @@ -60,14 +55,13 @@ func TestPeerTracker_GC(t *testing.T) { err = p.stop(ctx) require.NoError(t, err) - // ensure amount of peers in trackedPeers is equal to minPeerTrackerSizeBeforeGC - require.Len(t, p.trackedPeers, minPeerTrackerSizeBeforeGC) - require.Nil(t, p.disconnectedPeers[pid3]) + require.Len(t, p.trackedPeers, 10) + require.Nil(t, p.disconnectedPeers[pid1]) // ensure good peers were dumped to store peers, err := pidstore.Load(ctx) require.NoError(t, err) - require.Equal(t, minPeerTrackerSizeBeforeGC, len(peers)) + require.Equal(t, 10, len(peers)) } func TestPeerTracker_BlockPeer(t *testing.T) { diff --git a/p2p/session.go b/p2p/session.go index 8c035bac..505759ff 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -213,6 +213,12 @@ func (s *session[H]) doRequest( "err", err, "peer", stat.peerID, ) + // ErrNotFound is not a critical error here. It means + // that peer hasn't synced the requested range yet. + // Returning peer to the queue, so it could serve another ranges. + if errors.Is(err, header.ErrNotFound) { + s.queue.push(stat) + } return }