Skip to content

Commit

Permalink
Performance improvements to PEX
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Oct 22, 2020
1 parent 8088002 commit c1d189e
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 254 deletions.
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand All @@ -499,6 +500,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844 h1:LTTabKLNrhI/+rSSEG19b4pRD2ipmZx+DF5upbVZmrk=
github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syncthing/syncthing v0.14.48-rc.4/go.mod h1:nw3siZwHPA6M8iSfjDCWQ402eqvEIasMQOE8nFOxy7M=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8=
Expand Down Expand Up @@ -705,6 +710,8 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
93 changes: 4 additions & 89 deletions peer_protocol/pex.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package peer_protocol

import (
"net"

"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/torrent/bencode"
)
Expand All @@ -16,87 +14,8 @@ type PexMsg struct {
Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
}

func addrEqual(a, b *krpc.NodeAddr) bool {
return a.IP.Equal(b.IP) && a.Port == b.Port
}

func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
for i := range v {
if addrEqual(&v[i], a) {
return i
}
}
return -1
}

func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) {
if addr.IP.To4() != nil {
if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 {
// already added
return
}
if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
// on the dropped list - cancel out
m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
return
}
m.Added = append(m.Added, addr)
m.AddedFlags = append(m.AddedFlags, f)
} else if len(addr.IP) == net.IPv6len {
if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 {
// already added
return
}
if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
// on the dropped list - cancel out
m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
return
}
m.Added6 = append(m.Added6, addr)
m.Added6Flags = append(m.Added6Flags, f)
}
}

func (m *PexMsg) Drop(addr krpc.NodeAddr) {
if addr.IP.To4() != nil {
if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 {
// already dropped
return
}
if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
// on the added list - cancel out
m.Added = append(m.Added[:i], m.Added[i+1:]...)
m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
return
}
m.Dropped = append(m.Dropped, addr)
} else if len(addr.IP) == net.IPv6len {
if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 {
// already dropped
return
}
if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
// on the added list - cancel out
m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
return
}
m.Dropped6 = append(m.Dropped6, addr)
}
}

func (m *PexMsg) Len() int {
return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6)
}

// DeltaLen returns max of {added+added6, dropped+dropped6}
func (m *PexMsg) DeltaLen() int {
lenAdded := len(m.Added)+len(m.Added6)
lenDropped := len(m.Dropped)+len(m.Dropped6)
if lenAdded > lenDropped {
return lenAdded
}
return lenDropped
return len(m.Added) + len(m.Added6) + len(m.Dropped) + len(m.Dropped6)
}

func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
Expand All @@ -108,15 +27,11 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
}
}

func LoadPexMsg(b []byte) (*PexMsg, error) {
m := new(PexMsg)
if err := bencode.Unmarshal(b, m); err != nil {
return nil, err
}
return m, nil
func LoadPexMsg(b []byte) (ret PexMsg, err error) {
err = bencode.Unmarshal(b, &ret)
return
}


type PexPeerFlags byte

func (me PexPeerFlags) Get(f PexPeerFlags) bool {
Expand Down
126 changes: 0 additions & 126 deletions peer_protocol/pex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,132 +29,6 @@ func TestEmptyPexMsg(t *testing.T) {
require.NoError(t, bencode.Unmarshal(b, &pm))
}

func TestPexAdd(t *testing.T) {
addrs4 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
}
addrs6 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
}
f := PexPrefersEncryption | PexOutgoingConn

t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
m := new(PexMsg)
m.Drop(addrs[0])
m.Add(addrs[1], f)
for _, addr := range addrs {
m.Add(addr, f)
}
targ := &PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
},
AddedFlags: []PexPeerFlags{f, f, f},
Dropped: krpc.CompactIPv4NodeAddrs{},
}
require.EqualValues(t, targ, m)
})
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
m := new(PexMsg)
m.Drop(addrs[0])
m.Add(addrs[1], f)
for _, addr := range addrs {
m.Add(addr, f)
}
targ := &PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
},
Added6Flags: []PexPeerFlags{f, f, f},
Dropped6: krpc.CompactIPv6NodeAddrs{},
}
require.EqualValues(t, targ, m)
})
t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
xm := new(PexMsg)
xm.Add(addr, f)
require.EqualValues(t, 0, len(xm.Added))
require.EqualValues(t, 0, len(xm.AddedFlags))
require.EqualValues(t, 0, len(xm.Added6))
require.EqualValues(t, 0, len(xm.Added6Flags))
})
}

func TestPexDrop(t *testing.T) {
addrs4 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
}
addrs6 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
}
f := PexPrefersEncryption | PexOutgoingConn

t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
m := new(PexMsg)
m.Add(addrs[0], f)
m.Drop(addrs[1])
for _, addr := range addrs {
m.Drop(addr)
}
targ := &PexMsg{
AddedFlags: []PexPeerFlags{},
Added: krpc.CompactIPv4NodeAddrs{},
Dropped: krpc.CompactIPv4NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
},
}
require.EqualValues(t, targ, m)
})
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
m := new(PexMsg)
m.Add(addrs[0], f)
m.Drop(addrs[1])
for _, addr := range addrs {
m.Drop(addr)
}
targ := &PexMsg{
Added6Flags: []PexPeerFlags{},
Added6: krpc.CompactIPv6NodeAddrs{},
Dropped6: krpc.CompactIPv6NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
},
}
require.EqualValues(t, targ, m)
})
t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
xm := new(PexMsg)
xm.Drop(addr)
require.EqualValues(t, 0, len(xm.Dropped))
require.EqualValues(t, 0, len(xm.Dropped6))
})
}

func TestMarshalPexMessage(t *testing.T) {
addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
f := PexPrefersEncryption | PexOutgoingConn
Expand Down
2 changes: 2 additions & 0 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,8 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
return f
}

// This returns the address to use if we want to dial the peer again. It incorporates the peer's
// advertised listen port.
func (c *PeerConn) dialAddr() net.Addr {
if !c.outgoing && c.PeerListenPort != 0 {
switch addr := c.RemoteAddr.(type) {
Expand Down
Loading

2 comments on commit c1d189e

@anacrolix
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yarikk It's my impression that this implementation does the "more memory efficient approach" in:

Implementation Note: A simple approach is to queue up not-yet-sent connect/disconnect events for each connected peer and performing elision when generating the message. An slightly more complex but more memory-efficient approach is to keep a per-torrent timeline of connect/disconnect events and simply store a pointer for the point in time up to which events have already been sent for each peer. When creating a PEX message they only need to advance the pointer, take care to eliminate duplicates and to elide transient connections.

from BEP 11.

However there may be a memory leak, in that the event series is never trimmed. I also noted that the "naive" approach would probably actually be faster prior to the changes in this commit.

@yarikk
Copy link
Collaborator

@yarikk yarikk commented on c1d189e Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yarikk It's my impression that this implementation does the "more memory efficient approach"

Indeed, after few iterations I came up with this log-based approach which turned out to match what’s suggested in the BEP.

However there may be a memory leak, in that the event series is never trimmed.

That’s right, but it’s tricky to trim properly, as, say, having a drop without first having a matching add would constitute a protocol botch.

It’s not the best. But, on the brighter side, it allows to get away without extra locking, and is very simple in principle. I’m not saying it doesn’t worth searching for improvements.

I also noted that the "naive" approach would probably actually be faster prior to the changes in this commit.

Admittedly, the de-duplication code turned out to be suboptimal, and haven’t received sufficient attention on my part. It haven’t been moved beyond the “dumbest thing that works” up until this change of yours. Good catch!

Please sign in to comment.