diff --git a/client.go b/client.go index 83828bb070..e9f3e909ff 100644 --- a/client.go +++ b/client.go @@ -440,19 +440,35 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) { func (cl *Client) Close() (errs []error) { var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() + + if cl.closed.IsSet() { + cl.unlock() + return + } + + var mu sync.Mutex for _, t := range cl.torrentsAsSlice() { - err := t.close(&closeGroup) - if err != nil { - errs = append(errs, err) - } + closeGroup.Add(1) + + go func(t *Torrent) { + defer closeGroup.Done() + + err := t.close() + if err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }(t) } + cl.closed.Set() + cl.unlock() + closeGroup.Wait() + // don't close resources until torrent closes are complete for i := range cl.onClose { cl.onClose[len(cl.onClose)-1-i]() } - cl.closed.Set() - cl.unlock() cl.event.Broadcast() - closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock() return } @@ -1522,7 +1538,9 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e err = fmt.Errorf("no such torrent") return } - err = t.close(wg) + wg.Add(1) + defer wg.Done() + err = t.close() delete(cl.torrents, infoHash) return } diff --git a/mmap_span/mmap_span.go b/mmap_span/mmap_span.go index a8fb518dfa..e5b28418fc 100644 --- a/mmap_span/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -39,11 +39,18 @@ func (ms *MMapSpan) Append(mMap Mmap) { func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) { ms.mu.Lock() defer ms.mu.Unlock() - if ms.flushTimer == nil { + if len(ms.mMaps) > 0 && ms.flushTimer == nil { ms.flushTimer = time.AfterFunc(ms.FlushTime, func() { - // TODO deal with logging errors - ms.flushMaps(onFlush, true) + ms.mu.Lock() + flushTimer := ms.flushTimer + ms.flushTimer = nil + ms.mu.Unlock() + + if flushTimer != nil { + // TODO deal with logging errors + ms.flushMaps(onFlush, true) + } }) } return @@ -63,21 +70,18 @@ func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error dirtyPieces = ms.dirtyPieces.Clone() dirtySize = ms.dirtySize - if ms.flushTimer != nil { - ms.flushTimer = nil - for _, mMap := range ms.mMaps { - err := mMap.Flush() - if err != nil { - errs = append(errs, err) + for _, mMap := range ms.mMaps { + err := mMap.Flush() + if err != nil { + errs = append(errs, err) - } } + } - if len(errs) == 0 { - flushedCallback = ms.FlushedCallback - ms.dirtyPieces = roaring.Bitmap{} - ms.dirtySize = 0 - } + if len(errs) == 0 { + flushedCallback = ms.FlushedCallback + ms.dirtyPieces = roaring.Bitmap{} + ms.dirtySize = 0 } return diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go index 8d5aea011d..79ed2cbce4 100644 --- a/torrent-piece-request-order.go +++ b/torrent-piece-request-order.go @@ -33,10 +33,16 @@ func (t *Torrent) clientPieceRequestOrderKey() interface{} { return t.storage.Capacity } -func (t *Torrent) deletePieceRequestOrder() { +func (t *Torrent) deletePieceRequestOrder(lockClient bool) { if t.storage == nil { return } + + if lockClient { + t.cl.lock() + defer t.cl.unlock() + } + cpro := t.cl.pieceRequestOrder key := t.clientPieceRequestOrderKey() pro := cpro[key] diff --git a/torrent.go b/torrent.go index 995219f194..5516ae28ea 100644 --- a/torrent.go +++ b/torrent.go @@ -1065,52 +1065,53 @@ func (t *Torrent) numPiecesCompleted(lock bool) (num pieceIndex) { return pieceIndex(t._completedPieces.GetCardinality()) } -func (t *Torrent) close(wg *sync.WaitGroup) (err error) { +func (t *Torrent) close() (err error) { if !t.closed.Set() { err = errors.New("already closed") return } + for _, f := range t.onClose { f() } + func() { + t.mu.Lock() + defer t.mu.Unlock() + + t.iterPeers(func(p *Peer) { + p.close(false) + }, false) + }() + if t.storage != nil { - closed := make(chan struct{}) - defer func() { closed <- struct{}{} }() - - wg.Add(1) - go func() { - defer wg.Done() - <-closed - t.storageLock.Lock() - defer t.storageLock.Unlock() - if f := t.storage.Close; f != nil { - err1 := f() - if err1 != nil { - t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1) - } - } - }() + t.deletePieceRequestOrder(true) } - t.mu.Lock() - defer t.mu.Unlock() + func() { + t.mu.Lock() + defer t.mu.Unlock() + t.assertAllPiecesRelativeAvailabilityZero(false) + t.pex.Reset() + t.cl.event.Broadcast() + t.pieceStateChanges.Close() + t.updateWantPeersEvent(false) + if t.hashResults != nil { + close(t.hashResults) + t.hashResults = nil + } + }() - t.iterPeers(func(p *Peer) { - p.close(false) - }, false) if t.storage != nil { - t.deletePieceRequestOrder() - } - t.assertAllPiecesRelativeAvailabilityZero(false) - t.pex.Reset() - t.cl.event.Broadcast() - t.pieceStateChanges.Close() - t.updateWantPeersEvent(false) - if t.hashResults != nil { - close(t.hashResults) - t.hashResults = nil + t.storageLock.Lock() + defer t.storageLock.Unlock() + if f := t.storage.Close; f != nil { + if err := f(); err != nil { + t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err) + } + } } + return } @@ -2643,7 +2644,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } err := p.Storage().MarkComplete(!hasDirtyChunks) - + if err == nil { t.allStats(func(cs *ConnStats) { cs.pieceCompleted(int64(p.length(true))) @@ -2773,7 +2774,7 @@ func (t *Torrent) tryCreateMorePieceHashers(lock bool) { } if t.hashResults == nil { t.hashResults = make(chan hashResult, t.cl.config.PieceHashersPerTorrent*16) - go t.processHashResults() + go t.processHashResults(t.hashResults) } }() } @@ -2856,7 +2857,14 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool { cs.pieceHashed(length) }) - t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr} + t.mu.RLock() + hashResults := t.hashResults + t.mu.RUnlock() + + select { + case hashResults <- hashResult{p.index, correct, failedPeers, copyErr}: + case <-t.closed.Done(): + } } }() @@ -2870,7 +2878,7 @@ type hashResult struct { copyErr error } -func (t *Torrent) processHashResults() { +func (t *Torrent) processHashResults(hashResults chan hashResult) { g, ctx := errgroup.WithContext(context.Background()) _, cancel := context.WithCancel(ctx) @@ -2883,12 +2891,25 @@ func (t *Torrent) processHashResults() { defer cancel() for !t.closed.IsSet() { - results := []hashResult{<-t.hashResults} + results := []hashResult{} + + select { + case result, ok := <-hashResults: + if ok { + results = append(results, result) + } + case <-t.closed.Done(): + return + } for done := false; !done; { select { - case result := <-t.hashResults: - results = append(results, result) + case result, ok := <-hashResults: + if ok { + results = append(results, result) + } + case <-t.closed.Done(): + return default: done = true } diff --git a/torrent_test.go b/torrent_test.go index 561f793d46..6c04951697 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -6,7 +6,6 @@ import ( "net" "os" "path/filepath" - "sync" "testing" g "github.com/anacrolix/generics" @@ -247,7 +246,6 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) { tt.onSetInfo(true, true) err = pc.peerSentHaveNone(true) c.Assert(err, qt.IsNil) - var wg sync.WaitGroup - tt.close(&wg) + tt.close() tt.assertAllPiecesRelativeAvailabilityZero(true) } diff --git a/tracker_scraper.go b/tracker_scraper.go index f3f4f54d24..4d32a7cbac 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "net/url" "sync" @@ -72,18 +73,52 @@ type trackerAnnounceResult struct { Completed time.Time } +type ilu struct { + err error + ips []net.IP + lookupTime time.Time +} + +var ipc = map[string]*ilu{} +var ipcmu sync.RWMutex + func (me *trackerScraper) getIp() (ip net.IP, err error) { - var ips []net.IP - if me.lookupTrackerIp != nil { - ips, err = me.lookupTrackerIp(&me.u) - } else { - // Do a regular dns lookup - ips, err = net.LookupIP(me.u.Hostname()) + // cache the ip lookup for 15 mins, this avoids + // spamming DNS on os's that don't cache DNS lookups + // Cache TTL between 1 and 6 hours + + ipcmu.RLock() + lu := ipc[me.u.String()] + ipcmu.RUnlock() + + if lu == nil || + time.Since(lu.lookupTime) > time.Hour+time.Duration(rand.Int63n(int64(5*time.Hour))) || + lu.err != nil && time.Since(lu.lookupTime) > 15*time.Minute { + var ips []net.IP + + if me.lookupTrackerIp != nil { + ips, err = me.lookupTrackerIp(&me.u) + } else { + // Do a regular dns lookup + ips, err = net.LookupIP(me.u.Hostname()) + } + + ipcmu.Lock() + lu = &ilu{ + err: err, + ips: ips, + lookupTime: time.Now(), + } + ipc[me.u.String()] = lu + ipcmu.Unlock() + } - if err != nil { + + if lu.err != nil { return } - if len(ips) == 0 { + + if len(lu.ips) == 0 { err = errors.New("no ips") return } @@ -93,7 +128,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) { err = errors.New("client is closed") return } - for _, ip = range ips { + for _, ip = range lu.ips { if me.t.cl.ipIsBlocked(ip) { continue } @@ -109,6 +144,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) { } return } + err = errors.New("no acceptable ips") return }