From 2328bb420895f2173c6fd3dc84b35d83c13362ee Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 9 Feb 2024 00:15:18 +1100 Subject: [PATCH] Optimize request strategy for torrent storage with unlimited capacity --- piece.go | 4 +- request-strategy-impls.go | 58 ++++++++--- request-strategy-impls_test.go | 103 +++++++++++++++++++ request-strategy/order.go | 2 +- request-strategy/piece-request-order.go | 41 +++++--- request-strategy/piece-request-order_test.go | 18 ++-- requesting.go | 22 +++- torrent-piece-request-order.go | 33 ++++-- torrent.go | 21 ++-- undirtied-chunks-iter.go | 8 +- 10 files changed, 246 insertions(+), 64 deletions(-) diff --git a/piece.go b/piece.go index e08b260969..4fd2d309b9 100644 --- a/piece.go +++ b/piece.go @@ -82,13 +82,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType { func (p *Piece) unpendChunkIndex(i chunkIndexType) { p.t.dirtyChunks.Add(p.requestIndexOffset() + i) - p.t.updatePieceRequestOrder(p.index) + p.t.updatePieceRequestOrderPiece(p.index) p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i RequestIndex) { p.t.dirtyChunks.Remove(p.requestIndexOffset() + i) - p.t.updatePieceRequestOrder(p.index) + p.t.updatePieceRequestOrderPiece(p.index) } func (p *Piece) numChunks() chunkIndexType { diff --git a/request-strategy-impls.go b/request-strategy-impls.go index ae5054507d..9d779ded01 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -2,38 +2,66 @@ package torrent import ( g "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/metainfo" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" ) -type requestStrategyInput struct { - cl *Client - capFunc storage.TorrentCapacity +type requestStrategyInputCommon struct { + maxUnverifiedBytes int64 } -func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent { - return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)} +func (r requestStrategyInputCommon) MaxUnverifiedBytes() int64 { + return r.maxUnverifiedBytes } -func (r requestStrategyInput) Capacity() (int64, bool) { - if r.capFunc == nil { - return 0, false - } +type requestStrategyInputMultiTorrent struct { + requestStrategyInputCommon + torrents map[metainfo.Hash]*Torrent + capFunc storage.TorrentCapacity +} + +func (r requestStrategyInputMultiTorrent) Torrent(ih metainfo.Hash) request_strategy.Torrent { + return requestStrategyTorrent{g.MapMustGet(r.torrents, ih)} +} + +func (r requestStrategyInputMultiTorrent) Capacity() (int64, bool) { return (*r.capFunc)() } -func (r requestStrategyInput) MaxUnverifiedBytes() int64 { - return r.cl.config.MaxUnverifiedBytes +type requestStrategyInputSingleTorrent struct { + requestStrategyInputCommon + t *Torrent } -var _ request_strategy.Input = requestStrategyInput{} +func (r requestStrategyInputSingleTorrent) Torrent(_ metainfo.Hash) request_strategy.Torrent { + return requestStrategyTorrent{r.t} +} + +func (r requestStrategyInputSingleTorrent) Capacity() (cap int64, capped bool) { + return 0, false +} + +var _ request_strategy.Input = requestStrategyInputSingleTorrent{} + +func (cl *Client) getRequestStrategyInputCommon() requestStrategyInputCommon { + return requestStrategyInputCommon{cl.config.MaxUnverifiedBytes} +} // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { - return requestStrategyInput{ - cl: cl, - capFunc: primaryTorrent.storage.Capacity, + if primaryTorrent.storage.Capacity == nil { + return requestStrategyInputSingleTorrent{ + requestStrategyInputCommon: cl.getRequestStrategyInputCommon(), + t: primaryTorrent, + } + } else { + return requestStrategyInputMultiTorrent{ + requestStrategyInputCommon: cl.getRequestStrategyInputCommon(), + torrents: cl.torrents, + capFunc: primaryTorrent.storage.Capacity, + } } } diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go index f0d3fa183f..ad15166259 100644 --- a/request-strategy-impls_test.go +++ b/request-strategy-impls_test.go @@ -5,10 +5,13 @@ import ( "runtime" "testing" + "github.com/anacrolix/missinggo/v2/iter" "github.com/davecgh/go-spew/spew" qt "github.com/frankban/quicktest" + "github.com/anacrolix/torrent/metainfo" request_strategy "github.com/anacrolix/torrent/request-strategy" + "github.com/anacrolix/torrent/storage" ) func makeRequestStrategyPiece(t request_strategy.Torrent) request_strategy.Piece { @@ -27,3 +30,103 @@ func TestRequestStrategyPieceDoesntAlloc(t *testing.T) { // We have to use p, or it gets optimized away. spew.Fdump(io.Discard, p) } + +type storagePiece struct { + complete bool +} + +func (s storagePiece) ReadAt(p []byte, off int64) (n int, err error) { + //TODO implement me + panic("implement me") +} + +func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) { + //TODO implement me + panic("implement me") +} + +func (s storagePiece) MarkComplete() error { + //TODO implement me + panic("implement me") +} + +func (s storagePiece) MarkNotComplete() error { + //TODO implement me + panic("implement me") +} + +func (s storagePiece) Completion() storage.Completion { + return storage.Completion{Ok: true, Complete: s.complete} +} + +var _ storage.PieceImpl = storagePiece{} + +type storageClient struct { + completed int +} + +func (s *storageClient) OpenTorrent( + info *metainfo.Info, + infoHash metainfo.Hash, +) (storage.TorrentImpl, error) { + return storage.TorrentImpl{ + Piece: func(p metainfo.Piece) storage.PieceImpl { + return storagePiece{complete: p.Index() < s.completed} + }, + }, nil +} + +func BenchmarkRequestStrategy(b *testing.B) { + c := qt.New(b) + cl := newTestingClient(b) + storageClient := storageClient{} + tor, new := cl.AddTorrentOpt(AddTorrentOpts{ + Storage: &storageClient, + }) + tor.disableTriggers = true + c.Assert(new, qt.IsTrue) + const pieceLength = 1 << 8 << 10 + const numPieces = 30_000 + err := tor.setInfo(&metainfo.Info{ + Pieces: make([]byte, numPieces*metainfo.HashSize), + PieceLength: pieceLength, + Length: pieceLength * numPieces, + }) + c.Assert(err, qt.IsNil) + tor.onSetInfo() + peer := cl.newConnection(nil, newConnectionOpts{ + network: "test", + }) + peer.setTorrent(tor) + c.Assert(tor.storage, qt.IsNotNil) + const chunkSize = defaultChunkSize + peer.onPeerHasAllPiecesNoTriggers() + for i := 0; i < tor.numPieces(); i++ { + tor.pieces[i].priority.Raise(PiecePriorityNormal) + tor.updatePiecePriorityNoTriggers(i) + } + peer.peerChoking = false + //b.StopTimer() + b.ResetTimer() + //b.ReportAllocs() + for _ = range iter.N(b.N) { + storageClient.completed = 0 + for pieceIndex := range iter.N(numPieces) { + tor.updatePieceCompletion(pieceIndex) + } + for completed := 0; completed <= numPieces; completed += 1 { + storageClient.completed = completed + if completed > 0 { + tor.updatePieceCompletion(completed - 1) + } + // Starting and stopping timers around this part causes lots of GC overhead. + rs := peer.getDesiredRequestState() + tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes) + // End of part that should be timed. + remainingChunks := (numPieces - completed) * (pieceLength / chunkSize) + c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt( + remainingChunks, + int(cl.config.MaxUnverifiedBytes/chunkSize))) + } + } +} diff --git a/request-strategy/order.go b/request-strategy/order.go index eb4d1b7f72..67adc8587b 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -68,7 +68,7 @@ func GetRequestablePieces( return true } if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() { - return true + return false } allTorrentsUnverifiedBytes += pieceLength f(ih, _i.key.Index, _i.state) diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go index 3056741db3..54b5a6ee99 100644 --- a/request-strategy/piece-request-order.go +++ b/request-strategy/piece-request-order.go @@ -1,6 +1,9 @@ package requestStrategy -import "github.com/anacrolix/torrent/metainfo" +import ( + g "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/metainfo" +) type Btree interface { Delete(pieceRequestOrderItem) @@ -21,14 +24,14 @@ type PieceRequestOrder struct { } type PieceRequestOrderKey struct { - InfoHash metainfo.Hash Index int + InfoHash metainfo.Hash } type PieceRequestOrderState struct { + Availability int Priority piecePriority Partial bool - Availability int } type pieceRequestOrderItem struct { @@ -40,28 +43,29 @@ func (me *pieceRequestOrderItem) Less(otherConcrete *pieceRequestOrderItem) bool return pieceOrderLess(me, otherConcrete).Less() } -func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) { - if _, ok := me.keys[key]; ok { - panic(key) +// Returns the old state if the key was already present. +func (me *PieceRequestOrder) Add( + key PieceRequestOrderKey, + state PieceRequestOrderState, +) (old g.Option[PieceRequestOrderState]) { + if old.Value, old.Ok = me.keys[key]; old.Ok { + if state == old.Value { + return + } + me.tree.Delete(pieceRequestOrderItem{key, old.Value}) } me.tree.Add(pieceRequestOrderItem{key, state}) me.keys[key] = state + return } func (me *PieceRequestOrder) Update( key PieceRequestOrderKey, state PieceRequestOrderState, ) { - oldState, ok := me.keys[key] - if !ok { + if !me.Add(key, state).Ok { panic("key should have been added already") } - if state == oldState { - return - } - me.tree.Delete(pieceRequestOrderItem{key, oldState}) - me.tree.Add(pieceRequestOrderItem{key, state}) - me.keys[key] = state } func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem { @@ -71,9 +75,14 @@ func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceR } } -func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) { - me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]}) +func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) bool { + state, ok := me.keys[key] + if !ok { + return false + } + me.tree.Delete(pieceRequestOrderItem{key, state}) delete(me.keys, key) + return true } func (me *PieceRequestOrder) Len() int { diff --git a/request-strategy/piece-request-order_test.go b/request-strategy/piece-request-order_test.go index ee5fb39ae5..818b241407 100644 --- a/request-strategy/piece-request-order_test.go +++ b/request-strategy/piece-request-order_test.go @@ -19,7 +19,7 @@ func benchmarkPieceRequestOrder[B Btree]( for range iter.N(b.N) { pro := NewPieceOrder(newBtree(), numPieces) state := PieceRequestOrderState{} - doPieces := func(m func(PieceRequestOrderKey)) { + doPieces := func(m func(PieceRequestOrderKey) bool) { for i := range iter.N(numPieces) { key := PieceRequestOrderKey{ Index: i, @@ -28,34 +28,38 @@ func benchmarkPieceRequestOrder[B Btree]( m(key) } } - doPieces(func(key PieceRequestOrderKey) { - pro.Add(key, state) + doPieces(func(key PieceRequestOrderKey) bool { + return !pro.Add(key, state).Ok }) state.Availability++ - doPieces(func(key PieceRequestOrderKey) { + doPieces(func(key PieceRequestOrderKey) bool { pro.Update(key, state) + return true }) pro.tree.Scan(func(item pieceRequestOrderItem) bool { return true }) - doPieces(func(key PieceRequestOrderKey) { + doPieces(func(key PieceRequestOrderKey) bool { state.Priority = piecePriority(key.Index / 4) pro.Update(key, state) + return true }) pro.tree.Scan(func(item pieceRequestOrderItem) bool { return item.key.Index < 1000 }) state.Priority = 0 state.Availability++ - doPieces(func(key PieceRequestOrderKey) { + doPieces(func(key PieceRequestOrderKey) bool { pro.Update(key, state) + return true }) pro.tree.Scan(func(item pieceRequestOrderItem) bool { return item.key.Index < 1000 }) state.Availability-- - doPieces(func(key PieceRequestOrderKey) { + doPieces(func(key PieceRequestOrderKey) bool { pro.Update(key, state) + return true }) doPieces(pro.Delete) if pro.Len() != 0 { diff --git a/requesting.go b/requesting.go index fdb0ad7910..1440207d52 100644 --- a/requesting.go +++ b/requesting.go @@ -212,7 +212,8 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { return } } - if p.requestState.Cancelled.Contains(r) { + cancelled := &p.requestState.Cancelled + if !cancelled.IsEmpty() && cancelled.Contains(r) { // Can't re-request while awaiting acknowledgement. return } @@ -244,11 +245,19 @@ func (p *Peer) maybeUpdateActualRequestState() { func(_ context.Context) { next := p.getDesiredRequestState() p.applyRequestState(next) - p.t.requestIndexes = next.Requests.requestIndexes[:0] + p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) }, ) } +func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) { + // The incoming slice can be smaller when getDesiredRequestState short circuits on some + // conditions. + if cap(slice) > cap(t.requestIndexes) { + t.requestIndexes = slice[:0] + } +} + // Whether we should allow sending not interested ("losing interest") to the peer. I noticed // qBitTorrent seems to punish us for sending not interested when we're streaming and don't // currently need anything. @@ -278,7 +287,11 @@ func (p *Peer) applyRequestState(next desiredRequestState) { return } more := true - requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + orig := next.Requests.requestIndexes + requestHeap := heap.InterfaceForSlice( + &next.Requests.requestIndexes, + next.Requests.lessByValue, + ) heap.Init(requestHeap) t := p.t @@ -292,6 +305,9 @@ func (p *Peer) applyRequestState(next desiredRequestState) { break } req := heap.Pop(requestHeap) + if cap(next.Requests.requestIndexes) != cap(orig) { + panic("changed") + } existing := t.requestingPeer(req) if existing != nil && existing != p { // Don't steal from the poor. diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go index 10623da0b5..03943ed04a 100644 --- a/torrent-piece-request-order.go +++ b/torrent-piece-request-order.go @@ -1,17 +1,28 @@ package torrent import ( + g "github.com/anacrolix/generics" request_strategy "github.com/anacrolix/torrent/request-strategy" ) -func (t *Torrent) updatePieceRequestOrder(pieceIndex int) { +func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) { if t.storage == nil { return } - if ro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]; ok { - ro.Update( - t.pieceRequestOrderKey(pieceIndex), - t.requestStrategyPieceOrderState(pieceIndex)) + pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()] + if !ok { + return + } + key := t.pieceRequestOrderKey(pieceIndex) + if t.hasStorageCap() { + pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex)) + return + } + pending := !t.ignorePieceForRequests(pieceIndex) + if pending { + pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex)) + } else { + pro.Delete(key) } } @@ -41,9 +52,7 @@ func (t *Torrent) initPieceRequestOrder() { if t.storage == nil { return } - if t.cl.pieceRequestOrder == nil { - t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder) - } + g.MakeMapIfNil(&t.cl.pieceRequestOrder) key := t.clientPieceRequestOrderKey() cpro := t.cl.pieceRequestOrder if cpro[key] == nil { @@ -55,9 +64,11 @@ func (t *Torrent) addRequestOrderPiece(i int) { if t.storage == nil { return } - t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add( - t.pieceRequestOrderKey(i), - t.requestStrategyPieceOrderState(i)) + pro := t.getPieceRequestOrder() + key := t.pieceRequestOrderKey(i) + if t.hasStorageCap() || !t.ignorePieceForRequests(i) { + pro.Add(key, t.requestStrategyPieceOrderState(i)) + } } func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder { diff --git a/torrent.go b/torrent.go index 8ea2acde04..2643dd6a69 100644 --- a/torrent.go +++ b/torrent.go @@ -170,6 +170,8 @@ type Torrent struct { // Large allocations reused between request state updates. requestPieceStates []request_strategy.PieceRequestOrderState requestIndexes []RequestIndex + + disableTriggers bool } type outgoingConnAttemptKey = *PeerInfo @@ -200,7 +202,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) { panic(p.relativeAvailability) } p.relativeAvailability-- - t.updatePieceRequestOrder(i) + t.updatePieceRequestOrderPiece(i) } func (t *Torrent) incPieceAvailability(i pieceIndex) { @@ -208,7 +210,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) { if t.haveInfo() { p := t.piece(i) p.relativeAvailability++ - t.updatePieceRequestOrder(i) + t.updatePieceRequestOrderPiece(i) } } @@ -1099,7 +1101,7 @@ func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType } func (t *Torrent) wantPieceIndex(index pieceIndex) bool { - return t._pendingPieces.Contains(uint32(index)) + return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index)) } // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent @@ -1256,7 +1258,7 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange if !t.closed.IsSet() { // It would be possible to filter on pure-priority changes here to avoid churning the piece // request order. - t.updatePieceRequestOrder(piece) + t.updatePieceRequestOrderPiece(piece) } p := &t.pieces[piece] newPrio := p.uncachedPriority() @@ -1269,9 +1271,10 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange } func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { - if t.updatePiecePriorityNoTriggers(piece) { + if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers { t.onPiecePendingTriggers(piece, reason) } + t.updatePieceRequestOrderPiece(piece) } func (t *Torrent) updateAllPiecePriorities(reason string) { @@ -1415,13 +1418,17 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { } else { t._completedPieces.Remove(x) } - p.t.updatePieceRequestOrder(piece) + p.t.updatePieceRequestOrderPiece(piece) t.updateComplete() if complete && len(p.dirtiers) != 0 { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } if changed { - t.logger.Levelf(log.Debug, "piece %d completion changed: %+v -> %+v", piece, cached, uncached) + //slog.Debug( + // "piece completion changed", + // slog.Int("piece", piece), + // slog.Any("from", cached), + // slog.Any("to", uncached)) t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion") } return changed diff --git a/undirtied-chunks-iter.go b/undirtied-chunks-iter.go index de0cce0a5e..8d480c2e9a 100644 --- a/undirtied-chunks-iter.go +++ b/undirtied-chunks-iter.go @@ -1,10 +1,14 @@ package torrent import ( - "github.com/anacrolix/torrent/typed-roaring" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func iterBitmapUnsetInRange[T typedRoaring.BitConstraint](it *typedRoaring.Iterator[T], start, end T, f func(T)) { +func iterBitmapUnsetInRange[T typedRoaring.BitConstraint]( + it *typedRoaring.Iterator[T], + start, end T, + f func(T), +) { it.AdvanceIfNeeded(start) lastDirty := start - 1 for it.HasNext() {