Skip to content

Commit

Permalink
lock torrent on connection handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jun 9, 2024
1 parent 353d096 commit 2c43dbd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 48 deletions.
77 changes: 43 additions & 34 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,40 +1054,49 @@ func (cl *Client) runReceivedConn(c *PeerConn) {

// Client lock must be held before entering this.
func (t *Torrent) runHandshookConn(pc *PeerConn) error {
pc.setTorrent(t)
cl := t.cl
for i, b := range cl.config.MinPeerExtensions {
if pc.PeerExtensionBytes[i]&b != b {
return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
if err := func() error {
t.mu.Lock()
defer t.mu.Unlock()

pc.setTorrent(t)
cl := t.cl
for i, b := range cl.config.MinPeerExtensions {
if pc.PeerExtensionBytes[i]&b != b {
return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
}
}
}
if pc.PeerID == cl.peerID {
if pc.outgoing {
connsToSelf.Add(1)
addr := pc.RemoteAddr.String()
cl.dopplegangerAddrs[addr] = struct{}{}
} /* else {
// Because the remote address is not necessarily the same as its client's torrent listen
// address, we won't record the remote address as a doppleganger. Instead, the initiator
// can record *us* as the doppleganger.
} */
t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
if pc.PeerID == cl.peerID {
if pc.outgoing {
connsToSelf.Add(1)
addr := pc.RemoteAddr.String()
cl.dopplegangerAddrs[addr] = struct{}{}
} /* else {
// Because the remote address is not necessarily the same as its client's torrent listen
// address, we won't record the remote address as a doppleganger. Instead, the initiator
// can record *us* as the doppleganger.
} */
t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
return nil
}
pc.r = deadlineReader{pc.conn, pc.r}
completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
if connIsIpv6(pc.conn) {
torrent.Add("completed handshake over ipv6", 1)
}
if err := t.addPeerConn(pc, false); err != nil {
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(pc, false)
pc.startMessageWriter(false)
pc.sendInitialMessages()
pc.initUpdateRequestsTimer()

return nil
}(); err != nil {
return err
}
pc.r = deadlineReader{pc.conn, pc.r}
completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
if connIsIpv6(pc.conn) {
torrent.Add("completed handshake over ipv6", 1)
}
if err := t.addPeerConn(pc); err != nil {
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(pc, true)
pc.startMessageWriter()
pc.sendInitialMessages()
pc.initUpdateRequestsTimer()
err := pc.mainReadLoop()
if err != nil {

if err := pc.mainReadLoop(); err != nil {
return fmt.Errorf("main read loop: %w", err)
}
return nil
Expand All @@ -1109,14 +1118,14 @@ const peerUpdateRequestsTimerReason = "updateRequestsTimer"
func (c *Peer) updateRequestsTimerFunc() {
c.t.mu.Lock()
defer c.t.mu.Unlock()

c.mu.Lock()
defer c.mu.Unlock()

if c.closed.IsSet() {
return
}
if c.isLowOnRequests(false,false) {
if c.isLowOnRequests(false, false) {
// If there are no outstanding requests, then a request update should have already run.
return
}
Expand Down
15 changes: 4 additions & 11 deletions peer-conn-msg-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol"
)

//var fwbCount atomic.Int64

func (pc *PeerConn) initMessageWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
//count := fwbCount.Add(1)
//fmt.Println("FWB0", count, pc.locker().locker)
//defer fwbCount.Add(-1)
if pc.closed.IsSet() {
return
}
Expand All @@ -38,15 +33,13 @@ func (pc *PeerConn) initMessageWriter() {
}
}

func (pc *PeerConn) startMessageWriter() {
func (pc *PeerConn) startMessageWriter(lockTorrent bool) {
pc.initMessageWriter()
go pc.messageWriterRunner()
go pc.messageWriterRunner(lockTorrent)
}

func (pc *PeerConn) messageWriterRunner() {
defer pc.locker().Unlock()
defer pc.close(true)
defer pc.locker().Lock()
func (pc *PeerConn) messageWriterRunner(lockTorrent bool) {
defer pc.close(lockTorrent)
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}

Expand Down
8 changes: 5 additions & 3 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,7 +2361,7 @@ func (t *Torrent) reconcileHandshakeStats(c *Peer) {
}

// Returns true if the connection is added.
func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
func (t *Torrent) addPeerConn(c *PeerConn, lockTorrent bool) (err error) {
defer func() {
if err == nil {
torrent.Add("added connections", 1)
Expand All @@ -2372,8 +2372,10 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
return errors.New("torrent closed")
}

t.mu.Lock()
defer t.mu.Unlock()
if lockTorrent {
t.mu.Lock()
defer t.mu.Unlock()
}

conns := t.peerConnsAsSlice(false)
defer conns.free()
Expand Down

0 comments on commit 2c43dbd

Please sign in to comment.