From 36f52d7a145142e682b79563c84b229159fdcb58 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 20 May 2021 20:23:45 +1000 Subject: [PATCH] Apply next request state asynchronously --- client.go | 4 +- internal/chansync/broadcast-cond.go.go | 3 +- internal/chansync/set-once.go | 3 +- peer-impl.go | 5 +- peerconn.go | 94 +++++++++++++------------- pexconn_test.go | 2 +- request-strategy.go | 46 ++++++++++--- webseed-peer.go | 12 +++- 8 files changed, 102 insertions(+), 67 deletions(-) diff --git a/client.go b/client.go index 7ac6b264d6..1858b84816 100644 --- a/client.go +++ b/client.go @@ -33,12 +33,12 @@ import ( "golang.org/x/xerrors" "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/internal/chansync" "github.com/anacrolix/torrent/internal/limiter" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" - request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/webtorrent" @@ -81,7 +81,7 @@ type Client struct { activeAnnounceLimiter limiter.Instance - pieceRequestOrder request_strategy.ClientPieceOrder + updateRequests chansync.BroadcastCond } type ipStr string diff --git a/internal/chansync/broadcast-cond.go.go b/internal/chansync/broadcast-cond.go.go index 9b8906920d..6d96d3c4e6 100644 --- a/internal/chansync/broadcast-cond.go.go +++ b/internal/chansync/broadcast-cond.go.go @@ -22,7 +22,8 @@ func (me *BroadcastCond) Broadcast() { } // Should be called before releasing locks on resources that might trigger subsequent Broadcasts. -func (me *BroadcastCond) WaitChan() <-chan struct{} { +// The channel is closed when the condition changes. +func (me *BroadcastCond) Signaled() Signaled { me.mu.Lock() defer me.mu.Unlock() if me.ch == nil { diff --git a/internal/chansync/set-once.go b/internal/chansync/set-once.go index 523e5eaf5b..db0e6e89c0 100644 --- a/internal/chansync/set-once.go +++ b/internal/chansync/set-once.go @@ -9,7 +9,8 @@ type SetOnce struct { closeOnce sync.Once } -func (me *SetOnce) Chan() <-chan struct{} { +// Returns a channel that is closed when the event is flagged. +func (me *SetOnce) Done() Done { me.init() return me.ch } diff --git a/peer-impl.go b/peer-impl.go index 880b8f3543..23c0fbb922 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -8,13 +8,14 @@ import ( // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with // legacy PeerConn methods. type peerImpl interface { + onNextRequestStateChanged() updateRequests() writeInterested(interested bool) bool // Neither of these return buffer room anymore, because they're currently both posted. There's // also PeerConn.writeBufferFull for when/where it matters. - _cancel(Request) - _request(Request) + _cancel(Request) bool + _request(Request) bool connectionFlags() string onClose() diff --git a/peerconn.go b/peerconn.go index b2753d6a88..40848e37e2 100644 --- a/peerconn.go +++ b/peerconn.go @@ -24,6 +24,7 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + request_strategy "github.com/anacrolix/torrent/request-strategy" ) type PeerSource string @@ -48,7 +49,10 @@ type PeerRemoteAddr interface { // Since we have to store all the requests in memory, we can't reasonably exceed what would be // indexable with the memory space available. -type maxRequests = int +type ( + maxRequests = int + requestState = request_strategy.PeerNextRequestState +) type Peer struct { // First to ensure 64-bit alignment for atomics. See #262. @@ -78,7 +82,8 @@ type Peer struct { lastChunkSent time.Time // Stuff controlled by the local peer. - interested bool + nextRequestState requestState + actualRequestState requestState lastBecameInterested time.Time priorInterest time.Duration @@ -87,7 +92,6 @@ type Peer struct { _chunksReceivedWhileExpecting int64 choking bool - requests map[Request]struct{} piecesReceivedSinceLastRequestUpdate maxRequests maxPiecesReceivedBetweenRequestUpdates maxRequests // Chunks that we might reasonably expect to receive from the peer. Due to @@ -171,13 +175,13 @@ func (cn *Peer) updateExpectingChunks() { } func (cn *Peer) expectingChunks() bool { - if len(cn.requests) == 0 { + if len(cn.actualRequestState.Requests) == 0 { return false } - if !cn.interested { + if !cn.actualRequestState.Interested { return false } - for r := range cn.requests { + for r := range cn.actualRequestState.Requests { if !cn.remoteChokingPiece(r.Index.Int()) { return true } @@ -217,7 +221,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) { func (cn *Peer) cumInterest() time.Duration { ret := cn.priorInterest - if cn.interested { + if cn.actualRequestState.Interested { ret += time.Since(cn.lastBecameInterested) } return ret @@ -301,7 +305,7 @@ func (cn *Peer) statusFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) } - if cn.interested { + if cn.actualRequestState.Interested { c('i') } if cn.choking { @@ -329,7 +333,7 @@ func (cn *Peer) downloadRate() float64 { func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { ret = make(map[pieceIndex]int) - for r := range cn.requests { + for r := range cn.actualRequestState.Requests { ret[pieceIndex(r.Index)]++ } return @@ -437,18 +441,6 @@ func (cn *PeerConn) write(msg pp.Message) bool { return notFull } -func (cn *peerConnMsgWriter) write(msg pp.Message) bool { - cn.mu.Lock() - defer cn.mu.Unlock() - cn.writeBuffer.Write(msg.MustMarshalBinary()) - cn.writeCond.Broadcast() - return !cn.writeBufferFull() -} - -func (cn *peerConnMsgWriter) writeBufferFull() bool { - return cn.writeBuffer.Len() >= writeBufferHighWaterLen -} - func (cn *PeerConn) requestMetadataPiece(index int) { eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata] if eID == 0 { @@ -538,10 +530,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { } func (cn *Peer) setInterested(interested bool) bool { - if cn.interested == interested { + if cn.actualRequestState.Interested == interested { return true } - cn.interested = interested + cn.actualRequestState.Interested = interested if interested { cn.lastBecameInterested = time.Now() } else if !cn.lastBecameInterested.IsZero() { @@ -587,20 +579,20 @@ func (cn *Peer) shouldRequest(r Request) error { return nil } -func (cn *Peer) request(r Request) error { +func (cn *Peer) request(r Request) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } - if _, ok := cn.requests[r]; ok { - return nil + if _, ok := cn.actualRequestState.Requests[r]; ok { + return true, nil } if cn.numLocalRequests() >= cn.nominalMaxRequests() { - return errors.New("too many outstanding requests") + return true, errors.New("too many outstanding requests") } - if cn.requests == nil { - cn.requests = make(map[Request]struct{}) + if cn.actualRequestState.Requests == nil { + cn.actualRequestState.Requests = make(map[Request]struct{}) } - cn.requests[r] = struct{}{} + cn.actualRequestState.Requests[r] = struct{}{} if cn.validReceiveChunks == nil { cn.validReceiveChunks = make(map[Request]int) } @@ -610,12 +602,11 @@ func (cn *Peer) request(r Request) error { for _, f := range cn.callbacks.SentRequest { f(PeerRequestEvent{cn, r}) } - cn.peerImpl._request(r) - return nil + return cn.peerImpl._request(r), nil } -func (me *PeerConn) _request(r Request) { - me.write(pp.Message{ +func (me *PeerConn) _request(r Request) bool { + return me.write(pp.Message{ Type: pp.Request, Index: r.Index, Begin: r.Begin, @@ -623,17 +614,21 @@ func (me *PeerConn) _request(r Request) { }) } -func (me *Peer) cancel(r Request) { +func (me *Peer) cancel(r Request) bool { if me.deleteRequest(r) { - me.peerImpl._cancel(r) + return me.peerImpl._cancel(r) } + return true } -func (me *PeerConn) _cancel(r Request) { - me.write(makeCancelMessage(r)) +func (me *PeerConn) _cancel(r Request) bool { + return me.write(makeCancelMessage(r)) } func (cn *PeerConn) fillWriteBuffer() { + if !cn.applyNextRequestState() { + return + } if cn.pex.IsEnabled() { if flow := cn.pex.Share(cn.write); !flow { return @@ -668,8 +663,7 @@ func (cn *PeerConn) postBitfield() { } func (cn *PeerConn) updateRequests() { - // log.Print("update requests") - cn.tickleWriter() + cn.t.cl.tickleRequester() } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -1286,7 +1280,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // out. deletedRequest := false { - if _, ok := c.requests[req]; ok { + if _, ok := c.actualRequestState.Requests[req]; ok { for _, f := range c.callbacks.ReceivedRequested { f(PeerMessageEvent{c, msg}) } @@ -1468,14 +1462,15 @@ func (c *Peer) peerHasWantedPieces() bool { } func (c *Peer) numLocalRequests() int { - return len(c.requests) + return len(c.actualRequestState.Requests) } func (c *Peer) deleteRequest(r Request) bool { - if _, ok := c.requests[r]; !ok { + delete(c.nextRequestState.Requests, r) + if _, ok := c.actualRequestState.Requests[r]; !ok { return false } - delete(c.requests, r) + delete(c.actualRequestState.Requests, r) for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, r}) } @@ -1493,12 +1488,13 @@ func (c *Peer) deleteRequest(r Request) bool { } func (c *Peer) deleteAllRequests() { - for r := range c.requests { + for r := range c.actualRequestState.Requests { c.deleteRequest(r) } - if len(c.requests) != 0 { - panic(len(c.requests)) + if l := len(c.actualRequestState.Requests); l != 0 { + panic(l) } + c.nextRequestState.Requests = nil // for c := range c.t.conns { // c.tickleWriter() // } @@ -1635,3 +1631,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { pc, ok := p.peerImpl.(*PeerConn) return pc, ok } + +func (p *PeerConn) onNextRequestStateChanged() { + p.tickleWriter() +} diff --git a/pexconn_test.go b/pexconn_test.go index df9aa44ed6..7bb61ecdaa 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) { out = m return true } - <-c.messageWriter.writeCond.WaitChan() + <-c.messageWriter.writeCond.Signaled() c.pex.Share(testWriter) require.True(t, writerCalled) require.EqualValues(t, pp.Extended, out.Type) diff --git a/request-strategy.go b/request-strategy.go index 72f7f3d4ae..7313c84a3f 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -5,25 +5,36 @@ import ( "unsafe" "github.com/anacrolix/missinggo/v2/bitmap" + + "github.com/anacrolix/torrent/internal/chansync" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/types" ) func (cl *Client) requester() { for { - func() { + update := func() chansync.Signaled { cl.lock() defer cl.unlock() cl.doRequests() + return cl.updateRequests.Signaled() }() + // We can probably tune how often to heed this signal. TODO: Currently disabled to retain + // existing behaviour, while the signalling is worked out. + update = nil select { case <-cl.closed.LockedChan(cl.locker()): return + case <-update: case <-time.After(100 * time.Millisecond): } } } +func (cl *Client) tickleRequester() { + cl.updateRequests.Broadcast() +} + func (cl *Client) doRequests() { ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { @@ -62,7 +73,7 @@ func (cl *Client) doRequests() { HasPiece: p.peerHasPiece, MaxRequests: p.nominalMaxRequests(), HasExistingRequest: func(r request_strategy.Request) bool { - _, ok := p.requests[r] + _, ok := p.actualRequestState.Requests[r] return ok }, Choking: p.peerChoking, @@ -81,7 +92,7 @@ func (cl *Client) doRequests() { MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, }) for p, state := range nextPeerStates { - applyPeerNextRequestState(p, state) + setPeerNextRequestState(p, state) } } @@ -91,20 +102,35 @@ func (p *peerId) Uintptr() uintptr { return uintptr(unsafe.Pointer(p)) } -func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { +func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { p := (*Peer)(_p.(*peerId)) - p.setInterested(rp.Interested) - for req := range p.requests { - if _, ok := rp.Requests[req]; !ok { - p.cancel(req) + p.nextRequestState = rp + p.onNextRequestStateChanged() +} + +func (p *Peer) applyNextRequestState() bool { + next := p.nextRequestState + current := p.actualRequestState + if !p.setInterested(next.Interested) { + return false + } + for req := range current.Requests { + if _, ok := next.Requests[req]; !ok { + if !p.cancel(req) { + return false + } } } - for req := range rp.Requests { - err := p.request(req) + for req := range next.Requests { + more, err := p.request(req) if err != nil { panic(err) } else { //log.Print(req) } + if !more { + return false + } } + return true } diff --git a/webseed-peer.go b/webseed-peer.go index 5f2980c3e7..7af892f0ff 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -45,19 +45,21 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r Request) { +func (ws *webseedPeer) _cancel(r Request) bool { active, ok := ws.activeRequests[r] if ok { active.Cancel() } + return true } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} } -func (ws *webseedPeer) _request(r Request) { +func (ws *webseedPeer) _request(r Request) bool { ws.requesterCond.Signal() + return true } func (ws *webseedPeer) doRequest(r Request) { @@ -76,7 +78,7 @@ func (ws *webseedPeer) requester() { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { - for r := range ws.peer.requests { + for r := range ws.peer.actualRequestState.Requests { if _, ok := ws.activeRequests[r]; ok { continue } @@ -142,3 +144,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } } } + +func (me *webseedPeer) onNextRequestStateChanged() { + me.peer.applyNextRequestState() +}