Skip to content

Commit

Permalink
adjusted chocke processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jun 8, 2024
1 parent a4a31e1 commit 1a18fa7
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 53 deletions.
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,12 +1107,16 @@ func (p *Peer) initUpdateRequestsTimer() {
const peerUpdateRequestsTimerReason = "updateRequestsTimer"

func (c *Peer) updateRequestsTimerFunc() {
c.t.mu.Lock()
defer c.t.mu.Unlock()

c.mu.Lock()
defer c.mu.Unlock()

if c.closed.IsSet() {
return
}
if c.isLowOnRequests(false) {
if c.isLowOnRequests(false,false) {
// If there are no outstanding requests, then a request update should have already run.
return
}
Expand All @@ -1122,7 +1126,7 @@ func (c *Peer) updateRequestsTimerFunc() {
torrent.Add("spurious timer requests updates", 1)
return
}
c.updateRequests(peerUpdateRequestsTimerReason, false, true)
c.updateRequests(peerUpdateRequestsTimerReason, false, false)
}

// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
Expand Down
6 changes: 6 additions & 0 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type peerImpl interface {
String() string
peerImplStatusLines() []string

// the peers is running low on requests and needs some more
// Note: for web peers this means low on web requests - as we
// want to drive the http request api while we are processing
// its responses
isLowOnRequests(lock bool, lockTorrent bool) bool

// All if the peer should have everything, known if we know that for a fact. For example, we can
// guess at how many pieces are in a torrent, and assume they have all pieces based on them
// having sent haves for everything, but we don't know for sure. But if they send a have-all
Expand Down
22 changes: 8 additions & 14 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func (cn *Peer) downloadRate() float64 {
}

func (p *Peer) DownloadRate() float64 {
p.locker().RLock()
defer p.locker().RUnlock()
p.mu.RLock()
defer p.mu.RUnlock()

return p.downloadRate()
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func (me *Peer) cancel(r RequestIndex, updateRequests bool, lock bool, lockTorre
}
}
me.decPeakRequests()
if updateRequests && me.isLowOnRequests(lock) {
if updateRequests && me.isLowOnRequests(lock, lockTorrent) {
me.updateRequests("Peer.cancel", lock, lockTorrent)
}
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
return false
}
}
if c.isLowOnRequests(true) {
if c.isLowOnRequests(true, true) {
c.updateRequests("Peer.remoteRejectedRequest", true, true)
}
c.decExpectedChunkReceive(r)
Expand Down Expand Up @@ -836,7 +836,7 @@ func (c *Peer) receiveChunk(msg *pp.Message, lockTorrent bool) error {

// this is moved after all processing to avoid request rehere because as we no longer have a
if intended {
if c.isLowOnRequests(true) {
if c.isLowOnRequests(true, false) {
c.updateRequests("Peer.receiveChunk deleted request", true, false)
}
}
Expand Down Expand Up @@ -945,7 +945,7 @@ func (c *Peer) deleteAllRequests(reason string, lock bool, lockTorrent bool) {
})
c.assertNoRequests()
c.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests(false) {
if p.isLowOnRequests(false, false) {
p.updateRequests(reason, false, false)
}
}, false)
Expand Down Expand Up @@ -1003,7 +1003,9 @@ func (l connectionTrust) Less(r connectionTrust) bool {
// Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims.
func (cn *Peer) newPeerPieces(lockTorrent bool) *roaring.Bitmap {
// TODO: Can we use copy on write?
cn.mu.RLock()
ret := cn.peerPieces().Clone()
cn.mu.RUnlock()
if all, _ := cn.peerHasAllPieces(lockTorrent); all {
if cn.t.haveInfo(true) {
ret.AddRange(0, bitmap.BitRange(cn.t.numPieces()))
Expand Down Expand Up @@ -1031,14 +1033,6 @@ func (p *Peer) uncancelledRequests() uint64 {

type peerLocalPublicAddr = IpPort

func (p *Peer) isLowOnRequests(lock bool) bool {
if lock {
p.mu.RLock()
defer p.mu.RUnlock()
}
return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
}

func (p *Peer) decPeakRequests() {
// // This can occur when peak requests are altered by the update request timer to be lower than
// // the actual number of outstanding requests. Let's let it go negative and see what happens. I
Expand Down
10 changes: 8 additions & 2 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (cn *PeerConn) peerImplStatusLines() []string {
}
}

func (p *PeerConn) isLowOnRequests(lock bool, lockTorrent bool) bool {
if lock {
p.mu.RLock()
defer p.mu.RUnlock()
}
return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
}

// Returns true if the connection is over IPv6.
func (cn *PeerConn) ipv6() bool {
ip := cn.remoteIp()
Expand Down Expand Up @@ -1159,8 +1167,6 @@ func (pc *PeerConn) String() string {
// Returns the pieces the peer could have based on their claims. If we don't know how many pieces
// are in the torrent, it could be a very large range if the peer has sent HaveAll.
func (pc *PeerConn) PeerPieces() *roaring.Bitmap {
pc.locker().RLock()
defer pc.locker().RUnlock()
return pc.newPeerPieces(true)
}

Expand Down
4 changes: 2 additions & 2 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
fmt.Fprintf(w, "webseeds:\n")
t.writePeerStatuses(w, t.webSeedsAsSlice(false))

peerConns := t.peerConnsAsSlice(true)
peerConns := t.peerConnsAsSlice(false)
defer peerConns.free()

// Peers without priorities first, then those with. I'm undecided about how to order peers
Expand Down Expand Up @@ -1545,7 +1545,7 @@ func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string, lock b
// if c.requestState.Interested {
// return
// }
if !c.isLowOnRequests(false) {
if !c.isLowOnRequests(false, false) {
return
}
if !c.peerHasPiece(piece, true, false) {
Expand Down
79 changes: 50 additions & 29 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type webseedPeer struct {
maxRequesters int // the number of requester to run for this peer
waiting int // the number of requesters currently waiting for a signal
receiving atomic.Int64
persisting atomic.Int64
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
Expand Down Expand Up @@ -119,7 +120,7 @@ func (cn *webseedPeer) nominalMaxRequests(lock bool, lockTorrent bool) maxReques
}

func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
webseedRequest := ws.client.NewRequest(ws.intoSpec(r), &ws.receiving)
ws.activeRequests[r] = webseedRequest
if activeLen := len(ws.activeRequests); activeLen > ws.maxActiveRequests {
ws.maxActiveRequests = activeLen
Expand Down Expand Up @@ -165,7 +166,7 @@ func (ws *webseedPeer) requester(i int) {

ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()

if errors.Is(err, webseed.ErrTooFast) {
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
}
Expand Down Expand Up @@ -236,7 +237,7 @@ func (ws *webseedPeer) requester(i int) {

if time.Since(ws.lastLog) > 5*time.Second {
ws.lastLog = time.Now()
go logProgress(ws)
go logProgress(ws, "requests", true)
}

ws.waiting++
Expand All @@ -259,11 +260,13 @@ func (ws *webseedPeer) requester(i int) {

var webpeerUnchokeTimerDuration = 15 * time.Second

func logProgress(ws *webseedPeer) {
func logProgress(ws *webseedPeer, label string, lockTorrent bool) {
t := ws.peer.t

t.mu.RLock()
defer t.mu.RUnlock()
if lockTorrent {
t.mu.RLock()
defer t.mu.RUnlock()
}

if !t.haveInfo(false) {
return
Expand All @@ -275,12 +278,13 @@ func logProgress(ws *webseedPeer) {
desiredRequests := len(ws.peer.getDesiredRequestState(false, false, false).Requests.requestIndexes)
pendingRequests := int(ws.peer.requestState.Requests.GetCardinality())
receiving := ws.receiving.Load()
persisting := ws.persisting.Load()

ws.peer.logger.Levelf(log.Debug, "requests %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d)",
ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(false, false),
len(ws.activeRequests)-int(receiving), receiving, ws.maxActiveRequests, ws.waiting,
ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s",
label, ws.processedRequests, pendingRequests, desiredRequests, ws.nominalMaxRequests(false, false),
len(ws.activeRequests)-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting,
t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults),
t.numPiecesCompleted(false), t.NumPieces())
t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate))
}

func requestUpdate(ws *webseedPeer) {
Expand All @@ -292,20 +296,15 @@ func requestUpdate(ws *webseedPeer) {

ws.updateRequestor = nil

ws.peer.logger.Levelf(log.Debug, "requestUpdate %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) %s",
ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(false, true, false).Requests.requestIndexes),
ws.nominalMaxRequests(true, false), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting,
ws.peer.t.numQueuedForHash(false), ws.peer.t.activePieceHashes.Load(), ws.peer.t.hashing.Load(), len(ws.peer.t.hashResults),
ws.peer.t.numPiecesCompleted(false), ws.peer.t.NumPieces(),
time.Since(ws.peer.lastRequestUpdate))
logProgress(ws, "requestUpdate", false)

if !ws.peer.closed.IsSet() {
numPieces := ws.peer.t.NumPieces()
numCompleted := ws.peer.t.numPiecesCompleted(false)

if numCompleted < numPieces {
// Don't wait for small files
if ws.peer.isLowOnRequests(false) && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) {
if ws.peer.isLowOnRequests(true, false) && (numPieces == 1 || time.Since(ws.peer.lastRequestUpdate) > webpeerUnchokeTimerDuration) {
// if the number of incomplete pieces is less than five adjust this peers
// lastUsefulChunkReceived to ensure that it can steal from non web peers
// this is to help ensure completion - we may want to add a head call
Expand Down Expand Up @@ -340,29 +339,36 @@ func requestUpdate(ws *webseedPeer) {
}

peerInfo := []string{}

ws.peer.t.iterPeers(func(p *Peer) {
p.mu.RLock()
defer p.mu.RUnlock()

rate := p.downloadRate()
pieces := int(p.requestState.Requests.GetCardinality())
desired := len(p.getDesiredRequestState(false, true, false).Requests.requestIndexes)
desired := len(p.getDesiredRequestState(false, false, false).Requests.requestIndexes)

this := ""
if p == &ws.peer {
this = "*"
}
flags := p.connectionFlags()
peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate))

}, false)

ws.peer.logger.Levelf(log.Debug, "unchoke processed=%d, complete(%d/%d) maxRequesters=%d, waiting=%d, (%s): peers(%d): %v", ws.processedRequests, numCompleted, numPieces, ws.maxRequesters, ws.waiting, ws.peer.lastUsefulChunkReceived, len(peerInfo), peerInfo)
ws.peer.logger.Levelf(log.Debug, "unchoke processed=%d, complete(%d/%d) maxRequesters=%d, waiting=%d, (%s): peers(%d): %v",
ws.processedRequests, numCompleted, numPieces, ws.maxRequesters,
ws.waiting, time.Since(ws.peer.lastUsefulChunkReceived), len(peerInfo), peerInfo)

ws.peer.updateRequests("unchoked", true, false)
func() {
// update requests may require a write lock on the
// torrent so free our read lock first
ws.peer.t.mu.RUnlock()
defer ws.peer.t.mu.RLock()
ws.peer.updateRequests("unchoked", true, true)
}()

ws.peer.logger.Levelf(log.Debug, "unchoked %d (p=%d,d=%d,n=%d) active(c=%d,m=%d,w=%d) hashing(q=%d,a=%d) complete(%d/%d) %s",
ws.processedRequests, int(ws.peer.requestState.Requests.GetCardinality()), len(ws.peer.getDesiredRequestState(true, true, false).Requests.requestIndexes),
ws.nominalMaxRequests(true, false), len(ws.activeRequests), ws.maxActiveRequests, ws.waiting,
ws.peer.t.numQueuedForHash(false), ws.peer.t.activePieceHashes.Load(), ws.peer.t.numPiecesCompleted(false), ws.peer.t.NumPieces(),
time.Since(ws.peer.lastRequestUpdate))
logProgress(ws, "unchoked", false)

// if the initial unchoke didn't yield a request (for small files) - don't immediately
// retry it means its being handled by a prioritized peer
Expand Down Expand Up @@ -391,6 +397,21 @@ func (cn *webseedPeer) ban() {
cn.peer.drop(true)
}

func (cn *webseedPeer) isLowOnRequests(lock bool, lockTorrent bool) bool {
if lockTorrent {
cn.peer.t.mu.RLock()
defer cn.peer.t.mu.RUnlock()
}

if lock {
cn.peer.mu.RLock()
defer cn.peer.mu.RUnlock()
}

return cn.peer.requestState.Requests.IsEmpty() && cn.peer.requestState.Cancelled.IsEmpty() ||
len(cn.activeRequests) <= cn.nominalMaxRequests(false, false)/4
}

func (ws *webseedPeer) handleUpdateRequests(lock bool, lockTorrent bool) {
// Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
// pieces.
Expand All @@ -403,7 +424,7 @@ func (ws *webseedPeer) onClose(lockTorrent bool) {
// Just deleting them means we would have to manually cancel active requests.
ws.peer.cancelAllRequests(lockTorrent)
ws.peer.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests(true) {
if p.isLowOnRequests(true, lockTorrent) {
p.updateRequests("webseedPeer.onClose", true, lockTorrent)
}
}, true)
Expand All @@ -414,8 +435,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
result := <-webseedRequest.Result
close(webseedRequest.Result) // one-shot

ws.receiving.Add(1)
defer ws.receiving.Add(-1)
ws.persisting.Add(1)
defer ws.persisting.Add(-1)

// We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
// sure if we can divine which errors indicate cancellation on our end without hitting the
Expand Down
13 changes: 9 additions & 4 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net/http"
"strings"
"sync/atomic"

"github.com/RoaringBitmap/roaring"

Expand Down Expand Up @@ -68,7 +69,7 @@ type RequestResult struct {
Err error
}

func (ws *Client) NewRequest(r RequestSpec) Request {
func (ws *Client) NewRequest(r RequestSpec, receivingCounter *atomic.Int64) Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
Expand Down Expand Up @@ -98,7 +99,7 @@ func (ws *Client) NewRequest(r RequestSpec) Request {
Result: make(chan RequestResult, 1),
}
go func() {
b, err := readRequestPartResponses(ctx, requestParts)
b, err := readRequestPartResponses(ctx, requestParts, receivingCounter)
req.Result <- RequestResult{
Bytes: b,
Err: err,
Expand Down Expand Up @@ -174,14 +175,18 @@ func recvPartResult(ctx context.Context, buf *bytes.Buffer, part requestPart, re

var ErrTooFast = errors.New("making requests too fast")

func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
func readRequestPartResponses(ctx context.Context, parts []requestPart, receivingCounter *atomic.Int64) ([]byte, error) {
var buf bytes.Buffer

for _, part := range parts {
if result, err := part.do(); err != nil {
return nil, err
} else {
if err = recvPartResult(ctx, &buf, part, result); err != nil {
if err = func() error {
receivingCounter.Add(1)
defer receivingCounter.Add(-1)
return recvPartResult(ctx, &buf, part, result)
}(); err != nil {
return nil, fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
}
}
Expand Down

0 comments on commit 1a18fa7

Please sign in to comment.