Skip to content

Commit

Permalink
added status locking flags
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jun 10, 2024
1 parent ae3bf3c commit dfeedeb
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 52 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
addr: addr.String(),
}
defer dialPool.startDrainer()
dialTimeout := opts.t.getDialTimeoutUnlocked()
dialTimeout := opts.t.dialTimeout(true)
{
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
Expand Down
57 changes: 50 additions & 7 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"sync"
"time"

"github.com/anacrolix/torrent/segments"
)
Expand All @@ -15,9 +16,12 @@ type Mmap interface {
}

type MMapSpan struct {
mu sync.RWMutex
mMaps []Mmap
segmentLocater segments.Index
mu sync.RWMutex
mMaps []Mmap
segmentLocater segments.Index
flushTimer *time.Timer
FlushTime time.Duration
FlushedCallback func()
}

func (ms *MMapSpan) Append(mMap Mmap) {
Expand All @@ -27,18 +31,57 @@ func (ms *MMapSpan) Append(mMap Mmap) {
func (ms *MMapSpan) Flush() (errs []error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)
if ms.flushTimer != nil {
ms.flushTimer = time.AfterFunc(ms.FlushTime,
func() {
// TODO deal with logging errors
ms.flushMaps()
})
}
return
}

func (ms *MMapSpan) flushMaps() (errs []error) {
var flushedCallback func()

errs = func() (errs []error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if ms.flushTimer != nil {
ms.flushTimer = nil
/*for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)
}
}*/

if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
}
}

return
}()

if flushedCallback != nil {
flushedCallback()
}

return
}

func (ms *MMapSpan) Close() (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()

if ms.flushTimer != nil {
ms.flushTimer.Stop()
errs = ms.flushMaps()
ms.flushTimer = nil
}

for _, mMap := range ms.mMaps {
err := mMap.Unmap()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestHaveAllThenBitfield(t *testing.T) {
}, true), qt.IsNil)
pc.t.onSetInfo(true, true)
c.Check(tt.numPieces(), qt.Equals, 7)
c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
c.Check(tt.pieceAvailabilityRuns(true), qt.DeepEquals, []pieceAvailabilityRun{
// The last element of the bitfield is irrelevant, as the Torrent actually only has 7
// pieces.
{2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
Expand Down
15 changes: 12 additions & 3 deletions storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"os"
"path/filepath"
"time"

"github.com/anacrolix/missinggo/v2"
"github.com/edsrzf/mmap-go"
Expand All @@ -35,7 +36,11 @@ func NewMMapWithCompletion(baseDir string, completion PieceCompletion) *mmapClie
}

func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir)
span, err := mMapTorrent(info, s.baseDir, 1500*time.Millisecond, func() {
if commiter, ok := s.pc.(interface{ Commit() }); ok {
commiter.Commit()
}
})
t := &mmapTorrentStorage{
infoHash: infoHash,
span: span,
Expand Down Expand Up @@ -110,8 +115,12 @@ func (sp mmapStoragePiece) MarkNotComplete() error {
return nil
}

func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) {
mms = &mmap_span.MMapSpan{}
func mMapTorrent(md *metainfo.Info, location string, flushTime time.Duration, flushedCallback func()) (mms *mmap_span.MMapSpan, err error) {
mms = &mmap_span.MMapSpan{
FlushTime: flushTime,
FlushedCallback: flushedCallback,
}

defer func() {
if err != nil {
mms.Close()
Expand Down
2 changes: 1 addition & 1 deletion t.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *Torrent) Length() int64 {
// Returns a run-time generated metainfo for the torrent that includes the
// info bytes and announce-list as currently known to the client.
func (t *Torrent) Metainfo() metainfo.MetaInfo {
return t.newMetaInfo()
return t.newMetaInfo(true)
}

func (t *Torrent) addReader(r *reader) {
Expand Down
81 changes: 43 additions & 38 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,11 @@ func (me pieceAvailabilityRun) String() string {
return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
}

func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) pieceAvailabilityRuns(lock bool) (ret []pieceAvailabilityRun) {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
Expand All @@ -817,9 +819,11 @@ func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
return
}

func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) pieceAvailabilityFrequencies(lock bool) (freqs []int) {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

freqs = make([]int, t.numActivePeers()+1)
for i := range t.pieces {
Expand Down Expand Up @@ -924,15 +928,15 @@ func (t *Torrent) writeStatus(w io.Writer) {
// availability frequencies instead.
if false {
fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
for _, run := range t.pieceAvailabilityRuns() {
for _, run := range t.pieceAvailabilityRuns(false) {
ret = append(ret, run.String())
}
return
}(), " "))
}
fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
func() (ret []string) {
for avail, freq := range t.pieceAvailabilityFrequencies() {
for avail, freq := range t.pieceAvailabilityFrequencies(false) {
if freq == 0 {
continue
}
Expand Down Expand Up @@ -1020,9 +1024,11 @@ func (t *Torrent) haveInfo(lock bool) bool {

// Returns a run-time generated MetaInfo that includes the info bytes and
// announce-list as currently known to the client.
func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) newMetaInfo(lock bool) metainfo.MetaInfo {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

return metainfo.MetaInfo{
CreationDate: time.Now().Unix(),
Expand Down Expand Up @@ -1141,17 +1147,19 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
if t.storage != nil {
t.deletePieceRequestOrder()
}
t.assertAllPiecesRelativeAvailabilityZero()
t.assertAllPiecesRelativeAvailabilityZero(true)
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(true)
return
}

func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) assertAllPiecesRelativeAvailabilityZero(lock bool) {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

for i := range t.pieces {
p := &t.pieces[i]
Expand Down Expand Up @@ -2266,7 +2274,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
// We're also announcing ourselves as a listener, so we don't just want peer addresses.
// TODO: We can include the announce_peer step depending on whether we can receive
// inbound connections. We should probably only announce once every 15 mins too.
if !t.wantAnyConns() {
if !t.wantAnyConns(true) {
goto wait
}
// TODO: Determine if there's a listener on the port we're announcing.
Expand Down Expand Up @@ -2454,9 +2462,11 @@ func (t *Torrent) newConnsAllowed(lock bool) bool {
return true
}

func (t *Torrent) wantAnyConns() bool {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) wantAnyConns(lock bool) bool {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

if !t.networkingEnabled.Bool() {
return false
Expand Down Expand Up @@ -2589,13 +2599,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
p.mu.RUnlock()
t.clearPieceTouchers(piece, true)

/* Temp for testing
hasDirty := p.hasDirtyChunks()
if hasDirty {
if p.hasDirtyChunks(true) {
p.Flush() // You can be synchronous here!
}
*/

err := p.Storage().MarkComplete()
if err != nil {
t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
Expand Down Expand Up @@ -2750,7 +2757,7 @@ func (t *Torrent) tryCreatePieceHasher() bool {
}()

for !t.closed.IsSet() {
pi, ok := t.getPieceToHash()
pi, ok := t.getPieceToHash(true)
if !ok {
break
}
Expand Down Expand Up @@ -2828,9 +2835,11 @@ func (t *Torrent) processHashResults() {
}
}

func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) getPieceToHash(lock bool) (ret pieceIndex, ok bool) {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
if t.piece(i, false).hashing {
Expand Down Expand Up @@ -3027,9 +3036,12 @@ func (t *Torrent) numQueuedForHash(lock bool) uint64 {
return t.piecesQueuedForHash.Len()
}

func (t *Torrent) dialTimeout() time.Duration {
t.mu.RLock()
defer t.mu.RUnlock()
func (t *Torrent) dialTimeout(lock bool) time.Duration {
if lock {
t.mu.RLock()
defer t.mu.RUnlock()
}

return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
}

Expand Down Expand Up @@ -3561,10 +3573,3 @@ func (t *Torrent) numHalfOpenAttempts(lock bool) (num int) {
}
return
}

func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
cl := t.cl
cl.rLock()
defer cl.rUnlock()
return t.dialTimeout()
}
2 changes: 1 addition & 1 deletion torrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,5 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
c.Assert(err, qt.IsNil)
var wg sync.WaitGroup
tt.close(&wg)
tt.assertAllPiecesRelativeAvailabilityZero()
tt.assertAllPiecesRelativeAvailabilityZero(true)
}

0 comments on commit dfeedeb

Please sign in to comment.