Skip to content

Commit

Permalink
Apply next request state asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jun 7, 2021
1 parent d37dea1 commit 36f52d7
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 67 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (
"golang.org/x/xerrors"

"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/chansync"
"github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent"
Expand Down Expand Up @@ -81,7 +81,7 @@ type Client struct {

activeAnnounceLimiter limiter.Instance

pieceRequestOrder request_strategy.ClientPieceOrder
updateRequests chansync.BroadcastCond
}

type ipStr string
Expand Down
3 changes: 2 additions & 1 deletion internal/chansync/broadcast-cond.go.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func (me *BroadcastCond) Broadcast() {
}

// Should be called before releasing locks on resources that might trigger subsequent Broadcasts.
func (me *BroadcastCond) WaitChan() <-chan struct{} {
// The channel is closed when the condition changes.
func (me *BroadcastCond) Signaled() Signaled {
me.mu.Lock()
defer me.mu.Unlock()
if me.ch == nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/chansync/set-once.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ type SetOnce struct {
closeOnce sync.Once
}

func (me *SetOnce) Chan() <-chan struct{} {
// Returns a channel that is closed when the event is flagged.
func (me *SetOnce) Done() Done {
me.init()
return me.ch
}
Expand Down
5 changes: 3 additions & 2 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
// legacy PeerConn methods.
type peerImpl interface {
onNextRequestStateChanged()
updateRequests()
writeInterested(interested bool) bool

// Neither of these return buffer room anymore, because they're currently both posted. There's
// also PeerConn.writeBufferFull for when/where it matters.
_cancel(Request)
_request(Request)
_cancel(Request) bool
_request(Request) bool

connectionFlags() string
onClose()
Expand Down
94 changes: 47 additions & 47 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)

type PeerSource string
Expand All @@ -48,7 +49,10 @@ type PeerRemoteAddr interface {

// Since we have to store all the requests in memory, we can't reasonably exceed what would be
// indexable with the memory space available.
type maxRequests = int
type (
maxRequests = int
requestState = request_strategy.PeerNextRequestState
)

type Peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
Expand Down Expand Up @@ -78,7 +82,8 @@ type Peer struct {
lastChunkSent time.Time

// Stuff controlled by the local peer.
interested bool
nextRequestState requestState
actualRequestState requestState
lastBecameInterested time.Time
priorInterest time.Duration

Expand All @@ -87,7 +92,6 @@ type Peer struct {
_chunksReceivedWhileExpecting int64

choking bool
requests map[Request]struct{}
piecesReceivedSinceLastRequestUpdate maxRequests
maxPiecesReceivedBetweenRequestUpdates maxRequests
// Chunks that we might reasonably expect to receive from the peer. Due to
Expand Down Expand Up @@ -171,13 +175,13 @@ func (cn *Peer) updateExpectingChunks() {
}

func (cn *Peer) expectingChunks() bool {
if len(cn.requests) == 0 {
if len(cn.actualRequestState.Requests) == 0 {
return false
}
if !cn.interested {
if !cn.actualRequestState.Interested {
return false
}
for r := range cn.requests {
for r := range cn.actualRequestState.Requests {
if !cn.remoteChokingPiece(r.Index.Int()) {
return true
}
Expand Down Expand Up @@ -217,7 +221,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {

func (cn *Peer) cumInterest() time.Duration {
ret := cn.priorInterest
if cn.interested {
if cn.actualRequestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
return ret
Expand Down Expand Up @@ -301,7 +305,7 @@ func (cn *Peer) statusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
if cn.interested {
if cn.actualRequestState.Interested {
c('i')
}
if cn.choking {
Expand Down Expand Up @@ -329,7 +333,7 @@ func (cn *Peer) downloadRate() float64 {

func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
for r := range cn.requests {
for r := range cn.actualRequestState.Requests {
ret[pieceIndex(r.Index)]++
}
return
Expand Down Expand Up @@ -437,18 +441,6 @@ func (cn *PeerConn) write(msg pp.Message) bool {
return notFull
}

func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}

func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}

func (cn *PeerConn) requestMetadataPiece(index int) {
eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
if eID == 0 {
Expand Down Expand Up @@ -538,10 +530,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
}

func (cn *Peer) setInterested(interested bool) bool {
if cn.interested == interested {
if cn.actualRequestState.Interested == interested {
return true
}
cn.interested = interested
cn.actualRequestState.Interested = interested
if interested {
cn.lastBecameInterested = time.Now()
} else if !cn.lastBecameInterested.IsZero() {
Expand Down Expand Up @@ -587,20 +579,20 @@ func (cn *Peer) shouldRequest(r Request) error {
return nil
}

func (cn *Peer) request(r Request) error {
func (cn *Peer) request(r Request) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
if _, ok := cn.requests[r]; ok {
return nil
if _, ok := cn.actualRequestState.Requests[r]; ok {
return true, nil
}
if cn.numLocalRequests() >= cn.nominalMaxRequests() {
return errors.New("too many outstanding requests")
return true, errors.New("too many outstanding requests")
}
if cn.requests == nil {
cn.requests = make(map[Request]struct{})
if cn.actualRequestState.Requests == nil {
cn.actualRequestState.Requests = make(map[Request]struct{})
}
cn.requests[r] = struct{}{}
cn.actualRequestState.Requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[Request]int)
}
Expand All @@ -610,30 +602,33 @@ func (cn *Peer) request(r Request) error {
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
cn.peerImpl._request(r)
return nil
return cn.peerImpl._request(r), nil
}

func (me *PeerConn) _request(r Request) {
me.write(pp.Message{
func (me *PeerConn) _request(r Request) bool {
return me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
})
}

func (me *Peer) cancel(r Request) {
func (me *Peer) cancel(r Request) bool {
if me.deleteRequest(r) {
me.peerImpl._cancel(r)
return me.peerImpl._cancel(r)
}
return true
}

func (me *PeerConn) _cancel(r Request) {
me.write(makeCancelMessage(r))
func (me *PeerConn) _cancel(r Request) bool {
return me.write(makeCancelMessage(r))
}

func (cn *PeerConn) fillWriteBuffer() {
if !cn.applyNextRequestState() {
return
}
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
Expand Down Expand Up @@ -668,8 +663,7 @@ func (cn *PeerConn) postBitfield() {
}

func (cn *PeerConn) updateRequests() {
// log.Print("update requests")
cn.tickleWriter()
cn.t.cl.tickleRequester()
}

// Emits the indices in the Bitmaps bms in order, never repeating any index.
Expand Down Expand Up @@ -1286,7 +1280,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// out.
deletedRequest := false
{
if _, ok := c.requests[req]; ok {
if _, ok := c.actualRequestState.Requests[req]; ok {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
Expand Down Expand Up @@ -1468,14 +1462,15 @@ func (c *Peer) peerHasWantedPieces() bool {
}

func (c *Peer) numLocalRequests() int {
return len(c.requests)
return len(c.actualRequestState.Requests)
}

func (c *Peer) deleteRequest(r Request) bool {
if _, ok := c.requests[r]; !ok {
delete(c.nextRequestState.Requests, r)
if _, ok := c.actualRequestState.Requests[r]; !ok {
return false
}
delete(c.requests, r)
delete(c.actualRequestState.Requests, r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, r})
}
Expand All @@ -1493,12 +1488,13 @@ func (c *Peer) deleteRequest(r Request) bool {
}

func (c *Peer) deleteAllRequests() {
for r := range c.requests {
for r := range c.actualRequestState.Requests {
c.deleteRequest(r)
}
if len(c.requests) != 0 {
panic(len(c.requests))
if l := len(c.actualRequestState.Requests); l != 0 {
panic(l)
}
c.nextRequestState.Requests = nil
// for c := range c.t.conns {
// c.tickleWriter()
// }
Expand Down Expand Up @@ -1635,3 +1631,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
pc, ok := p.peerImpl.(*PeerConn)
return pc, ok
}

func (p *PeerConn) onNextRequestStateChanged() {
p.tickleWriter()
}
2 changes: 1 addition & 1 deletion pexconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
out = m
return true
}
<-c.messageWriter.writeCond.WaitChan()
<-c.messageWriter.writeCond.Signaled()
c.pex.Share(testWriter)
require.True(t, writerCalled)
require.EqualValues(t, pp.Extended, out.Type)
Expand Down
Loading

0 comments on commit 36f52d7

Please sign in to comment.