Skip to content

Commit

Permalink
Optimize request strategy for torrent storage with unlimited capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 12, 2024
1 parent 1416648 commit 2328bb4
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 64 deletions.
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {

func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
p.t.updatePieceRequestOrderPiece(p.index)
p.readerCond.Broadcast()
}

func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
p.t.updatePieceRequestOrderPiece(p.index)
}

func (p *Piece) numChunks() chunkIndexType {
Expand Down
58 changes: 43 additions & 15 deletions request-strategy-impls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,66 @@ package torrent

import (
g "github.com/anacrolix/generics"

"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)

type requestStrategyInput struct {
cl *Client
capFunc storage.TorrentCapacity
type requestStrategyInputCommon struct {
maxUnverifiedBytes int64
}

func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)}
func (r requestStrategyInputCommon) MaxUnverifiedBytes() int64 {
return r.maxUnverifiedBytes
}

func (r requestStrategyInput) Capacity() (int64, bool) {
if r.capFunc == nil {
return 0, false
}
type requestStrategyInputMultiTorrent struct {
requestStrategyInputCommon
torrents map[metainfo.Hash]*Torrent
capFunc storage.TorrentCapacity
}

func (r requestStrategyInputMultiTorrent) Torrent(ih metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{g.MapMustGet(r.torrents, ih)}
}

func (r requestStrategyInputMultiTorrent) Capacity() (int64, bool) {
return (*r.capFunc)()
}

func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
return r.cl.config.MaxUnverifiedBytes
type requestStrategyInputSingleTorrent struct {
requestStrategyInputCommon
t *Torrent
}

var _ request_strategy.Input = requestStrategyInput{}
func (r requestStrategyInputSingleTorrent) Torrent(_ metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{r.t}
}

func (r requestStrategyInputSingleTorrent) Capacity() (cap int64, capped bool) {
return 0, false
}

var _ request_strategy.Input = requestStrategyInputSingleTorrent{}

func (cl *Client) getRequestStrategyInputCommon() requestStrategyInputCommon {
return requestStrategyInputCommon{cl.config.MaxUnverifiedBytes}
}

// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
return requestStrategyInput{
cl: cl,
capFunc: primaryTorrent.storage.Capacity,
if primaryTorrent.storage.Capacity == nil {
return requestStrategyInputSingleTorrent{
requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
t: primaryTorrent,
}
} else {
return requestStrategyInputMultiTorrent{
requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
torrents: cl.torrents,
capFunc: primaryTorrent.storage.Capacity,
}
}
}

Expand Down
103 changes: 103 additions & 0 deletions request-strategy-impls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"runtime"
"testing"

"github.com/anacrolix/missinggo/v2/iter"
"github.com/davecgh/go-spew/spew"
qt "github.com/frankban/quicktest"

"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)

func makeRequestStrategyPiece(t request_strategy.Torrent) request_strategy.Piece {
Expand All @@ -27,3 +30,103 @@ func TestRequestStrategyPieceDoesntAlloc(t *testing.T) {
// We have to use p, or it gets optimized away.
spew.Fdump(io.Discard, p)
}

type storagePiece struct {
complete bool
}

func (s storagePiece) ReadAt(p []byte, off int64) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (s storagePiece) MarkComplete() error {
//TODO implement me
panic("implement me")
}

func (s storagePiece) MarkNotComplete() error {
//TODO implement me
panic("implement me")
}

func (s storagePiece) Completion() storage.Completion {
return storage.Completion{Ok: true, Complete: s.complete}
}

var _ storage.PieceImpl = storagePiece{}

type storageClient struct {
completed int
}

func (s *storageClient) OpenTorrent(
info *metainfo.Info,
infoHash metainfo.Hash,
) (storage.TorrentImpl, error) {
return storage.TorrentImpl{
Piece: func(p metainfo.Piece) storage.PieceImpl {
return storagePiece{complete: p.Index() < s.completed}
},
}, nil
}

func BenchmarkRequestStrategy(b *testing.B) {
c := qt.New(b)
cl := newTestingClient(b)
storageClient := storageClient{}
tor, new := cl.AddTorrentOpt(AddTorrentOpts{
Storage: &storageClient,
})
tor.disableTriggers = true
c.Assert(new, qt.IsTrue)
const pieceLength = 1 << 8 << 10
const numPieces = 30_000
err := tor.setInfo(&metainfo.Info{
Pieces: make([]byte, numPieces*metainfo.HashSize),
PieceLength: pieceLength,
Length: pieceLength * numPieces,
})
c.Assert(err, qt.IsNil)
tor.onSetInfo()
peer := cl.newConnection(nil, newConnectionOpts{
network: "test",
})
peer.setTorrent(tor)
c.Assert(tor.storage, qt.IsNotNil)
const chunkSize = defaultChunkSize
peer.onPeerHasAllPiecesNoTriggers()
for i := 0; i < tor.numPieces(); i++ {
tor.pieces[i].priority.Raise(PiecePriorityNormal)
tor.updatePiecePriorityNoTriggers(i)
}
peer.peerChoking = false
//b.StopTimer()
b.ResetTimer()
//b.ReportAllocs()
for _ = range iter.N(b.N) {
storageClient.completed = 0
for pieceIndex := range iter.N(numPieces) {
tor.updatePieceCompletion(pieceIndex)
}
for completed := 0; completed <= numPieces; completed += 1 {
storageClient.completed = completed
if completed > 0 {
tor.updatePieceCompletion(completed - 1)
}
// Starting and stopping timers around this part causes lots of GC overhead.
rs := peer.getDesiredRequestState()
tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
// End of part that should be timed.
remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
remainingChunks,
int(cl.config.MaxUnverifiedBytes/chunkSize)))
}
}
}
2 changes: 1 addition & 1 deletion request-strategy/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func GetRequestablePieces(
return true
}
if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
return true
return false
}
allTorrentsUnverifiedBytes += pieceLength
f(ih, _i.key.Index, _i.state)
Expand Down
41 changes: 25 additions & 16 deletions request-strategy/piece-request-order.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package requestStrategy

import "github.com/anacrolix/torrent/metainfo"
import (
g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/metainfo"
)

type Btree interface {
Delete(pieceRequestOrderItem)
Expand All @@ -21,14 +24,14 @@ type PieceRequestOrder struct {
}

type PieceRequestOrderKey struct {
InfoHash metainfo.Hash
Index int
InfoHash metainfo.Hash
}

type PieceRequestOrderState struct {
Availability int
Priority piecePriority
Partial bool
Availability int
}

type pieceRequestOrderItem struct {
Expand All @@ -40,28 +43,29 @@ func (me *pieceRequestOrderItem) Less(otherConcrete *pieceRequestOrderItem) bool
return pieceOrderLess(me, otherConcrete).Less()
}

func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
if _, ok := me.keys[key]; ok {
panic(key)
// Returns the old state if the key was already present.
func (me *PieceRequestOrder) Add(
key PieceRequestOrderKey,
state PieceRequestOrderState,
) (old g.Option[PieceRequestOrderState]) {
if old.Value, old.Ok = me.keys[key]; old.Ok {
if state == old.Value {
return
}
me.tree.Delete(pieceRequestOrderItem{key, old.Value})
}
me.tree.Add(pieceRequestOrderItem{key, state})
me.keys[key] = state
return
}

func (me *PieceRequestOrder) Update(
key PieceRequestOrderKey,
state PieceRequestOrderState,
) {
oldState, ok := me.keys[key]
if !ok {
if !me.Add(key, state).Ok {
panic("key should have been added already")
}
if state == oldState {
return
}
me.tree.Delete(pieceRequestOrderItem{key, oldState})
me.tree.Add(pieceRequestOrderItem{key, state})
me.keys[key] = state
}

func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
Expand All @@ -71,9 +75,14 @@ func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceR
}
}

func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]})
func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) bool {
state, ok := me.keys[key]
if !ok {
return false
}
me.tree.Delete(pieceRequestOrderItem{key, state})
delete(me.keys, key)
return true
}

func (me *PieceRequestOrder) Len() int {
Expand Down
18 changes: 11 additions & 7 deletions request-strategy/piece-request-order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func benchmarkPieceRequestOrder[B Btree](
for range iter.N(b.N) {
pro := NewPieceOrder(newBtree(), numPieces)
state := PieceRequestOrderState{}
doPieces := func(m func(PieceRequestOrderKey)) {
doPieces := func(m func(PieceRequestOrderKey) bool) {
for i := range iter.N(numPieces) {
key := PieceRequestOrderKey{
Index: i,
Expand All @@ -28,34 +28,38 @@ func benchmarkPieceRequestOrder[B Btree](
m(key)
}
}
doPieces(func(key PieceRequestOrderKey) {
pro.Add(key, state)
doPieces(func(key PieceRequestOrderKey) bool {
return !pro.Add(key, state).Ok
})
state.Availability++
doPieces(func(key PieceRequestOrderKey) {
doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return true
})
doPieces(func(key PieceRequestOrderKey) {
doPieces(func(key PieceRequestOrderKey) bool {
state.Priority = piecePriority(key.Index / 4)
pro.Update(key, state)
return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return item.key.Index < 1000
})
state.Priority = 0
state.Availability++
doPieces(func(key PieceRequestOrderKey) {
doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return item.key.Index < 1000
})
state.Availability--
doPieces(func(key PieceRequestOrderKey) {
doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
return true
})
doPieces(pro.Delete)
if pro.Len() != 0 {
Expand Down
Loading

0 comments on commit 2328bb4

Please sign in to comment.