Skip to content

Commit

Permalink
lock adjustments and debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jun 4, 2024
1 parent d0c268c commit d412968
Show file tree
Hide file tree
Showing 17 changed files with 408 additions and 298 deletions.
42 changes: 19 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
connString: regularNetConnPeerConnConnString(nc),
})
defer func() {
cl.lock()
defer cl.unlock()
c.close()
c.close(true)
}()
c.Discovery = PeerSourceIncoming
cl.runReceivedConn(c)
Expand Down Expand Up @@ -864,13 +862,16 @@ func (cl *Client) outgoingConnection(
opts outgoingConnOpts,
attemptKey outgoingConnAttemptKey,
) {
//fmt.Println("OC0", cl._mu.locker)
//defer fmt.Println("OC", "DONE")
c, err := cl.dialAndCompleteHandshake(opts)
if err == nil {
c.conn.SetWriteDeadline(time.Time{})
}
cl.lock()
defer cl.unlock()
// Don't release lock between here and addPeerConn, unless it's for failure.
//fmt.Println("OC1")
cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
if err != nil {
if cl.config.Debug {
Expand All @@ -883,7 +884,7 @@ func (cl *Client) outgoingConnection(
}
return
}
defer c.close()
defer c.close(true)
c.Discovery = opts.peerInfo.Source
c.trusted = opts.peerInfo.Trusted
opts.t.runHandshookConnLoggingErr(c)
Expand Down Expand Up @@ -1081,7 +1082,7 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error {
if err := t.addPeerConn(pc); err != nil {
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(pc)
defer t.dropConnection(pc, true)
pc.startMessageWriter()
pc.sendInitialMessages()
pc.initUpdateRequestsTimer()
Expand Down Expand Up @@ -1121,7 +1122,7 @@ func (c *Peer) updateRequestsTimerFunc() {
torrent.Add("spurious timer requests updates", 1)
return
}
c.updateRequests(peerUpdateRequestsTimerReason, false)
c.updateRequests(peerUpdateRequestsTimerReason, false, true)
}

// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
Expand Down Expand Up @@ -1340,9 +1341,6 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
// If the torrent already exists then this Storage is ignored and the
// existing torrent returned with `new` set to `false`
func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
fmt.Println("ATIHWS")
defer fmt.Println("ATIHWS", "DONE")

cl.torrentsMu.Lock()
defer cl.torrentsMu.Unlock()

Expand Down Expand Up @@ -1372,24 +1370,19 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
// Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
// then this Storage is ignored and the existing torrent returned with `new` set to `false`.
func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
fmt.Println("ATO")
defer fmt.Println("ATO", "DONE")

infoHash := opts.InfoHash
cl.lock()
defer cl.unlock()
fmt.Println("ATO1")
cl.torrentsMu.Lock()
defer cl.torrentsMu.Unlock()
fmt.Println("ATO1A")

t, ok := cl.torrents[infoHash]
if ok {
return
}
new = true

t = cl.newTorrentOpt(opts)
fmt.Println("ATO2")
cl.eachDhtServer(func(s DhtServer) {
if cl.config.PeriodicallyAnnounceTorrentsToDht {
go t.dhtAnnouncer(s)
Expand All @@ -1399,12 +1392,10 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
func() {
t.mu.Lock()
defer t.mu.Unlock()
fmt.Println("ATO3")
t.setInfoBytesLocked(opts.InfoBytes)
cl.clearAcceptLimits()
t.updateWantPeersEvent(false)
}()
fmt.Println("ATO4")
// Tickle Client.waitAccept, new torrent may want conns.
cl.event.Broadcast()
return
Expand Down Expand Up @@ -1600,13 +1591,18 @@ func (cl *Client) banPeerIP(ip net.IP) {
cl.badPeerIPsMu.Unlock()

for _, t := range cl.torrentsAsSlice() {
for _, p := range t.peersAsSlice(true) {
if p.remoteIp().Equal(ip) {
t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
// Should this be a close?
p.drop()
func() {
t.mu.Lock()
defer t.mu.Unlock()

for _, p := range t.peersAsSlice(false) {
if p.remoteIp().Equal(ip) {
t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
// Should this be a close?
p.drop(false)
}
}
}
}()
}
}

Expand Down
22 changes: 20 additions & 2 deletions deferrwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package torrent

import (
"sync/atomic"
"time"

"github.com/anacrolix/sync"
stack2 "github.com/go-stack/stack"
Expand All @@ -14,8 +15,12 @@ type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()

lc atomic.Int32
locker string
lc atomic.Int32
locker string
locktime time.Time
rlc atomic.Int32
rlmu sync.Mutex
rlocker [20]string
}

func stack(skip int) string {
Expand All @@ -26,6 +31,7 @@ func (me *lockWithDeferreds) Lock() {
me.internal.Lock()
me.lc.Add(1)
me.locker = stack(2)
me.locktime = time.Now()
}

func (me *lockWithDeferreds) Unlock() {
Expand All @@ -34,6 +40,7 @@ func (me *lockWithDeferreds) Unlock() {
panic("lock underflow")
}
me.locker = ""
me.locktime = time.Time{}
unlockActions := me.unlockActions
for i := 0; i < len(unlockActions); i += 1 {
unlockActions[i]()
Expand All @@ -44,9 +51,20 @@ func (me *lockWithDeferreds) Unlock() {

func (me *lockWithDeferreds) RLock() {
me.internal.RLock()
me.rlmu.Lock()
me.rlocker[me.rlc.Load()] = string(stack(2))
me.rlc.Add(1)
me.rlmu.Unlock()
}

func (me *lockWithDeferreds) RUnlock() {
me.rlmu.Lock()
me.rlc.Add(-1)
if me.rlc.Load() < 0 {
panic("lock underflow")
}
me.rlocker[me.rlc.Load()] = ""
me.rlmu.Unlock()
me.internal.RUnlock()
}

Expand Down
6 changes: 3 additions & 3 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ type FilePieceState struct {

// Returns the state of pieces in this file.
func (f *File) State() (ret []FilePieceState) {
f.t.cl.rLock()
defer f.t.cl.rUnlock()
f.t.mu.RLock()
defer f.t.mu.RUnlock()
pieceSize := int64(f.t.usualPieceSize())
off := f.offset % pieceSize
remaining := f.length
Expand All @@ -145,7 +145,7 @@ func (f *File) State() (ret []FilePieceState) {
if len1 > remaining {
len1 = remaining
}
ps := f.t.pieceState(i)
ps := f.t.pieceState(i, false)
ret = append(ret, FilePieceState{len1, ps})
off = 0
remaining -= len1
Expand Down
7 changes: 6 additions & 1 deletion peer-conn-msg-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol"
)

//var fwbCount atomic.Int64

func (pc *PeerConn) initMessageWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
//count := fwbCount.Add(1)
//fmt.Println("FWB0", count, pc.locker().locker)
//defer fwbCount.Add(-1)
pc.locker().Lock()
defer pc.locker().Unlock()
if pc.closed.IsSet() {
Expand All @@ -42,7 +47,7 @@ func (pc *PeerConn) startMessageWriter() {

func (pc *PeerConn) messageWriterRunner() {
defer pc.locker().Unlock()
defer pc.close()
defer pc.close(true)
defer pc.locker().Lock()
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}
Expand Down
6 changes: 3 additions & 3 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ import (
// legacy PeerConn methods.
type peerImpl interface {
// Trigger the actual request state to get updated
handleUpdateRequests()
handleUpdateRequests(lockTorrent bool)
writeInterested(interested bool) bool

// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
// handled by a follow-up event.
_cancel(RequestIndex) (acked bool)
_request(Request) bool
connectionFlags() string
onClose()
onClose(lockTorrent bool)
onGotInfo(info *metainfo.Info, lock bool)
// Drop connection. This may be a no-op if there is no connection.
drop()
drop(lockTorrent bool)
// Rebuke the peer
ban()
String() string
Expand Down
Loading

0 comments on commit d412968

Please sign in to comment.