Skip to content

Commit

Permalink
Break out peerConnWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jun 7, 2021
1 parent 925f5d1 commit 24ceed6
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 37 deletions.
34 changes: 29 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(c)
go c.writer(time.Minute)
c.startWriter()
cl.sendInitialMessages(c, t)
err := c.mainReadLoop()
if err != nil {
Expand All @@ -969,6 +969,32 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
return nil
}

func (pc *PeerConn) startWriter() {
w := &pc.messageWriter
*w = peerConnWriter{
fillWriteBuffer: func() {
pc.locker().Lock()
defer pc.locker().Unlock()
pc.fillWriteBuffer()
},
closed: &pc.closed,
logger: pc.logger,
w: pc.w,
keepAlive: func() bool {
pc.locker().Lock()
defer pc.locker().Unlock()
return pc.useful()
},
writeBuffer: new(bytes.Buffer),
}
go func() {
defer pc.locker().Unlock()
defer pc.close()
defer pc.locker().Lock()
pc.messageWriter.run(time.Minute)
}()
}

// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
Expand Down Expand Up @@ -1409,13 +1435,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
Network: network,
callbacks: &cl.config.Callbacks,
},
connString: connString,
conn: nc,
writeBuffer: new(bytes.Buffer),
connString: connString,
conn: nc,
}
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02
github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff
github.com/anacrolix/sync v0.2.0
github.com/anacrolix/sync v0.3.0
github.com/anacrolix/tagflag v1.3.0
github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425
github.com/anacrolix/utp v0.1.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ github.com/anacrolix/sync v0.0.0-20180611022320-3c4cb11f5a01/go.mod h1:+u91KiUuf
github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk=
github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f h1:7KqmZoEOIXa0UbR2WQ/YPF4H+MPV6rhWk4E4tcv5eDg=
github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/sync v0.3.0 h1:ZPjTrkqQWEfnYVGTQHh5qNjokWaXnjsyXTJSMsKY0TA=
github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
Expand Down Expand Up @@ -428,6 +432,7 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
Expand Down
32 changes: 32 additions & 0 deletions internal/chansync/broadcast-cond.go.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package chansync

import (
"github.com/anacrolix/sync"
)

// Can be used as zero-value. Due to the caller needing to bring their own synchronization, an
// eqiuvalent to "sync".Cond.Signal is not provided. BroadcastCond is intended to be selected on
// with other channels.
type BroadcastCond struct {
mu sync.Mutex
ch chan struct{}
}

func (me *BroadcastCond) Broadcast() {
me.mu.Lock()
defer me.mu.Unlock()
if me.ch != nil {
close(me.ch)
me.ch = nil
}
}

// Should be called before releasing locks on resources that might trigger subsequent Broadcasts.
func (me *BroadcastCond) WaitChan() <-chan struct{} {
me.mu.Lock()
defer me.mu.Unlock()
if me.ch == nil {
me.ch = make(chan struct{})
}
return me.ch
}
41 changes: 41 additions & 0 deletions internal/chansync/set-once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package chansync

import "sync"

// SetOnce is a boolean value that can only be flipped from false to true.
type SetOnce struct {
ch chan struct{}
initOnce sync.Once
closeOnce sync.Once
}

func (me *SetOnce) Chan() <-chan struct{} {
me.init()
return me.ch
}

func (me *SetOnce) init() {
me.initOnce.Do(func() {
me.ch = make(chan struct{})
})
}

// Set only returns true the first time it is called.
func (me *SetOnce) Set() (first bool) {
me.closeOnce.Do(func() {
me.init()
first = true
close(me.ch)
})
return
}

func (me *SetOnce) IsSet() bool {
me.init()
select {
case <-me.ch:
return true
default:
return false
}
}
83 changes: 56 additions & 27 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/sync"

"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/chansync"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
)
Expand Down Expand Up @@ -68,7 +68,7 @@ type Peer struct {
cryptoMethod mse.CryptoMethod
Discovery PeerSource
trusted bool
closed missinggo.Event
closed chansync.SetOnce
// Set true after we've added our ConnStats generated during handshake to
// other ConnStat instances as determined when the *Torrent became known.
reconciledHandshakeStats bool
Expand Down Expand Up @@ -148,11 +148,10 @@ type PeerConn struct {
w io.Writer
r io.Reader

writeBuffer *bytes.Buffer
uploadTimer *time.Timer
writerCond sync.Cond
messageWriter peerConnWriter

pex pexConnState
uploadTimer *time.Timer
pex pexConnState
}

func (cn *PeerConn) connStatusString() string {
Expand Down Expand Up @@ -429,17 +428,25 @@ const writeBufferHighWaterLen = 1 << 15
// done asynchronously, so it may be that we're not able to honour backpressure from this method.
func (cn *PeerConn) write(msg pp.Message) bool {
torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
// We don't need to track bytes here because a connection.w Writer wrapper takes care of that
// (although there's some delay between us recording the message, and the connection writer
// We don't need to track bytes here because the connection's Writer has that behaviour injected
// (although there's some delay between us buffering the message, and the connection writer
// flushing it out.).
cn.writeBuffer.Write(msg.MustMarshalBinary())
notFull := cn.messageWriter.write(msg)
// Last I checked only Piece messages affect stats, and we don't write those.
cn.wroteMsg(&msg)
cn.tickleWriter()
return notFull
}

func (cn *peerConnWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}

func (cn *PeerConn) writeBufferFull() bool {
func (cn *peerConnWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}

Expand Down Expand Up @@ -636,48 +643,70 @@ func (cn *PeerConn) fillWriteBuffer() {
cn.upload(cn.write)
}

type peerConnWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
closed *chansync.SetOnce
logger log.Logger
w io.Writer
keepAlive func() bool

mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
writeBuffer *bytes.Buffer
}

// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
var (
lastWrite time.Time = time.Now()
keepAliveTimer *time.Timer
)
keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
cn.locker().Lock()
defer cn.locker().Unlock()
cn.mu.Lock()
defer cn.mu.Unlock()
if time.Since(lastWrite) >= keepAliveTimeout {
cn.tickleWriter()
cn.writeCond.Broadcast()
}
keepAliveTimer.Reset(keepAliveTimeout)
})
cn.locker().Lock()
defer cn.locker().Unlock()
defer cn.close()
cn.mu.Lock()
defer cn.mu.Unlock()
defer keepAliveTimer.Stop()
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
if cn.writeBuffer.Len() == 0 {
cn.fillWriteBuffer()
func() {
cn.mu.Unlock()
defer cn.mu.Lock()
cn.fillWriteBuffer()
}()
}
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.useful() {
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
// TODO: Minimize wakeups....
cn.writerCond.Wait()
writeCond := cn.writeCond.WaitChan()
cn.mu.Unlock()
select {
case <-cn.closed.Chan():
case <-writeCond:
}
cn.mu.Lock()
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.locker().Unlock()
cn.mu.Unlock()
n, err := cn.w.Write(frontBuf.Bytes())
cn.locker().Lock()
cn.mu.Lock()
if n != 0 {
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
Expand Down Expand Up @@ -1463,7 +1492,7 @@ func (c *PeerConn) uploadAllowed() bool {

func (c *PeerConn) setRetryUploadTimer(delay time.Duration) {
if c.uploadTimer == nil {
c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
c.uploadTimer = time.AfterFunc(delay, c.tickleWriter)
} else {
c.uploadTimer.Reset(delay)
}
Expand Down Expand Up @@ -1558,7 +1587,7 @@ func (c *Peer) deleteAllRequests() {
// This is called when something has changed that should wake the writer, such as putting stuff into
// the writeBuffer, or changing some state that the writer can act on.
func (c *PeerConn) tickleWriter() {
c.writerCond.Broadcast()
c.messageWriter.writeCond.Broadcast()
}

func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
Expand Down
3 changes: 1 addition & 2 deletions peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
Expand All @@ -32,7 +31,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
r, w := io.Pipe()
//c.r = r
c.w = w
go c.writer(time.Minute)
c.startWriter()
c.locker().Lock()
c.t._completedPieces.Add(1)
c.postBitfield( /*[]bool{false, true, false}*/ )
Expand Down
4 changes: 2 additions & 2 deletions pexconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestPexConnState(t *testing.T) {
c := cl.newConnection(nil, false, addr, addr.Network(), "")
c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
c.writerCond.L.Lock()
c.messageWriter.mu.Lock()
c.setTorrent(torrent)
torrent.addPeerConn(c)

Expand All @@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
out = m
return true
}
c.writerCond.Wait()
<-c.messageWriter.writeCond.WaitChan()
c.pex.Share(testWriter)
require.True(t, writerCalled)
require.EqualValues(t, pp.Extended, out.Type)
Expand Down

0 comments on commit 24ceed6

Please sign in to comment.