Skip to content

Commit

Permalink
added status locking flags
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jun 10, 2024
1 parent dfeedeb commit 5de146b
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 63 deletions.
6 changes: 3 additions & 3 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type peerImpl interface {
// Rebuke the peer
ban()
String() string
peerImplStatusLines() []string
peerImplStatusLines(lock bool) []string

// the peers is running low on requests and needs some more
// Note: for web peers this means low on web requests - as we
Expand All @@ -38,8 +38,8 @@ type peerImpl interface {
// guess at how many pieces are in a torrent, and assume they have all pieces based on them
// having sent haves for everything, but we don't know for sure. But if they send a have-all
// message, then it's clear that they do.
peerHasAllPieces(lockTorrent bool) (all, known bool)
peerPieces() *roaring.Bitmap
peerHasAllPieces(lock bool, lockTorrent bool) (all, known bool)
peerPieces(lock bool) *roaring.Bitmap

nominalMaxRequests(lock bool, lockTorrent bool) maxRequests
}
90 changes: 58 additions & 32 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,20 @@ func (cn *Peer) expectingChunks() bool {
return haveAllowedFastRequests
}

func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
cn.mu.RLock()
defer cn.mu.RUnlock()
func (cn *Peer) remoteChokingPiece(piece pieceIndex, lock bool) bool {
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}

return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
}

func (cn *Peer) cumInterest() time.Duration {
cn.mu.RLock()
defer cn.mu.RUnlock()
func (cn *Peer) cumInterest(lock bool) time.Duration {
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}

ret := cn.priorInterest
if cn.requestState.Interested {
Expand All @@ -196,9 +200,12 @@ func (cn *Peer) locker() *lockWithDeferreds {
return cn.t.cl.locker()
}

func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
cn.mu.RLock()
defer cn.mu.RUnlock()
func (cn *PeerConn) supportsExtension(ext pp.ExtensionName, lock bool) bool {
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}

_, ok := cn.PeerExtensionIDs[ext]
return ok
}
Expand All @@ -217,10 +224,10 @@ func (cn *Peer) bestPeerNumPieces(lock bool, lockTorrent bool) pieceIndex {
return cn.peerMinPieces
}

func (cn *Peer) completedString() string {
have := pieceIndex(cn.peerPieces().GetCardinality())
best := cn.bestPeerNumPieces(true, true)
if all, _ := cn.peerHasAllPieces(true); all {
func (cn *Peer) completedString(lock bool, lockTorrent bool) string {
have := pieceIndex(cn.peerPieces(lock).GetCardinality())
best := cn.bestPeerNumPieces(lock, lockTorrent)
if all, _ := cn.peerHasAllPieces(lock, lockTorrent); all {
have = best
}
return fmt.Sprintf("%d/%d", have, best)
Expand Down Expand Up @@ -292,12 +299,23 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
next(None[pieceIndex]())
}

func (cn *Peer) writeStatus(w io.Writer) {
func (cn *Peer) writeStatus(w io.Writer, lock bool, lockTorrent bool) {

if lockTorrent {
cn.t.mu.RLock()
defer cn.t.mu.RUnlock()
}

if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}

// \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
}
fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(false), "\n"))
prio, err := cn.peerPriority()
prioStr := fmt.Sprintf("%08x", prio)
if err != nil {
Expand All @@ -307,25 +325,23 @@ func (cn *Peer) writeStatus(w io.Writer) {
fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful(true)),
cn.cumInterest(),
eventAgeString(cn.lastHelpful(false)),
cn.cumInterest(false),
cn.totalExpectingTime(),
)

cn.mu.RLock()
lenPeerTouchedPieces := len(cn.peerTouchedPieces)
cn.mu.RUnlock()

fmt.Fprintf(w,
"%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
cn.completedString(false, false),
lenPeerTouchedPieces,
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
cn.requestState.Requests.GetCardinality(),
cn.requestState.Cancelled.GetCardinality(),
cn.peerImpl.nominalMaxRequests(true, true),
cn.peerImpl.nominalMaxRequests(false, false),
cn.PeerMaxRequests,
len(cn.peerRequests),
localClientReqq,
Expand Down Expand Up @@ -379,11 +395,11 @@ func (cn *Peer) peerHasPiece(piece pieceIndex, lock bool, lockTorrent bool) bool
defer cn.mu.RUnlock()
}

if all, known := cn.peerHasAllPieces(false); all && known {
if all, known := cn.peerHasAllPieces(false, false); all && known {
return true
}

return cn.peerPieces().ContainsInt(piece)
return cn.peerPieces(lock).ContainsInt(piece)
}

// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
Expand Down Expand Up @@ -865,22 +881,27 @@ func (cn *Peer) netGoodPiecesDirtied() int64 {
return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
}

func (c *Peer) peerHasWantedPieces(lockTorrent bool) bool {
func (c *Peer) peerHasWantedPieces(lock bool, lockTorrent bool) bool {
if lockTorrent {
c.t.mu.RLock()
defer c.t.mu.RUnlock()
}

if all, _ := c.peerHasAllPieces(false); all {
if lock {
c.mu.RLock()
defer c.mu.RUnlock()
}

if all, _ := c.peerHasAllPieces(false, false); all {
isEmpty := c.t._pendingPieces.IsEmpty()

return !c.t.haveAllPieces(false) && !isEmpty
}
if !c.t.haveInfo(false) {
return !c.peerPieces().IsEmpty()
return !c.peerPieces(false).IsEmpty()
}

return c.peerPieces().Intersects(&c.t._pendingPieces)
return c.peerPieces(false).Intersects(&c.t._pendingPieces)
}

// Returns true if an outstanding request is removed. Cancelled requests should be handled
Expand Down Expand Up @@ -1001,12 +1022,17 @@ func (l connectionTrust) Less(r connectionTrust) bool {
}

// Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims.
func (cn *Peer) newPeerPieces(lockTorrent bool) *roaring.Bitmap {
func (cn *Peer) newPeerPieces(lock bool, lockTorrent bool) (ret *roaring.Bitmap) {
// TODO: Can we use copy on write?
cn.mu.RLock()
ret := cn.peerPieces().Clone()
cn.mu.RUnlock()
if all, _ := cn.peerHasAllPieces(lockTorrent); all {
func() {
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}
ret = cn.peerPieces(false).Clone()
}()

if all, _ := cn.peerHasAllPieces(lock, lockTorrent); all {
if cn.t.haveInfo(true) {
ret.AddRange(0, bitmap.BitRange(cn.t.numPieces()))
} else {
Expand Down
29 changes: 19 additions & 10 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ type PeerConn struct {
outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
}

func (cn *PeerConn) pexStatus() string {
func (cn *PeerConn) pexStatus(lock bool) string {
if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
return "extended protocol disabled"
}
if cn.PeerExtensionIDs == nil {
return "pending extended handshake"
}
if !cn.supportsExtension(pp.ExtensionNamePex) {
if !cn.supportsExtension(pp.ExtensionNamePex, lock) {
return "unsupported"
}
if true {
Expand All @@ -101,13 +101,13 @@ func (cn *PeerConn) pexStatus() string {
}
}

func (cn *PeerConn) peerImplStatusLines() []string {
func (cn *PeerConn) peerImplStatusLines(lock bool) []string {
return []string{
cn.connString,
fmt.Sprintf("peer id: %+q", cn.PeerID),
fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
fmt.Sprintf("pex: %s", cn.pexStatus()),
fmt.Sprintf("pex: %s", cn.pexStatus(lock)),
}
}

Expand Down Expand Up @@ -147,13 +147,17 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) bool {
return ml.Less()
}

func (cn *PeerConn) peerHasAllPieces(lockTorrent bool) (all, known bool) {
func (cn *PeerConn) peerHasAllPieces(lock bool, lockTorrent bool) (all, known bool) {
if cn.peerSentHaveAll {
return true, true
}
if !cn.t.haveInfo(lockTorrent) {
return false, false
}
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}
return cn._peerPieces.GetCardinality() == uint64(cn.t.numPieces()), true
}

Expand All @@ -168,7 +172,12 @@ func (cn *PeerConn) setNumPieces(num pieceIndex, lock bool) {
cn.peerPiecesChanged(lock)
}

func (cn *PeerConn) peerPieces() *roaring.Bitmap {
func (cn *PeerConn) peerPieces(lock bool) *roaring.Bitmap {
if lock {
cn.mu.RLock()
defer cn.mu.RUnlock()
}

return &cn._peerPieces
}

Expand Down Expand Up @@ -1042,7 +1051,7 @@ func (c *PeerConn) uploadAllowed() bool {
if c.t.seeding(true) {
return true
}
if !c.peerHasWantedPieces(true) {
if !c.peerHasWantedPieces(true, true) {
return false
}
// Don't upload more than 100 KiB more than we download.
Expand Down Expand Up @@ -1188,7 +1197,7 @@ func (pc *PeerConn) String() string {
// Returns the pieces the peer could have based on their claims. If we don't know how many pieces
// are in the torrent, it could be a very large range if the peer has sent HaveAll.
func (pc *PeerConn) PeerPieces() *roaring.Bitmap {
return pc.newPeerPieces(true)
return pc.newPeerPieces(true, true)
}

func (pc *PeerConn) remoteIsTransmission() bool {
Expand Down Expand Up @@ -1216,12 +1225,12 @@ func (c *PeerConn) useful(lockTorrent bool) bool {
return false
}
if !t.haveInfo(lockTorrent) {
return c.supportsExtension("ut_metadata")
return c.supportsExtension("ut_metadata", true)
}
if t.seeding(lockTorrent) && c.peerInterested {
return true
}
if c.peerHasWantedPieces(lockTorrent) {
if c.peerHasWantedPieces(true, lockTorrent) {
return true
}
return false
Expand Down
9 changes: 5 additions & 4 deletions requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex, lock boo
// Whether we should allow sending not interested ("losing interest") to the peer. I noticed
// qBitTorrent seems to punish us for sending not interested when we're streaming and don't
// currently need anything.
func (p *Peer) allowSendNotInterested(lockTorrent bool) bool {
func (p *Peer) allowSendNotInterested(lock bool, lockTorrent bool) bool {
// Except for caching, we're not likely to lose pieces very soon.
if p.t.haveAllPieces(lockTorrent) {
return true
}
all, known := p.peerHasAllPieces(lockTorrent)
all, known := p.peerHasAllPieces(lock, lockTorrent)
if all || !known {
return false
}
Expand All @@ -333,12 +333,13 @@ func (p *Peer) allowSendNotInterested(lockTorrent bool) bool {
p.t.mu.RLock()
defer p.t.mu.RUnlock()
}
return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
return roaring.AndNot(p.peerPieces(lock), &p.t._completedPieces).IsEmpty()
}

// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState, lock bool, lockTorrent bool) {
t := p.t

if lockTorrent {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -352,7 +353,7 @@ func (p *Peer) applyRequestState(next desiredRequestState, lock bool, lockTorren
current := &p.requestState
// Make interest sticky
if !next.Interested && current.Interested {
if !p.allowSendNotInterested(false) {
if !p.allowSendNotInterested(false, false) {
next.Interested = true
}
}
Expand Down
Loading

0 comments on commit 5de146b

Please sign in to comment.