Skip to content

Commit

Permalink
Use webrtc local addr for webrtc conn peer priority
Browse files Browse the repository at this point in the history
WebRTC conns are providing the correct IP for peer priority calculations, so use that instead of trying to guess (which doesn't work if there are no regular conn listeners attached to the Client.

(cherry picked from commit e86bb5fee3958dc90a3d012469b1352005d6c9ad)
  • Loading branch information
anacrolix committed Jul 12, 2022
1 parent aedf258 commit 214fe6b
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 31 deletions.
4 changes: 4 additions & 0 deletions bep40_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func TestBep40Priority(t *testing.T) {
IpPort{IP: net.ParseIP("123.213.32.10"), Port: 0},
IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0},
))
assert.Equal(t, peerPriority(0x2b41d456), bep40PriorityIgnoreError(
IpPort{IP: net.ParseIP("206.248.98.111"), Port: 0},
IpPort{IP: net.ParseIP("142.147.89.224"), Port: 0},
))
assert.EqualValues(t, "\x00\x00\x00\x00", func() []byte {
b, _ := bep40PriorityBytes(
IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0},
Expand Down
60 changes: 42 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,16 @@ func (cl *Client) incomingConnection(nc net.Conn) {
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
regularNetConnPeerConnConnString(nc))
remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
c := cl.newConnection(
nc,
newConnectionOpts{
outgoing: false,
remoteAddr: nc.RemoteAddr(),
localPublicAddr: cl.publicAddr(remoteAddr.IP),
network: nc.RemoteAddr().Network(),
connString: regularNetConnPeerConnConnString(nc),
})
defer func() {
cl.lock()
defer cl.unlock()
Expand Down Expand Up @@ -687,13 +695,12 @@ func (cl *Client) initiateProtocolHandshakes(
ctx context.Context,
nc net.Conn,
t *Torrent,
outgoing, encryptHeader bool,
remoteAddr PeerRemoteAddr,
network, connString string,
encryptHeader bool,
newConnOpts newConnectionOpts,
) (
c *PeerConn, err error,
) {
c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
c = cl.newConnection(nc, newConnOpts)
c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
Expand Down Expand Up @@ -725,7 +732,15 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
}
return nil, errors.New("dial failed")
}
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
outgoing: true,
remoteAddr: addr,
// It would be possible to retrieve a public IP from the dialer used here?
localPublicAddr: cl.publicAddr(addrIpPort.IP),
network: dr.Dialer.DialerNetwork(),
connString: regularNetConnPeerConnConnString(nc),
})
if err != nil {
nc.Close()
}
Expand Down Expand Up @@ -1468,28 +1483,37 @@ func (cl *Client) banPeerIP(ip net.IP) {
}
}

func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
if network == "" {
panic(remoteAddr)
type newConnectionOpts struct {
outgoing bool
remoteAddr PeerRemoteAddr
localPublicAddr peerLocalPublicAddr
network string
connString string
}

func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
if opts.network == "" {
panic(opts.remoteAddr)
}
c = &PeerConn{
Peer: Peer{
outgoing: outgoing,
outgoing: opts.outgoing,
choking: true,
peerChoking: true,
PeerMaxRequests: 250,

RemoteAddr: remoteAddr,
Network: network,
callbacks: &cl.config.Callbacks,
RemoteAddr: opts.remoteAddr,
localPublicAddr: opts.localPublicAddr,
Network: opts.network,
callbacks: &cl.config.Callbacks,
},
connString: connString,
connString: opts.connString,
conn: nc,
}
c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
if remoteAddr != nil {
netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String())
if opts.remoteAddr != nil {
netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
if err == nil {
c.bannableAddr = Some(netipAddrPort.Addr())
}
Expand All @@ -1501,7 +1525,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
l: cl.config.DownloadRateLimiter,
r: c.r,
}
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer)
}
Expand Down
2 changes: 2 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package torrent

type peerLocalPublicAddr = IpPort

func (p *Peer) isLowOnRequests() bool {
return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
}
Expand Down
13 changes: 8 additions & 5 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ type Peer struct {
peerImpl
callbacks *Callbacks

outgoing bool
Network string
RemoteAddr PeerRemoteAddr
bannableAddr Option[bannableAddr]
outgoing bool
Network string
RemoteAddr PeerRemoteAddr
// The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
// config.
localPublicAddr peerLocalPublicAddr
bannableAddr Option[bannableAddr]
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
Expand Down Expand Up @@ -1729,7 +1732,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
}

func (c *Peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
}

func (c *Peer) remoteIp() net.IP {
Expand Down
11 changes: 8 additions & 3 deletions peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
var cl Client
cl.init(TestingConfig(t))
cl.initLogger()
c := cl.newConnection(nil, false, nil, "io.Pipe", "")
c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err)
Expand Down Expand Up @@ -107,7 +107,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
t.onSetInfo()
t._pendingPieces.Add(0)
r, w := net.Pipe()
cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
cn := cl.newConnection(r, newConnectionOpts{
outgoing: true,
remoteAddr: r.RemoteAddr(),
network: r.RemoteAddr().Network(),
connString: regularNetConnPeerConnConnString(r),
})
cn.setTorrent(t)
mrlErrChan := make(chan error)
msg := pp.Message{
Expand Down Expand Up @@ -287,7 +292,7 @@ func TestPreferredNetworkDirection(t *testing.T) {
func TestReceiveLargeRequest(t *testing.T) {
c := qt.New(t)
cl := newTestingClient(t)
pc := cl.newConnection(nil, false, nil, "test", "")
pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
tor := cl.newTorrentForTesting()
tor.info = &metainfo.Info{PieceLength: 3 << 20}
pc.setTorrent(tor)
Expand Down
5 changes: 4 additions & 1 deletion pexconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func TestPexConnState(t *testing.T) {
cl.initLogger()
torrent := cl.newTorrent(metainfo.Hash{}, nil)
addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
c := cl.newConnection(nil, false, addr, addr.Network(), "")
c := cl.newConnection(nil, newConnectionOpts{
remoteAddr: addr,
network: addr.Network(),
})
c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
c.messageWriter.mu.Lock()
Expand Down
13 changes: 9 additions & 4 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,18 +1575,23 @@ func (t *Torrent) onWebRtcConn(
DataChannelContext: dcc,
}
peerRemoteAddr := netConn.RemoteAddr()
//t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
if t.cl.badPeerAddr(peerRemoteAddr) {
return
}
localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
pc, err := t.cl.initiateProtocolHandshakes(
context.Background(),
netConn,
t,
dcc.LocalOffered,
false,
netConn.RemoteAddr(),
webrtcNetwork,
fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
newConnectionOpts{
outgoing: dcc.LocalOffered,
remoteAddr: peerRemoteAddr,
localPublicAddr: localAddrIpPort,
network: webrtcNetwork,
connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
},
)
if err != nil {
t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
Expand Down

0 comments on commit 214fe6b

Please sign in to comment.