diff --git a/client.go b/client.go index 9956149218..cca6b0bc65 100644 --- a/client.go +++ b/client.go @@ -1107,12 +1107,16 @@ func (p *Peer) initUpdateRequestsTimer() { const peerUpdateRequestsTimerReason = "updateRequestsTimer" func (c *Peer) updateRequestsTimerFunc() { + c.t.mu.Lock() + defer c.t.mu.Unlock() + c.mu.Lock() defer c.mu.Unlock() + if c.closed.IsSet() { return } - if c.isLowOnRequests(false) { + if c.isLowOnRequests(false,false) { // If there are no outstanding requests, then a request update should have already run. return } @@ -1122,7 +1126,7 @@ func (c *Peer) updateRequestsTimerFunc() { torrent.Add("spurious timer requests updates", 1) return } - c.updateRequests(peerUpdateRequestsTimerReason, false, true) + c.updateRequests(peerUpdateRequestsTimerReason, false, false) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this diff --git a/peer-impl.go b/peer-impl.go index bcd447e977..aa202ec723 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -28,6 +28,12 @@ type peerImpl interface { String() string peerImplStatusLines() []string + // the peers is running low on requests and needs some more + // Note: for web peers this means low on web requests - as we + // want to drive the http request api while we are processing + // its responses + isLowOnRequests(lock bool, lockTorrent bool) bool + // All if the peer should have everything, known if we know that for a fact. For example, we can // guess at how many pieces are in a torrent, and assume they have all pieces based on them // having sent haves for everything, but we don't know for sure. But if they send a have-all diff --git a/peer.go b/peer.go index 963807a49f..1448cab550 100644 --- a/peer.go +++ b/peer.go @@ -265,8 +265,8 @@ func (cn *Peer) downloadRate() float64 { } func (p *Peer) DownloadRate() float64 { - p.locker().RLock() - defer p.locker().RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.downloadRate() } @@ -536,7 +536,7 @@ func (me *Peer) cancel(r RequestIndex, updateRequests bool, lock bool, lockTorre } } me.decPeakRequests() - if updateRequests && me.isLowOnRequests(lock) { + if updateRequests && me.isLowOnRequests(lock, lockTorrent) { me.updateRequests("Peer.cancel", lock, lockTorrent) } } @@ -646,7 +646,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return false } } - if c.isLowOnRequests(true) { + if c.isLowOnRequests(true, true) { c.updateRequests("Peer.remoteRejectedRequest", true, true) } c.decExpectedChunkReceive(r) @@ -836,7 +836,7 @@ func (c *Peer) receiveChunk(msg *pp.Message, lockTorrent bool) error { // this is moved after all processing to avoid request rehere because as we no longer have a if intended { - if c.isLowOnRequests(true) { + if c.isLowOnRequests(true, false) { c.updateRequests("Peer.receiveChunk deleted request", true, false) } } @@ -945,7 +945,7 @@ func (c *Peer) deleteAllRequests(reason string, lock bool, lockTorrent bool) { }) c.assertNoRequests() c.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests(false) { + if p.isLowOnRequests(false, false) { p.updateRequests(reason, false, false) } }, false) @@ -1003,7 +1003,9 @@ func (l connectionTrust) Less(r connectionTrust) bool { // Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims. func (cn *Peer) newPeerPieces(lockTorrent bool) *roaring.Bitmap { // TODO: Can we use copy on write? + cn.mu.RLock() ret := cn.peerPieces().Clone() + cn.mu.RUnlock() if all, _ := cn.peerHasAllPieces(lockTorrent); all { if cn.t.haveInfo(true) { ret.AddRange(0, bitmap.BitRange(cn.t.numPieces())) @@ -1031,14 +1033,6 @@ func (p *Peer) uncancelledRequests() uint64 { type peerLocalPublicAddr = IpPort -func (p *Peer) isLowOnRequests(lock bool) bool { - if lock { - p.mu.RLock() - defer p.mu.RUnlock() - } - return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() -} - func (p *Peer) decPeakRequests() { // // This can occur when peak requests are altered by the update request timer to be lower than // // the actual number of outstanding requests. Let's let it go negative and see what happens. I diff --git a/peerconn.go b/peerconn.go index 639bbdd366..04d936342a 100644 --- a/peerconn.go +++ b/peerconn.go @@ -111,6 +111,14 @@ func (cn *PeerConn) peerImplStatusLines() []string { } } +func (p *PeerConn) isLowOnRequests(lock bool, lockTorrent bool) bool { + if lock { + p.mu.RLock() + defer p.mu.RUnlock() + } + return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() +} + // Returns true if the connection is over IPv6. func (cn *PeerConn) ipv6() bool { ip := cn.remoteIp() @@ -1159,8 +1167,6 @@ func (pc *PeerConn) String() string { // Returns the pieces the peer could have based on their claims. If we don't know how many pieces // are in the torrent, it could be a very large range if the peer has sent HaveAll. func (pc *PeerConn) PeerPieces() *roaring.Bitmap { - pc.locker().RLock() - defer pc.locker().RUnlock() return pc.newPeerPieces(true) } diff --git a/torrent.go b/torrent.go index b49063d4d2..9780a1d0d7 100644 --- a/torrent.go +++ b/torrent.go @@ -958,7 +958,7 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "webseeds:\n") t.writePeerStatuses(w, t.webSeedsAsSlice(false)) - peerConns := t.peerConnsAsSlice(true) + peerConns := t.peerConnsAsSlice(false) defer peerConns.free() // Peers without priorities first, then those with. I'm undecided about how to order peers @@ -1545,7 +1545,7 @@ func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string, lock b // if c.requestState.Interested { // return // } - if !c.isLowOnRequests(false) { + if !c.isLowOnRequests(false, false) { return } if !c.peerHasPiece(piece, true, false) { diff --git a/webseed-peer.go b/webseed-peer.go index 28fa375b42..2189b92057 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -33,6 +33,7 @@ type webseedPeer struct { maxRequesters int // the number of requester to run for this peer waiting int // the number of requesters currently waiting for a signal receiving atomic.Int64 + persisting atomic.Int64 requesterCond sync.Cond updateRequestor *time.Timer lastUnhandledErr time.Time @@ -119,7 +120,7 @@ func (cn *webseedPeer) nominalMaxRequests(lock bool, lockTorrent bool) maxReques } func (ws *webseedPeer) doRequest(r Request) error { - webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) + webseedRequest := ws.client.NewRequest(ws.intoSpec(r), &ws.receiving) ws.activeRequests[r] = webseedRequest if activeLen := len(ws.activeRequests); activeLen > ws.maxActiveRequests { ws.maxActiveRequests = activeLen @@ -165,7 +166,7 @@ func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Unlock() defer ws.requesterCond.L.Lock() - + if errors.Is(err, webseed.ErrTooFast) { time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) } @@ -236,7 +237,7 @@ func (ws *webseedPeer) requester(i int) { if time.Since(ws.lastLog) > 5*time.Second { ws.lastLog = time.Now() - go logProgress(ws) + go logProgress(ws, "requests", true) } ws.waiting++ @@ -259,11 +260,13 @@ func (ws *webseedPeer) requester(i int) { var webpeerUnchokeTimerDuration = 15 * time.Second -func logProgress(ws *webseedPeer) { +func logProgress(ws *webseedPeer, label string, lockTorrent bool) { t := ws.peer.t - t.mu.RLock() - defer t.mu.RUnlock() + if lockTorrent { + t.mu.RLock() + defer t.mu.RUnlock() + } if !t.haveInfo(false) { return @@ -275,12 +278,13 @@ func logProgress(ws *webseedPeer) { desiredRequests := len(ws.peer.getDesiredRequestState(false, false, false).Requests.requestIndexes) pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()) receiving := ws.receiving.Load() + persisting := ws.persisting.Load() - ws.peer.logger.Levelf(log.Debug, "requests %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d)", - ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(false, false), - len(ws.activeRequests)-int(receiving), receiving, ws.maxActiveRequests, ws.waiting, + ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s", + label, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(false, false), + len(ws.activeRequests)-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting, t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults), - t.numPiecesCompleted(false), t.NumPieces()) + t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate)) } func requestUpdate(ws *webseedPeer) { @@ -292,12 +296,7 @@ func requestUpdate(ws *webseedPeer) { ws.updateRequestor = nil - ws.peer.logger.Levelf(log.Debug, "requestUpdate %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) %s", - ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(false, true, false).Requests.requestIndexes), - ws.nominalMaxRequests(true, false), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, - ws.peer.t.numQueuedForHash(false), ws.peer.t.activePieceHashes.Load(), ws.peer.t.hashing.Load(), len(ws.peer.t.hashResults), - ws.peer.t.numPiecesCompleted(false), ws.peer.t.NumPieces(), - time.Since(ws.peer.lastRequestUpdate)) + logProgress(ws, "requestUpdate", false) if !ws.peer.closed.IsSet() { numPieces := ws.peer.t.NumPieces() @@ -305,7 +304,7 @@ func requestUpdate(ws *webseedPeer) { if numCompleted < numPieces { // Don't wait for small files - if ws.peer.isLowOnRequests(false) && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) { + if ws.peer.isLowOnRequests(true, false) && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) { // if the number of incomplete pieces is less than five adjust this peers // lastUsefulChunkReceived to ensure that it can steal from non web peers // this is to help ensure completion - we may want to add a head call @@ -340,11 +339,13 @@ func requestUpdate(ws *webseedPeer) { } peerInfo := []string{} - ws.peer.t.iterPeers(func(p *Peer) { + p.mu.RLock() + defer p.mu.RUnlock() + rate := p.downloadRate() pieces := int(p.requestState.Requests.GetCardinality()) - desired := len(p.getDesiredRequestState(false, true, false).Requests.requestIndexes) + desired := len(p.getDesiredRequestState(false, false, false).Requests.requestIndexes) this := "" if p == &ws.peer { @@ -352,17 +353,22 @@ func requestUpdate(ws *webseedPeer) { } flags := p.connectionFlags() peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate)) + }, false) - ws.peer.logger.Levelf(log.Debug, "unchoke processed=%d, complete(%d/%d) maxRequesters=%d, waiting=%d, (%s): peers(%d): %v", ws.processedRequests, numCompleted, numPieces, ws.maxRequesters, ws.waiting, ws.peer.lastUsefulChunkReceived, len(peerInfo), peerInfo) + ws.peer.logger.Levelf(log.Debug, "unchoke processed=%d, complete(%d/%d) maxRequesters=%d, waiting=%d, (%s): peers(%d): %v", + ws.processedRequests, numCompleted, numPieces, ws.maxRequesters, + ws.waiting, time.Since(ws.peer.lastUsefulChunkReceived), len(peerInfo), peerInfo) - ws.peer.updateRequests("unchoked", true, false) + func() { + // update requests may require a write lock on the + // torrent so free our read lock first + ws.peer.t.mu.RUnlock() + defer ws.peer.t.mu.RLock() + ws.peer.updateRequests("unchoked", true, true) + }() - ws.peer.logger.Levelf(log.Debug, "unchoked %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d) complete(%d/%d) %s", - ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(true, true, false).Requests.requestIndexes), - ws.nominalMaxRequests(true, false), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, - ws.peer.t.numQueuedForHash(false), ws.peer.t.activePieceHashes.Load(), ws.peer.t.numPiecesCompleted(false), ws.peer.t.NumPieces(), - time.Since(ws.peer.lastRequestUpdate)) + logProgress(ws, "unchoked", false) // if the initial unchoke didn't yield a request (for small files) - don't immediately // retry it means its being handled by a prioritized peer @@ -391,6 +397,21 @@ func (cn *webseedPeer) ban() { cn.peer.drop(true) } +func (cn *webseedPeer) isLowOnRequests(lock bool, lockTorrent bool) bool { + if lockTorrent { + cn.peer.t.mu.RLock() + defer cn.peer.t.mu.RUnlock() + } + + if lock { + cn.peer.mu.RLock() + defer cn.peer.mu.RUnlock() + } + + return cn.peer.requestState.Requests.IsEmpty() && cn.peer.requestState.Cancelled.IsEmpty() || + len(cn.activeRequests) <= cn.nominalMaxRequests(false, false)/4 +} + func (ws *webseedPeer) handleUpdateRequests(lock bool, lockTorrent bool) { // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized // pieces. @@ -403,7 +424,7 @@ func (ws *webseedPeer) onClose(lockTorrent bool) { // Just deleting them means we would have to manually cancel active requests. ws.peer.cancelAllRequests(lockTorrent) ws.peer.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests(true) { + if p.isLowOnRequests(true, lockTorrent) { p.updateRequests("webseedPeer.onClose", true, lockTorrent) } }, true) @@ -414,8 +435,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re result := <-webseedRequest.Result close(webseedRequest.Result) // one-shot - ws.receiving.Add(1) - defer ws.receiving.Add(-1) + ws.persisting.Add(1) + defer ws.persisting.Add(-1) // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not // sure if we can divine which errors indicate cancellation on our end without hitting the diff --git a/webseed/client.go b/webseed/client.go index da7d6d2850..9f419b577b 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -9,6 +9,7 @@ import ( "log" "net/http" "strings" + "sync/atomic" "github.com/RoaringBitmap/roaring" @@ -68,7 +69,7 @@ type RequestResult struct { Err error } -func (ws *Client) NewRequest(r RequestSpec) Request { +func (ws *Client) NewRequest(r RequestSpec, receivingCounter *atomic.Int64) Request { ctx, cancel := context.WithCancel(context.Background()) var requestParts []requestPart if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool { @@ -98,7 +99,7 @@ func (ws *Client) NewRequest(r RequestSpec) Request { Result: make(chan RequestResult, 1), } go func() { - b, err := readRequestPartResponses(ctx, requestParts) + b, err := readRequestPartResponses(ctx, requestParts, receivingCounter) req.Result <- RequestResult{ Bytes: b, Err: err, @@ -174,14 +175,18 @@ func recvPartResult(ctx context.Context, buf *bytes.Buffer, part requestPart, re var ErrTooFast = errors.New("making requests too fast") -func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) { +func readRequestPartResponses(ctx context.Context, parts []requestPart, receivingCounter *atomic.Int64) ([]byte, error) { var buf bytes.Buffer for _, part := range parts { if result, err := part.do(); err != nil { return nil, err } else { - if err = recvPartResult(ctx, &buf, part, result); err != nil { + if err = func() error { + receivingCounter.Add(1) + defer receivingCounter.Add(-1) + return recvPartResult(ctx, &buf, part, result) + }(); err != nil { return nil, fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err) } }