Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed download halts due to timing issues #8

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/anacrolix/sync"
"github.com/pion/datachannel"
"golang.org/x/exp/maps"
"golang.org/x/time/rate"

"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/common"
Expand Down Expand Up @@ -2587,17 +2588,30 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
// downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
// "https://webtorrent.io/torrents/".
const maxRequests = 16

// This should affect how often we have to recompute requests for this peer. Note that
// because we can request more than 1 thing at a time over HTTP, we will hit the low
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
peerMaxRequests := 128

// unless there is more availible bandwith - in which case max requests becomes
// the max available pieces that will consune that bandwidth

if bandwidth := t.cl.config.DownloadRateLimiter.Limit(); bandwidth > 0 && t.info != nil {
if maxPieceRequests := int(bandwidth / rate.Limit(t.info.PieceLength)); maxPieceRequests > peerMaxRequests {
peerMaxRequests = maxPieceRequests
}
}

ws := webseedPeer{
peer: Peer{
t: t,
outgoing: true,
Network: "http",
reconciledHandshakeStats: true,
// This should affect how often we have to recompute requests for this peer. Note that
// because we can request more than 1 thing at a time over HTTP, we will hit the low
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
PeerMaxRequests: 128,

PeerMaxRequests: peerMaxRequests,
// TODO: Set ban prefix?
RemoteAddr: remoteAddrFromUrl(url),
callbacks: t.callbacks(),
Expand Down
122 changes: 60 additions & 62 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type webseedPeer struct {
maxActiveRequests int // the max number of active requests for this peer
processedRequests int // the total number of requests this peer has processed
maxRequesters int // the number of requester to run for this peer
waiting int // the number of requesters currently waiting for a signal
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
Expand Down Expand Up @@ -118,9 +119,6 @@ func (ws *webseedPeer) doRequest(r Request) error {
}

func (ws *webseedPeer) requester(i int) {

ws.peer.Torrent()._pendingPieces.GetCardinality()

ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

Expand All @@ -134,28 +132,6 @@ func (ws *webseedPeer) requester(i int) {
return true
}

// if there are more than one requests in the queue and we don't
// have all of the responders activated yet we need to
// kick the other requestors into life - otherwise the max parallel
// requests will stay below the max - unless some external action happens
if pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()); pendingRequests > 1 {
pendingRequests--
activeCount := len(ws.activeRequests) + 1

if activeCount < pendingRequests {
signals := pendingRequests - activeCount

if signals > ws.maxRequesters {
// max responders excluding this
signals = ws.maxRequesters
}

for s := 0; s < signals; s++ {
ws.requesterCond.Signal()
}
}
}

// note doRequest unlocks ws.requesterCond.L which free the
// condition to allow other requestors to receive in parallel it
// will lock again before it returns so the remainder of the code
Expand Down Expand Up @@ -190,9 +166,9 @@ func (ws *webseedPeer) requester(i int) {
desiredRequests := len(ws.peer.getDesiredRequestState().Requests.requestIndexes)
pendingRequests := int(ws.peer.requestState.Requests.GetCardinality())

ws.peer.logger.Levelf(log.Debug, "%d: requests %d (a=%d,d=%d) active(c=%d,m=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, pendingRequests, desiredRequests,
len(ws.activeRequests), ws.maxActiveRequests, ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart)
ws.peer.logger.Levelf(log.Debug, "%d: requests %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) complete(%d/%d) restart(%v)",
i, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(),
len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart)

if pendingRequests > ws.maxRequesters {
if pendingRequests > ws.peer.PeerMaxRequests {
Expand All @@ -209,6 +185,7 @@ func (ws *webseedPeer) requester(i int) {
}

ws.maxRequesters = pendingRequests
ws.requesterCond.Broadcast()
}

}
Expand All @@ -220,64 +197,85 @@ func (ws *webseedPeer) requester(i int) {
}
}

ws.waiting++
ws.requesterCond.Wait()
ws.waiting--

if ws.updateRequestor != nil {
ws.updateRequestor.Stop()
ws.updateRequestor = nil
}
} else {
// if there are more than one requests in the queue and we don't
// have all of the responders activated yet we need to
// kick the other requestors into life - otherwise the max parallel
// requests will stay below the max - unless some external action happens
if pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()); pendingRequests > 1 {
activeCount := len(ws.activeRequests)

if activeCount < pendingRequests {
ws.requesterCond.Broadcast()
}
}
}
}
}

var webpeerUnchokeTimerDuration = 15 * time.Second

func requestUpdate(ws *webseedPeer) {
if ws != nil && !ws.peer.closed.IsSet() {
numPieces := uint64(ws.peer.t.NumPieces())
numCompleted := ws.peer.t._completedPieces.GetCardinality()

if numCompleted < numPieces {
if ws.peer.isLowOnRequests() && time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
// before doing this to ensure the peer has the file
if numPieces-numCompleted < 16 {
lastExistingUseful := ws.peer.lastUsefulChunkReceived

for piece := pieceIndex(0); piece < pieceIndex(numPieces); piece++ {
if ws.peer.t._completedPieces.Contains(uint32(piece)) {
continue
}

if existing := ws.peer.t.requestingPeer(RequestIndex(piece)); existing != nil {
if existing.connectionFlags() == "WS" {
if ws != nil {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

ws.updateRequestor = nil

if !ws.peer.closed.IsSet() {
numPieces := uint64(ws.peer.t.NumPieces())
numCompleted := ws.peer.t._completedPieces.GetCardinality()

if numCompleted < numPieces {
if ws.peer.isLowOnRequests() && time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
// before doing this to ensure the peer has the file
if numPieces-numCompleted < 16 {
lastExistingUseful := ws.peer.lastUsefulChunkReceived

for piece := pieceIndex(0); piece < pieceIndex(numPieces); piece++ {
if ws.peer.t._completedPieces.Contains(uint32(piece)) {
continue
}

// if the existing client looks like its not producing timely chunks then
// adjust our lastUsefulChunkReceived value to make sure we can steal the
// piece from it
if time.Since(existing.lastUsefulChunkReceived) > webpeerUnchokeTimerDuration {
if !lastExistingUseful.After(existing.lastUsefulChunkReceived) {
lastExistingUseful = existing.lastUsefulChunkReceived.Add(time.Minute)
if existing := ws.peer.t.requestingPeer(RequestIndex(piece)); existing != nil {
if existing.connectionFlags() == "WS" {
continue
}

// if the existing client looks like its not producing timely chunks then
// adjust our lastUsefulChunkReceived value to make sure we can steal the
// piece from it
if time.Since(existing.lastUsefulChunkReceived) > webpeerUnchokeTimerDuration {
if !lastExistingUseful.After(existing.lastUsefulChunkReceived) {
lastExistingUseful = existing.lastUsefulChunkReceived.Add(time.Minute)
}
}
}
}
}

ws.peer.lastUsefulChunkReceived = lastExistingUseful
}
ws.peer.lastUsefulChunkReceived = lastExistingUseful
}

ws.peer.logger.Levelf(log.Debug, "unchoke %d/%d (%s)", ws.processedRequests, ws.peer.t.NumPieces(), ws.peer.lastUsefulChunkReceived)
ws.peer.logger.Levelf(log.Debug, "unchoke %d/%d maxRequesters=%d, waiting=%d, (%s)", ws.processedRequests, ws.peer.t.NumPieces(), ws.maxRequesters, ws.waiting, ws.peer.lastUsefulChunkReceived)

ws.peer.updateRequests("unchoked")
return
ws.peer.updateRequests("unchoked")
return
}
}
}

ws.requesterCond.Signal()
ws.requesterCond.Signal()
}
}
}

Expand Down
Loading