Skip to content

Commit

Permalink
PEX: add periodic deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
yarikk authored and anacrolix committed Apr 15, 2020
1 parent 93430aa commit 2b2c480
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 56 deletions.
16 changes: 0 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,22 +934,6 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
}
}

func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
if !ok {
return
}
m, seq := t.pex.Genmsg(0)
conn.pexSeq = seq
if m.Len() == 0 {
cl.logger.Printf("no initial PEX this time")
// FIXME see how can we schedule another initial for later
return
}
conn.logger.Printf("sending initial PEX message: %v", m)
conn.post(m.Message(xid))
}

func (cl *Client) dhtPort() (ret uint16) {
cl.eachDhtServer(func(s DhtServer) {
ret = uint16(missinggo.AddrPort(s.Addr()))
Expand Down
13 changes: 11 additions & 2 deletions peer_protocol/pex.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,24 @@ func (m *PexMsg) DeltaLen() int {
return lenDropped
}

func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
payload := bencode.MustMarshal(pexMsg)
func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
payload := bencode.MustMarshal(m)
return Message{
Type: Extended,
ExtendedID: pexExtendedId,
ExtendedPayload: payload,
}
}

func LoadPexMsg(b []byte) (*PexMsg, error) {
m := new(PexMsg)
if err := bencode.Unmarshal(b, m); err != nil {
return nil, err
}
return m, nil
}


type PexPeerFlags byte

func (me PexPeerFlags) Get(f PexPeerFlags) bool {
Expand Down
57 changes: 32 additions & 25 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type PeerConn struct {
// response.
metadataRequests []bool
sentHaves bitmap.Bitmap
pexSeq int
pex pexConnState

// Stuff controlled by the remote peer.
PeerID PeerID
Expand Down Expand Up @@ -321,6 +321,9 @@ func (cn *PeerConn) close() {
if !cn.closed.Set() {
return
}
if cn.pex.IsEnabled() {
cn.pex.Close()
}
cn.tickleWriter()
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
Expand Down Expand Up @@ -559,7 +562,9 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
}
cn.requestsLowWater = len(cn.requests) / 2
}

if cn.pex.IsEnabled() {
cn.pex.Share(msg) // gated internally
}
cn.upload(msg)
}

Expand Down Expand Up @@ -1132,9 +1137,9 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
}
}
c.requestPendingMetadata()
if !cl.config.DisablePEX {
cl.sendInitialPEX(c, t)
// BUG no sending PEX updates yet
if !t.cl.config.DisablePEX {
t.pex.Add(c) // we learnt enough now
c.pex.Init(c)
}
return nil
case metadataExtendedId:
Expand All @@ -1144,25 +1149,10 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
}
return nil
case pexExtendedId:
if cl.config.DisablePEX {
// TODO: Maybe close the connection. Check that we're not
// advertising that we support PEX if it's disabled.
return nil
}
c.logger.Printf("incoming PEX message")
var pexMsg pp.PexMsg
err := bencode.Unmarshal(payload, &pexMsg)
if err != nil {
return fmt.Errorf("error unmarshalling PEX message: %s", err)
if !c.pex.IsEnabled() {
return nil // or hang-up maybe?
}
npeers := len(pexMsg.Added6) + len(pexMsg.Added)
c.logger.Printf("adding %d peers from PEX", npeers)
torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
var peers Peers
peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
t.addPeers(peers)
return nil
return c.pex.Recv(payload)
default:
return fmt.Errorf("unexpected extended message ID: %v", id)
}
Expand Down Expand Up @@ -1488,15 +1478,32 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
if c.outgoing {
f |= pp.PexOutgoingConn
}
if c.utp() {
if c.remoteAddr != nil && strings.Contains(c.remoteAddr.Network(), "udp") {
f |= pp.PexSupportsUtp
}
return f
}

func (c *PeerConn) dialAddr() net.Addr {
if !c.outgoing && c.PeerListenPort != 0 {
switch addr := c.remoteAddr.(type) {
case *net.TCPAddr:
dialAddr := *addr
dialAddr.Port = c.PeerListenPort
return &dialAddr
case *net.UDPAddr:
dialAddr := *addr
dialAddr.Port = c.PeerListenPort
return &dialAddr
}
}
return c.remoteAddr
}

func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
f := c.pexPeerFlags()
return pexEvent{t, c.remoteAddr, f}
addr := c.dialAddr()
return pexEvent{t, addr, f}
}

func (c *PeerConn) String() string {
Expand Down
53 changes: 48 additions & 5 deletions peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
}

func TestPexPeerFlags(t *testing.T) {
func TestConnPexPeerFlags(t *testing.T) {
var (
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
)
var testcases = []struct {
conn *PeerConn
f pp.PexPeerFlags
Expand All @@ -153,13 +157,52 @@ func TestPexPeerFlags(t *testing.T) {
{&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
{&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
{&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
{&PeerConn{network: "udp4"}, pp.PexSupportsUtp},
{&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp},
{&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn},
{&PeerConn{network: "tcp6"}, 0},
{&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
{&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
{&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
{&PeerConn{remoteAddr: tcpAddr}, 0},
}
for i, tc := range testcases {
f := tc.conn.pexPeerFlags()
require.EqualValues(t, tc.f, f, i)
}
}

func TestConnPexEvent(t *testing.T) {
var (
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
)
var testcases = []struct {
t pexEventType
c *PeerConn
e pexEvent
}{
{
pexAdd,
&PeerConn{remoteAddr: udpAddr},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
},
{
pexDrop,
&PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
},
{
pexAdd,
&PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexAdd, dialTcpAddr, 0},
},
{
pexDrop,
&PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
},
}
for i, tc := range testcases {
e := tc.c.pexEvent(tc.t)
require.EqualValues(t, tc.e, e, i)
}
}
5 changes: 5 additions & 0 deletions pex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ func (s *pexState) Add(c *PeerConn) {
}
e := c.pexEvent(pexAdd)
s.ev = append(s.ev, e)
c.pex.Listed = true
}

func (s *pexState) Drop(c *PeerConn) {
if !c.pex.Listed {
// skip connections which were not previously Added
return
}
e := c.pexEvent(pexDrop)
s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
Expand Down
10 changes: 8 additions & 2 deletions pex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestPexAdded(t *testing.T) {
func TestPexDropped(t *testing.T) {
t.Run("belowTarg", func(t *testing.T) {
s := &pexState{nc: 1}
s.Drop(&PeerConn{remoteAddr: addrs[0]})
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
targ := &pexState{
hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: 0,
Expand All @@ -84,13 +84,19 @@ func TestPexDropped(t *testing.T) {
})
t.Run("aboveTarg", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{remoteAddr: addrs[0]})
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
targ := &pexState{
ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: pexTargAdded,
}
require.EqualValues(t, targ, s)
})
t.Run("aboveTargNotListed", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}})
targ := &pexState{nc: pexTargAdded + 1}
require.EqualValues(t, targ, s)
})
}

func TestPexReset(t *testing.T) {
Expand Down
116 changes: 116 additions & 0 deletions pexconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package torrent

import (
"fmt"
"time"

"github.com/anacrolix/log"

pp "github.com/anacrolix/torrent/peer_protocol"
)

const (
pexRetryDelay = 10 * time.Second
pexInterval = 1 * time.Minute
)

// per-connection PEX state
type pexConnState struct {
enabled bool
xid pp.ExtensionNumber
seq int
timer *time.Timer
gate chan struct{}
readyfn func()
torrent *Torrent
Listed bool
info log.Logger
dbg log.Logger
}

func (s *pexConnState) IsEnabled() bool {
return s.enabled
}

// Init is called from the reader goroutine upon the extended handshake completion
func (s *pexConnState) Init(c *PeerConn) {
xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
if !ok || xid == 0 || c.t.cl.config.DisablePEX {
return
}
s.xid = xid
s.seq = 0
s.torrent = c.t
s.info = c.t.cl.logger
s.dbg = c.logger
s.readyfn = c.tickleWriter
s.gate = make(chan struct{}, 1)
s.timer = time.AfterFunc(0, func() {
s.gate <- struct{}{}
s.readyfn() // wake up the writer
})
s.enabled = true
}

// schedule next PEX message
func (s *pexConnState) sched(delay time.Duration) {
s.timer.Reset(delay)
}

// generate next PEX message for the peer; returns nil if nothing yet to send
func (s *pexConnState) genmsg() *pp.PexMsg {
tx, seq := s.torrent.pex.Genmsg(s.seq)
if tx.Len() == 0 {
return nil
}
s.seq = seq
return tx
}

// Share is called from the writer goroutine if when it is woken up with the write buffers empty
func (s *pexConnState) Share(postfn messageWriter) {
select {
case <-s.gate:
if tx := s.genmsg(); tx != nil {
s.dbg.Print("sending PEX message: ", tx)
postfn(tx.Message(s.xid))
s.sched(pexInterval)
} else {
// no PEX to send this time - try again shortly
s.sched(pexRetryDelay)
}
default:
return
}
}

// Recv is called from the reader goroutine
func (s *pexConnState) Recv(payload []byte) error {
rx, err := pp.LoadPexMsg(payload)
if err != nil {
return fmt.Errorf("error unmarshalling PEX message: %s", err)
}
s.dbg.Print("incoming PEX message: ", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))

var peers Peers
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags)
s.dbg.Printf("adding %d peers from PEX", len(peers))
s.torrent.addPeers(peers)
// s.dbg.Print("known swarm now:", s.torrent.KnownSwarm())

// one day we may also want to:
// - check if the peer is not flooding us with PEX updates
// - handle drops somehow
// - detect malicious peers

return nil
}

func (s *pexConnState) Close() {
if s.timer != nil {
s.timer.Stop()
}
}
Loading

0 comments on commit 2b2c480

Please sign in to comment.