Skip to content

Commit

Permalink
Merge pull request #7 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
Increase max requests handled and add dynamic requester resizing
  • Loading branch information
mh0lt authored May 8, 2024
2 parents 089918d + faae968 commit 59efe2a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 11 deletions.
2 changes: 2 additions & 0 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ type peerImpl interface {
// message, then it's clear that they do.
peerHasAllPieces() (all, known bool)
peerPieces() *roaring.Bitmap

nominalMaxRequests() maxRequests
}
4 changes: 2 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (cn *Peer) writeStatus(w io.Writer) {
&cn._stats.ChunksWritten,
cn.requestState.Requests.GetCardinality(),
cn.requestState.Cancelled.GetCardinality(),
cn.nominalMaxRequests(),
cn.peerImpl.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
localClientReqq,
Expand Down Expand Up @@ -441,7 +441,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
if cn.requestState.Requests.Contains(r) {
return true, nil
}
if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
cn.requestState.Requests.Add(r)
Expand Down
2 changes: 1 addition & 1 deletion requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
break
}
numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
if numPending >= p.nominalMaxRequests() {
if numPending >= p.peerImpl.nominalMaxRequests() {
break
}
req := heap.Pop(requestHeap)
Expand Down
3 changes: 2 additions & 1 deletion torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
}
},
},
maxRequesters: maxRequests,
activeRequests: make(map[Request]webseed.Request, maxRequests),
}
ws.peer.initRequestState()
Expand All @@ -2620,7 +2621,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
}
ws.peer.initUpdateRequestsTimer()
ws.requesterCond.L = t.cl.locker()
for i := 0; i < maxRequests; i += 1 {
for i := 0; i < ws.maxRequesters; i += 1 {
go ws.requester(i)
}
for _, f := range t.callbacks().NewPeer {
Expand Down
59 changes: 52 additions & 7 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type webseedPeer struct {
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
maxActiveRequests int
processedRequests int
maxActiveRequests int // the max number of active requests for this peer
processedRequests int // the total number of requests this peer has processed
maxRequesters int // the number of requester to run for this peer
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
Expand Down Expand Up @@ -78,6 +79,29 @@ func (ws *webseedPeer) _request(r Request) bool {
return true
}

func (cn *webseedPeer) nominalMaxRequests() maxRequests {
interestedPeers := 1

cn.peer.t.iterPeers(func(peer *Peer) {
if peer == &cn.peer {
return
}

if !peer.closed.IsSet() {
if peer.connectionFlags() != "WS" {
if !peer.peerInterested || peer.lastHelpful().IsZero() {
return
}
}

interestedPeers++
}
})

activeRequestsPerPeer := cn.peer.bestPeerNumPieces() / maxInt(1, interestedPeers)
return maxInt(1, minInt(cn.peer.PeerMaxRequests, maxInt(maxInt(8, activeRequestsPerPeer), cn.peer.peakRequests*2)))
}

func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
Expand All @@ -100,7 +124,7 @@ func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

for !ws.peer.closed.IsSet() {
for !ws.peer.closed.IsSet() && i < ws.maxRequesters {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false

Expand All @@ -121,9 +145,9 @@ func (ws *webseedPeer) requester(i int) {
if activeCount < pendingRequests {
signals := pendingRequests - activeCount

if signals > 15 {
if signals > ws.maxRequesters {
// max responders excluding this
signals = 15
signals = ws.maxRequesters
}

for s := 0; s < signals; s++ {
Expand Down Expand Up @@ -163,9 +187,30 @@ func (ws *webseedPeer) requester(i int) {
})

if !(ws.peer.t.dataDownloadDisallowed.Bool() || ws.peer.t.info == nil) {
desiredRequests := len(ws.peer.getDesiredRequestState().Requests.requestIndexes)
pendingRequests := int(ws.peer.requestState.Requests.GetCardinality())

ws.peer.logger.Levelf(log.Debug, "%d: requests %d (a=%d,d=%d) active(c=%d,m=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, ws.peer.requestState.Requests.GetCardinality(), len(ws.peer.getDesiredRequestState().Requests.requestIndexes),
len(ws.activeRequests), ws.maxActiveRequests, ws.peer.t._completedPieces.GetCardinality(), ws.peer.t.NumPieces(), restart)
i, ws.processedRequests, pendingRequests, desiredRequests,
len(ws.activeRequests), ws.maxActiveRequests, ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart)

if pendingRequests > ws.maxRequesters {
if pendingRequests > ws.peer.PeerMaxRequests {
pendingRequests = ws.peer.PeerMaxRequests
}

for i := ws.maxRequesters; i < pendingRequests; i++ {
go ws.requester(i)
ws.maxRequesters++
}
} else {
if pendingRequests < 16 {
pendingRequests = 16
}

ws.maxRequesters = pendingRequests
}

}

if !restart {
Expand Down

0 comments on commit 59efe2a

Please sign in to comment.