diff --git a/client.go b/client.go index b6919f631a..2884db0d17 100644 --- a/client.go +++ b/client.go @@ -1106,12 +1106,12 @@ func (p *Peer) initUpdateRequestsTimer() { const peerUpdateRequestsTimerReason = "updateRequestsTimer" func (c *Peer) updateRequestsTimerFunc() { - c.locker().Lock() - defer c.locker().Unlock() + c.mu.Lock() + defer c.mu.Unlock() if c.closed.IsSet() { return } - if c.isLowOnRequests() { + if c.isLowOnRequests(false) { // If there are no outstanding requests, then a request update should have already run. return } @@ -1121,7 +1121,7 @@ func (c *Peer) updateRequestsTimerFunc() { torrent.Add("spurious timer requests updates", 1) return } - c.updateRequests(peerUpdateRequestsTimerReason) + c.updateRequests(peerUpdateRequestsTimerReason, false) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this diff --git a/deferrwl.go b/deferrwl.go index f52445d628..a5863ab90f 100644 --- a/deferrwl.go +++ b/deferrwl.go @@ -25,7 +25,7 @@ func stack(skip int) string { func (me *lockWithDeferreds) Lock() { me.internal.Lock() me.lc.Add(1) - // me.locker = stack(2) + //me.locker = stack(2) } func (me *lockWithDeferreds) Unlock() { diff --git a/peer-impl.go b/peer-impl.go index f7505ae165..13d9c79555 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -35,5 +35,5 @@ type peerImpl interface { peerHasAllPieces() (all, known bool) peerPieces() *roaring.Bitmap - nominalMaxRequests() maxRequests + nominalMaxRequests(lock bool) maxRequests } diff --git a/peer.go b/peer.go index e726ab71e4..65e1334cac 100644 --- a/peer.go +++ b/peer.go @@ -28,7 +28,7 @@ type ( Peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats - mu sync.RWMutex + mu mu //sync.RWMutex t *Torrent @@ -133,6 +133,9 @@ func (p *Peer) Torrent() *Torrent { } func (p *Peer) initRequestState() { + p.mu.Lock() + defer p.mu.Unlock() + p.requestState.Requests = &peerRequests{} } @@ -172,10 +175,16 @@ func (cn *Peer) expectingChunks() bool { } func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { + cn.mu.RLock() + defer cn.mu.RUnlock() + return cn.peerChoking && !cn.peerAllowedFast.Contains(piece) } func (cn *Peer) cumInterest() time.Duration { + cn.mu.RLock() + defer cn.mu.RUnlock() + ret := cn.priorInterest if cn.requestState.Interested { ret += time.Since(cn.lastBecameInterested) @@ -188,12 +197,19 @@ func (cn *Peer) locker() *lockWithDeferreds { } func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool { + cn.mu.RLock() + defer cn.mu.RUnlock() _, ok := cn.PeerExtensionIDs[ext] return ok } // The best guess at number of pieces in the torrent for this peer. -func (cn *Peer) bestPeerNumPieces() pieceIndex { +func (cn *Peer) bestPeerNumPieces(lock bool) pieceIndex { + if lock { + cn.mu.RLock() + defer cn.mu.RUnlock() + } + if cn.t.haveInfo() { return cn.t.numPieces() } @@ -202,10 +218,11 @@ func (cn *Peer) bestPeerNumPieces() pieceIndex { func (cn *Peer) completedString() string { have := pieceIndex(cn.peerPieces().GetCardinality()) + best := cn.bestPeerNumPieces(true) if all, _ := cn.peerHasAllPieces(); all { - have = cn.bestPeerNumPieces() + have = best } - return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces()) + return fmt.Sprintf("%d/%d", have, best) } func eventAgeString(t time.Time) string { @@ -294,9 +311,9 @@ func (cn *Peer) writeStatus(w io.Writer) { cn.totalExpectingTime(), ) - cn.t.mu.RLock() + cn.mu.RLock() lenPeerTouchedPieces := len(cn.peerTouchedPieces) - cn.t.mu.RUnlock() + cn.mu.RUnlock() fmt.Fprintf(w, "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", @@ -307,7 +324,7 @@ func (cn *Peer) writeStatus(w io.Writer) { &cn._stats.ChunksWritten, cn.requestState.Requests.GetCardinality(), cn.requestState.Cancelled.GetCardinality(), - cn.peerImpl.nominalMaxRequests(), + cn.peerImpl.nominalMaxRequests(true), cn.PeerMaxRequests, len(cn.peerRequests), localClientReqq, @@ -343,6 +360,9 @@ func (p *Peer) close() { // Peer definitely has a piece, for purposes of requesting. So it's not sufficient that we think // they do (known=true). func (cn *Peer) peerHasPiece(piece pieceIndex) bool { + cn.mu.RLock() + defer cn.mu.RUnlock() + if all, known := cn.peerHasAllPieces(); all && known { return true } @@ -365,7 +385,7 @@ var ( ) // The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() maxRequests { +func (cn *Peer) nominalMaxRequests(lock bool) maxRequests { return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)) } @@ -378,9 +398,6 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) { } func (cn *Peer) setInterested(interested bool) bool { - cn.mu.Lock() - defer cn.mu.Unlock() - if cn.requestState.Interested == interested { return true } @@ -407,6 +424,10 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { return err } pi := cn.t.pieceIndexOfRequestIndex(r) + + cn.mu.RLock() + defer cn.mu.RUnlock() + if cn.requestState.Cancelled.Contains(r) { return errors.New("request is cancelled and waiting acknowledgement") } @@ -435,25 +456,27 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { return nil } -func (cn *Peer) mustRequest(r RequestIndex) bool { - more, err := cn.request(r) +func (cn *Peer) mustRequest(r RequestIndex, lock bool) bool { + more, err := cn.request(r, lock) if err != nil { panic(err) } return more } -func (cn *Peer) request(r RequestIndex) (more bool, err error) { +func (cn *Peer) request(r RequestIndex, lock bool) (more bool, err error) { + if lock { + cn.mu.Lock() + defer cn.mu.Unlock() + } + //if err := cn.shouldRequest(r); err != nil { // panic(err) //} - cn.mu.Lock() - defer cn.mu.Unlock() - if cn.requestState.Requests.Contains(r) { return true, nil } - if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() { + if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests(false) { return true, errors.New("too many outstanding requests") } @@ -475,8 +498,8 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { return cn.peerImpl._request(ppReq), nil } -func (me *Peer) cancel(r RequestIndex) { - if !me.deleteRequest(r) { +func (me *Peer) cancel(r RequestIndex, lock bool) { + if !me.deleteRequest(r, lock) { panic("request not existing should have been guarded") } if me._cancel(r) { @@ -486,13 +509,18 @@ func (me *Peer) cancel(r RequestIndex) { } } me.decPeakRequests() - if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") + if me.isLowOnRequests(lock) { + me.updateRequests("Peer.cancel", lock) } } // Sets a reason to update requests, and if there wasn't already one, handle it. -func (cn *Peer) updateRequests(reason string) { +func (cn *Peer) updateRequests(reason string, lock bool) { + if lock { + cn.mu.Lock() + defer cn.mu.Unlock() + } + if cn.needRequestUpdate != "" { return } @@ -575,19 +603,28 @@ func runSafeExtraneous(f func()) { // Returns true if it was valid to reject the request. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { - if c.deleteRequest(r) { + if c.deleteRequest(r, true) { c.decPeakRequests() - } else if !c.requestState.Cancelled.CheckedRemove(r) { - return false + } else { + c.mu.RLock() + removed := c.requestState.Cancelled.CheckedRemove(r) + c.mu.RUnlock() + + if !removed { + return false + } } - if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") + if c.isLowOnRequests(true) { + c.updateRequests("Peer.remoteRejectedRequest", true) } c.decExpectedChunkReceive(r) return true } func (c *Peer) decExpectedChunkReceive(r RequestIndex) { + c.mu.Lock() + defer c.mu.Unlock() + count := c.validReceiveChunks[r] if count == 1 { delete(c.validReceiveChunks, r) @@ -624,18 +661,25 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // hashing if they're out of bounds. defer recordBlockForSmartBan() - if c.peerChoking { + c.mu.RLock() + peerChoking := c.peerChoking + validReceiveChunks := c.validReceiveChunks[req] + allowedFast := c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) + receivedRequest := c.requestState.Requests.Contains(req) + c.mu.RUnlock() + + if peerChoking { chunksReceived.Add("while choked", 1) } - // ok to here - if c.validReceiveChunks[req] <= 0 { + + if validReceiveChunks <= 0 { chunksReceived.Add("unexpected", 1) return errors.New("received unexpected chunk") } c.decExpectedChunkReceive(req) - if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) { + if peerChoking && allowedFast { chunksReceived.Add("due to allowed fast", 1) } @@ -644,20 +688,30 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // out. intended := false { - if c.requestState.Requests.Contains(req) { + if receivedRequest { for _, f := range c.callbacks.ReceivedRequested { f(PeerMessageEvent{c, msg}) } } + + checkRemove := func() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.requestState.Cancelled.CheckedRemove(req) + } + // Request has been satisfied. - if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) { + if c.deleteRequest(req, true) || checkRemove() { intended = true if !c.peerChoking { c._chunksReceivedWhileExpecting++ } - if c.isLowOnRequests() { - c.updateRequests("Peer.receiveChunk deleted request") + + c.mu.Lock() + if c.isLowOnRequests(false) { + c.updateRequests("Peer.receiveChunk deleted request", false) } + c.mu.Unlock() } else { chunksReceived.Add("unintended", 1) } @@ -672,7 +726,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } - + // ok to here piece := &t.pieces[ppReq.Index] c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) @@ -698,7 +752,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { if p == c { panic("should not be pending request from conn that just received it") } - p.cancel(req) + p.cancel(req, true) } err = func() error { @@ -724,7 +778,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a // request update runs while we're writing the chunk that just failed. Then we never do a // fresh update after pending the failed request. - c.updateRequests("Peer.receiveChunk error writing chunk") + c.updateRequests("Peer.receiveChunk error writing chunk", true) c.t.cl.unlock() // this is temp - moving t to not use cl locked externally t.onWriteChunkErr(err) c.t.cl.lock() @@ -750,12 +804,12 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } func (c *Peer) onDirtiedPiece(piece pieceIndex) { - c.t.mu.Lock() + c.mu.Lock() if c.peerTouchedPieces == nil { c.peerTouchedPieces = make(map[pieceIndex]struct{}) } c.peerTouchedPieces[piece] = struct{}{} - c.t.mu.Unlock() + c.mu.Unlock() p := &c.t.pieces[piece] p.mu.Lock() @@ -789,13 +843,16 @@ func (c *Peer) peerHasWantedPieces() bool { // Returns true if an outstanding request is removed. Cancelled requests should be handled // separately. -func (c *Peer) deleteRequest(r RequestIndex) bool { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Peer) deleteRequest(r RequestIndex, lock bool) bool { + if lock { + c.mu.Lock() + defer c.mu.Unlock() + } if !c.requestState.Requests.CheckedRemove(r) { return false } + for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } @@ -813,22 +870,24 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { } func (c *Peer) deleteAllRequests(reason string) { + c.mu.Lock() + defer c.mu.Unlock() + if c.requestState.Requests.IsEmpty() { return } c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { - if !c.deleteRequest(x) { + if !c.deleteRequest(x, false) { panic("request should exist") } return true }) c.assertNoRequests() c.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests(reason) + if p.isLowOnRequests(false) { + p.updateRequests(reason, false) } - }) - return + }, false) } func (c *Peer) assertNoRequests() { @@ -838,12 +897,14 @@ func (c *Peer) assertNoRequests() { } func (c *Peer) cancelAllRequests() { + c.mu.Lock() + defer c.mu.Unlock() + c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { - c.cancel(x) + c.cancel(x, false) return true }) c.assertNoRequests() - return } func (c *Peer) peerPriority() (peerPriority, error) { @@ -897,12 +958,18 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { } func (p *Peer) uncancelledRequests() uint64 { + p.mu.RLock() + defer p.mu.RUnlock() return p.requestState.Requests.GetCardinality() } type peerLocalPublicAddr = IpPort -func (p *Peer) isLowOnRequests() bool { +func (p *Peer) isLowOnRequests(lock bool) bool { + if lock { + p.mu.RLock() + defer p.mu.RUnlock() + } return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() } diff --git a/peerconn.go b/peerconn.go index 71b94c2fc3..4889bb7d82 100644 --- a/peerconn.go +++ b/peerconn.go @@ -387,7 +387,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { } cn._peerPieces.Add(uint32(piece)) if cn.t.wantPieceIndex(piece) { - cn.updateRequests("have") + cn.updateRequests("have", true) } cn.peerPiecesChanged() return nil @@ -443,7 +443,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { // as or. cn._peerPieces.Xor(&bm) if shouldUpdateRequests { - cn.updateRequests("bitfield") + cn.updateRequests("bitfield", true) } // We didn't guard this before, I see no reason to do it now. cn.peerPiecesChanged() @@ -474,7 +474,7 @@ func (cn *PeerConn) peerHasAllPiecesTriggers() { cn.t.mu.RUnlock() if !isEmpty { - cn.updateRequests("Peer.onPeerHasAllPieces") + cn.updateRequests("Peer.onPeerHasAllPieces", true) } cn.peerPiecesChanged() } @@ -763,7 +763,6 @@ func (c *PeerConn) mainReadLoop() (err error) { break } - c.mu.Lock() if !c.fastEnabled() { c.deleteAllRequests("choked by non-fast PeerConn") } else { @@ -774,9 +773,12 @@ func (c *PeerConn) mainReadLoop() (err error) { // could let us request a lot of stuff, then choke us and never reject, but they're // only a single peer, our chunk balancing should smooth over this abuse. } + + c.mu.Lock() c.peerChoking = true c.updateExpectingChunks() c.mu.Unlock() + case pp.Unchoke: if !peerChoking { // Some clients do this for some reason. Transmission doesn't error on this, so we @@ -785,7 +787,7 @@ func (c *PeerConn) mainReadLoop() (err error) { break } - c.mu.RLock() + c.mu.Lock() preservedCount := 0 c.requestState.Requests.Iterate(func(x RequestIndex) bool { if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) { @@ -803,19 +805,23 @@ func (c *PeerConn) mainReadLoop() (err error) { torrent.Add("requestsPreservedThroughChoking", int64(preservedCount)) } - c.mu.RUnlock() + c.peerChoking = false + c.mu.Unlock() - c.t.mu.RLock() + c.t.mu.Lock() isEmpty := c.t._pendingPieces.IsEmpty() - c.t.mu.RUnlock() + c.t.mu.Unlock() - c.mu.Lock() - c.peerChoking = false - if !isEmpty { - c.updateRequests("unchoked") - } - c.updateExpectingChunks() - c.mu.Unlock() + func() { + c.mu.Lock() + defer c.mu.Unlock() + + if !isEmpty { + c.updateRequests("unchoked", false) + } + + c.updateExpectingChunks() + }() case pp.Interested: c.peerInterested = true @@ -865,7 +871,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Suggest: torrent.Add("suggests received", 1) log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger) - c.updateRequests("suggested") + c.updateRequests("suggested", true) case pp.HaveAll: err = c.onPeerSentHaveAll() case pp.HaveNone: @@ -879,7 +885,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger) - c.updateRequests("PeerConn.mainReadLoop allowed fast") + c.updateRequests("PeerConn.mainReadLoop allowed fast", true) case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) default: diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go index ad15166259..60cd2e556d 100644 --- a/request-strategy-impls_test.go +++ b/request-strategy-impls_test.go @@ -120,7 +120,7 @@ func BenchmarkRequestStrategy(b *testing.B) { tor.updatePieceCompletion(completed - 1) } // Starting and stopping timers around this part causes lots of GC overhead. - rs := peer.getDesiredRequestState() + rs := peer.getDesiredRequestState(false) tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes) // End of part that should be timed. remainingChunks := (numPieces - completed) * (pieceLength / chunkSize) diff --git a/requesting.go b/requesting.go index eb214199ef..a055db4862 100644 --- a/requesting.go +++ b/requesting.go @@ -166,7 +166,7 @@ type desiredRequestState struct { Interested bool } -func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { +func (p *Peer) getDesiredRequestState(debug bool) (desired desiredRequestState) { t := p.t if !t.haveInfo() { return @@ -190,78 +190,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { var dirtyChunks = t.dirtyChunks.Clone() t.mu.RUnlock() - requestStrategy.GetRequestablePieces( - input, - t.getPieceRequestOrder(), - func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { - if ih != t.infoHash { - return - } - if !p.peerHasPiece(pieceIndex) { - return - } - requestHeap.pieceStates[pieceIndex] = pieceExtra - allowedFast := p.peerAllowedFast.Contains(pieceIndex) - - it.Initialize(&dirtyChunks) - - t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { - if !allowedFast { - // We must signal interest to request this. TODO: We could set interested if the - // peers pieces (minus the allowed fast set) overlap with our missing pieces if - // there are any readers, or any pending pieces. - desired.Interested = true - // We can make or will allow sustaining a request here if we're not choked, or - // have made the request previously (presumably while unchoked), and haven't had - // the peer respond yet (and the request was retained because we are using the - // fast extension). - if p.peerChoking && !p.requestState.Requests.Contains(r) { - // We can't request this right now. - return - } - } - cancelled := &p.requestState.Cancelled - if !cancelled.IsEmpty() && cancelled.Contains(r) { - // Can't re-request while awaiting acknowledgement. - return - } - requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) - }) - }, - ) - t.assertPendingRequests() - desired.Requests = requestHeap - return -} - -func (p *Peer) getDesiredRequestStateDebug() (desired desiredRequestState) { - t := p.t - if !t.haveInfo() { - return - } - if t.closed.IsSet() { - return - } - if t.dataDownloadDisallowed.Bool() { - return - } - input := t.getRequestStrategyInput() - requestHeap := desiredPeerRequests{ - peer: p, - pieceStates: t.requestPieceStates, - requestIndexes: t.requestIndexes, - } - callCount := 0 iterCount := 0 - // Caller-provided allocation for roaring bitmap iteration. - var it typedRoaring.Iterator[RequestIndex] - - t.mu.RLock() - var dirtyChunks = t.dirtyChunks.Clone() - t.mu.RUnlock() - requestStrategy.GetRequestablePieces( input, t.getPieceRequestOrder(), @@ -289,13 +220,24 @@ func (p *Peer) getDesiredRequestStateDebug() (desired desiredRequestState) { // have made the request previously (presumably while unchoked), and haven't had // the peer respond yet (and the request was retained because we are using the // fast extension). - if p.peerChoking && !p.requestState.Requests.Contains(r) { + + p.mu.RLock() + peerChoking := p.peerChoking + hasRequest := p.requestState.Requests.Contains(r) + p.mu.RUnlock() + + if peerChoking && !hasRequest { // We can't request this right now. return } } + + p.mu.RLock() cancelled := &p.requestState.Cancelled - if !cancelled.IsEmpty() && cancelled.Contains(r) { + awaitingCancel := !cancelled.IsEmpty() && cancelled.Contains(r) + p.mu.RUnlock() + + if awaitingCancel { // Can't re-request while awaiting acknowledgement. return } @@ -303,16 +245,17 @@ func (p *Peer) getDesiredRequestStateDebug() (desired desiredRequestState) { }) }, ) - t.assertPendingRequests() desired.Requests = requestHeap - cap, ok := input.Capacity() - maxuv := input.MaxUnverifiedBytes() - rolen := t.getPieceRequestOrder().Len() + if debug { + cap, ok := input.Capacity() + maxuv := input.MaxUnverifiedBytes() + rolen := t.getPieceRequestOrder().Len() - p.logger.Levelf(log.Debug, "desired req=%d cap=%d ok=%v maxuv=%d rolen=%d indexes=%d states=%d calls=%d iter=%d", len(requestHeap.requestIndexes), - cap, ok, maxuv, rolen, len(t.requestIndexes), len(t.requestPieceStates), callCount, iterCount) + p.logger.Levelf(log.Debug, "desired req=%d cap=%d ok=%v maxuv=%d rolen=%d indexes=%d states=%d calls=%d iter=%d", len(requestHeap.requestIndexes), + cap, ok, maxuv, rolen, len(t.requestIndexes), len(t.requestPieceStates), callCount, iterCount) + } return } @@ -334,7 +277,7 @@ func (p *Peer) maybeUpdateActualRequestState() { context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { - next := p.getDesiredRequestState() + next := p.getDesiredRequestState(false) p.applyRequestState(next) p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) }, @@ -369,9 +312,12 @@ func (p *Peer) allowSendNotInterested() bool { // Transmit/action the request state to the peer. func (p *Peer) applyRequestState(next desiredRequestState) { + p.mu.Lock() + defer p.mu.Unlock() + current := &p.requestState // Make interest sticky - if !next.Interested && p.requestState.Interested { + if !next.Interested && current.Interested { if !p.allowSendNotInterested() { next.Interested = true } @@ -397,7 +343,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) { break } numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality()) - if numPending >= p.peerImpl.nominalMaxRequests() { + if numPending >= p.peerImpl.nominalMaxRequests(false) { break } req := heap.Pop(requestHeap) @@ -408,7 +354,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) { if p.needRequestUpdate == "Peer.remoteRejectedRequest" { continue } - existing := t.requestingPeer(req) if existing != nil && existing != p { if p.needRequestUpdate == "Peer.cancel" { @@ -423,15 +368,14 @@ func (p *Peer) applyRequestState(next desiredRequestState) { if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) { continue } - - t.cancelRequest(req) + t.cancelRequest(req, false) } - - more = p.mustRequest(req) + more = p.mustRequest(req, false) if !more { break } } + if !more { // This might fail if we incorrectly determine that we can fit up to the maximum allowed // requests into the available write buffer space. We don't want that to happen because it @@ -440,6 +384,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) { "couldn't fill apply entire request state [newRequests=%v]", current.Requests.GetCardinality()-originalRequestCount)) } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) //log.Printf( // "%s: requests %v->%v (peak %v->%v) reason %q (peer %v)\n", diff --git a/t.go b/t.go index 4c9bc329cc..18d499cf6b 100644 --- a/t.go +++ b/t.go @@ -181,8 +181,8 @@ func (t *Torrent) deleteReader(r *reader) { // has been obtained, see Torrent.Info and Torrent.GotInfo. func (t *Torrent) DownloadPieces(begin, end pieceIndex) { t.cl.lock() + defer t.cl.unlock() t.downloadPiecesLocked(begin, end) - t.cl.unlock() } func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) { diff --git a/torrent.go b/torrent.go index ab98c02bf8..8742dd654c 100644 --- a/torrent.go +++ b/torrent.go @@ -61,9 +61,9 @@ type mu struct { func (m *mu) RLock() { m.RWMutex.RLock() - m.rlmu.Lock() + //m.rlmu.Lock() //m.rlocker[m.rlc.Load()] = string(stack(2)) - m.rlmu.Unlock() + //m.rlmu.Unlock() m.rlc.Add(1) //fmt.Println("R", m.rlc, string(dbg.Stack())[:40]) @@ -375,7 +375,7 @@ func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn { } func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn { - conns := t.peerConnsAsSlice() + conns := t.peerConnsAsSlice(true) defer conns.free() for _, c := range conns { if f(c) { @@ -575,8 +575,8 @@ func (t *Torrent) onSetInfo() { t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) - p.updateRequests("onSetInfo") - }) + p.updateRequests("onSetInfo", true) + }, true) } // Called when metadata for a torrent becomes available. @@ -631,7 +631,7 @@ func (t *Torrent) setMetadataSize(size int) (err error) { t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14)) t.metadataChanged.Broadcast() - conns := t.peerConnsAsSlice() + conns := t.peerConnsAsSlice(true) defer conns.free() for _, c := range conns { c.requestPendingMetadata() @@ -856,7 +856,7 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "webseeds:\n") t.writePeerStatuses(w, t.webSeedsAsSlice()) - peerConns := t.peerConnsAsSlice() + peerConns := t.peerConnsAsSlice(true) defer peerConns.free() // Peers without priorities first, then those with. I'm undecided about how to order peers @@ -1008,7 +1008,7 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { } t.iterPeers(func(p *Peer) { p.close() - }) + }, true) if t.storage != nil { t.deletePieceRequestOrder() } @@ -1210,6 +1210,24 @@ var peerConnSlices sync.Pool type conns []*PeerConn +func (c conns) numOutgoingConns() (ret int) { + for _, con := range c { + if con.outgoing { + ret++ + } + } + return +} + +func (c conns) numReceivedConns() (ret int) { + for _, con := range c { + if con.Discovery == PeerSourceIncoming { + ret++ + } + } + return +} + func (c conns) free() { peerConnSlices.Put(c) } @@ -1225,9 +1243,11 @@ func (t *Torrent) webSeedsAsSlice() (ret []*Peer) { return ret } -func (t *Torrent) peerConnsAsSlice() conns { - t.mu.RLock() - defer t.mu.RUnlock() +func (t *Torrent) peerConnsAsSlice(lock bool) conns { + if lock { + t.mu.RLock() + defer t.mu.RUnlock() + } conns := getPeerConnSlice(len(t.conns)) for k := range t.conns { @@ -1381,17 +1401,23 @@ func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string) { // if c.requestState.Interested { // return // } - if !c.isLowOnRequests() { + if !c.isLowOnRequests(true) { return } if !c.peerHasPiece(piece) { return } - if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { + + c.mu.RLock() + ignore := c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) + c.mu.RUnlock() + + if ignore { return } - c.updateRequests(reason) - }) + + c.updateRequests(reason, true) + }, true) } t.maybeNewConns() t.publishPieceStateChange(piece, true) @@ -1494,40 +1520,15 @@ func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) { t.updatePiecePriority(piece, reason) } -func (t *Torrent) numReceivedConns() (ret int) { - t.mu.RLock() - defer t.mu.RUnlock() - - for c := range t.conns { - if c.Discovery == PeerSourceIncoming { - ret++ - } - } - return -} - -func (t *Torrent) numOutgoingConns() (ret int) { - t.mu.RLock() - defer t.mu.RUnlock() - - for c := range t.conns { - if c.outgoing { - ret++ - } - } - return -} - func (t *Torrent) maxHalfOpen() int { // Note that if we somehow exceed the maximum established conns, we want // the negative value to have an effect. - t.mu.RLock() - lenConns := len(t.conns) - t.mu.RUnlock() + conns := t.peerConnsAsSlice(true) + defer conns.free() - establishedHeadroom := int64(t.maxEstablishedConns - lenConns) - extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2) + establishedHeadroom := int64(t.maxEstablishedConns - len(conns)) + extraIncoming := int64(conns.numReceivedConns() - t.maxEstablishedConns/2) // We want to allow some experimentation with new peers, and to try to // upset an oversupply of received connections. return int(min( @@ -2161,18 +2162,24 @@ func (t *Torrent) reconcileHandshakeStats(c *Peer) { // Returns true if the connection is added. func (t *Torrent) addPeerConn(c *PeerConn) (err error) { - t.mu.Lock() - defer t.mu.Unlock() - defer func() { if err == nil { torrent.Add("added connections", 1) } }() - if t.closed.IsSet() { + + t.mu.RLock() + closed := t.closed.IsSet() + t.mu.RUnlock() + + if closed { return errors.New("torrent closed") } - for c0 := range t.conns { + + conns := t.peerConnsAsSlice(true) + defer conns.free() + + for _, c0 := range conns { if c.PeerID != c0.PeerID { continue } @@ -2186,9 +2193,10 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { return errors.New("existing connection preferred") } } - if len(t.conns) >= t.maxEstablishedConns { - numOutgoing := t.numOutgoingConns() - numIncoming := len(t.conns) - numOutgoing + + if len(conns) >= t.maxEstablishedConns { + numOutgoing := conns.numOutgoingConns() + numIncoming := len(conns) - numOutgoing c := t.worstBadConn(worseConnLensOpts{ // We've already established that we have too many connections at this point, so we just // need to match what kind we have too many of vs. what we're trying to add now. @@ -2201,15 +2209,22 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { c.close() t.deletePeerConn(c) } + + t.mu.Lock() + defer t.mu.Unlock() + if len(t.conns) >= t.maxEstablishedConns { - panic(len(t.conns)) + panic(len(conns)) } + t.conns[c] = struct{}{} + t.cl.event.Broadcast() // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { t.pex.Add(c) } + return nil } @@ -2248,17 +2263,14 @@ func (t *Torrent) wantOutgoingConns() bool { return false } - t.mu.RLock() - lenConns := len(t.conns) - t.mu.RUnlock() + conns := t.peerConnsAsSlice(true) + defer conns.free() - if lenConns < t.maxEstablishedConns { - return true - } - numIncomingConns := lenConns - t.numOutgoingConns() + numOutgoingConns := conns.numOutgoingConns() + numIncomingConns := len(conns) - numOutgoingConns return t.worstBadConn(worseConnLensOpts{ - incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1, + incomingIsBad: numIncomingConns-numOutgoingConns > 1, outgoingIsBad: false, }) != nil } @@ -2268,18 +2280,20 @@ func (t *Torrent) wantIncomingConns() bool { return false } - t.mu.RLock() - lenConns := len(t.conns) - t.mu.RUnlock() + conns := t.peerConnsAsSlice(true) + defer conns.free() - if lenConns < t.maxEstablishedConns { + if len(conns) < t.maxEstablishedConns { return true } - numIncomingConns := lenConns - t.numOutgoingConns() + + numOutgoingConns := conns.numOutgoingConns() + + numIncomingConns := len(conns) - numOutgoingConns return t.worstBadConn(worseConnLensOpts{ incomingIsBad: false, - outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1, + outgoingIsBad: numOutgoingConns-numIncomingConns > 1, }) != nil } @@ -2438,7 +2452,7 @@ func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { start := t.pieceRequestIndexOffset(piece) end := start + t.pieceNumChunks(piece) for ri := start; ri < end; ri++ { - t.cancelRequest(ri) + t.cancelRequest(ri, true) } } @@ -2446,7 +2460,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.pendAllChunkSpecs(piece) t.cancelRequestsForPiece(piece) t.piece(piece).readerCond.Broadcast() - conns := t.peerConnsAsSlice() + conns := t.peerConnsAsSlice(true) defer conns.free() for _, conn := range conns { conn.have(piece) @@ -2476,7 +2490,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { // } for _, conn := range t.peersAsSlice() { if conn.peerHasPiece(piece) { - conn.updateRequests("piece incomplete") + conn.updateRequests("piece incomplete", true) } } } @@ -2646,7 +2660,7 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) { func (t *Torrent) peersAsSlice() (ret []*Peer) { t.iterPeers(func(p *Peer) { ret = append(ret, p) - }) + }, true) return } @@ -2792,10 +2806,9 @@ func (t *Torrent) onWriteChunkErr(err error) { } func (t *Torrent) DisallowDataDownload() { - conns := t.peerConnsAsSlice() - defer conns.free() - t.mu.Lock() + conns := t.peerConnsAsSlice(false) + defer conns.free() t.dataDownloadDisallowed.Set() t.mu.Unlock() @@ -2804,17 +2817,16 @@ func (t *Torrent) DisallowDataDownload() { t.cl.lock() defer t.cl.unlock() // Could check if peer request state is empty/not interested? - c.updateRequests("disallow data download") + c.updateRequests("disallow data download", true) c.cancelAllRequests() }() } } func (t *Torrent) AllowDataDownload() { - conns := t.peerConnsAsSlice() - defer conns.free() - t.mu.Lock() + conns := t.peerConnsAsSlice(false) + defer conns.free() t.dataDownloadDisallowed.Clear() t.mu.Unlock() @@ -2822,17 +2834,16 @@ func (t *Torrent) AllowDataDownload() { func() { t.cl.lock() defer t.cl.unlock() - c.updateRequests("allow data download") + c.updateRequests("allow data download", true) }() } } // Enables uploading data, if it was disabled. func (t *Torrent) AllowDataUpload() { - conns := t.peerConnsAsSlice() - defer conns.free() - t.mu.Lock() + conns := t.peerConnsAsSlice(false) + defer conns.free() t.dataUploadDisallowed = false t.mu.Unlock() @@ -2840,17 +2851,16 @@ func (t *Torrent) AllowDataUpload() { func() { t.cl.lock() defer t.cl.unlock() - c.updateRequests("allow data upload") + c.updateRequests("allow data upload", true) }() } } // Disables uploading data, if it was enabled. func (t *Torrent) DisallowDataUpload() { - conns := t.peerConnsAsSlice() - defer conns.free() - t.mu.Lock() + conns := t.peerConnsAsSlice(false) + defer conns.free() t.dataUploadDisallowed = true t.mu.Unlock() @@ -2859,7 +2869,7 @@ func (t *Torrent) DisallowDataUpload() { t.cl.lock() defer t.cl.unlock() // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead? - c.updateRequests("disallow data upload") + c.updateRequests("disallow data upload", true) }() } } @@ -2872,8 +2882,8 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) { t.userOnWriteChunkErr = f } -func (t *Torrent) iterPeers(f func(p *Peer)) { - conns := t.peerConnsAsSlice() +func (t *Torrent) iterPeers(f func(p *Peer), lock bool) { + conns := t.peerConnsAsSlice(lock) defer conns.free() webSeeds := t.webSeedsAsSlice() @@ -2994,7 +3004,7 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) { if p1 == p { active = true } - }) + }, true) return } @@ -3018,10 +3028,10 @@ func (t *Torrent) updateComplete() { t.Complete.SetBool(t.haveAllPieces()) } -func (t *Torrent) cancelRequest(r RequestIndex) *Peer { +func (t *Torrent) cancelRequest(r RequestIndex, lockPeer bool) *Peer { p := t.requestingPeer(r) if p != nil { - p.cancel(r) + p.cancel(r, lockPeer) } // TODO: This is a check that an old invariant holds. It can be removed after some testing. //delete(t.pendingRequests, r) @@ -3252,7 +3262,7 @@ func addrPortProtocolStr(addrPort netip.AddrPort) string { func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { rzsSent := 0 - conns := t.peerConnsAsSlice() + conns := t.peerConnsAsSlice(true) defer conns.free() for _, pc := range conns { diff --git a/webseed-peer.go b/webseed-peer.go index 8cfbc03a25..4d28b170c0 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -84,7 +84,7 @@ func (ws *webseedPeer) _request(r Request) bool { return true } -func (cn *webseedPeer) nominalMaxRequests() maxRequests { +func (cn *webseedPeer) nominalMaxRequests(lock bool) maxRequests { interestedPeers := 1 cn.peer.t.iterPeers(func(peer *Peer) { @@ -101,9 +101,14 @@ func (cn *webseedPeer) nominalMaxRequests() maxRequests { interestedPeers++ } - }) + }, lock) + + if lock { + cn.peer.mu.RLock() + defer cn.peer.mu.RUnlock() + } - activeRequestsPerPeer := cn.peer.bestPeerNumPieces() / maxInt(1, interestedPeers) + activeRequestsPerPeer := cn.peer.bestPeerNumPieces(false) / maxInt(1, interestedPeers) return maxInt(1, minInt(cn.peer.PeerMaxRequests, maxInt(maxInt(8, activeRequestsPerPeer), cn.peer.peakRequests*2))) } @@ -167,11 +172,11 @@ func (ws *webseedPeer) requester(i int) { }) if !(ws.peer.t.dataDownloadDisallowed.Bool() || ws.peer.t.info == nil) { - desiredRequests := len(ws.peer.getDesiredRequestState().Requests.requestIndexes) + desiredRequests := len(ws.peer.getDesiredRequestState(false).Requests.requestIndexes) pendingRequests := int(ws.peer.requestState.Requests.GetCardinality()) receiving := ws.receiving.Load() ws.peer.logger.Levelf(log.Debug, "%d: requests %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) restart(%v)", - i, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(), + i, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(true), len(ws.activeRequests)-int(receiving), receiving, ws.maxActiveRequests, ws.waiting, ws.peer.t.numQueuedForHash(), ws.peer.t.activePieceHashes.Load(), ws.peer.t.hashing.Load(), len(ws.peer.t.hashResults), ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), restart) @@ -252,8 +257,8 @@ func requestUpdate(ws *webseedPeer) { ws.updateRequestor = nil ws.peer.logger.Levelf(log.Debug, "requestUpdate %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) %s", - ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState().Requests.requestIndexes), - ws.nominalMaxRequests(), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, + ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(false).Requests.requestIndexes), + ws.nominalMaxRequests(true), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, ws.peer.t.numQueuedForHash(), ws.peer.t.activePieceHashes.Load(), ws.peer.t.hashing.Load(), len(ws.peer.t.hashResults), ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), time.Since(ws.peer.lastRequestUpdate)) @@ -263,7 +268,7 @@ func requestUpdate(ws *webseedPeer) { if numCompleted < numPieces { // Don't wait for small files - if ws.peer.isLowOnRequests() && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) { + if ws.peer.isLowOnRequests(true) && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) { // if the number of incomplete pieces is less than five adjust this peers // lastUsefulChunkReceived to ensure that it can steal from non web peers // this is to help ensure completion - we may want to add a head call @@ -302,7 +307,7 @@ func requestUpdate(ws *webseedPeer) { ws.peer.t.iterPeers(func(p *Peer) { rate := p.downloadRate() pieces := int(p.requestState.Requests.GetCardinality()) - desired := len(ws.peer.getDesiredRequestState().Requests.requestIndexes) + desired := len(ws.peer.getDesiredRequestState(false).Requests.requestIndexes) this := "" if p == &ws.peer { @@ -310,15 +315,15 @@ func requestUpdate(ws *webseedPeer) { } flags := p.connectionFlags() peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate)) - }) + }, true) ws.peer.logger.Levelf(log.Debug, "unchoke processed=%d, complete(%d/%d) maxRequesters=%d, waiting=%d, (%s): peers(%d): %v", ws.processedRequests, numCompleted, numPieces, ws.maxRequesters, ws.waiting, ws.peer.lastUsefulChunkReceived, len(peerInfo), peerInfo) - ws.peer.updateRequests("unchoked") + ws.peer.updateRequests("unchoked", true) ws.peer.logger.Levelf(log.Debug, "unchoked %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d) complete(%d/%d) %s", - ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestStateDebug().Requests.requestIndexes), - ws.nominalMaxRequests(), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, + ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(true).Requests.requestIndexes), + ws.nominalMaxRequests(true), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting, ws.peer.t.numQueuedForHash(), ws.peer.t.activePieceHashes.Load(), ws.peer.t.numPiecesCompleted(), ws.peer.t.NumPieces(), time.Since(ws.peer.lastRequestUpdate)) @@ -365,10 +370,10 @@ func (ws *webseedPeer) onClose() { // Just deleting them means we would have to manually cancel active requests. ws.peer.cancelAllRequests() ws.peer.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests("webseedPeer.onClose") + if p.isLowOnRequests(true) { + p.updateRequests("webseedPeer.onClose", true) } - }) + }, true) ws.requesterCond.Broadcast() }