diff --git a/go.mod b/go.mod index 3d51753f8..852d296af 100644 --- a/go.mod +++ b/go.mod @@ -377,3 +377,5 @@ require ( gonum.org/v1/gonum v0.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect ) + +replace github.com/filecoin-project/go-data-segment v0.0.1 => github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69 diff --git a/go.sum b/go.sum index e4c84e270..0d405c36a 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,6 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs= -github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc= @@ -954,6 +952,8 @@ github.com/ipni/storetheindex v0.8.1 h1:3uHclkcQWlIXQx+We4tbGF/XzoZYERz3so34xQbU github.com/ipni/storetheindex v0.8.1/go.mod h1:K4AR2bRll46YCWeGvob5foN/Z/kuovPdlUeJKOHVQHo= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= +github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69 h1:prNO3cadvXtRXItvhQoaJ0qfF3a1sPkZ8B6epyuCLPo= +github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69/go.mod h1:4ZWx04e7pKuozznBnZarZFJQ+PeUEKPp/Lv7M6K7bog= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go new file mode 100644 index 000000000..7a98b5a64 --- /dev/null +++ b/itests/dummydeal_podsi_test.go @@ -0,0 +1,239 @@ +package itests + +import ( + "bytes" + "context" + "fmt" + "io" + "math/bits" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-data-segment/datasegment" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode/data/builder" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" + multihash "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestDummyPodsiDealOnline(t *testing.T) { + randomFileSize := int(4e6) + + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + var opts []framework.FrameworkOpts + opts = append(opts, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10e9), framework.SetProvisionalWalletBalances(9e18)) + f := framework.NewTestFramework(ctx, t, opts...) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(5e18)) + require.NoError(t, err) + + tempdir := t.TempDir() + log.Debugw("using tempdir", "dir", tempdir) + + // create a random file + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, randomFileSize) + require.NoError(t, err) + + carFile := filepath.Join(tempdir, "test.car") + dataSegmentFile := filepath.Join(tempdir, "datasegment.dat") + + // pack it into the car + rootCid, err := createCar(t, carFile, []string{randomFilepath}) + require.NoError(t, err) + + // pack the car into data segement piece twice so that we have two segments + makeDataSegmentPiece(t, dataSegmentFile, []string{carFile, carFile}) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, tempdir) + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid := uuid.New() + + // Make a deal + res, err := f.MakeDummyDeal(dealUuid, dataSegmentFile, rootCid, server.URL+"/"+filepath.Base(dataSegmentFile), false) + require.NoError(t, err) + require.True(t, res.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) + + time.Sleep(2 * time.Second) + + // Wait for the first deal to be added to a sector and cleaned up so space is made + err = f.WaitForDealAddedToSector(dealUuid) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + +} + +func makeDataSegmentPiece(t *testing.T, dataSegmentFile string, subPieces []string) { + readers := make([]io.Reader, 0) + deals := make([]abi.PieceInfo, 0) + for _, sp := range subPieces { + arg, err := os.Open(sp) + require.NoError(t, err) + + readers = append(readers, arg) + cp := new(commp.Calc) + io.Copy(cp, arg) + rawCommP, size, err := cp.Digest() + require.NoError(t, err) + + arg.Seek(0, io.SeekStart) + c, _ := commcid.DataCommitmentV1ToCID(rawCommP) + subdeal := abi.PieceInfo{ + Size: abi.PaddedPieceSize(size), + PieceCID: c, + } + deals = append(deals, subdeal) + } + require.NotEqual(t, 0, len(deals)) + + _, size, err := datasegment.ComputeDealPlacement(deals) + require.NoError(t, err) + + overallSize := abi.PaddedPieceSize(size) + // we need to make this the 'next' power of 2 in order to have space for the index + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + + a, err := datasegment.NewAggregate(abi.PaddedPieceSize(next), deals) + require.NoError(t, err) + out, err := a.AggregateObjectReader(readers) + require.NoError(t, err) + + // open output file + fo, err := os.Create(dataSegmentFile) + require.NoError(t, err) + defer fo.Close() + + written, err := io.Copy(fo, out) + require.NoError(t, err) + require.NotZero(t, written) +} + +func createCar(t *testing.T, carFile string, files []string) (cid.Cid, error) { + // make a cid with the right length that we eventually will patch with the root. + hasher, err := multihash.GetHasher(multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + digest := hasher.Sum([]byte{}) + hash, err := multihash.Encode(digest, multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + proxyRoot := cid.NewCidV1(uint64(multicodec.DagPb), hash) + + options := []car.Option{} + + cdest, err := blockstore.OpenReadWrite(carFile, []cid.Cid{proxyRoot}, options...) + + if err != nil { + return cid.Undef, err + } + + // Write the unixfs blocks into the store. + root, err := writeFiles(context.Background(), false, cdest, files...) + if err != nil { + return cid.Undef, err + } + + if err := cdest.Finalize(); err != nil { + return cid.Undef, err + } + // re-open/finalize with the final root. + return root, car.ReplaceRootsInFile(carFile, []cid.Cid{root}) +} + +func writeFiles(ctx context.Context, noWrap bool, bs *blockstore.ReadWrite, paths ...string) (cid.Cid, error) { + ls := cidlink.DefaultLinkSystem() + ls.TrustedStorage = true + ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) { + cl, ok := l.(cidlink.Link) + if !ok { + return nil, fmt.Errorf("not a cidlink") + } + blk, err := bs.Get(ctx, cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewBuffer(blk.RawData()), nil + } + ls.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(l ipld.Link) error { + cl, ok := l.(cidlink.Link) + if !ok { + return fmt.Errorf("not a cidlink") + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + bs.Put(ctx, blk) + return nil + }, nil + } + + topLevel := make([]dagpb.PBLink, 0, len(paths)) + for _, p := range paths { + l, size, err := builder.BuildUnixFSRecursive(p, &ls) + if err != nil { + return cid.Undef, err + } + if noWrap { + rcl, ok := l.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", l) + } + return rcl.Cid, nil + } + name := path.Base(p) + entry, err := builder.BuildUnixFSDirectoryEntry(name, int64(size), l) + if err != nil { + return cid.Undef, err + } + topLevel = append(topLevel, entry) + } + + // make a directory for the file(s). + + root, _, err := builder.BuildUnixFSDirectory(topLevel, &ls) + if err != nil { + return cid.Undef, nil + } + rcl, ok := root.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", root) + } + + return rcl.Cid, nil +} diff --git a/itests/framework/framework.go b/itests/framework/framework.go index 70acf1b61..f7bb624ff 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -77,9 +77,10 @@ import ( var Log = logging.Logger("boosttest") type TestFrameworkConfig struct { - Ensemble *kit.Ensemble - EnableLegacy bool - MaxStagingBytes int64 + Ensemble *kit.Ensemble + EnableLegacy bool + MaxStagingBytes int64 + ProvisionalWalletBalances int64 } type TestFramework struct { @@ -117,8 +118,17 @@ func WithEnsemble(e *kit.Ensemble) FrameworkOpts { } } +func SetProvisionalWalletBalances(balance int64) FrameworkOpts { + return func(tmc *TestFrameworkConfig) { + tmc.ProvisionalWalletBalances = balance + } +} + func NewTestFramework(ctx context.Context, t *testing.T, opts ...FrameworkOpts) *TestFramework { - fmc := &TestFrameworkConfig{} + fmc := &TestFrameworkConfig{ + // default provisional balance + ProvisionalWalletBalances: 1e18, + } for _, opt := range opts { opt(fmc) } @@ -224,7 +234,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) Log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) wg.Done() @@ -239,7 +249,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating publish storage deals wallet") psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) Log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) wg.Done() @@ -248,7 +258,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating deal collateral wallet") dealCollatAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, dealCollatAddr, amt) Log.Infof("Created deal collateral wallet %s with %d attoFil", dealCollatAddr, amt) wg.Done() diff --git a/node/config/def.go b/node/config/def.go index 989315a98..f8f0c1f37 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -225,6 +225,9 @@ func DefaultBoost() *Boost { NChunks: 5, AllowPrivateIPs: false, }, + ExperimentalConfig: ExperimentalConfig{ + PodsiDataSegmentReaderBufferSizeBytes: 4e6, // 4MiB + }, } return cfg } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index f5fc43cd2..3c6909b4c 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -115,6 +115,12 @@ your node if metadata log is disabled`, Comment: ``, }, + { + Name: "ExperimentalConfig", + Type: "ExperimentalConfig", + + Comment: `Experimental config`, + }, }, "Common": []DocField{ { @@ -452,6 +458,15 @@ If this parameter is not set, boost will serve data from the endpoint configured in SectorIndexApiInfo.`, }, }, + "ExperimentalConfig": []DocField{ + { + Name: "PodsiDataSegmentReaderBufferSizeBytes", + Type: "int64", + + Comment: `PodsiDataSegmentReaderBufferSizeBytes sets the size of the read buffer to use for podsi deal index parsing. +Default is 4 MiB.`, + }, + }, "FeeConfig": []DocField{ { Name: "MaxPublishDealsFee", diff --git a/node/config/types.go b/node/config/types.go index 8585d1105..5cd0adafe 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -54,6 +54,9 @@ type Boost struct { LotusFees FeeConfig DAGStore lotus_config.DAGStoreConfig IndexProvider IndexProviderConfig + + // Experimental config + ExperimentalConfig ExperimentalConfig } func (b *Boost) GetDealmakingConfig() lotus_config.DealmakingConfig { @@ -445,3 +448,9 @@ type HttpDownloadConfig struct { // The default is false. AllowPrivateIPs bool } + +type ExperimentalConfig struct { + // PodsiDataSegmentReaderBufferSizeBytes sets the size of the read buffer to use for podsi deal index parsing. + // Default is 4 MiB. + PodsiDataSegmentReaderBufferSizeBytes int64 +} diff --git a/node/modules/piecedirectory.go b/node/modules/piecedirectory.go index d99088e40..4cf2e71f2 100644 --- a/node/modules/piecedirectory.go +++ b/node/modules/piecedirectory.go @@ -138,7 +138,8 @@ func NewPieceDirectory(cfg *config.Boost) func(lc fx.Lifecycle, maddr dtypes.Min pdctx, cancel := context.WithCancel(context.Background()) pd := piecedirectory.NewPieceDirectory(store, sa, cfg.LocalIndexDirectory.ParallelAddIndexLimit, - piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency)) + piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency), + piecedirectory.WithDataSegmentReaderBufferSize(cfg.ExperimentalConfig.PodsiDataSegmentReaderBufferSizeBytes)) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { err := sa.Start(ctx, log) diff --git a/piecedirectory/piece_directory_test.go b/piecedirectory/piece_directory_test.go index fb8584d04..2dac2c5e3 100644 --- a/piecedirectory/piece_directory_test.go +++ b/piecedirectory/piece_directory_test.go @@ -24,7 +24,7 @@ func TestSegmentParsing(t *testing.T) { rd, err := os.Open("testdata/segment.car") require.NoError(t, err) - recs, err := parsePieceWithDataSegmentIndex(pieceCid, carSize, rd) + recs, err := parsePieceWithDataSegmentIndex(pieceCid, carSize, rd, 4e6) // 4MiB buffer require.NoError(t, err) t.Log(recs) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index c10249f1a..c4d170348 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" carutil "github.com/filecoin-project/boost/car" @@ -33,10 +34,10 @@ import ( "github.com/ipld/go-car/v2/blockstore" carindex "github.com/ipld/go-car/v2/index" "github.com/jellydator/ttlcache/v2" - "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" ) var log = logging.Logger("piecedirectory") @@ -46,7 +47,8 @@ const ( ) type settings struct { - addIndexConcurrency int + addIndexConcurrency int + dataSegmentReaderBufferSize int64 } type Option func(*settings) @@ -57,6 +59,12 @@ func WithAddIndexConcurrency(c int) Option { } } +func WithDataSegmentReaderBufferSize(size int64) Option { + return func(s *settings) { + s.dataSegmentReaderBufferSize = size + } +} + type PieceDirectory struct { settings *settings store *bdclient.Store @@ -306,7 +314,7 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid // Try to parse data as containing a data segment index log.Debugw("add index: read index", "pieceCid", pieceCid) - recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader) + recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader, ps.settings.dataSegmentReaderBufferSize) if err != nil { log.Infow("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err) // Iterate over all the blocks in the piece to extract the index records @@ -384,34 +392,78 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { return recs, nil } -func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { +func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int64) ([]model.Record, error) { + var readCount int32 ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + + now := time.Now() dsis := datasegment.DataSegmentIndexStartOffset(ps) if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } - dataSegments, err := datasegment.ParseDataSegmentIndex(r) - if err != nil { - return nil, fmt.Errorf("could not parse data segment index: %w", err) + log.Infof("podsi: took %s to seek to the start offset", time.Since(now).String()) + + now = time.Now() + + results := make(chan *datasegment.SegmentDesc) + var parseIndexErr error + ctx, cancel := context.WithCancel(context.Background()) + go func() { + cr := countingReader{Reader: r, cnt: &readCount} + br := bufio.NewReaderSize(&cr, int(dataSegmentReaderBufferSize)) + parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, br, results) + close(results) + }() + + var segments []datasegment.SegmentDesc + cnt := -1 + for res := range results { + cnt++ + if err := res.Validate(); err != nil { + if errors.Is(err, datasegment.ErrValidation) { + continue + } else { + cancel() + return nil, xerrors.Errorf("could not calculate valid entries: got unknown error for entry %d: %w", cnt, err) + } + } + segments = append(segments, *res) } - segments, err := dataSegments.ValidEntries() - if err != nil { - return nil, fmt.Errorf("could not calculate valid entries: %w", err) + + cancel() + + if parseIndexErr != nil { + return nil, fmt.Errorf("could not parse data segment index: %w", parseIndexErr) } + if len(segments) == 0 { return nil, fmt.Errorf("no data segments found") } - + + log.Infow("podsi: parsing and validating data segment index", "buffer_mib", dataSegmentReaderBufferSize/1e6, "segments", len(segments), "time", time.Since(now).String(), "reads", readCount) + + readCount = 0 + + now = time.Now() + recs := make([]model.Record, 0) - for _, s := range segments { + + // this can't be parallelised because reader needs to be used at different offsets + for i, s := range segments { segOffset := s.UnpaddedOffest() segSize := s.UnpaddedLength() - lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) + _, err := r.Seek(int64(segOffset), io.SeekStart) + if err != nil { + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", i, segOffset, err) + } + cr := countingReader{Reader: r, cnt: &readCount} + br := bufio.NewReaderSize(&cr, int(dataSegmentReaderBufferSize)) + lr := io.LimitReader(br, int64(segSize)) subRecs, err := parseRecordsFromCar(lr) if err != nil { // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. - return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", i, segOffset, err) } for i := range subRecs { subRecs[i].Offset += segOffset @@ -419,9 +471,22 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } + log.Infow("podsi: parsing data segments", "buffer_mib", dataSegmentReaderBufferSize/1e6, "records", len(recs), "time", time.Since(now).String(), "reads", readCount) + return recs, nil } +type countingReader struct { + io.Reader + + cnt *int32 +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + atomic.AddInt32(cr.cnt, 1) + return cr.Reader.Read(p) +} + // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal // corresponding to an unsealed sector for this method to work. It will try to build index // using all available deals and will exit as soon as it succeeds for one of the deals @@ -849,7 +914,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( bsR = carutil.NewMultiReaderAt( bytes.NewReader(headerBuf.Bytes()), // payload (CARv1) header bytes.NewReader(make([]byte, dataOffset)), // padding to account for the CARv2 wrapper - sectionReader, // payload (CARv1) data + sectionReader, // payload (CARv1) data ) } else { bsR = reader @@ -893,11 +958,11 @@ func (s *SectorAccessorAsPieceReader) GetReader(ctx context.Context, minerAddr a } func isIdentity(c cid.Cid) (digest []byte, ok bool, err error) { - dmh, err := multihash.Decode(c.Hash()) + dmh, err := mh.Decode(c.Hash()) if err != nil { return nil, false, err } - ok = dmh.Code == multihash.IDENTITY + ok = dmh.Code == mh.IDENTITY digest = dmh.Digest return digest, ok, nil }