Skip to content

Commit

Permalink
Merge branch 'no-ipv6' into possum
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 12, 2024
2 parents fbb3115 + 71b12fc commit 98f035c
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 86 deletions.
23 changes: 23 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# On macOS, docker does not support IPv6.

FROM alpine

RUN apk add go fuse

WORKDIR /src

COPY . .

# RUN go env

ARG GOCACHE=/root/.cache/go-build
ARG GOMODCACHE=/root/go/pkg/mod

RUN --mount=type=cache,target=$GOCACHE \
--mount=type=cache,target=$GOMODCACHE \
go test -failfast ./...

RUN --mount=type=cache,target=$GOCACHE \
--mount=type=cache,target=$GOMODCACHE \
./fs/test.sh

3 changes: 2 additions & 1 deletion client-nowasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
)

func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
c := qt.New(t)
cfg := TestingConfig(t)
pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
require.NoError(t, err)
ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
defer ci.Close()
cfg.DefaultStorage = ci
cl, err := NewClient(cfg)
require.NoError(t, err)
c.Assert(err, qt.IsNil, qt.Commentf("%#v", err))
cl.Close()
// And again, https://github.com/anacrolix/torrent/issues/158
cl, err = NewClient(cfg)
Expand Down
13 changes: 12 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,21 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
}
}

sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback, cl.logger)
builtinListenNetworks := cl.listenNetworks()
sockets, err := listenAll(
builtinListenNetworks,
cl.config.ListenHost,
cl.config.ListenPort,
cl.firewallCallback,
cl.logger,
)
if err != nil {
return
}
if len(sockets) == 0 && len(builtinListenNetworks) != 0 {
err = fmt.Errorf("no sockets created for networks %v", builtinListenNetworks)
return
}

// Check for panics.
cl.LocalPort()
Expand Down
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
}

func TestDhtInheritBlocklist(t *testing.T) {
c := qt.New(t)
ipl := iplist.New(nil)
require.NotNil(t, ipl)
cfg := TestingConfig(t)
Expand All @@ -353,7 +354,7 @@ func TestDhtInheritBlocklist(t *testing.T) {
assert.Equal(t, ipl, s.(AnacrolixDhtServerWrapper).Server.IPBlocklist())
numServers++
})
assert.EqualValues(t, 2, numServers)
c.Assert(numServers, qt.Not(qt.Equals), 0)
}

// Check that stuff is merged in subsequent AddTorrentSpec for the same
Expand Down
12 changes: 9 additions & 3 deletions network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func testListenerNetwork(
expectedNet, givenNet, addr string, validIp4 bool,
) {
l, err := listenFunc(givenNet, addr)
if isUnsupportedNetworkError(err) {
return
}
require.NoError(t, err)
defer l.Close()
assert.EqualValues(t, expectedNet, l.Addr().Network())
Expand Down Expand Up @@ -49,10 +52,13 @@ func testAcceptedConnAddr(
require.NoError(t, err)
defer c.Close()
assert.EqualValues(t, network, c.RemoteAddr().Network())
assert.Equal(t, valid4, missinggo.AddrIP(c.RemoteAddr()).To4() != nil)
assert.Equal(t, valid4, missinggo.AddrIP(c.RemoteAddr()).To4() == nil)
}

func listenClosure(rawListenFunc func(string, string) (net.Listener, error), network, addr string) func() (net.Listener, error) {
func listenClosure(
rawListenFunc func(string, string) (net.Listener, error),
network, addr string,
) func() (net.Listener, error) {
return func() (net.Listener, error) {
return rawListenFunc(network, addr)
}
Expand All @@ -76,6 +82,6 @@ func TestListenLocalhostNetwork(t *testing.T) {
"tcp",
false,
dialClosure(net.Dial, "tcp"),
listenClosure(net.Listen, "tcp6", "localhost:0"),
listenClosure(net.Listen, "tcp4", "localhost:0"),
)
}
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {

func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
p.t.updatePieceRequestOrderPiece(p.index)
p.readerCond.Broadcast()
}

func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
p.t.updatePieceRequestOrderPiece(p.index)
}

func (p *Piece) numChunks() chunkIndexType {
Expand Down
58 changes: 43 additions & 15 deletions request-strategy-impls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,66 @@ package torrent

import (
g "github.com/anacrolix/generics"

"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)

type requestStrategyInput struct {
cl *Client
capFunc storage.TorrentCapacity
type requestStrategyInputCommon struct {
maxUnverifiedBytes int64
}

func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)}
func (r requestStrategyInputCommon) MaxUnverifiedBytes() int64 {
return r.maxUnverifiedBytes
}

func (r requestStrategyInput) Capacity() (int64, bool) {
if r.capFunc == nil {
return 0, false
}
type requestStrategyInputMultiTorrent struct {
requestStrategyInputCommon
torrents map[metainfo.Hash]*Torrent
capFunc storage.TorrentCapacity
}

func (r requestStrategyInputMultiTorrent) Torrent(ih metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{g.MapMustGet(r.torrents, ih)}
}

func (r requestStrategyInputMultiTorrent) Capacity() (int64, bool) {
return (*r.capFunc)()
}

func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
return r.cl.config.MaxUnverifiedBytes
type requestStrategyInputSingleTorrent struct {
requestStrategyInputCommon
t *Torrent
}

var _ request_strategy.Input = requestStrategyInput{}
func (r requestStrategyInputSingleTorrent) Torrent(_ metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{r.t}
}

func (r requestStrategyInputSingleTorrent) Capacity() (cap int64, capped bool) {
return 0, false
}

var _ request_strategy.Input = requestStrategyInputSingleTorrent{}

func (cl *Client) getRequestStrategyInputCommon() requestStrategyInputCommon {
return requestStrategyInputCommon{cl.config.MaxUnverifiedBytes}
}

// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
return requestStrategyInput{
cl: cl,
capFunc: primaryTorrent.storage.Capacity,
if primaryTorrent.storage.Capacity == nil {
return requestStrategyInputSingleTorrent{
requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
t: primaryTorrent,
}
} else {
return requestStrategyInputMultiTorrent{
requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
torrents: cl.torrents,
capFunc: primaryTorrent.storage.Capacity,
}
}
}

Expand Down
103 changes: 103 additions & 0 deletions request-strategy-impls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"runtime"
"testing"

"github.com/anacrolix/missinggo/v2/iter"
"github.com/davecgh/go-spew/spew"
qt "github.com/frankban/quicktest"

"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)

func makeRequestStrategyPiece(t request_strategy.Torrent) request_strategy.Piece {
Expand All @@ -27,3 +30,103 @@ func TestRequestStrategyPieceDoesntAlloc(t *testing.T) {
// We have to use p, or it gets optimized away.
spew.Fdump(io.Discard, p)
}

type storagePiece struct {
complete bool
}

func (s storagePiece) ReadAt(p []byte, off int64) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) {
//TODO implement me
panic("implement me")
}

func (s storagePiece) MarkComplete() error {
//TODO implement me
panic("implement me")
}

func (s storagePiece) MarkNotComplete() error {
//TODO implement me
panic("implement me")
}

func (s storagePiece) Completion() storage.Completion {
return storage.Completion{Ok: true, Complete: s.complete}
}

var _ storage.PieceImpl = storagePiece{}

type storageClient struct {
completed int
}

func (s *storageClient) OpenTorrent(
info *metainfo.Info,
infoHash metainfo.Hash,
) (storage.TorrentImpl, error) {
return storage.TorrentImpl{
Piece: func(p metainfo.Piece) storage.PieceImpl {
return storagePiece{complete: p.Index() < s.completed}
},
}, nil
}

func BenchmarkRequestStrategy(b *testing.B) {
c := qt.New(b)
cl := newTestingClient(b)
storageClient := storageClient{}
tor, new := cl.AddTorrentOpt(AddTorrentOpts{
Storage: &storageClient,
})
tor.disableTriggers = true
c.Assert(new, qt.IsTrue)
const pieceLength = 1 << 8 << 10
const numPieces = 30_000
err := tor.setInfo(&metainfo.Info{
Pieces: make([]byte, numPieces*metainfo.HashSize),
PieceLength: pieceLength,
Length: pieceLength * numPieces,
})
c.Assert(err, qt.IsNil)
tor.onSetInfo()
peer := cl.newConnection(nil, newConnectionOpts{
network: "test",
})
peer.setTorrent(tor)
c.Assert(tor.storage, qt.IsNotNil)
const chunkSize = defaultChunkSize
peer.onPeerHasAllPiecesNoTriggers()
for i := 0; i < tor.numPieces(); i++ {
tor.pieces[i].priority.Raise(PiecePriorityNormal)
tor.updatePiecePriorityNoTriggers(i)
}
peer.peerChoking = false
//b.StopTimer()
b.ResetTimer()
//b.ReportAllocs()
for _ = range iter.N(b.N) {
storageClient.completed = 0
for pieceIndex := range iter.N(numPieces) {
tor.updatePieceCompletion(pieceIndex)
}
for completed := 0; completed <= numPieces; completed += 1 {
storageClient.completed = completed
if completed > 0 {
tor.updatePieceCompletion(completed - 1)
}
// Starting and stopping timers around this part causes lots of GC overhead.
rs := peer.getDesiredRequestState()
tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
// End of part that should be timed.
remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
remainingChunks,
int(cl.config.MaxUnverifiedBytes/chunkSize)))
}
}
}
2 changes: 1 addition & 1 deletion request-strategy/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func GetRequestablePieces(
return true
}
if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
return true
return false
}
allTorrentsUnverifiedBytes += pieceLength
f(ih, _i.key.Index, _i.state)
Expand Down
Loading

0 comments on commit 98f035c

Please sign in to comment.