Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various traversal and verifier fixes #338

Merged
merged 1 commit into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions pkg/internal/testutil/gen.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package testutil

import (
"fmt"
"io"
"math/rand"
"net"
"strconv"
Expand All @@ -10,7 +12,10 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/ipfs/go-unixfsnode/data"
unixfs "github.com/ipfs/go-unixfsnode/testutil"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/go-libipni/metadata"
crypto "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -150,9 +155,11 @@ func (ZeroReader) Read(b []byte) (n int, err error) {
return len(b), nil
}

// TODO: this should probably be an option in unixfsnode/testutil, for
// generators to strictly not return a DAG with duplicates
// TODO: these should probably be in unixfsnode/testutil, or as options to
// the respective functions there.

// GenerateNoDupes runs the unixfsnode/testutil generator function repeatedly
// until it produces a DAG with strictly no duplicate CIDs.
func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
var check func(unixfs.DirEntry) bool
var seen map[cid.Cid]struct{}
Expand All @@ -178,3 +185,37 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

// GenerateStrictlyNestedShardedDir is a wrapper around
// unixfsnode/testutil.GenerateDirectory that uses dark magic to repeatedly
// generate a sharded directory until it produces one that is strictly nested.
// That is, it produces a sharded directory structure with strictly at least one
// level of sharding with at least two child shards.
//
// Since it is possible to produce a sharded directory that is
// contained in a single block, this function provides a way to generate a
// sharded directory for cases where we need to test multi-level sharding.
func GenerateStrictlyNestedShardedDir(t *testing.T, linkSys *linking.LinkSystem, randReader io.Reader, targetSize int) unixfs.DirEntry {
for {
de := unixfs.GenerateDirectory(t, linkSys, randReader, targetSize, true)
nd, err := linkSys.Load(linking.LinkContext{}, cidlink.Link{Cid: de.Root}, dagpb.Type.PBNode)
require.NoError(t, err)
ufsd, err := data.DecodeUnixFSData(nd.(dagpb.PBNode).Data.Must().Bytes())
require.NoError(t, err)
pfxLen := len(fmt.Sprintf("%X", ufsd.FieldFanout().Must().Int()-1))
iter := nd.(dagpb.PBNode).Links.ListIterator()
childShards := 0
for !iter.Done() {
_, lnk, err := iter.Next()
require.NoError(t, err)
nameLen := len(lnk.(dagpb.PBLink).Name.Must().String())
if nameLen == pfxLen {
// name is just a shard prefix, so we have at least one level of nesting
childShards++
}
}
if childShards >= 2 {
return de
}
}
}
35 changes: 31 additions & 4 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
ctx,
cidlink.Link{Cid: br.request.Cid},
selector,
&traversalLinkSys,
traversalLinkSys,
preloader,
br.request.MaxBlocks,
)
Expand Down Expand Up @@ -295,15 +295,20 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi
}
}

func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error {
return nil
}

func easyTraverse(
ctx context.Context,
root datamodel.Link,
traverseSelector datamodel.Node,
lsys *linking.LinkSystem,
lsys linking.LinkSystem,
preloader preload.Loader,
maxBlocks uint64,
) error {

lsys, ecr := newErrorCapturingReader(lsys)
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)

// retrieve first node
Expand All @@ -319,7 +324,7 @@ func easyTraverse(
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: *lsys,
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: protoChooser,
Preloader: preloader,
},
Expand All @@ -335,5 +340,27 @@ func easyTraverse(
if err != nil {
return err
}
return progress.WalkAdv(node, compiledSelector, func(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { return nil })
if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil {
return err
}
return ecr.Error
}

type errorCapturingReader struct {
sro linking.BlockReadOpener
Error error
}

func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) {
ecr := &errorCapturingReader{sro: lsys.StorageReadOpener}
lsys.StorageReadOpener = ecr.StorageReadOpener
return lsys, ecr
}

func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
r, err := ecr.sro(lc, l)
if err != nil {
ecr.Error = err
}
return r, err
}
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader {
}
}

func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) {
func (t *timeToFirstByteReader) Read(p []byte) (int, error) {
if !t.first {
t.first = true
defer t.cb()
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/http/ipfs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpserver

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -188,7 +189,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req))

// force all blocks to flush
if cerr := carWriter.Close(); cerr != nil {
if cerr := carWriter.Close(); cerr != nil && !errors.Is(cerr, context.Canceled) {
logger.Infof("error closing car writer: %s", cerr)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/duplicateaddercar.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (bs *blockStream) Close() {
}

func (bs *blockStream) WriteBlock(blk blocks.Block) error {
if bs.ctx.Err() != nil {
return bs.ctx.Err()
}
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.done {
Expand Down
46 changes: 38 additions & 8 deletions pkg/verifiedcar/verifiedcar.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason)
//
// * https://specs.ipfs.tech/http-gateways/path-gateway/
func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) {

cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false))
if err != nil {
// TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err)
Expand All @@ -93,7 +92,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin
}

func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) {

sel, err := selector.CompileSelector(cfg.Selector)
if err != nil {
return 0, 0, err
Expand All @@ -106,7 +104,7 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)

lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys)
nbls, lsys := NewNextBlockLinkSystem(ctx, cfg, cr, bt, lsys)

// run traversal in this goroutine
progress := traversal.Progress{
Expand Down Expand Up @@ -136,21 +134,41 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
return 0, 0, traversalError(err)
}

if nbls.Error != nil {
// capture any errors not bubbled up through the traversal, i.e. see
// https://github.com/ipld/go-ipld-prime/pull/524
return 0, 0, nbls.Error
}

// make sure we don't have any extraneous data beyond what the traversal needs
_, err = cbr.Next()
if !errors.Is(err, io.EOF) {
if err == nil {
return 0, 0, ErrExtraneousBlock
} else if !errors.Is(err, io.EOF) {
return 0, 0, err
}

// wait for parser to finish and provide errors or stats
return bt.blocks, bt.bytes, nil
}

func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener {
type NextBlockLinkSystem struct {
Error error
}

func NewNextBlockLinkSystem(
ctx context.Context,
cfg Config,
cr *carReader,
bt *writeTracker,
lsys linking.LinkSystem,
) (*NextBlockLinkSystem, linking.LinkSystem) {
nbls := &NextBlockLinkSystem{}
seen := make(map[cid.Cid]struct{})
return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid
storageReadOpener := lsys.StorageReadOpener

nextBlockReadOpener := func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid
var data []byte
var err error
if _, ok := seen[cid]; ok {
Expand All @@ -165,7 +183,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w
}
} else {
// duplicate block, rely on the supplied LinkSystem to have stored this
rdr, err := lsys.StorageReadOpener(lc, l)
rdr, err := storageReadOpener(lc, l)
if !cfg.WriteDuplicatesOut {
return rdr, err
}
Expand Down Expand Up @@ -198,6 +216,18 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w
}
return io.NopCloser(rdr), nil
}

// wrap nextBlockReadOpener in one that captures errors on `nbls`
lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
rdr, err := nextBlockReadOpener(lc, l)
if err != nil {
nbls.Error = err
return nil, err
}
return rdr, nil
}

return nbls, lsys
}

type carReader struct {
Expand Down
Loading