From 84dfbbfda1ba89e881fe0964458a463eaf8fba13 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Mon, 6 Nov 2023 16:52:26 +0000 Subject: [PATCH 01/10] WIP --- go.mod | 4 + piecedirectory/piecedirectory.go | 128 +++++++++++++++++++++++++++++-- 2 files changed, 124 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 3d51753f8..79080abe6 100644 --- a/go.mod +++ b/go.mod @@ -377,3 +377,7 @@ require ( gonum.org/v1/gonum v0.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect ) + + +// replace github.com/filecoin-project/go-data-segment => /Users/ivan/Dev/go-data-segment + diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index c10249f1a..b85418c7d 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -37,6 +37,7 @@ import ( mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" ) var log = logging.Logger("piecedirectory") @@ -390,18 +391,39 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } - dataSegments, err := datasegment.ParseDataSegmentIndex(r) - if err != nil { - return nil, fmt.Errorf("could not parse data segment index: %w", err) + results := make(chan *datasegment.SegmentDesc) + var parseIndexErr error + ctx, cancel := context.WithCancel(context.Background()) + go func() { + parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, r, results) + close(results) + }() + + var segments []datasegment.SegmentDesc + cnt := -1 + for res := range results { + cnt++ + if err := res.Validate(); err != nil { + if errors.Is(err, datasegment.ErrValidation) { + continue + } else { + cancel() + return nil, xerrors.Errorf("could not calculate valid entries: got unknown error for entry %d: %w", cnt, err) + } + } + segments = append(segments, *res) } - segments, err := dataSegments.ValidEntries() - if err != nil { - return nil, fmt.Errorf("could not calculate valid entries: %w", err) + + cancel() + + if parseIndexErr != nil { + return nil, fmt.Errorf("could not parse data segment index: %w", parseIndexErr) } + if len(segments) == 0 { return nil, fmt.Errorf("no data segments found") } - + recs := make([]model.Record, 0) for _, s := range segments { segOffset := s.UnpaddedOffest() @@ -422,6 +444,96 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return recs, nil } +// func parsePieceWithDataSegmentIndex2(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { +// ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() +// dsis := datasegment.DataSegmentIndexStartOffset(ps) +// if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { +// return nil, fmt.Errorf("could not seek to data segment index: %w", err) +// } + +// dataSegments, err := datasegment.ParseDataSegmentIndex(r) +// if err != nil { +// return nil, fmt.Errorf("could not parse data segment index: %w", err) +// } +// now := time.Now() +// segments, err := dataSegments.ValidEntries() +// if err != nil { +// return nil, fmt.Errorf("could not calculate valid entries: %w", err) +// } + +// fmt.Printf("Total validation time: %.2f\n", time.Since(now).Seconds()) +// if len(segments) == 0 { +// return nil, fmt.Errorf("no data segments found") +// } + +// recs := make([]model.Record, 0) +// for _, s := range segments { +// segOffset := s.UnpaddedOffest() +// segSize := s.UnpaddedLength() + +// lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) +// subRecs, err := parseRecordsFromCar(lr) +// if err != nil { +// // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. +// return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) +// } +// for i := range subRecs { +// subRecs[i].Offset += segOffset +// } +// recs = append(recs, subRecs...) +// } + +// return recs, nil +// } + +// func parsePieceWithDataSegmentIndex3(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { +// ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() +// dsis := datasegment.DataSegmentIndexStartOffset(ps) +// if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { +// return nil, fmt.Errorf("could not seek to data segment index: %w", err) +// } +// results := make(chan *datasegment.SegmentDesc) +// var parseIndexErr error +// go func() { +// parseIndexErr = datasegment.ParseDataSegmentIndexAsync(context.Background(), r, results) +// close(results) +// }() + +// var segments []*datasegment.SegmentDesc +// for res := range results { +// segments = append(segments, res) +// } + +// validSegments := make([]datasegment.SegmentDesc, 0, len(segments)) + +// if parseIndexErr != nil { +// return nil, fmt.Errorf("could not parse data segment index: %w", parseIndexErr) +// } + +// if len(segments) == 0 { +// return nil, fmt.Errorf("no data segments found") +// } + +// recs := make([]model.Record, 0) +// for _, s := range segments { +// segOffset := s.UnpaddedOffest() +// segSize := s.UnpaddedLength() + +// lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) +// subRecs, err := parseRecordsFromCar(lr) +// if err != nil { +// // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. +// return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) +// } +// for i := range subRecs { +// subRecs[i].Offset += segOffset +// } +// recs = append(recs, subRecs...) +// } + +// return recs, nil +// } + // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal // corresponding to an unsealed sector for this method to work. It will try to build index // using all available deals and will exit as soon as it succeeds for one of the deals @@ -849,7 +961,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( bsR = carutil.NewMultiReaderAt( bytes.NewReader(headerBuf.Bytes()), // payload (CARv1) header bytes.NewReader(make([]byte, dataOffset)), // padding to account for the CARv2 wrapper - sectionReader, // payload (CARv1) data + sectionReader, // payload (CARv1) data ) } else { bsR = reader From 952315616d9e931c16f95486fb7754aca79a1ac0 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 7 Nov 2023 12:13:27 +0000 Subject: [PATCH 02/10] Add cached index reader --- go.mod | 4 +--- go.sum | 4 ++-- piecedirectory/piecedirectory.go | 21 ++++++++++++++++----- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 79080abe6..852d296af 100644 --- a/go.mod +++ b/go.mod @@ -378,6 +378,4 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect ) - -// replace github.com/filecoin-project/go-data-segment => /Users/ivan/Dev/go-data-segment - +replace github.com/filecoin-project/go-data-segment v0.0.1 => github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69 diff --git a/go.sum b/go.sum index e4c84e270..0d405c36a 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,6 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs= -github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc= @@ -954,6 +952,8 @@ github.com/ipni/storetheindex v0.8.1 h1:3uHclkcQWlIXQx+We4tbGF/XzoZYERz3so34xQbU github.com/ipni/storetheindex v0.8.1/go.mod h1:K4AR2bRll46YCWeGvob5foN/Z/kuovPdlUeJKOHVQHo= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= +github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69 h1:prNO3cadvXtRXItvhQoaJ0qfF3a1sPkZ8B6epyuCLPo= +github.com/ischasny/go-data-segment v0.0.0-20231107120541-53b3ec9a7c69/go.mod h1:4ZWx04e7pKuozznBnZarZFJQ+PeUEKPp/Lv7M6K7bog= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index b85418c7d..4dc812dee 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -33,7 +33,6 @@ import ( "github.com/ipld/go-car/v2/blockstore" carindex "github.com/ipld/go-car/v2/index" "github.com/jellydator/ttlcache/v2" - "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" @@ -43,7 +42,8 @@ import ( var log = logging.Logger("piecedirectory") const ( - MaxCachedReaders = 128 + MaxCachedReaders = 128 + DataSegmentReaderBufferSize = 4 << 20 // 4MiB ) type settings struct { @@ -387,15 +387,20 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + dsis := datasegment.DataSegmentIndexStartOffset(ps) if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } + + now := time.Now() + results := make(chan *datasegment.SegmentDesc) var parseIndexErr error ctx, cancel := context.WithCancel(context.Background()) go func() { - parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, r, results) + br := bufio.NewReaderSize(r, DataSegmentReaderBufferSize) + parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, br, results) close(results) }() @@ -424,6 +429,10 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return nil, fmt.Errorf("no data segments found") } + log.Debugf("parsing and validating data segment index of %d segments took %.2f seconds", len(segments), time.Since(now).Seconds()) + + now = time.Now() + recs := make([]model.Record, 0) for _, s := range segments { segOffset := s.UnpaddedOffest() @@ -441,6 +450,8 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } + log.Debugf("parsing data segments of %d records took %.2f seconds", len(recs), time.Since(now).Seconds()) + return recs, nil } @@ -1005,11 +1016,11 @@ func (s *SectorAccessorAsPieceReader) GetReader(ctx context.Context, minerAddr a } func isIdentity(c cid.Cid) (digest []byte, ok bool, err error) { - dmh, err := multihash.Decode(c.Hash()) + dmh, err := mh.Decode(c.Hash()) if err != nil { return nil, false, err } - ok = dmh.Code == multihash.IDENTITY + ok = dmh.Code == mh.IDENTITY digest = dmh.Digest return digest, ok, nil } From ec9d6a340d18a7f170c9eaa9555b1527fb2c638b Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 7 Nov 2023 14:49:40 +0000 Subject: [PATCH 03/10] further improvement --- piecedirectory/piecedirectory.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 4dc812dee..89475264a 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -43,7 +43,7 @@ var log = logging.Logger("piecedirectory") const ( MaxCachedReaders = 128 - DataSegmentReaderBufferSize = 4 << 20 // 4MiB + DataSegmentReaderBufferSize = 100 << 20 // 4MiB ) type settings struct { @@ -388,12 +388,14 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + now := time.Now() dsis := datasegment.DataSegmentIndexStartOffset(ps) if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } + log.Debugf("podsi: took %s to seek to the start offset", time.Since(now).String()) - now := time.Now() + now = time.Now() results := make(chan *datasegment.SegmentDesc) var parseIndexErr error @@ -429,7 +431,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return nil, fmt.Errorf("no data segments found") } - log.Debugf("parsing and validating data segment index of %d segments took %.2f seconds", len(segments), time.Since(now).Seconds()) + log.Debugf("podsi: parsing and validating data segment index of %d segments took %s", len(segments), time.Since(now).String()) now = time.Now() @@ -439,7 +441,8 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type segSize := s.UnpaddedLength() lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) - subRecs, err := parseRecordsFromCar(lr) + br := bufio.NewReaderSize(lr, DataSegmentReaderBufferSize) + subRecs, err := parseRecordsFromCar(br) if err != nil { // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) @@ -450,7 +453,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } - log.Debugf("parsing data segments of %d records took %.2f seconds", len(recs), time.Since(now).Seconds()) + log.Debugf("podsi: parsing data segments of %d records took %.2f seconds", len(recs), time.Since(now).String()) return recs, nil } From 19707c9fd7d6553043384e4d4bc63879c064fdb5 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 7 Nov 2023 15:02:47 +0000 Subject: [PATCH 04/10] further improvement --- piecedirectory/piecedirectory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 89475264a..7fe1b28e9 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -43,7 +43,7 @@ var log = logging.Logger("piecedirectory") const ( MaxCachedReaders = 128 - DataSegmentReaderBufferSize = 100 << 20 // 4MiB + DataSegmentReaderBufferSize = 10 << 20 // 4MiB ) type settings struct { From 779b5d3b3eebc99c0a44321d5e11bb3d74ac10bc Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 9 Nov 2023 18:58:54 +0000 Subject: [PATCH 05/10] add itest --- itests/framework/framework.go | 24 ++++-- piecedirectory/piecedirectory.go | 143 ++++++++++--------------------- 2 files changed, 60 insertions(+), 107 deletions(-) diff --git a/itests/framework/framework.go b/itests/framework/framework.go index 70acf1b61..f7bb624ff 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -77,9 +77,10 @@ import ( var Log = logging.Logger("boosttest") type TestFrameworkConfig struct { - Ensemble *kit.Ensemble - EnableLegacy bool - MaxStagingBytes int64 + Ensemble *kit.Ensemble + EnableLegacy bool + MaxStagingBytes int64 + ProvisionalWalletBalances int64 } type TestFramework struct { @@ -117,8 +118,17 @@ func WithEnsemble(e *kit.Ensemble) FrameworkOpts { } } +func SetProvisionalWalletBalances(balance int64) FrameworkOpts { + return func(tmc *TestFrameworkConfig) { + tmc.ProvisionalWalletBalances = balance + } +} + func NewTestFramework(ctx context.Context, t *testing.T, opts ...FrameworkOpts) *TestFramework { - fmc := &TestFrameworkConfig{} + fmc := &TestFrameworkConfig{ + // default provisional balance + ProvisionalWalletBalances: 1e18, + } for _, opt := range opts { opt(fmc) } @@ -224,7 +234,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) Log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) wg.Done() @@ -239,7 +249,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating publish storage deals wallet") psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) Log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) wg.Done() @@ -248,7 +258,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating deal collateral wallet") dealCollatAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, dealCollatAddr, amt) Log.Infof("Created deal collateral wallet %s with %d attoFil", dealCollatAddr, amt) wg.Done() diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 7fe1b28e9..6cdfa1db4 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" carutil "github.com/filecoin-project/boost/car" @@ -43,11 +44,12 @@ var log = logging.Logger("piecedirectory") const ( MaxCachedReaders = 128 - DataSegmentReaderBufferSize = 10 << 20 // 4MiB + DataSegmentReaderBufferSize = 4e6 // 4MiB ) type settings struct { - addIndexConcurrency int + addIndexConcurrency int + dataSegmentReaderBufferSize int } type Option func(*settings) @@ -58,6 +60,12 @@ func WithAddIndexConcurrency(c int) Option { } } +func WithDataSegmentReaderBufferSize(size int) Option { + return func(s *settings) { + s.dataSegmentReaderBufferSize = size + } +} + type PieceDirectory struct { settings *settings store *bdclient.Store @@ -85,7 +93,8 @@ func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThro addIdxThrottleSize: addIndexThrottleSize, addIdxThrottle: make(chan struct{}, addIndexThrottleSize), settings: &settings{ - addIndexConcurrency: config.DefaultAddIndexConcurrency, + addIndexConcurrency: config.DefaultAddIndexConcurrency, + dataSegmentReaderBufferSize: DataSegmentReaderBufferSize, }, } @@ -307,7 +316,7 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid // Try to parse data as containing a data segment index log.Debugw("add index: read index", "pieceCid", pieceCid) - recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader) + recs, err := parsePieceWithDataSegmentIndexCustom(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader, ps.settings.dataSegmentReaderBufferSize) if err != nil { log.Infow("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err) // Iterate over all the blocks in the piece to extract the index records @@ -386,6 +395,11 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { } func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { + return parsePieceWithDataSegmentIndexCustom(pieceCid, unpaddedSize, r, DataSegmentReaderBufferSize) +} + +func parsePieceWithDataSegmentIndexCustom(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int) ([]model.Record, error) { + var readCount int32 ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() now := time.Now() @@ -401,7 +415,8 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type var parseIndexErr error ctx, cancel := context.WithCancel(context.Background()) go func() { - br := bufio.NewReaderSize(r, DataSegmentReaderBufferSize) + cr := countingReader{Reader: r, cnt: &readCount} + br := bufio.NewReaderSize(&cr, dataSegmentReaderBufferSize) parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, br, results) close(results) }() @@ -436,16 +451,23 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type now = time.Now() recs := make([]model.Record, 0) - for _, s := range segments { + + // this can't be parallelised because reader needs to be used at different offsets + for i, s := range segments { segOffset := s.UnpaddedOffest() segSize := s.UnpaddedLength() - lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) - br := bufio.NewReaderSize(lr, DataSegmentReaderBufferSize) - subRecs, err := parseRecordsFromCar(br) + _, err := r.Seek(int64(segOffset), io.SeekStart) + if err != nil { + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", i, segOffset, err) + } + cr := countingReader{Reader: r, cnt: &readCount} + br := bufio.NewReaderSize(&cr, dataSegmentReaderBufferSize) + lr := io.LimitReader(br, int64(segSize)) + subRecs, err := parseRecordsFromCar(lr) if err != nil { // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. - return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", i, segOffset, err) } for i := range subRecs { subRecs[i].Offset += segOffset @@ -453,100 +475,21 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } - log.Debugf("podsi: parsing data segments of %d records took %.2f seconds", len(recs), time.Since(now).String()) + log.Debugf("podsi: parsing data segments of %d records took %.2f seconds and resulted into %d read operations", len(recs), time.Since(now).String(), readCount) return recs, nil } -// func parsePieceWithDataSegmentIndex2(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { -// ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() -// dsis := datasegment.DataSegmentIndexStartOffset(ps) -// if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { -// return nil, fmt.Errorf("could not seek to data segment index: %w", err) -// } - -// dataSegments, err := datasegment.ParseDataSegmentIndex(r) -// if err != nil { -// return nil, fmt.Errorf("could not parse data segment index: %w", err) -// } -// now := time.Now() -// segments, err := dataSegments.ValidEntries() -// if err != nil { -// return nil, fmt.Errorf("could not calculate valid entries: %w", err) -// } - -// fmt.Printf("Total validation time: %.2f\n", time.Since(now).Seconds()) -// if len(segments) == 0 { -// return nil, fmt.Errorf("no data segments found") -// } - -// recs := make([]model.Record, 0) -// for _, s := range segments { -// segOffset := s.UnpaddedOffest() -// segSize := s.UnpaddedLength() - -// lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) -// subRecs, err := parseRecordsFromCar(lr) -// if err != nil { -// // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. -// return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) -// } -// for i := range subRecs { -// subRecs[i].Offset += segOffset -// } -// recs = append(recs, subRecs...) -// } - -// return recs, nil -// } - -// func parsePieceWithDataSegmentIndex3(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { -// ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() -// dsis := datasegment.DataSegmentIndexStartOffset(ps) -// if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { -// return nil, fmt.Errorf("could not seek to data segment index: %w", err) -// } -// results := make(chan *datasegment.SegmentDesc) -// var parseIndexErr error -// go func() { -// parseIndexErr = datasegment.ParseDataSegmentIndexAsync(context.Background(), r, results) -// close(results) -// }() - -// var segments []*datasegment.SegmentDesc -// for res := range results { -// segments = append(segments, res) -// } - -// validSegments := make([]datasegment.SegmentDesc, 0, len(segments)) - -// if parseIndexErr != nil { -// return nil, fmt.Errorf("could not parse data segment index: %w", parseIndexErr) -// } - -// if len(segments) == 0 { -// return nil, fmt.Errorf("no data segments found") -// } - -// recs := make([]model.Record, 0) -// for _, s := range segments { -// segOffset := s.UnpaddedOffest() -// segSize := s.UnpaddedLength() - -// lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) -// subRecs, err := parseRecordsFromCar(lr) -// if err != nil { -// // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. -// return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) -// } -// for i := range subRecs { -// subRecs[i].Offset += segOffset -// } -// recs = append(recs, subRecs...) -// } - -// return recs, nil -// } +type countingReader struct { + io.Reader + + cnt *int32 +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + atomic.AddInt32(cr.cnt, 1) + return cr.Reader.Read(p) +} // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal // corresponding to an unsealed sector for this method to work. It will try to build index From b116410a4ee50477091a536d00197dd457ba41cf Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 9 Nov 2023 19:04:07 +0000 Subject: [PATCH 06/10] add itest --- itests/dummydeal_podsi_test.go | 239 +++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 itests/dummydeal_podsi_test.go diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go new file mode 100644 index 000000000..f74bc000e --- /dev/null +++ b/itests/dummydeal_podsi_test.go @@ -0,0 +1,239 @@ +package itests + +import ( + "bytes" + "context" + "fmt" + "io" + "math/bits" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-data-segment/datasegment" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode/data/builder" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" + multihash "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestDummyPodsiDealOnline(t *testing.T) { + randomFileSize := int(1e6) + + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + var opts []framework.FrameworkOpts + opts = append(opts, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10e9), framework.SetProvisionalWalletBalances(9e18)) + f := framework.NewTestFramework(ctx, t, opts...) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(5e18)) + require.NoError(t, err) + + tempdir := t.TempDir() + log.Debugw("using tempdir", "dir", tempdir) + + // create a random file + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, randomFileSize) + require.NoError(t, err) + + carFile := filepath.Join(tempdir, "test.car") + dataSegmentFile := filepath.Join(tempdir, "datasegment.dat") + + // pack it into the car + rootCid, err := createCar(t, carFile, []string{randomFilepath}) + require.NoError(t, err) + + // pack the car into data segement piece twice so that we have two segments + makeDataSegmentPiece(t, dataSegmentFile, []string{carFile, carFile}) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, tempdir) + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid := uuid.New() + + // Make a deal + res, err := f.MakeDummyDeal(dealUuid, dataSegmentFile, rootCid, server.URL+"/"+filepath.Base(dataSegmentFile), false) + require.NoError(t, err) + require.True(t, res.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) + + time.Sleep(2 * time.Second) + + // Wait for the first deal to be added to a sector and cleaned up so space is made + err = f.WaitForDealAddedToSector(dealUuid) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + +} + +func makeDataSegmentPiece(t *testing.T, dataSegmentFile string, subPieces []string) { + readers := make([]io.Reader, 0) + deals := make([]abi.PieceInfo, 0) + for _, sp := range subPieces { + arg, err := os.Open(sp) + require.NoError(t, err) + + readers = append(readers, arg) + cp := new(commp.Calc) + io.Copy(cp, arg) + rawCommP, size, err := cp.Digest() + require.NoError(t, err) + + arg.Seek(0, io.SeekStart) + c, _ := commcid.DataCommitmentV1ToCID(rawCommP) + subdeal := abi.PieceInfo{ + Size: abi.PaddedPieceSize(size), + PieceCID: c, + } + deals = append(deals, subdeal) + } + require.NotEqual(t, 0, len(deals)) + + _, size, err := datasegment.ComputeDealPlacement(deals) + require.NoError(t, err) + + overallSize := abi.PaddedPieceSize(size) + // we need to make this the 'next' power of 2 in order to have space for the index + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + + a, err := datasegment.NewAggregate(abi.PaddedPieceSize(next), deals) + require.NoError(t, err) + out, err := a.AggregateObjectReader(readers) + require.NoError(t, err) + + // open output file + fo, err := os.Create(dataSegmentFile) + require.NoError(t, err) + defer fo.Close() + + written, err := io.Copy(fo, out) + require.NoError(t, err) + require.NotZero(t, written) +} + +func createCar(t *testing.T, carFile string, files []string) (cid.Cid, error) { + // make a cid with the right length that we eventually will patch with the root. + hasher, err := multihash.GetHasher(multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + digest := hasher.Sum([]byte{}) + hash, err := multihash.Encode(digest, multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + proxyRoot := cid.NewCidV1(uint64(multicodec.DagPb), hash) + + options := []car.Option{} + + cdest, err := blockstore.OpenReadWrite(carFile, []cid.Cid{proxyRoot}, options...) + + if err != nil { + return cid.Undef, err + } + + // Write the unixfs blocks into the store. + root, err := writeFiles(context.Background(), false, cdest, files...) + if err != nil { + return cid.Undef, err + } + + if err := cdest.Finalize(); err != nil { + return cid.Undef, err + } + // re-open/finalize with the final root. + return root, car.ReplaceRootsInFile(carFile, []cid.Cid{root}) +} + +func writeFiles(ctx context.Context, noWrap bool, bs *blockstore.ReadWrite, paths ...string) (cid.Cid, error) { + ls := cidlink.DefaultLinkSystem() + ls.TrustedStorage = true + ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) { + cl, ok := l.(cidlink.Link) + if !ok { + return nil, fmt.Errorf("not a cidlink") + } + blk, err := bs.Get(ctx, cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewBuffer(blk.RawData()), nil + } + ls.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(l ipld.Link) error { + cl, ok := l.(cidlink.Link) + if !ok { + return fmt.Errorf("not a cidlink") + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + bs.Put(ctx, blk) + return nil + }, nil + } + + topLevel := make([]dagpb.PBLink, 0, len(paths)) + for _, p := range paths { + l, size, err := builder.BuildUnixFSRecursive(p, &ls) + if err != nil { + return cid.Undef, err + } + if noWrap { + rcl, ok := l.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", l) + } + return rcl.Cid, nil + } + name := path.Base(p) + entry, err := builder.BuildUnixFSDirectoryEntry(name, int64(size), l) + if err != nil { + return cid.Undef, err + } + topLevel = append(topLevel, entry) + } + + // make a directory for the file(s). + + root, _, err := builder.BuildUnixFSDirectory(topLevel, &ls) + if err != nil { + return cid.Undef, nil + } + rcl, ok := root.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", root) + } + + return rcl.Cid, nil +} From 8db688ac419e21eb7a400dde8a42929963036b86 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 28 Nov 2023 18:09:37 +0000 Subject: [PATCH 07/10] Add podsi buffer size configuration --- node/config/def.go | 3 +++ node/config/doc_gen.go | 15 +++++++++++++++ node/config/types.go | 9 +++++++++ node/modules/piecedirectory.go | 3 ++- piecedirectory/piece_directory_test.go | 2 +- piecedirectory/piecedirectory.go | 19 ++++++++----------- 6 files changed, 38 insertions(+), 13 deletions(-) diff --git a/node/config/def.go b/node/config/def.go index 989315a98..9843bebc4 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -225,6 +225,9 @@ func DefaultBoost() *Boost { NChunks: 5, AllowPrivateIPs: false, }, + ExperimentalConfig: ExperimentalConfig{ + PodsiDataSegmentReaderBufferSize: 4e6, // 4MiB + }, } return cfg } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index f5fc43cd2..e9df5db1c 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -115,6 +115,12 @@ your node if metadata log is disabled`, Comment: ``, }, + { + Name: "ExperimentalConfig", + Type: "ExperimentalConfig", + + Comment: `Experimental config`, + }, }, "Common": []DocField{ { @@ -452,6 +458,15 @@ If this parameter is not set, boost will serve data from the endpoint configured in SectorIndexApiInfo.`, }, }, + "ExperimentalConfig": []DocField{ + { + Name: "PodsiDataSegmentReaderBufferSize", + Type: "int", + + Comment: `DataSegmentReaderBufferSize sets the size of the read buffer to use for podsi deal index parsing. +Default is 4 MiB.`, + }, + }, "FeeConfig": []DocField{ { Name: "MaxPublishDealsFee", diff --git a/node/config/types.go b/node/config/types.go index 8585d1105..dea8070b5 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -54,6 +54,9 @@ type Boost struct { LotusFees FeeConfig DAGStore lotus_config.DAGStoreConfig IndexProvider IndexProviderConfig + + // Experimental config + ExperimentalConfig ExperimentalConfig } func (b *Boost) GetDealmakingConfig() lotus_config.DealmakingConfig { @@ -445,3 +448,9 @@ type HttpDownloadConfig struct { // The default is false. AllowPrivateIPs bool } + +type ExperimentalConfig struct { + // DataSegmentReaderBufferSize sets the size of the read buffer to use for podsi deal index parsing. + // Default is 4 MiB. + PodsiDataSegmentReaderBufferSize int +} diff --git a/node/modules/piecedirectory.go b/node/modules/piecedirectory.go index d99088e40..1ad90fe5b 100644 --- a/node/modules/piecedirectory.go +++ b/node/modules/piecedirectory.go @@ -138,7 +138,8 @@ func NewPieceDirectory(cfg *config.Boost) func(lc fx.Lifecycle, maddr dtypes.Min pdctx, cancel := context.WithCancel(context.Background()) pd := piecedirectory.NewPieceDirectory(store, sa, cfg.LocalIndexDirectory.ParallelAddIndexLimit, - piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency)) + piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency), + piecedirectory.WithDataSegmentReaderBufferSize(cfg.ExperimentalConfig.PodsiDataSegmentReaderBufferSize)) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { err := sa.Start(ctx, log) diff --git a/piecedirectory/piece_directory_test.go b/piecedirectory/piece_directory_test.go index fb8584d04..2dac2c5e3 100644 --- a/piecedirectory/piece_directory_test.go +++ b/piecedirectory/piece_directory_test.go @@ -24,7 +24,7 @@ func TestSegmentParsing(t *testing.T) { rd, err := os.Open("testdata/segment.car") require.NoError(t, err) - recs, err := parsePieceWithDataSegmentIndex(pieceCid, carSize, rd) + recs, err := parsePieceWithDataSegmentIndex(pieceCid, carSize, rd, 4e6) // 4MiB buffer require.NoError(t, err) t.Log(recs) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 6cdfa1db4..c72fb821d 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -43,8 +43,7 @@ import ( var log = logging.Logger("piecedirectory") const ( - MaxCachedReaders = 128 - DataSegmentReaderBufferSize = 4e6 // 4MiB + MaxCachedReaders = 128 ) type settings struct { @@ -316,7 +315,7 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid // Try to parse data as containing a data segment index log.Debugw("add index: read index", "pieceCid", pieceCid) - recs, err := parsePieceWithDataSegmentIndexCustom(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader, ps.settings.dataSegmentReaderBufferSize) + recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader, ps.settings.dataSegmentReaderBufferSize) if err != nil { log.Infow("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err) // Iterate over all the blocks in the piece to extract the index records @@ -394,11 +393,7 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { return recs, nil } -func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { - return parsePieceWithDataSegmentIndexCustom(pieceCid, unpaddedSize, r, DataSegmentReaderBufferSize) -} - -func parsePieceWithDataSegmentIndexCustom(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int) ([]model.Record, error) { +func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int) ([]model.Record, error) { var readCount int32 ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() @@ -407,7 +402,7 @@ func parsePieceWithDataSegmentIndexCustom(pieceCid cid.Cid, unpaddedSize int64, if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } - log.Debugf("podsi: took %s to seek to the start offset", time.Since(now).String()) + log.Infof("podsi: took %s to seek to the start offset", time.Since(now).String()) now = time.Now() @@ -446,7 +441,9 @@ func parsePieceWithDataSegmentIndexCustom(pieceCid cid.Cid, unpaddedSize int64, return nil, fmt.Errorf("no data segments found") } - log.Debugf("podsi: parsing and validating data segment index of %d segments took %s", len(segments), time.Since(now).String()) + log.Infow("podsi: parsing and validating data segment index", "buffer_mib", dataSegmentReaderBufferSize/1e6, "segments", len(segments), "time", time.Since(now).String(), "reads", readCount) + + readCount = 0 now = time.Now() @@ -475,7 +472,7 @@ func parsePieceWithDataSegmentIndexCustom(pieceCid cid.Cid, unpaddedSize int64, recs = append(recs, subRecs...) } - log.Debugf("podsi: parsing data segments of %d records took %.2f seconds and resulted into %d read operations", len(recs), time.Since(now).String(), readCount) + log.Infow("podsi: parsing data segments", "buffer_mib", dataSegmentReaderBufferSize/1e6, "records", len(recs), "time", time.Since(now).String(), "reads", readCount) return recs, nil } From d77b0a330512cacdf651d9a1be9f3a47a97d0146 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 28 Nov 2023 18:14:43 +0000 Subject: [PATCH 08/10] made configuration int64 --- node/config/def.go | 2 +- node/config/doc_gen.go | 6 +++--- node/config/types.go | 4 ++-- node/modules/piecedirectory.go | 2 +- piecedirectory/piecedirectory.go | 10 +++++----- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/node/config/def.go b/node/config/def.go index 9843bebc4..f8f0c1f37 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -226,7 +226,7 @@ func DefaultBoost() *Boost { AllowPrivateIPs: false, }, ExperimentalConfig: ExperimentalConfig{ - PodsiDataSegmentReaderBufferSize: 4e6, // 4MiB + PodsiDataSegmentReaderBufferSizeBytes: 4e6, // 4MiB }, } return cfg diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index e9df5db1c..3c6909b4c 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -460,10 +460,10 @@ configured in SectorIndexApiInfo.`, }, "ExperimentalConfig": []DocField{ { - Name: "PodsiDataSegmentReaderBufferSize", - Type: "int", + Name: "PodsiDataSegmentReaderBufferSizeBytes", + Type: "int64", - Comment: `DataSegmentReaderBufferSize sets the size of the read buffer to use for podsi deal index parsing. + Comment: `PodsiDataSegmentReaderBufferSizeBytes sets the size of the read buffer to use for podsi deal index parsing. Default is 4 MiB.`, }, }, diff --git a/node/config/types.go b/node/config/types.go index dea8070b5..5cd0adafe 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -450,7 +450,7 @@ type HttpDownloadConfig struct { } type ExperimentalConfig struct { - // DataSegmentReaderBufferSize sets the size of the read buffer to use for podsi deal index parsing. + // PodsiDataSegmentReaderBufferSizeBytes sets the size of the read buffer to use for podsi deal index parsing. // Default is 4 MiB. - PodsiDataSegmentReaderBufferSize int + PodsiDataSegmentReaderBufferSizeBytes int64 } diff --git a/node/modules/piecedirectory.go b/node/modules/piecedirectory.go index 1ad90fe5b..4cf2e71f2 100644 --- a/node/modules/piecedirectory.go +++ b/node/modules/piecedirectory.go @@ -139,7 +139,7 @@ func NewPieceDirectory(cfg *config.Boost) func(lc fx.Lifecycle, maddr dtypes.Min pd := piecedirectory.NewPieceDirectory(store, sa, cfg.LocalIndexDirectory.ParallelAddIndexLimit, piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency), - piecedirectory.WithDataSegmentReaderBufferSize(cfg.ExperimentalConfig.PodsiDataSegmentReaderBufferSize)) + piecedirectory.WithDataSegmentReaderBufferSize(cfg.ExperimentalConfig.PodsiDataSegmentReaderBufferSizeBytes)) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { err := sa.Start(ctx, log) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index c72fb821d..cf2ece337 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -48,7 +48,7 @@ const ( type settings struct { addIndexConcurrency int - dataSegmentReaderBufferSize int + dataSegmentReaderBufferSize int64 } type Option func(*settings) @@ -59,7 +59,7 @@ func WithAddIndexConcurrency(c int) Option { } } -func WithDataSegmentReaderBufferSize(size int) Option { +func WithDataSegmentReaderBufferSize(size int64) Option { return func(s *settings) { s.dataSegmentReaderBufferSize = size } @@ -393,7 +393,7 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { return recs, nil } -func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int) ([]model.Record, error) { +func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader, dataSegmentReaderBufferSize int64) ([]model.Record, error) { var readCount int32 ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() @@ -411,7 +411,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type ctx, cancel := context.WithCancel(context.Background()) go func() { cr := countingReader{Reader: r, cnt: &readCount} - br := bufio.NewReaderSize(&cr, dataSegmentReaderBufferSize) + br := bufio.NewReaderSize(&cr, int(dataSegmentReaderBufferSize)) parseIndexErr = datasegment.ParseDataSegmentIndexAsync(ctx, br, results) close(results) }() @@ -459,7 +459,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", i, segOffset, err) } cr := countingReader{Reader: r, cnt: &readCount} - br := bufio.NewReaderSize(&cr, dataSegmentReaderBufferSize) + br := bufio.NewReaderSize(&cr, int(dataSegmentReaderBufferSize)) lr := io.LimitReader(br, int64(segSize)) subRecs, err := parseRecordsFromCar(lr) if err != nil { From 8099ece90e9569b5594a576f2eab05c8a3fc86d0 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Wed, 29 Nov 2023 13:32:29 +0000 Subject: [PATCH 09/10] test --- piecedirectory/piecedirectory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index cf2ece337..4a5ef7cbe 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -100,6 +100,7 @@ func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThro for _, opt := range opts { opt(pd.settings) } + pd.settings.dataSegmentReaderBufferSize = 127 * (pd.settings.dataSegmentReaderBufferSize / 127) if pd.settings.addIndexConcurrency == 0 { pd.settings.addIndexConcurrency = config.DefaultAddIndexConcurrency From ca4b8cf0d4c75c2412a28712f3995064a1511318 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 30 Nov 2023 11:00:59 +0000 Subject: [PATCH 10/10] wip --- itests/dummydeal_podsi_test.go | 2 +- piecedirectory/piecedirectory.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go index f74bc000e..7a98b5a64 100644 --- a/itests/dummydeal_podsi_test.go +++ b/itests/dummydeal_podsi_test.go @@ -35,7 +35,7 @@ import ( ) func TestDummyPodsiDealOnline(t *testing.T) { - randomFileSize := int(1e6) + randomFileSize := int(4e6) ctx := context.Background() log := framework.Log diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 4a5ef7cbe..c4d170348 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -92,15 +92,13 @@ func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThro addIdxThrottleSize: addIndexThrottleSize, addIdxThrottle: make(chan struct{}, addIndexThrottleSize), settings: &settings{ - addIndexConcurrency: config.DefaultAddIndexConcurrency, - dataSegmentReaderBufferSize: DataSegmentReaderBufferSize, + addIndexConcurrency: config.DefaultAddIndexConcurrency, }, } for _, opt := range opts { opt(pd.settings) } - pd.settings.dataSegmentReaderBufferSize = 127 * (pd.settings.dataSegmentReaderBufferSize / 127) if pd.settings.addIndexConcurrency == 0 { pd.settings.addIndexConcurrency = config.DefaultAddIndexConcurrency