Skip to content

Commit

Permalink
Merge pull request #27 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
Increase webseed parallelization
  • Loading branch information
mh0lt authored Aug 8, 2024
2 parents ee8ee63 + f458456 commit a1b13ba
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 75 deletions.
34 changes: 26 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,35 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) {
func (cl *Client) Close() (errs []error) {
var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()

if cl.closed.IsSet() {
cl.unlock()
return
}

var mu sync.Mutex
for _, t := range cl.torrentsAsSlice() {
err := t.close(&closeGroup)
if err != nil {
errs = append(errs, err)
}
closeGroup.Add(1)

go func(t *Torrent) {
defer closeGroup.Done()

err := t.close()
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(t)
}
cl.closed.Set()
cl.unlock()
closeGroup.Wait()
// don't close resources until torrent closes are complete
for i := range cl.onClose {
cl.onClose[len(cl.onClose)-1-i]()
}
cl.closed.Set()
cl.unlock()
cl.event.Broadcast()
closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
return
}

Expand Down Expand Up @@ -1522,7 +1538,9 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e
err = fmt.Errorf("no such torrent")
return
}
err = t.close(wg)
wg.Add(1)
defer wg.Done()
err = t.close()
delete(cl.torrents, infoHash)
return
}
Expand Down
34 changes: 19 additions & 15 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ func (ms *MMapSpan) Append(mMap Mmap) {
func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.flushTimer == nil {
if len(ms.mMaps) > 0 && ms.flushTimer == nil {
ms.flushTimer = time.AfterFunc(ms.FlushTime,
func() {
// TODO deal with logging errors
ms.flushMaps(onFlush, true)
ms.mu.Lock()
flushTimer := ms.flushTimer
ms.flushTimer = nil
ms.mu.Unlock()

if flushTimer != nil {
// TODO deal with logging errors
ms.flushMaps(onFlush, true)
}
})
}
return
Expand All @@ -63,21 +70,18 @@ func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error
dirtyPieces = ms.dirtyPieces.Clone()
dirtySize = ms.dirtySize

if ms.flushTimer != nil {
ms.flushTimer = nil
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)

}
}
}

if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}
if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}

return
Expand Down
8 changes: 7 additions & 1 deletion torrent-piece-request-order.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ func (t *Torrent) clientPieceRequestOrderKey() interface{} {
return t.storage.Capacity
}

func (t *Torrent) deletePieceRequestOrder() {
func (t *Torrent) deletePieceRequestOrder(lockClient bool) {
if t.storage == nil {
return
}

if lockClient {
t.cl.lock()
defer t.cl.unlock()
}

cpro := t.cl.pieceRequestOrder
key := t.clientPieceRequestOrderKey()
pro := cpro[key]
Expand Down
99 changes: 60 additions & 39 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,52 +1065,53 @@ func (t *Torrent) numPiecesCompleted(lock bool) (num pieceIndex) {
return pieceIndex(t._completedPieces.GetCardinality())
}

func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
func (t *Torrent) close() (err error) {
if !t.closed.Set() {
err = errors.New("already closed")
return
}

for _, f := range t.onClose {
f()
}

func() {
t.mu.Lock()
defer t.mu.Unlock()

t.iterPeers(func(p *Peer) {
p.close(false)
}, false)
}()

if t.storage != nil {
closed := make(chan struct{})
defer func() { closed <- struct{}{} }()

wg.Add(1)
go func() {
defer wg.Done()
<-closed
t.storageLock.Lock()
defer t.storageLock.Unlock()
if f := t.storage.Close; f != nil {
err1 := f()
if err1 != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
}
}
}()
t.deletePieceRequestOrder(true)
}

t.mu.Lock()
defer t.mu.Unlock()
func() {
t.mu.Lock()
defer t.mu.Unlock()
t.assertAllPiecesRelativeAvailabilityZero(false)
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(false)
if t.hashResults != nil {
close(t.hashResults)
t.hashResults = nil
}
}()

t.iterPeers(func(p *Peer) {
p.close(false)
}, false)
if t.storage != nil {
t.deletePieceRequestOrder()
}
t.assertAllPiecesRelativeAvailabilityZero(false)
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(false)
if t.hashResults != nil {
close(t.hashResults)
t.hashResults = nil
t.storageLock.Lock()
defer t.storageLock.Unlock()
if f := t.storage.Close; f != nil {
if err := f(); err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err)
}
}
}

return
}

Expand Down Expand Up @@ -2643,7 +2644,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
}

err := p.Storage().MarkComplete(!hasDirtyChunks)

if err == nil {
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(int64(p.length(true)))
Expand Down Expand Up @@ -2773,7 +2774,7 @@ func (t *Torrent) tryCreateMorePieceHashers(lock bool) {
}
if t.hashResults == nil {
t.hashResults = make(chan hashResult, t.cl.config.PieceHashersPerTorrent*16)
go t.processHashResults()
go t.processHashResults(t.hashResults)
}
}()
}
Expand Down Expand Up @@ -2856,7 +2857,14 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
cs.pieceHashed(length)
})

t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr}
t.mu.RLock()
hashResults := t.hashResults
t.mu.RUnlock()

select {
case hashResults <- hashResult{p.index, correct, failedPeers, copyErr}:
case <-t.closed.Done():
}
}
}()

Expand All @@ -2870,7 +2878,7 @@ type hashResult struct {
copyErr error
}

func (t *Torrent) processHashResults() {
func (t *Torrent) processHashResults(hashResults chan hashResult) {

g, ctx := errgroup.WithContext(context.Background())
_, cancel := context.WithCancel(ctx)
Expand All @@ -2883,12 +2891,25 @@ func (t *Torrent) processHashResults() {
defer cancel()

for !t.closed.IsSet() {
results := []hashResult{<-t.hashResults}
results := []hashResult{}

select {
case result, ok := <-hashResults:
if ok {
results = append(results, result)
}
case <-t.closed.Done():
return
}

for done := false; !done; {
select {
case result := <-t.hashResults:
results = append(results, result)
case result, ok := <-hashResults:
if ok {
results = append(results, result)
}
case <-t.closed.Done():
return
default:
done = true
}
Expand Down
4 changes: 1 addition & 3 deletions torrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net"
"os"
"path/filepath"
"sync"
"testing"

g "github.com/anacrolix/generics"
Expand Down Expand Up @@ -247,7 +246,6 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
tt.onSetInfo(true, true)
err = pc.peerSentHaveNone(true)
c.Assert(err, qt.IsNil)
var wg sync.WaitGroup
tt.close(&wg)
tt.close()
tt.assertAllPiecesRelativeAvailabilityZero(true)
}
54 changes: 45 additions & 9 deletions tracker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"sync"
Expand Down Expand Up @@ -72,18 +73,52 @@ type trackerAnnounceResult struct {
Completed time.Time
}

type ilu struct {
err error
ips []net.IP
lookupTime time.Time
}

var ipc = map[string]*ilu{}
var ipcmu sync.RWMutex

func (me *trackerScraper) getIp() (ip net.IP, err error) {
var ips []net.IP
if me.lookupTrackerIp != nil {
ips, err = me.lookupTrackerIp(&me.u)
} else {
// Do a regular dns lookup
ips, err = net.LookupIP(me.u.Hostname())
// cache the ip lookup for 15 mins, this avoids
// spamming DNS on os's that don't cache DNS lookups
// Cache TTL between 1 and 6 hours

ipcmu.RLock()
lu := ipc[me.u.String()]
ipcmu.RUnlock()

if lu == nil ||
time.Since(lu.lookupTime) > time.Hour+time.Duration(rand.Int63n(int64(5*time.Hour))) ||
lu.err != nil && time.Since(lu.lookupTime) > 15*time.Minute {
var ips []net.IP

if me.lookupTrackerIp != nil {
ips, err = me.lookupTrackerIp(&me.u)
} else {
// Do a regular dns lookup
ips, err = net.LookupIP(me.u.Hostname())
}

ipcmu.Lock()
lu = &ilu{
err: err,
ips: ips,
lookupTime: time.Now(),
}
ipc[me.u.String()] = lu
ipcmu.Unlock()

}
if err != nil {

if lu.err != nil {
return
}
if len(ips) == 0 {

if len(lu.ips) == 0 {
err = errors.New("no ips")
return
}
Expand All @@ -93,7 +128,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) {
err = errors.New("client is closed")
return
}
for _, ip = range ips {
for _, ip = range lu.ips {
if me.t.cl.ipIsBlocked(ip) {
continue
}
Expand All @@ -109,6 +144,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) {
}
return
}

err = errors.New("no acceptable ips")
return
}
Expand Down

0 comments on commit a1b13ba

Please sign in to comment.