Skip to content

Commit

Permalink
Use netip.AddrPort in PEX code and filter unusable addrs much sooner
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed May 16, 2023
1 parent 9be2500 commit 5f2b09a
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 57 deletions.
7 changes: 7 additions & 0 deletions netip-addrport.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
return netip.ParseAddrPort(pra.String())
}
}

func krpcNodeAddrFromAddrPort(addrPort netip.AddrPort) krpc.NodeAddr {
return krpc.NodeAddr{
IP: addrPort.Addr().AsSlice(),
Port: int(addrPort.Port()),
}
}
11 changes: 8 additions & 3 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,15 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr {
return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
}

func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
f := c.pexPeerFlags()
addr := c.dialAddr()
return pexEvent{t, addr, f, nil}
dialAddr := c.dialAddr()
addr, err := addrPortFromPeerRemoteAddr(dialAddr)
if err != nil || !addr.IsValid() {
err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
return
}
return pexEvent{t, addr, f, nil}, nil
}

func (c *PeerConn) String() string {
Expand Down
31 changes: 22 additions & 9 deletions peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestConnPexPeerFlags(t *testing.T) {
}

func TestConnPexEvent(t *testing.T) {
c := qt.New(t)
var (
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
Expand All @@ -195,27 +196,39 @@ func TestConnPexEvent(t *testing.T) {
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
&PeerConn{
Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
PeerListenPort: dialTcpAddr.Port,
},
pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
},
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexAdd, dialTcpAddr, 0, nil},
&PeerConn{
Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
PeerListenPort: dialTcpAddr.Port,
},
pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
&PeerConn{
Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
PeerListenPort: dialUdpAddr.Port,
},
pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
},
}
for i, tc := range testcases {
e := tc.c.pexEvent(tc.t)
require.EqualValues(t, tc.e, e, i)
c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
e, err := tc.c.pexEvent(tc.t)
c.Assert(err, qt.IsNil)
c.Check(e, qt.Equals, tc.e)
})
}
}

Expand Down
43 changes: 17 additions & 26 deletions pex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package torrent

import (
"net"
"net/netip"
"sync"

"github.com/anacrolix/dht/v2/krpc"

pp "github.com/anacrolix/torrent/peer_protocol"
)

Expand All @@ -26,16 +25,16 @@ const (
// represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
type pexEvent struct {
t pexEventType
addr PeerRemoteAddr
addr netip.AddrPort
f pp.PexPeerFlags
next *pexEvent // event feed list
}

// facilitates efficient de-duplication while generating PEX messages
type pexMsgFactory struct {
msg pp.PexMsg
added map[addrKey]struct{}
dropped map[addrKey]struct{}
added map[netip.AddrPort]struct{}
dropped map[netip.AddrPort]struct{}
}

func (me *pexMsgFactory) DeltaLen() int {
Expand All @@ -44,11 +43,11 @@ func (me *pexMsgFactory) DeltaLen() int {
int64(len(me.dropped))))
}

type addrKey string
type addrKey = netip.AddrPort

This comment has been minimized.

Copy link
@yarikk

yarikk May 19, 2023

Collaborator

You may want to either loose the type or keep it universally, as this change replaces some but not all uses of addrKey type, @anacrolix

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 19, 2023

Author Owner

Thanks. I had observed that, but hadn't fully committed to it, and it looked like addrKey is a distinct concept, even if it can now be the same due to net/netip.Addr being comparable.

This comment has been minimized.

Copy link
@yarikk

yarikk May 19, 2023

Collaborator

iirc, the idea behind addrKey was to use it as keys of the map, which net.IPAddr could not be but the new net/netip.Addr can. It should be ok to convert fully, I think.

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 22, 2023

Author Owner

Thanks. I will not fully convert it now due to the extra effort but will leave it open to the next person who works on the code.


// Returns the key to use to identify a given addr in the factory.
func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey {
return addrKey(addr.String())
func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey {
return addr
}

// Returns whether the entry was added (we can check if we're cancelling out another entry and so
Expand All @@ -61,10 +60,7 @@ func (me *pexMsgFactory) add(e pexEvent) {
if me.added == nil {
me.added = make(map[addrKey]struct{}, pexMaxDelta)
}
addr, ok := nodeAddr(e.addr)
if !ok {
return
}
addr := krpcNodeAddrFromAddrPort(e.addr)
m := &me.msg
switch {
case addr.IP.To4() != nil:
Expand Down Expand Up @@ -96,10 +92,7 @@ func (me *pexMsgFactory) add(e pexEvent) {
// Returns whether the entry was added (we can check if we're cancelling out another entry and so
// won't hit the limit consuming this event).
func (me *pexMsgFactory) drop(e pexEvent) {
addr, ok := nodeAddr(e.addr)
if !ok {
return
}
addr := krpcNodeAddrFromAddrPort(e.addr)
key := me.addrKey(e.addr)
if me.dropped == nil {
me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
Expand Down Expand Up @@ -148,14 +141,6 @@ func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
return &me.msg
}

// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
// format.
func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
ipport, _ := tryIpPortFromNetAddr(addr)
ok := ipport.IP != nil
return krpc.NodeAddr{IP: ipport.IP, Port: ipport.Port}, ok
}

// Per-torrent PEX state
type pexState struct {
sync.RWMutex
Expand Down Expand Up @@ -184,6 +169,10 @@ func (s *pexState) append(e *pexEvent) {
}

func (s *pexState) Add(c *PeerConn) {
e, err := c.pexEvent(pexAdd)
if err != nil {
return
}
s.Lock()
defer s.Unlock()
s.nc++
Expand All @@ -194,7 +183,6 @@ func (s *pexState) Add(c *PeerConn) {
}
s.hold = s.hold[:0]
}
e := c.pexEvent(pexAdd)
c.pex.Listed = true
s.append(&e)
}
Expand All @@ -204,9 +192,12 @@ func (s *pexState) Drop(c *PeerConn) {
// skip connections which were not previously Added
return
}
e, err := c.pexEvent(pexDrop)

This comment has been minimized.

Copy link
@yarikk

yarikk May 19, 2023

Collaborator

Why move outside the lock?

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 19, 2023

Author Owner

Hm interesting. I think I did this because I observed that it can now fail here and not want to decrement nc. It looks like it works because all the state that pexEvent checks might not modified after PEX is initialized for a connection. I haven't confirmed that though, I should run it with the race detector in production. If you think it's going to fail, I'll move it back.

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 22, 2023

Author Owner

I checked this in production and no issues were detected. I don't prefer it either way.

if err != nil {
return
}
s.Lock()
defer s.Unlock()
e := c.pexEvent(pexDrop)
s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
s.hold = append(s.hold, e)
Expand Down
36 changes: 18 additions & 18 deletions pex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func TestPexReset(t *testing.T) {
require.EqualValues(t, targ, s)
}

func mustNodeAddr(addr net.Addr) krpc.NodeAddr {
ret, ok := nodeAddr(addr)
if !ok {
panic(addr)
func krpcNodeAddrFromNetAddr(addr net.Addr) krpc.NodeAddr {
addrPort, err := addrPortFromPeerRemoteAddr(addr)
if err != nil {
panic(err)
}
return ret
return krpcNodeAddrFromAddrPort(addrPort)
}

var testcases = []struct {
Expand Down Expand Up @@ -99,13 +99,13 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
krpcNodeAddrFromNetAddr(addrs[2]),
krpcNodeAddrFromNetAddr(addrs[3]),
},
AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
krpcNodeAddrFromNetAddr(addrs[0]),
krpcNodeAddrFromNetAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn},
},
Expand All @@ -120,10 +120,10 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
krpcNodeAddrFromNetAddr(addrs[2]),
},
Dropped6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
krpcNodeAddrFromNetAddr(addrs[0]),
},
},
},
Expand All @@ -144,7 +144,7 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
krpcNodeAddrFromNetAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
Expand All @@ -168,12 +168,12 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
krpcNodeAddrFromNetAddr(addrs[2]),
},
AddedFlags: []pp.PexPeerFlags{0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
krpcNodeAddrFromNetAddr(addrs[0]),
krpcNodeAddrFromNetAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0, 0},
},
Expand All @@ -193,7 +193,7 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
krpcNodeAddrFromNetAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
Expand All @@ -207,7 +207,7 @@ var testcases = []struct {
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
krpcNodeAddrFromNetAddr(addrs[0]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
Expand All @@ -216,7 +216,7 @@ var testcases = []struct {
},
targ1: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
krpcNodeAddrFromNetAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
Expand Down
2 changes: 1 addition & 1 deletion pexconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestPexConnState(t *testing.T) {
Added: krpc.CompactIPv4NodeAddrs(nil),
AddedFlags: []pp.PexPeerFlags{},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addr),
krpcNodeAddrFromNetAddr(addr),
},
Added6Flags: []pp.PexPeerFlags{0},
}
Expand Down

0 comments on commit 5f2b09a

Please sign in to comment.