diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 72f15d94f0..349bf239d1 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -425,7 +425,7 @@ func gatherAllChunks(ctx context.Context, cs chunkSource, idx tableIndex, stats return nil, nil, err } - bytes, err := cs.get(ctx, h, stats) + bytes, _, err := cs.get(ctx, h, nil, stats) if err != nil { return nil, nil, err } @@ -907,7 +907,7 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats * return chk, nil } - bytes, err := csc.cs.get(ctx, h, stats) + bytes, _, err := csc.cs.get(ctx, h, nil, stats) if bytes == nil || err != nil { return nil, err } @@ -919,7 +919,8 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats * // has returns true if the chunk is in the ChunkSource. This is not related to what is cached, just a helper. func (csc *simpleChunkSourceCache) has(h hash.Hash) (bool, error) { - return csc.cs.has(h) + res, _, err := csc.cs.has(h, nil) + return res, err } // addresses get all chunk addresses of the ChunkSource as a hash.HashSet. diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index 3ccd0183c5..1b1f55d826 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -64,42 +64,63 @@ func openReader(file string) (io.ReaderAt, uint64, error) { return f, uint64(stat.Size()), nil } -func (acs archiveChunkSource) has(h hash.Hash) (bool, error) { - return acs.aRdr.has(h), nil +func (acs archiveChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { + res := acs.aRdr.has(h) + if res && keeper != nil && keeper(h) { + return false, gcBehavior_Block, nil + } + return res, gcBehavior_Continue, nil } -func (acs archiveChunkSource) hasMany(addrs []hasRecord) (bool, error) { +func (acs archiveChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { // single threaded first pass. foundAll := true for i, addr := range addrs { - if acs.aRdr.has(*(addr.a)) { + h := *addr.a + if acs.aRdr.has(h) { + if keeper != nil && keeper(h) { + return false, gcBehavior_Block, nil + } addrs[i].has = true } else { foundAll = false } } - return !foundAll, nil + return !foundAll, gcBehavior_Continue, nil } -func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { - // ctx, stats ? NM4. - return acs.aRdr.get(h) +func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { + res, err := acs.aRdr.get(h) + if err != nil { + return nil, gcBehavior_Continue, err + } + if res != nil && keeper != nil && keeper(h) { + return nil, gcBehavior_Block, nil + } + return res, gcBehavior_Continue, nil } -func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { +func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { // single threaded first pass. foundAll := true for i, req := range reqs { - data, err := acs.aRdr.get(*req.a) - if err != nil || data == nil { + h := *req.a + data, err := acs.aRdr.get(h) + if err != nil { + return true, gcBehavior_Continue, err + } + if data == nil { foundAll = false } else { + if keeper != nil && keeper(h) { + return true, gcBehavior_Block, nil + } chunk := chunks.NewChunk(data) found(ctx, &chunk) reqs[i].found = true } } - return !foundAll, nil + return !foundAll, gcBehavior_Continue, nil } // iterate iterates over the archive chunks. The callback is called for each chunk in the archive. This is not optimized @@ -146,14 +167,14 @@ func (acs archiveChunkSource) clone() (chunkSource, error) { return archiveChunkSource{acs.file, rdr}, nil } -func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) { - return nil, errors.New("Archive chunk source does not support getRecordRanges") +func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord, _ keeperF) (map[hash.Hash]Range, gcBehavior, error) { + return nil, gcBehavior_Continue, errors.New("Archive chunk source does not support getRecordRanges") } -func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) { found(ctx, ChunkToCompressedChunk(*chk)) - }, stats) + }, keeper, stats) } func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { diff --git a/go/store/nbs/archive_test.go b/go/store/nbs/archive_test.go index da3c324cf2..c78d3b5710 100644 --- a/go/store/nbs/archive_test.go +++ b/go/store/nbs/archive_test.go @@ -655,28 +655,28 @@ type testChunkSource struct { var _ chunkSource = (*testChunkSource)(nil) -func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) { +func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ keeperF, _ *Stats) ([]byte, gcBehavior, error) { for _, chk := range tcs.chunks { if chk.Hash() == h { - return chk.Data(), nil + return chk.Data(), gcBehavior_Continue, nil } } - return nil, errors.New("not found") + return nil, gcBehavior_Continue, errors.New("not found") } -func (tcs *testChunkSource) has(h hash.Hash) (bool, error) { +func (tcs *testChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { panic("never used") } -func (tcs *testChunkSource) hasMany(addrs []hasRecord) (bool, error) { +func (tcs *testChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { panic("never used") } -func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { +func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { panic("never used") } -func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { panic("never used") } @@ -700,7 +700,7 @@ func (tcs *testChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, panic("never used") } -func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { +func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) { panic("never used") } diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index cc58ffea89..816a931462 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -115,25 +115,31 @@ func (s3p awsTablePersister) key(k string) string { return k } -func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { - name, data, chunkCount, err := mt.write(haver, stats) - +func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { + name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil } if chunkCount == 0 { - return emptyChunkSource{}, nil + return emptyChunkSource{}, gcBehavior_Continue, nil } err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String()) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err } tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name} - return newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize) + src, err := newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize) + if err != nil { + return emptyChunkSource{}, gcBehavior_Continue, err + } + return src, gcBehavior_Continue, nil } func (s3p awsTablePersister) multipartUpload(ctx context.Context, r io.Reader, sz uint64, key string) error { diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 4ab92c1651..3187f2e08b 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -90,7 +90,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := makeFakeS3(t) s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} - src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) defer src.close() @@ -108,7 +108,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := makeFakeS3(t) s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}} - src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) defer src.close() if assert.True(mustUint32(src.count()) > 0) { @@ -133,7 +133,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := makeFakeS3(t) s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} - src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{}) + src, _, err := s3p.Persist(context.Background(), mt, existingTable, nil, &Stats{}) require.NoError(t, err) defer src.close() assert.True(mustUint32(src.count()) == 0) @@ -148,7 +148,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1} s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} - _, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + _, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) assert.Error(err) }) } @@ -306,7 +306,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { for i := 0; i < len(chunks); i++ { mt := newMemTable(uint64(2 * targetPartSize)) mt.addChunk(computeAddr(chunks[i]), chunks[i]) - cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) sources = append(sources, cs) } @@ -379,7 +379,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { } var err error - sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{}) + sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) } src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{}) @@ -417,9 +417,9 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { rand.Read(medChunks[i]) mt.addChunk(computeAddr(medChunks[i]), medChunks[i]) } - cs1, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + cs1, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) - cs2, err := s3p.Persist(context.Background(), mtb, nil, &Stats{}) + cs2, _, err := s3p.Persist(context.Background(), mtb, nil, nil, &Stats{}) require.NoError(t, err) sources := chunkSources{cs1, cs2} @@ -450,7 +450,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { mt := newMemTable(uint64(2 * targetPartSize)) mt.addChunk(computeAddr(smallChunks[i]), smallChunks[i]) var err error - sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{}) + sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) } @@ -461,7 +461,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { } var err error - cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) + cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) sources = append(sources, cs) @@ -474,7 +474,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { mt.addChunk(computeAddr(medChunks[i]), medChunks[i]) } - cs, err = s3p.Persist(context.Background(), mt, nil, &Stats{}) + cs, _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) sources = append(sources, cs) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 9aca6ecd73..274bbcef2e 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -45,12 +45,16 @@ var _ tableFilePersister = &blobstorePersister{} // Persist makes the contents of mt durable. Chunks already present in // |haver| may be dropped in the process. -func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { - address, data, chunkCount, err := mt.write(haver, stats) +func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { + address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { - return emptyChunkSource{}, err - } else if chunkCount == 0 { - return emptyChunkSource{}, nil + return emptyChunkSource{}, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil + } + if chunkCount == 0 { + return emptyChunkSource{}, gcBehavior_Continue, nil } name := address.String() @@ -59,24 +63,28 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver // first write table records and tail (index+footer) as separate blobs eg, ectx := errgroup.WithContext(ctx) - eg.Go(func() (err error) { - _, err = bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records)) - return + eg.Go(func() error { + _, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records)) + return err }) - eg.Go(func() (err error) { - _, err = bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail)) - return + eg.Go(func() error { + _, err := bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail)) + return err }) if err = eg.Wait(); err != nil { - return nil, err + return nil, gcBehavior_Continue, err } // then concatenate into a final blob if _, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}); err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err } rdr := &bsTableReaderAt{name, bsp.bs} - return newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) + src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) + if err != nil { + return emptyChunkSource{}, gcBehavior_Continue, err + } + return src, gcBehavior_Continue, nil } // ConjoinAll implements tablePersister. diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index 170cc43cb6..6f0508b2e8 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -51,7 +51,7 @@ func TestCmpChunkTableWriter(t *testing.T) { found := make([]CompressedChunk, 0) eg, egCtx := errgroup.WithContext(ctx) - _, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, &Stats{}) + _, _, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, nil, &Stats{}) require.NoError(t, err) require.NoError(t, eg.Wait()) @@ -146,7 +146,7 @@ func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader) reqs := toGetRecords(hashes) found := make([]*chunks.Chunk, 0) eg, ctx := errgroup.WithContext(ctx) - _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, &Stats{}) + _, _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, nil, &Stats{}) if err != nil { return nil, err } diff --git a/go/store/nbs/conjoiner_test.go b/go/store/nbs/conjoiner_test.go index 846aa4c7f4..a9f64aa222 100644 --- a/go/store/nbs/conjoiner_test.go +++ b/go/store/nbs/conjoiner_test.go @@ -63,7 +63,7 @@ func makeTestSrcs(t *testing.T, tableSizes []uint32, p tablePersister) (srcs chu c := nextChunk() mt.addChunk(computeAddr(c), c) } - cs, err := p.Persist(context.Background(), mt, nil, &Stats{}) + cs, _, err := p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) c, err := cs.clone() require.NoError(t, err) @@ -159,11 +159,11 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { var ok bool for _, act := range actualSrcs { var err error - ok, err = act.has(rec.a) + ok, _, err = act.has(rec.a, nil) require.NoError(t, err) var buf []byte if ok { - buf, err = act.get(ctx, rec.a, stats) + buf, _, err = act.get(ctx, rec.a, nil, stats) require.NoError(t, err) assert.Equal(t, rec.data, buf) break @@ -180,7 +180,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { mt := newMemTable(testMemTableSize) data := []byte{0xde, 0xad} mt.addChunk(computeAddr(data), data) - src, err := p.Persist(context.Background(), mt, nil, &Stats{}) + src, _, err := p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) defer src.close() return tableSpec{src.hash(), mustUint32(src.count())} diff --git a/go/store/nbs/empty_chunk_source.go b/go/store/nbs/empty_chunk_source.go index 5df2696c33..8d00c820de 100644 --- a/go/store/nbs/empty_chunk_source.go +++ b/go/store/nbs/empty_chunk_source.go @@ -34,24 +34,24 @@ import ( type emptyChunkSource struct{} -func (ecs emptyChunkSource) has(h hash.Hash) (bool, error) { - return false, nil +func (ecs emptyChunkSource) has(h hash.Hash, _ keeperF) (bool, gcBehavior, error) { + return false, gcBehavior_Continue, nil } -func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) { - return true, nil +func (ecs emptyChunkSource) hasMany(addrs []hasRecord, _ keeperF) (bool, gcBehavior, error) { + return true, gcBehavior_Continue, nil } -func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { - return nil, nil +func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { + return nil, gcBehavior_Continue, nil } -func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { - return true, nil +func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + return true, gcBehavior_Continue, nil } -func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { - return true, nil +func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + return true, gcBehavior_Continue, nil } func (ecs emptyChunkSource) count() (uint32, error) { @@ -74,8 +74,8 @@ func (ecs emptyChunkSource) reader(context.Context) (io.ReadCloser, uint64, erro return io.NopCloser(&bytes.Buffer{}), 0, nil } -func (ecs emptyChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { - return map[hash.Hash]Range{}, nil +func (ecs emptyChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) { + return map[hash.Hash]Range{}, gcBehavior_Continue, nil } func (ecs emptyChunkSource) currentSize() uint64 { diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index 5868b98217..83175088f2 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -86,16 +86,23 @@ func (ftp *fsTablePersister) Exists(ctx context.Context, name hash.Hash, chunkCo return archiveFileExists(ctx, ftp.dir, name) } -func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { +func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { t1 := time.Now() defer stats.PersistLatency.SampleTimeSince(t1) - name, data, chunkCount, err := mt.write(haver, stats) + name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil } - return ftp.persistTable(ctx, name, data, chunkCount, stats) + src, err := ftp.persistTable(ctx, name, data, chunkCount, stats) + if err != nil { + return emptyChunkSource{}, gcBehavior_Continue, err + } + return src, gcBehavior_Continue, nil } func (ftp *fsTablePersister) Path() string { diff --git a/go/store/nbs/file_table_persister_test.go b/go/store/nbs/file_table_persister_test.go index 00e57d2fc9..4adde94986 100644 --- a/go/store/nbs/file_table_persister_test.go +++ b/go/store/nbs/file_table_persister_test.go @@ -96,7 +96,8 @@ func persistTableData(p tablePersister, chunx ...[]byte) (src chunkSource, err e return nil, fmt.Errorf("memTable too full to add %s", computeAddr(c)) } } - return p.Persist(context.Background(), mt, nil, &Stats{}) + src, _, err = p.Persist(context.Background(), mt, nil, nil, &Stats{}) + return src, err } func TestFSTablePersisterPersistNoData(t *testing.T) { @@ -113,7 +114,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) { defer file.RemoveAll(dir) fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{}) - src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{}) + src, _, err := fts.Persist(context.Background(), mt, existingTable, nil, &Stats{}) require.NoError(t, err) assert.True(mustUint32(src.count()) == 0) @@ -177,7 +178,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) { } var err error - sources[0], err = fts.Persist(ctx, mt, nil, &Stats{}) + sources[0], _, err = fts.Persist(ctx, mt, nil, nil, &Stats{}) require.NoError(t, err) sources[1], err = sources[0].clone() require.NoError(t, err) diff --git a/go/store/nbs/frag/main.go b/go/store/nbs/frag/main.go index 424259916e..7eb9c2d8db 100644 --- a/go/store/nbs/frag/main.go +++ b/go/store/nbs/frag/main.go @@ -153,14 +153,14 @@ func main() { if i+1 == numGroups { // last group go func(i int) { defer wg.Done() - reads[i], _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:].HashSet(), 0) + reads[i], _, _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:].HashSet(), 0, nil) d.PanicIfError(err) }(i) continue } go func(i int) { defer wg.Done() - reads[i], _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:(i+1)*branchFactor].HashSet(), 0) + reads[i], _, _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:(i+1)*branchFactor].HashSet(), 0, nil) d.PanicIfError(err) }(i) } diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 64846797ad..e8790ec7e8 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -118,7 +118,9 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo return nil } - err = gcs.newGen.GetMany(ctx, notFound, func(ctx context.Context, chunk *chunks.Chunk) { + hashes = notFound + notFound = hashes.Copy() + err = gcs.newGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) { func() { mu.Lock() defer mu.Unlock() @@ -143,14 +145,18 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo } func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error { + return gcs.getManyCompressed(ctx, hashes, found, gcDependencyMode_TakeDependency) +} + +func (gcs *GenerationalNBS) getManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk), gcDepMode gcDependencyMode) error { var mu sync.Mutex notInOldGen := hashes.Copy() - err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) { + err := gcs.oldGen.getManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) { mu.Lock() delete(notInOldGen, chunk.Hash()) mu.Unlock() found(ctx, chunk) - }) + }, gcDepMode) if err != nil { return err } @@ -159,12 +165,12 @@ func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.H } notFound := notInOldGen.Copy() - err = gcs.newGen.GetManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) { + err = gcs.newGen.getManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) { mu.Lock() delete(notFound, chunk.Hash()) mu.Unlock() found(ctx, chunk) - }) + }, gcDepMode) if err != nil { return err } @@ -174,7 +180,7 @@ func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.H // The missing chunks may be ghost chunks. if gcs.ghostGen != nil { - return gcs.ghostGen.GetManyCompressed(ctx, notFound, found) + return gcs.ghostGen.getManyCompressed(ctx, notFound, found, gcDepMode) } return nil } @@ -202,14 +208,30 @@ func (gcs *GenerationalNBS) Has(ctx context.Context, h hash.Hash) (bool, error) } // HasMany returns a new HashSet containing any members of |hashes| that are absent from the store. -func (gcs *GenerationalNBS) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) { - gcs.newGen.mu.RLock() - defer gcs.newGen.mu.RUnlock() - return gcs.hasMany(toHasRecords(hashes)) +func (gcs *GenerationalNBS) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + absent, err := gcs.newGen.HasMany(ctx, hashes) + if err != nil { + return nil, err + } + if len(absent) == 0 { + return nil, err + } + + absent, err = gcs.oldGen.HasMany(ctx, absent) + if err != nil { + return nil, err + } + if len(absent) == 0 || gcs.ghostGen == nil { + return nil, err + } + + return gcs.ghostGen.HasMany(ctx, absent) } -func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err error) { - absent, err = gcs.newGen.hasMany(recs) +// |refCheck| is called from write processes in newGen, so it is called with +// newGen.mu held. oldGen.mu is not held however. +func (gcs *GenerationalNBS) refCheck(recs []hasRecord) (hash.HashSet, error) { + absent, err := gcs.newGen.refCheck(recs) if err != nil { return nil, err } else if len(absent) == 0 { @@ -219,12 +241,11 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err absent, err = func() (hash.HashSet, error) { gcs.oldGen.mu.RLock() defer gcs.oldGen.mu.RUnlock() - return gcs.oldGen.hasMany(recs) + return gcs.oldGen.refCheck(recs) }() if err != nil { return nil, err } - if len(absent) == 0 || gcs.ghostGen == nil { return absent, nil } @@ -237,7 +258,7 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err // to Flush(). Put may be called concurrently with other calls to Put(), // Get(), GetMany(), Has() and HasMany(). func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error { - return gcs.newGen.putChunk(ctx, c, getAddrs, gcs.hasMany) + return gcs.newGen.putChunk(ctx, c, getAddrs, gcs.refCheck) } // Returns the NomsBinFormat with which this ChunkSource is compatible. @@ -277,7 +298,7 @@ func (gcs *GenerationalNBS) Root(ctx context.Context) (hash.Hash, error) { // persisted root hash from last to current (or keeps it the same). // If last doesn't match the root in persistent storage, returns false. func (gcs *GenerationalNBS) Commit(ctx context.Context, current, last hash.Hash) (bool, error) { - return gcs.newGen.commit(ctx, current, last, gcs.hasMany) + return gcs.newGen.commit(ctx, current, last, gcs.refCheck) } // Stats may return some kind of struct that reports statistics about the @@ -400,18 +421,18 @@ func (gcs *GenerationalNBS) AddTableFilesToManifest(ctx context.Context, fileIdT // PruneTableFiles deletes old table files that are no longer referenced in the manifest of the new or old gen chunkstores func (gcs *GenerationalNBS) PruneTableFiles(ctx context.Context) error { - err := gcs.oldGen.pruneTableFiles(ctx, gcs.hasMany) + err := gcs.oldGen.pruneTableFiles(ctx) if err != nil { return err } - return gcs.newGen.pruneTableFiles(ctx, gcs.hasMany) + return gcs.newGen.pruneTableFiles(ctx) } // SetRootChunk changes the root chunk hash from the previous value to the new root for the newgen cs func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous hash.Hash) error { - return gcs.newGen.setRootChunk(ctx, root, previous, gcs.hasMany) + return gcs.newGen.setRootChunk(ctx, root, previous, gcs.refCheck) } // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. diff --git a/go/store/nbs/ghost_store.go b/go/store/nbs/ghost_store.go index 11d23de6a6..9edd0fb40f 100644 --- a/go/store/nbs/ghost_store.go +++ b/go/store/nbs/ghost_store.go @@ -91,6 +91,10 @@ func (g GhostBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found } func (g GhostBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error { + return g.getManyCompressed(ctx, hashes, found, gcDependencyMode_TakeDependency) +} + +func (g GhostBlockStore) getManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk), gcDepMode gcDependencyMode) error { for h := range hashes { if g.skippedRefs.Has(h) { found(ctx, NewGhostCompressedChunk(h)) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 8f415cbfa3..dc8deff8e1 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -239,17 +239,19 @@ func (j *ChunkJournal) IterateRoots(f func(root string, timestamp *time.Time) er } // Persist implements tablePersister. -func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { +func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { if j.backing.readOnly() { - return nil, errReadOnlyManifest + return nil, gcBehavior_Continue, errReadOnlyManifest } else if err := j.maybeInit(ctx); err != nil { - return nil, err + return nil, gcBehavior_Continue, err } if haver != nil { sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted. - if _, err := haver.hasMany(mt.order); err != nil { - return nil, err + if _, gcb, err := haver.hasMany(mt.order, keeper); err != nil { + return nil, gcBehavior_Continue, err + } else if gcb != gcBehavior_Continue { + return nil, gcb, nil } sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write } @@ -261,10 +263,10 @@ func (j *ChunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a]) err := j.wr.writeCompressedChunk(ctx, ChunkToCompressedChunk(c)) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } } - return journalChunkSource{journal: j.wr}, nil + return journalChunkSource{journal: j.wr}, gcBehavior_Continue, nil } // ConjoinAll implements tablePersister. diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index c8dd8a4ac0..e7bf50dc4a 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -39,20 +39,29 @@ type journalChunkSource struct { var _ chunkSource = journalChunkSource{} -func (s journalChunkSource) has(h hash.Hash) (bool, error) { - return s.journal.hasAddr(h), nil +func (s journalChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { + res := s.journal.hasAddr(h) + if res && keeper != nil && keeper(h) { + return false, gcBehavior_Block, nil + } + return res, gcBehavior_Continue, nil } -func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) { +func (s journalChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { + missing := false for i := range addrs { - ok := s.journal.hasAddr(*addrs[i].a) + h := *addrs[i].a + ok := s.journal.hasAddr(h) if ok { + if keeper != nil && keeper(h) { + return true, gcBehavior_Block, nil + } addrs[i].has = true } else { missing = true } } - return + return missing, gcBehavior_Continue, nil } func (s journalChunkSource) getCompressed(ctx context.Context, h hash.Hash, _ *Stats) (CompressedChunk, error) { @@ -60,20 +69,23 @@ func (s journalChunkSource) getCompressed(ctx context.Context, h hash.Hash, _ *S return s.journal.getCompressedChunk(h) } -func (s journalChunkSource) get(ctx context.Context, h hash.Hash, _ *Stats) ([]byte, error) { +func (s journalChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF, _ *Stats) ([]byte, gcBehavior, error) { defer trace.StartRegion(ctx, "journalChunkSource.get").End() cc, err := s.journal.getCompressedChunk(h) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } else if cc.IsEmpty() { - return nil, nil + return nil, gcBehavior_Continue, nil + } + if keeper != nil && keeper(h) { + return nil, gcBehavior_Block, nil } ch, err := cc.ToChunk() if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } - return ch.Data(), nil + return ch.Data(), gcBehavior_Continue, nil } type journalRecord struct { @@ -83,7 +95,7 @@ type journalRecord struct { idx int } -func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { +func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { return s.getManyCompressed(ctx, eg, reqs, func(ctx context.Context, cc CompressedChunk) { ch, err := cc.ToChunk() if err != nil { @@ -94,7 +106,7 @@ func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, req } chWHash := chunks.NewChunkWithHash(cc.Hash(), ch.Data()) found(ctx, &chWHash) - }, stats) + }, keeper, stats) } // getManyCompressed implements chunkReader. Here we (1) synchronously check @@ -103,7 +115,7 @@ func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, req // and then (4) asynchronously perform reads. We release the journal read // lock after returning when all reads are completed, which can be after the // function returns. -func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { defer trace.StartRegion(ctx, "journalChunkSource.getManyCompressed").End() var remaining bool @@ -114,11 +126,16 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup. if r.found { continue } - rang, ok := s.journal.ranges.get(*r.a) + h := *r.a + rang, ok := s.journal.ranges.get(h) if !ok { remaining = true continue } + if keeper != nil && keeper(h) { + s.journal.lock.RUnlock() + return true, gcBehavior_Block, nil + } jReqs = append(jReqs, journalRecord{r: rang, idx: i}) reqs[i].found = true } @@ -150,7 +167,7 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup. wg.Wait() s.journal.lock.RUnlock() }() - return remaining, nil + return remaining, gcBehavior_Continue, nil } func (s journalChunkSource) count() (uint32, error) { @@ -171,22 +188,26 @@ func (s journalChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, return rdr, uint64(sz), err } -func (s journalChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { +func (s journalChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) { ranges := make(map[hash.Hash]Range, len(requests)) for _, req := range requests { if req.found { continue } - rng, ok, err := s.journal.getRange(ctx, *req.a) + h := *req.a + rng, ok, err := s.journal.getRange(ctx, h) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } else if !ok { continue } + if keeper != nil && keeper(h) { + return nil, gcBehavior_Block, nil + } req.found = true // update |requests| - ranges[hash.Hash(*req.a)] = rng + ranges[h] = rng } - return ranges, nil + return ranges, gcBehavior_Continue, nil } // size implements chunkSource. diff --git a/go/store/nbs/journal_test.go b/go/store/nbs/journal_test.go index 9486f1edf1..603d1610d4 100644 --- a/go/store/nbs/journal_test.go +++ b/go/store/nbs/journal_test.go @@ -67,14 +67,14 @@ func TestChunkJournalPersist(t *testing.T) { haver := emptyChunkSource{} for i := 0; i < iters; i++ { memTbl, chunkMap := randomMemTable(16) - source, err := j.Persist(ctx, memTbl, haver, stats) + source, _, err := j.Persist(ctx, memTbl, haver, nil, stats) assert.NoError(t, err) for h, ch := range chunkMap { - ok, err := source.has(h) + ok, _, err := source.has(h, nil) assert.NoError(t, err) assert.True(t, ok) - data, err := source.get(ctx, h, stats) + data, _, err := source.get(ctx, h, nil, stats) assert.NoError(t, err) assert.Equal(t, ch.Data(), data) } @@ -96,7 +96,7 @@ func TestReadRecordRanges(t *testing.T) { gets = append(gets, getRecord{a: &h, prefix: h.Prefix()}) } - jcs, err := j.Persist(ctx, mt, emptyChunkSource{}, &Stats{}) + jcs, _, err := j.Persist(ctx, mt, emptyChunkSource{}, nil, &Stats{}) require.NoError(t, err) rdr, sz, err := jcs.(journalChunkSource).journal.snapshot(context.Background()) @@ -108,11 +108,11 @@ func TestReadRecordRanges(t *testing.T) { require.NoError(t, err) assert.Equal(t, int(sz), n) - ranges, err := jcs.getRecordRanges(ctx, gets) + ranges, _, err := jcs.getRecordRanges(ctx, gets, nil) require.NoError(t, err) for h, rng := range ranges { - b, err := jcs.get(ctx, h, &Stats{}) + b, _, err := jcs.get(ctx, h, nil, &Stats{}) assert.NoError(t, err) ch1 := chunks.NewChunkWithHash(h, b) assert.Equal(t, data[h], ch1) diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index df8c45946f..b15a62f4ff 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -228,7 +228,7 @@ func TestJournalWriterBootstrap(t *testing.T) { source := journalChunkSource{journal: j} for a, cc := range data { - buf, err := source.get(ctx, a, nil) + buf, _, err := source.get(ctx, a, nil, nil) require.NoError(t, err) ch, err := cc.ToChunk() require.NoError(t, err) diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index cbffa34a72..1fd8c0ffcd 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -61,7 +61,7 @@ func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error } var stats Stats - name, data, count, err := mt.write(nil, &stats) + name, data, count, _, err := mt.write(nil, nil, &stats) if err != nil { return "", nil, err @@ -135,22 +135,27 @@ func (mt *memTable) uncompressedLen() (uint64, error) { return mt.totalData, nil } -func (mt *memTable) has(h hash.Hash) (bool, error) { +func (mt *memTable) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { _, has := mt.chunks[h] - return has, nil + if has && keeper != nil && keeper(h) { + return false, gcBehavior_Block, nil + } + return has, gcBehavior_Continue, nil } -func (mt *memTable) hasMany(addrs []hasRecord) (bool, error) { +func (mt *memTable) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { var remaining bool for i, addr := range addrs { if addr.has { continue } - ok, err := mt.has(*addr.a) - + ok, gcb, err := mt.has(*addr.a, keeper) if err != nil { - return false, err + return false, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return ok, gcb, nil } if ok { @@ -159,18 +164,25 @@ func (mt *memTable) hasMany(addrs []hasRecord) (bool, error) { remaining = true } } - return remaining, nil + return remaining, gcBehavior_Continue, nil } -func (mt *memTable) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { - return mt.chunks[h], nil +func (mt *memTable) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { + c, ok := mt.chunks[h] + if ok && keeper != nil && keeper(h) { + return nil, gcBehavior_Block, nil + } + return c, gcBehavior_Continue, nil } -func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { +func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { var remaining bool for i, r := range reqs { data := mt.chunks[*r.a] if data != nil { + if keeper != nil && keeper(*r.a) { + return true, gcBehavior_Block, nil + } c := chunks.NewChunkWithHash(hash.Hash(*r.a), data) reqs[i].found = true found(ctx, &c) @@ -178,14 +190,17 @@ func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getR remaining = true } } - return remaining, nil + return remaining, gcBehavior_Continue, nil } -func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { var remaining bool for i, r := range reqs { data := mt.chunks[*r.a] if data != nil { + if keeper != nil && keeper(*r.a) { + return true, gcBehavior_Block, nil + } c := chunks.NewChunkWithHash(hash.Hash(*r.a), data) reqs[i].found = true found(ctx, ChunkToCompressedChunk(c)) @@ -194,7 +209,7 @@ func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, r } } - return remaining, nil + return remaining, gcBehavior_Continue, nil } func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) error { @@ -205,10 +220,11 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er return nil } -func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data []byte, count uint32, err error) { +func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, count uint32, gcb gcBehavior, err error) { + gcb = gcBehavior_Continue numChunks := uint64(len(mt.order)) if numChunks == 0 { - return hash.Hash{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks") + return hash.Hash{}, nil, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks") } maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) // todo: memory quota @@ -217,10 +233,12 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data if haver != nil { sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted. - _, err := haver.hasMany(mt.order) - + _, gcb, err = haver.hasMany(mt.order, keeper) if err != nil { - return hash.Hash{}, nil, 0, err + return hash.Hash{}, nil, 0, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return hash.Hash{}, nil, 0, gcb, err } sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write @@ -236,7 +254,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data tableSize, name, err := tw.finish() if err != nil { - return hash.Hash{}, nil, 0, err + return hash.Hash{}, nil, 0, gcBehavior_Continue, err } if count > 0 { @@ -246,7 +264,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name hash.Hash, data stats.ChunksPerPersist.Sample(uint64(count)) } - return name, buff[:tableSize], count, nil + return name, buff[:tableSize], count, gcBehavior_Continue, nil } func (mt *memTable) close() error { diff --git a/go/store/nbs/mem_table_test.go b/go/store/nbs/mem_table_test.go index 250f299463..42a1888a98 100644 --- a/go/store/nbs/mem_table_test.go +++ b/go/store/nbs/mem_table_test.go @@ -97,14 +97,14 @@ func TestMemTableAddHasGetChunk(t *testing.T) { assertChunksInReader(chunks, mt, assert) for _, c := range chunks { - data, err := mt.get(context.Background(), computeAddr(c), &Stats{}) + data, _, err := mt.get(context.Background(), computeAddr(c), nil, &Stats{}) require.NoError(t, err) assert.Equal(bytes.Compare(c, data), 0) } notPresent := []byte("nope") - assert.False(mt.has(computeAddr(notPresent))) - assert.Nil(mt.get(context.Background(), computeAddr(notPresent), &Stats{})) + assert.False(mt.has(computeAddr(notPresent), nil)) + assert.Nil(mt.get(context.Background(), computeAddr(notPresent), nil, &Stats{})) } func TestMemTableAddOverflowChunk(t *testing.T) { @@ -117,9 +117,9 @@ func TestMemTableAddOverflowChunk(t *testing.T) { bigAddr := computeAddr(big) mt := newMemTable(memTableSize) assert.Equal(mt.addChunk(bigAddr, big), chunkAdded) - assert.True(mt.has(bigAddr)) + assert.True(mt.has(bigAddr, nil)) assert.Equal(mt.addChunk(computeAddr(little), little), chunkNotAdded) - assert.False(mt.has(computeAddr(little))) + assert.False(mt.has(computeAddr(little), nil)) } { @@ -127,12 +127,12 @@ func TestMemTableAddOverflowChunk(t *testing.T) { bigAddr := computeAddr(big) mt := newMemTable(memTableSize) assert.Equal(mt.addChunk(bigAddr, big), chunkAdded) - assert.True(mt.has(bigAddr)) + assert.True(mt.has(bigAddr, nil)) assert.Equal(mt.addChunk(computeAddr(little), little), chunkAdded) - assert.True(mt.has(computeAddr(little))) + assert.True(mt.has(computeAddr(little), nil)) other := []byte("o") assert.Equal(mt.addChunk(computeAddr(other), other), chunkNotAdded) - assert.False(mt.has(computeAddr(other))) + assert.False(mt.has(computeAddr(other), nil)) } } @@ -158,7 +158,7 @@ func TestMemTableWrite(t *testing.T) { tr1, err := newTableReader(ti1, tableReaderAtFromBytes(td1), fileBlockSize) require.NoError(t, err) defer tr1.close() - assert.True(tr1.has(computeAddr(chunks[1]))) + assert.True(tr1.has(computeAddr(chunks[1]), nil)) td2, _, err := buildTable(chunks[2:]) require.NoError(t, err) @@ -167,9 +167,9 @@ func TestMemTableWrite(t *testing.T) { tr2, err := newTableReader(ti2, tableReaderAtFromBytes(td2), fileBlockSize) require.NoError(t, err) defer tr2.close() - assert.True(tr2.has(computeAddr(chunks[2]))) + assert.True(tr2.has(computeAddr(chunks[2]), nil)) - _, data, count, err := mt.write(chunkReaderGroup{tr1, tr2}, &Stats{}) + _, data, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{}) require.NoError(t, err) assert.Equal(uint32(1), count) @@ -178,9 +178,9 @@ func TestMemTableWrite(t *testing.T) { outReader, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) defer outReader.close() - assert.True(outReader.has(computeAddr(chunks[0]))) - assert.False(outReader.has(computeAddr(chunks[1]))) - assert.False(outReader.has(computeAddr(chunks[2]))) + assert.True(outReader.has(computeAddr(chunks[0]), nil)) + assert.False(outReader.has(computeAddr(chunks[1]), nil)) + assert.False(outReader.has(computeAddr(chunks[2]), nil)) } type tableReaderAtAdapter struct { @@ -223,7 +223,7 @@ func TestMemTableSnappyWriteOutOfLine(t *testing.T) { } mt.snapper = &outOfLineSnappy{[]bool{false, true, false}} // chunks[1] should trigger a panic - assert.Panics(func() { mt.write(nil, &Stats{}) }) + assert.Panics(func() { mt.write(nil, nil, &Stats{}) }) } type outOfLineSnappy struct { @@ -244,72 +244,82 @@ func (o *outOfLineSnappy) Encode(dst, src []byte) []byte { type chunkReaderGroup []chunkReader -func (crg chunkReaderGroup) has(h hash.Hash) (bool, error) { +func (crg chunkReaderGroup) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { for _, haver := range crg { - ok, err := haver.has(h) - + ok, gcb, err := haver.has(h, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil } if ok { - return true, nil + return true, gcb, nil } } - - return false, nil + return false, gcBehavior_Continue, nil } -func (crg chunkReaderGroup) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { +func (crg chunkReaderGroup) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { for _, haver := range crg { - if data, err := haver.get(ctx, h, stats); err != nil { - return nil, err + if data, gcb, err := haver.get(ctx, h, keeper, stats); err != nil { + return nil, gcb, err + } else if gcb != gcBehavior_Continue { + return nil, gcb, nil } else if data != nil { - return data, nil + return data, gcb, nil } } - return nil, nil + return nil, gcBehavior_Continue, nil } -func (crg chunkReaderGroup) hasMany(addrs []hasRecord) (bool, error) { +func (crg chunkReaderGroup) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { for _, haver := range crg { - remaining, err := haver.hasMany(addrs) - + remaining, gcb, err := haver.hasMany(addrs, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return false, gcb, nil } - if !remaining { - return false, nil + return false, gcb, nil } } - return true, nil + return true, gcBehavior_Continue, nil } -func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { +func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { for _, haver := range crg { - remaining, err := haver.getMany(ctx, eg, reqs, found, stats) + remaining, gcb, err := haver.getMany(ctx, eg, reqs, found, keeper, stats) if err != nil { - return true, err + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil } if !remaining { - return false, nil + return false, gcb, nil } } - return true, nil + return true, gcBehavior_Continue, nil } -func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { for _, haver := range crg { - remaining, err := haver.getManyCompressed(ctx, eg, reqs, found, stats) + remaining, gcb, err := haver.getManyCompressed(ctx, eg, reqs, found, keeper, stats) if err != nil { - return true, err + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil } if !remaining { - return false, nil + return false, gcb, nil } } - return true, nil + return true, gcBehavior_Continue, nil } func (crg chunkReaderGroup) count() (count uint32, err error) { diff --git a/go/store/nbs/no_conjoin_bs_persister.go b/go/store/nbs/no_conjoin_bs_persister.go index 053c9be710..98ed3a06a5 100644 --- a/go/store/nbs/no_conjoin_bs_persister.go +++ b/go/store/nbs/no_conjoin_bs_persister.go @@ -21,7 +21,6 @@ import ( "io" "time" - "github.com/fatih/color" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/store/blobstore" @@ -40,27 +39,32 @@ var _ tableFilePersister = &noConjoinBlobstorePersister{} // Persist makes the contents of mt durable. Chunks already present in // |haver| may be dropped in the process. -func (bsp *noConjoinBlobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { - address, data, chunkCount, err := mt.write(haver, stats) +func (bsp *noConjoinBlobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { + address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err + } else if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil } else if chunkCount == 0 { - return emptyChunkSource{}, nil + return emptyChunkSource{}, gcBehavior_Continue, nil } name := address.String() eg, ectx := errgroup.WithContext(ctx) - eg.Go(func() (err error) { - fmt.Fprintf(color.Output, "Persist: bs.Put: name: %s\n", name) - _, err = bsp.bs.Put(ectx, name, int64(len(data)), bytes.NewBuffer(data)) - return + eg.Go(func() error { + _, err := bsp.bs.Put(ectx, name, int64(len(data)), bytes.NewBuffer(data)) + return err }) if err = eg.Wait(); err != nil { - return nil, err + return nil, gcBehavior_Continue, err } rdr := &bsTableReaderAt{name, bsp.bs} - return newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) + src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) + if err != nil { + return nil, gcBehavior_Continue, err + } + return src, gcBehavior_Continue, nil } // ConjoinAll implements tablePersister. diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index f428f0817e..37d8248374 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -399,7 +399,7 @@ func interloperWrite(fm *fakeManifest, p tablePersister, rootChunk []byte, chunk persisted = append(chunks, rootChunk) var src chunkSource - src, err = p.Persist(context.Background(), createMemTable(persisted), nil, &Stats{}) + src, _, err = p.Persist(context.Background(), createMemTable(persisted), nil, nil, &Stats{}) if err != nil { return hash.Hash{}, nil, err } @@ -505,16 +505,18 @@ type fakeTablePersister struct { var _ tablePersister = fakeTablePersister{} -func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { +func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { if mustUint32(mt.count()) == 0 { - return emptyChunkSource{}, nil + return emptyChunkSource{}, gcBehavior_Continue, nil } - name, data, chunkCount, err := mt.write(haver, stats) + name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err + } else if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil } else if chunkCount == 0 { - return emptyChunkSource{}, nil + return emptyChunkSource{}, gcBehavior_Continue, nil } ftp.mu.Lock() @@ -523,14 +525,14 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c ti, err := parseTableIndexByCopy(ctx, data, ftp.q) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } cs, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) if err != nil { - return emptyChunkSource{}, err + return emptyChunkSource{}, gcBehavior_Continue, err } - return chunkSourceAdapter{cs, name}, nil + return chunkSourceAdapter{cs, name}, gcBehavior_Continue, nil } func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) { @@ -661,7 +663,7 @@ func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractR return err } - data, err := src.get(ctx, h, nil) + data, _, err := src.get(ctx, h, nil, nil) if err != nil { return err } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 7ddc44a001..3a6d2fe73b 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -23,6 +23,7 @@ package nbs import ( "context" + "errors" "fmt" "io" "os" @@ -39,7 +40,6 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" - "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -89,6 +89,17 @@ type NBSCompressedChunkStore interface { GetManyCompressed(context.Context, hash.HashSet, func(context.Context, CompressedChunk)) error } +type gcDependencyMode int + +const ( + gcDependencyMode_TakeDependency gcDependencyMode = iota + gcDependencyMode_NoDependency +) + +type CompressedChunkStoreForGC interface { + getManyCompressed(context.Context, hash.HashSet, func(context.Context, CompressedChunk), gcDependencyMode) error +} + type NomsBlockStore struct { mm manifestManager p tablePersister @@ -99,8 +110,14 @@ type NomsBlockStore struct { tables tableSet upstream manifestContents - cond *sync.Cond + cond *sync.Cond + // |true| after BeginGC is called, and false once the corresponding EndGC call returns. gcInProgress bool + // When unlocked read operations are occuring against the + // block store, and they started when |gcInProgress == true|, + // this variable is incremented. EndGC will not return until + // no outstanding reads are in progress. + gcOutstandingReads int // keeperFunc is set when |gcInProgress| and appends to the GC sweep queue // or blocks on GC finalize keeperFunc func(hash.Hash) bool @@ -152,14 +169,14 @@ func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashe } func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { - gr := toGetRecords(hashes) - ranges := make(map[hash.Hash]map[hash.Hash]Range) - - fn := func(css chunkSourceSet) error { + fn := func(css chunkSourceSet, gr []getRecord, ranges map[hash.Hash]map[hash.Hash]Range, keeper keeperF) (gcBehavior, error) { for _, cs := range css { - rng, err := cs.getRecordRanges(ctx, gr) + rng, gcb, err := cs.getRecordRanges(ctx, gr, keeper) if err != nil { - return err + return gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return gcb, nil } h := hash.Hash(cs.hash()) @@ -171,22 +188,60 @@ func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.Ha ranges[h] = rng } } - return nil + return gcBehavior_Continue, nil } - tables := func() tableSet { - nbs.mu.RLock() - defer nbs.mu.RUnlock() - return nbs.tables - }() + for { + nbs.mu.Lock() + tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead() + nbs.mu.Unlock() - if err := fn(tables.upstream); err != nil { - return nil, err + gr := toGetRecords(hashes) + ranges := make(map[hash.Hash]map[hash.Hash]Range) + + gcb, err := fn(tables.upstream, gr, ranges, keeper) + if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err); err != nil { + return nil, err + } else if needsContinue { + continue + } + + gcb, err = fn(tables.novel, gr, ranges, keeper) + if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err); err != nil { + return nil, err + } else if needsContinue { + continue + } + + return ranges, nil } - if err := fn(tables.novel); err != nil { - return nil, err +} + +func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavior, endRead func(), err error) (bool, error) { + if err != nil { + if endRead != nil { + nbs.mu.Lock() + endRead() + nbs.mu.Unlock() + } + return false, err + } + if gcb == gcBehavior_Block { + nbs.mu.Lock() + if endRead != nil { + endRead() + } + err := nbs.waitForGC(ctx) + nbs.mu.Unlock() + return true, err + } else { + if endRead != nil { + nbs.mu.Lock() + endRead() + nbs.mu.Unlock() + } + return false, nil } - return ranges, nil } func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error) { @@ -703,7 +758,9 @@ func (nbs *NomsBlockStore) WithoutConjoiner() *NomsBlockStore { } } -// Wait for GC to complete to continue with writes +// Wait for GC to complete to continue with ongoing operations. +// Called with nbs.mu held. When this function returns with a nil +// error, gcInProgress will be false. func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error { stop := make(chan struct{}) defer close(stop) @@ -721,7 +778,7 @@ func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error { } func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error { - return nbs.putChunk(ctx, c, getAddrs, nbs.hasMany) + return nbs.putChunk(ctx, c, getAddrs, nbs.refCheck) } func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry, checker refCheck) error { @@ -787,11 +844,18 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, getAdd addChunkRes = nbs.mt.addChunk(ch.Hash(), ch.Data()) if addChunkRes == chunkNotAdded { - ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats) + ts, gcb, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.keeperFunc, nbs.hasCache, nbs.stats) if err != nil { nbs.handlePossibleDanglingRefError(err) return false, err } + if gcb == gcBehavior_Block { + retry = true + if err := nbs.waitForGC(ctx); err != nil { + return false, err + } + continue + } nbs.addPendingRefsToHasCache() nbs.tables = ts nbs.mt = newMemTable(nbs.mtSize) @@ -845,100 +909,131 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, nbs.stats.ChunksPerGet.Sample(1) }() - data, tables, err := func() ([]byte, chunkReader, error) { - var data []byte - nbs.mu.RLock() - defer nbs.mu.RUnlock() + for { + nbs.mu.Lock() if nbs.mt != nil { - var err error - data, err = nbs.mt.get(ctx, h, nbs.stats) - + data, gcb, err := nbs.mt.get(ctx, h, nbs.keeperFunc, nbs.stats) if err != nil { - return nil, nil, err + nbs.mu.Unlock() + return chunks.EmptyChunk, err + } + if gcb == gcBehavior_Block { + err = nbs.waitForGC(ctx) + nbs.mu.Unlock() + if err != nil { + return chunks.EmptyChunk, err + } + continue + } + if data != nil { + nbs.mu.Unlock() + return chunks.NewChunkWithHash(h, data), nil } } - return data, nbs.tables, nil - }() + tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead() + nbs.mu.Unlock() - if err != nil { - return chunks.EmptyChunk, err - } - - if data != nil { - return chunks.NewChunkWithHash(h, data), nil - } - - data, err = tables.get(ctx, h, nbs.stats) - - if err != nil { - return chunks.EmptyChunk, err - } + data, gcb, err := tables.get(ctx, h, keeper, nbs.stats) + needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err) + if err != nil { + return chunks.EmptyChunk, err + } + if needContinue { + continue + } - if data != nil { - return chunks.NewChunkWithHash(h, data), nil + if data != nil { + return chunks.NewChunkWithHash(h, data), nil + } + return chunks.EmptyChunk, nil } - - return chunks.EmptyChunk, nil } func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error { ctx, span := tracer.Start(ctx, "nbs.GetMany", trace.WithAttributes(attribute.Int("num_hashes", len(hashes)))) - span.End() - return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, stats *Stats) (bool, error) { - return cr.getMany(ctx, eg, reqs, found, nbs.stats) - }) + defer span.End() + return nbs.getManyWithFunc(ctx, hashes, gcDependencyMode_TakeDependency, + func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + return cr.getMany(ctx, eg, reqs, found, keeper, nbs.stats) + }, + ) } func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error { + return nbs.getManyCompressed(ctx, hashes, found, gcDependencyMode_TakeDependency) +} + +func (nbs *NomsBlockStore) getManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk), gcDepMode gcDependencyMode) error { ctx, span := tracer.Start(ctx, "nbs.GetManyCompressed", trace.WithAttributes(attribute.Int("num_hashes", len(hashes)))) defer span.End() - return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, stats *Stats) (bool, error) { - return cr.getManyCompressed(ctx, eg, reqs, found, nbs.stats) - }) + return nbs.getManyWithFunc(ctx, hashes, gcDepMode, + func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + return cr.getManyCompressed(ctx, eg, reqs, found, keeper, nbs.stats) + }, + ) } func (nbs *NomsBlockStore) getManyWithFunc( ctx context.Context, hashes hash.HashSet, - getManyFunc func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, stats *Stats) (bool, error), + gcDepMode gcDependencyMode, + getManyFunc func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, keeper keeperF, stats *Stats) (bool, gcBehavior, error), ) error { - t1 := time.Now() - reqs := toGetRecords(hashes) + if len(hashes) == 0 { + return nil + } + t1 := time.Now() defer func() { - if len(hashes) > 0 { - nbs.stats.GetLatency.SampleTimeSince(t1) - nbs.stats.ChunksPerGet.Sample(uint64(len(reqs))) - } + nbs.stats.GetLatency.SampleTimeSince(t1) + nbs.stats.ChunksPerGet.Sample(uint64(len(hashes))) }() - eg, ctx := errgroup.WithContext(ctx) - const ioParallelism = 16 - eg.SetLimit(ioParallelism) + for { + reqs := toGetRecords(hashes) + eg, ctx := errgroup.WithContext(ctx) + const ioParallelism = 16 + eg.SetLimit(ioParallelism) - tables, remaining, err := func() (tables chunkReader, remaining bool, err error) { - nbs.mu.RLock() - defer nbs.mu.RUnlock() - tables = nbs.tables - remaining = true + nbs.mu.Lock() + keeper := nbs.keeperFunc + if gcDepMode == gcDependencyMode_NoDependency { + keeper = nil + } if nbs.mt != nil { - remaining, err = getManyFunc(ctx, nbs.mt, eg, reqs, nbs.stats) + remaining, gcb, err := getManyFunc(ctx, nbs.mt, eg, reqs, keeper, nbs.stats) + if err != nil { + nbs.mu.Unlock() + return err + } + if gcb == gcBehavior_Block { + err = nbs.waitForGC(ctx) + nbs.mu.Unlock() + if err != nil { + return err + } + continue + } + if !remaining { + nbs.mu.Unlock() + return nil + } } - return - }() - if err != nil { - return err - } + tables, endRead := nbs.tables, nbs.beginRead() + nbs.mu.Unlock() - if remaining { - _, err = getManyFunc(ctx, tables, eg, reqs, nbs.stats) - } + _, gcb, err := getManyFunc(ctx, tables, eg, reqs, keeper, nbs.stats) + err = errors.Join(err, eg.Wait()) + needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err) + if err != nil { + return err + } + if needContinue { + continue + } - if err != nil { - eg.Wait() - return err + return nil } - return eg.Wait() } func toGetRecords(hashes hash.HashSet) []getRecord { @@ -992,36 +1087,41 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { nbs.stats.AddressesPerHas.Sample(1) }() - has, tables, err := func() (bool, chunkReader, error) { - nbs.mu.RLock() - defer nbs.mu.RUnlock() - + for { + nbs.mu.Lock() if nbs.mt != nil { - has, err := nbs.mt.has(h) - + has, gcb, err := nbs.mt.has(h, nbs.keeperFunc) if err != nil { - return false, nil, err + nbs.mu.Unlock() + return false, err + } + if gcb == gcBehavior_Block { + err = nbs.waitForGC(ctx) + nbs.mu.Unlock() + if err != nil { + return false, err + } + continue + } + if has { + nbs.mu.Unlock() + return true, nil } - - return has, nbs.tables, nil } + tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead() + nbs.mu.Unlock() - return false, nbs.tables, nil - }() - - if err != nil { - return false, err - } - - if !has { - has, err = tables.has(h) - + has, gcb, err := tables.has(h, keeper) + needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err) if err != nil { return false, err } - } + if needsContinue { + continue + } - return has, nil + return has, nil + } } func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { @@ -1030,35 +1130,84 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha } t1 := time.Now() - defer nbs.stats.HasLatency.SampleTimeSince(t1) - nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) + defer func() { + nbs.stats.HasLatency.SampleTimeSince(t1) + nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) + }() - nbs.mu.RLock() - defer nbs.mu.RUnlock() - return nbs.hasMany(toHasRecords(hashes)) -} + for { + reqs := toHasRecords(hashes) -func (nbs *NomsBlockStore) hasManyInSources(srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { - if hashes.Size() == 0 { - return nil, nil - } + nbs.mu.Lock() + if nbs.mt != nil { + remaining, gcb, err := nbs.mt.hasMany(reqs, nbs.keeperFunc) + if err != nil { + nbs.mu.Unlock() + return nil, err + } + if gcb == gcBehavior_Block { + err = nbs.waitForGC(ctx) + nbs.mu.Unlock() + if err != nil { + return nil, err + } + continue + } + if !remaining { + nbs.mu.Unlock() + return hash.HashSet{}, nil + } + } + tables, keeper, endRead := nbs.tables, nbs.keeperFunc, nbs.beginRead() + nbs.mu.Unlock() - t1 := time.Now() - defer nbs.stats.HasLatency.SampleTimeSince(t1) - nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) + remaining, gcb, err := tables.hasMany(reqs, keeper) + needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err) + if err != nil { + return nil, err + } + if needContinue { + continue + } - nbs.mu.RLock() - defer nbs.mu.RUnlock() + if !remaining { + return hash.HashSet{}, nil + } - records := toHasRecords(hashes) + absent := hash.HashSet{} + for _, r := range reqs { + if !r.has { + absent.Insert(*r.a) + } + } + return absent, nil + } +} + +// Operates a lot like |hasMany|, but without locking and without +// taking read dependencies on the checked references. Should only be +// used for the sanity checking on references for written chunks. +func (nbs *NomsBlockStore) refCheck(reqs []hasRecord) (hash.HashSet, error) { + if nbs.mt != nil { + remaining, _, err := nbs.mt.hasMany(reqs, nil) + if err != nil { + return nil, err + } + if !remaining { + return hash.HashSet{}, nil + } + } - _, err := nbs.tables.hasManyInSources(srcs, records) + remaining, _, err := nbs.tables.hasMany(reqs, nil) if err != nil { return nil, err } + if !remaining { + return hash.HashSet{}, nil + } absent := hash.HashSet{} - for _, r := range records { + for _, r := range reqs { if !r.has { absent.Insert(*r.a) } @@ -1066,36 +1215,32 @@ func (nbs *NomsBlockStore) hasManyInSources(srcs []hash.Hash, hashes hash.HashSe return absent, nil } -func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) { - tables, remaining, err := func() (tables chunkReader, remaining bool, err error) { - tables = nbs.tables +// Only used for a generational full GC, where the table files are +// added to the store and are then used to filter which chunks need to +// make it to the new generation. In this context, we do not need to +// worry about taking read dependencies on the requested chunks. Hence +// our handling of keeperFunc and gcBehavior below. +func (nbs *NomsBlockStore) hasManyInSources(srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { + if hashes.Size() == 0 { + return nil, nil + } - remaining = true - if nbs.mt != nil { - remaining, err = nbs.mt.hasMany(reqs) + t1 := time.Now() + defer nbs.stats.HasLatency.SampleTimeSince(t1) + nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) - if err != nil { - return nil, false, err - } - } + nbs.mu.RLock() + defer nbs.mu.RUnlock() - return tables, remaining, nil - }() + records := toHasRecords(hashes) + _, _, err := nbs.tables.hasManyInSources(srcs, records, nil) if err != nil { return nil, err } - if remaining { - _, err := tables.hasMany(reqs) - - if err != nil { - return nil, err - } - } - absent := hash.HashSet{} - for _, r := range reqs { + for _, r := range records { if !r.has { absent.Insert(*r.a) } @@ -1162,7 +1307,7 @@ func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { } func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash) (success bool, err error) { - return nbs.commit(ctx, current, last, nbs.hasMany) + return nbs.commit(ctx, current, last, nbs.refCheck) } func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash, checker refCheck) (success bool, err error) { @@ -1251,22 +1396,30 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has return handleOptimisticLockFailure(cached) } - if nbs.mt != nil { - cnt, err := nbs.mt.count() - - if err != nil { - return err - } - - if cnt > 0 { - ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats) + for { + if nbs.mt != nil { + cnt, err := nbs.mt.count() if err != nil { - nbs.handlePossibleDanglingRefError(err) return err } - nbs.addPendingRefsToHasCache() - nbs.tables, nbs.mt = ts, nil + if cnt > 0 { + ts, gcb, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.keeperFunc, nbs.hasCache, nbs.stats) + if err != nil { + nbs.handlePossibleDanglingRefError(err) + return err + } + if gcb == gcBehavior_Block { + err = nbs.waitForGC(ctx) + if err != nil { + return err + } + continue + } + nbs.addPendingRefsToHasCache() + nbs.tables, nbs.mt = ts, nil + } } + break } didConjoin, err := nbs.conjoinIfRequired(ctx) @@ -1555,12 +1708,11 @@ func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdTo // PruneTableFiles deletes old table files that are no longer referenced in the manifest. func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { - return nbs.pruneTableFiles(ctx, nbs.hasMany) + return nbs.pruneTableFiles(ctx) } -func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context, checker refCheck) (err error) { +func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { mtime := time.Now() - return nbs.p.PruneTableFiles(ctx, func() []hash.Hash { nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1593,16 +1745,41 @@ func (nbs *NomsBlockStore) EndGC() { if !nbs.gcInProgress { panic("EndGC called when gc was not in progress") } + for nbs.gcOutstandingReads > 0 { + nbs.cond.Wait() + } nbs.gcInProgress = false nbs.keeperFunc = nil nbs.cond.Broadcast() } +// beginRead() is called with |nbs.mu| held. It signals an ongoing +// read operation which will be operating against the existing table +// files without |nbs.mu| held. The read should be bracket with a call +// to the returned |endRead|, which must be called with |nbs.mu| held +// if it is non-|nil|, and should not be called otherwise. +// +// If there is an ongoing GC operation which this call is made, it is +// guaranteed not to complete until the corresponding |endRead| call. +func (nbs *NomsBlockStore) beginRead() (endRead func()) { + if nbs.gcInProgress { + nbs.gcOutstandingReads += 1 + return func() { + nbs.gcOutstandingReads -= 1 + if nbs.gcOutstandingReads < 0 { + panic("impossible") + } + nbs.cond.Broadcast() + } + } + return nil +} + func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode) } -func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { +func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src CompressedChunkStoreForGC, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation @@ -1670,7 +1847,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompres } type markAndSweeper struct { - src NBSCompressedChunkStore + src CompressedChunkStoreForGC dest *NomsBlockStore getAddrs chunks.GetAddrsCurry filter chunks.HasManyFunc @@ -1716,7 +1893,7 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err found := 0 var addErr error - err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) { + err = i.src.getManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) { mu.Lock() defer mu.Unlock() if addErr != nil { @@ -1740,7 +1917,7 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err return } addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false }) - }) + }, gcDependencyMode_NoDependency) if err != nil { return err } @@ -1897,7 +2074,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mo // SetRootChunk changes the root chunk hash from the previous value to the new root. func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error { - return nbs.setRootChunk(ctx, root, previous, nbs.hasMany) + return nbs.setRootChunk(ctx, root, previous, nbs.refCheck) } func (nbs *NomsBlockStore) setRootChunk(ctx context.Context, root, previous hash.Hash, checker refCheck) error { @@ -1924,7 +2101,7 @@ func (nbs *NomsBlockStore) setRootChunk(ctx context.Context, root, previous hash } // CalcReads computes the number of IO operations necessary to fetch |hashes|. -func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64) (reads int, split bool, err error) { +func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64, keeper keeperF) (int, bool, gcBehavior, error) { reqs := toGetRecords(hashes) tables := func() (tables tableSet) { nbs.mu.RLock() @@ -1934,15 +2111,17 @@ func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64) (read return }() - reads, split, remaining, err := tableSetCalcReads(tables, reqs, blockSize) - + reads, split, remaining, gcb, err := tableSetCalcReads(tables, reqs, blockSize, keeper) if err != nil { - return 0, false, err + return 0, false, gcb, err + } + if gcb != gcBehavior_Continue { + return 0, false, gcb, nil } if remaining { - return 0, false, errors.New("failed to find all chunks") + return 0, false, gcBehavior_Continue, errors.New("failed to find all chunks") } - return + return reads, split, gcb, err } diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 02c92bb7a9..730d44df4a 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -209,7 +209,7 @@ func TestNBSPruneTableFiles(t *testing.T) { addrs.Insert(c.Hash()) return nil } - }, st.hasMany) + }, st.refCheck) require.NoError(t, err) require.True(t, ok) ok, err = st.Commit(ctx, st.upstream.root, st.upstream.root) @@ -377,7 +377,7 @@ func persistTableFileSources(t *testing.T, p tablePersister, numTableFiles int) require.True(t, ok) tableFileMap[fileIDHash] = uint32(i + 1) mapIds[i] = fileIDHash - cs, err := p.Persist(context.Background(), createMemTable(chunkData), nil, &Stats{}) + cs, _, err := p.Persist(context.Background(), createMemTable(chunkData), nil, nil, &Stats{}) require.NoError(t, err) require.NoError(t, cs.close()) diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index b487422d07..c476398973 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -187,24 +187,41 @@ type extractRecord struct { err error } +// Returned by read methods that take a |keeperFunc|, this lets a +// caller know whether the operation was successful or if it needs to +// be retried. It may need to be retried if a GC is in progress but +// the dependencies indicated by the operation cannot be added to the +// GC process. In that case, the caller needs to wait until the GC is +// over and run the entire operation again. +type gcBehavior bool + +const ( + // Operation was successful, go forward with the result. + gcBehavior_Continue gcBehavior = false + // Operation needs to block until the GC is over and then retry. + gcBehavior_Block = true +) + +type keeperF func(hash.Hash) bool + type chunkReader interface { // has returns true if a chunk with addr |h| is present. - has(h hash.Hash) (bool, error) + has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) // hasMany sets hasRecord.has to true for each present hasRecord query, it returns // true if any hasRecord query was not found in this chunkReader. - hasMany(addrs []hasRecord) (bool, error) + hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) // get returns the chunk data for a chunk with addr |h| if present, and nil otherwise. - get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) + get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) // getMany sets getRecord.found to true, and calls |found| for each present getRecord query. // It returns true if any getRecord query was not found in this chunkReader. - getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) + getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) // getManyCompressed sets getRecord.found to true, and calls |found| for each present getRecord query. // It returns true if any getRecord query was not found in this chunkReader. - getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) + getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) // count returns the chunk count for this chunkReader. count() (uint32, error) @@ -226,7 +243,7 @@ type chunkSource interface { reader(context.Context) (io.ReadCloser, uint64, error) // getRecordRanges sets getRecord.found to true, and returns a Range for each present getRecord query. - getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) + getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) // index returns the tableIndex of this chunkSource. index() (tableIndex, error) diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 5c230daa09..6d283fb3ec 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -47,7 +47,7 @@ type cleanupFunc func() type tablePersister interface { // Persist makes the contents of mt durable. Chunks already present in // |haver| may be dropped in the process. - Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) + Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) // ConjoinAll conjoins all chunks in |sources| into a single, new // chunkSource. It returns a |cleanupFunc| which can be called to diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index 3ff059fb48..c55d48c28a 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -178,7 +178,7 @@ func newTableReader(index tableIndex, r tableReaderAt, blockSize uint64) (tableR } // Scan across (logically) two ordered slices of address prefixes. -func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { +func (tr tableReader) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { filterIdx := uint32(0) filterLen := uint32(tr.idx.chunkCount()) @@ -206,7 +206,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { } if filterIdx >= filterLen { - return true, nil + return true, gcBehavior_Continue, nil } if addr.prefix != tr.prefixes[filterIdx] { @@ -218,9 +218,12 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { for j := filterIdx; j < filterLen && addr.prefix == tr.prefixes[j]; j++ { m, err := tr.idx.entrySuffixMatches(j, addr.a) if err != nil { - return false, err + return false, gcBehavior_Continue, err } if m { + if keeper != nil && keeper(*addr.a) { + return true, gcBehavior_Block, nil + } addrs[i].has = true break } @@ -231,7 +234,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { } } - return remaining, nil + return remaining, gcBehavior_Continue, nil } func (tr tableReader) count() (uint32, error) { @@ -247,20 +250,27 @@ func (tr tableReader) index() (tableIndex, error) { } // returns true iff |h| can be found in this table. -func (tr tableReader) has(h hash.Hash) (bool, error) { +func (tr tableReader) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { _, ok, err := tr.idx.lookup(&h) - return ok, err + if ok && keeper != nil && keeper(h) { + return false, gcBehavior_Block, nil + } + return ok, gcBehavior_Continue, err } // returns the storage associated with |h|, iff present. Returns nil if absent. On success, // the returned byte slice directly references the underlying storage. -func (tr tableReader) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { +func (tr tableReader) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { e, found, err := tr.idx.lookup(&h) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } if !found { - return nil, nil + return nil, gcBehavior_Continue, nil + } + + if keeper != nil && keeper(h) { + return nil, gcBehavior_Block, nil } offset := e.Offset() @@ -270,30 +280,30 @@ func (tr tableReader) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byt n, err := tr.r.ReadAtWithStats(ctx, buff, int64(offset), stats) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } if n != int(length) { - return nil, errors.New("failed to read all data") + return nil, gcBehavior_Continue, errors.New("failed to read all data") } cmp, err := NewCompressedChunk(h, buff) if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } if len(cmp.CompressedData) == 0 { - return nil, errors.New("failed to get data") + return nil, gcBehavior_Continue, errors.New("failed to get data") } chnk, err := cmp.ToChunk() if err != nil { - return nil, err + return nil, gcBehavior_Continue, err } - return chnk.Data(), nil + return chnk.Data(), gcBehavior_Continue, nil } type offsetRec struct { @@ -380,26 +390,33 @@ func (tr tableReader) getMany( eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), - stats *Stats) (bool, error) { + keeper keeperF, + stats *Stats) (bool, gcBehavior, error) { // Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set // of table locations which must be read in order to satisfy the getMany operation. - offsetRecords, remaining, err := tr.findOffsets(reqs) + offsetRecords, remaining, gcb, err := tr.findOffsets(reqs, keeper) if err != nil { - return false, err + return false, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return remaining, gcb, nil } err = tr.getManyAtOffsets(ctx, eg, offsetRecords, found, stats) - return remaining, err + return remaining, gcBehavior_Continue, err } -func (tr tableReader) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { +func (tr tableReader) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { // Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set // of table locations which must be read in order to satisfy the getMany operation. - offsetRecords, remaining, err := tr.findOffsets(reqs) + offsetRecords, remaining, gcb, err := tr.findOffsets(reqs, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return remaining, gcb, nil } err = tr.getManyCompressedAtOffsets(ctx, eg, offsetRecords, found, stats) - return remaining, err + return remaining, gcBehavior_Continue, err } func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, eg *errgroup.Group, offsetRecords offsetRecSlice, found func(context.Context, CompressedChunk), stats *Stats) error { @@ -498,7 +515,7 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc( // chunks remaining will be set to false upon return. If some are not here, // then remaining will be true. The result offsetRecSlice is sorted in offset // order. -func (tr tableReader) findOffsets(reqs []getRecord) (ors offsetRecSlice, remaining bool, err error) { +func (tr tableReader) findOffsets(reqs []getRecord, keeper keeperF) (ors offsetRecSlice, remaining bool, gcb gcBehavior, err error) { filterIdx := uint32(0) filterLen := uint32(len(tr.prefixes)) ors = make(offsetRecSlice, 0, len(reqs)) @@ -541,13 +558,16 @@ func (tr tableReader) findOffsets(reqs []getRecord) (ors offsetRecSlice, remaini for j := filterIdx; j < filterLen && req.prefix == tr.prefixes[j]; j++ { m, err := tr.idx.entrySuffixMatches(j, req.a) if err != nil { - return nil, false, err + return nil, false, gcBehavior_Continue, err } if m { + if keeper != nil && keeper(*req.a) { + return nil, false, gcBehavior_Block, nil + } reqs[i].found = true entry, err := tr.idx.indexEntry(j, nil) if err != nil { - return nil, false, err + return nil, false, gcBehavior_Continue, err } ors = append(ors, offsetRec{req.a, entry.Offset(), entry.Length()}) break @@ -560,7 +580,7 @@ func (tr tableReader) findOffsets(reqs []getRecord) (ors offsetRecSlice, remaini } sort.Sort(ors) - return ors, remaining, nil + return ors, remaining, gcBehavior_Continue, nil } func canReadAhead(fRec offsetRec, curStart, curEnd, blockSize uint64) (newEnd uint64, canRead bool) { @@ -584,12 +604,15 @@ func canReadAhead(fRec offsetRec, curStart, curEnd, blockSize uint64) (newEnd ui return fRec.offset + uint64(fRec.length), true } -func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) { +func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64, keeper keeperF) (int, bool, gcBehavior, error) { var offsetRecords offsetRecSlice // Pass #1: Build the set of table locations which must be read in order to find all the elements of |reqs| which are present in this table. - offsetRecords, remaining, err = tr.findOffsets(reqs) + offsetRecords, remaining, gcb, err := tr.findOffsets(reqs, keeper) if err != nil { - return 0, false, err + return 0, false, gcb, err + } + if gcb != gcBehavior_Continue { + return 0, false, gcb, nil } // Now |offsetRecords| contains all locations within the table which must @@ -597,6 +620,7 @@ func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int, // location). Scan forward, grouping sequences of reads into large physical // reads. + var reads int var readStart, readEnd uint64 readStarted := false @@ -622,7 +646,7 @@ func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int, readStarted = false } - return + return reads, remaining, gcBehavior_Continue, err } func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord) error { @@ -681,11 +705,14 @@ func (tr tableReader) reader(ctx context.Context) (io.ReadCloser, uint64, error) return r, sz, nil } -func (tr tableReader) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) { +func (tr tableReader) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) { // findOffsets sets getRecord.found - recs, _, err := tr.findOffsets(requests) + recs, _, gcb, err := tr.findOffsets(requests, keeper) if err != nil { - return nil, err + return nil, gcb, err + } + if gcb != gcBehavior_Continue { + return nil, gcb, nil } ranges := make(map[hash.Hash]Range, len(recs)) for _, r := range recs { @@ -694,7 +721,7 @@ func (tr tableReader) getRecordRanges(ctx context.Context, requests []getRecord) Length: r.length, } } - return ranges, nil + return ranges, gcBehavior_Continue, nil } func (tr tableReader) currentSize() uint64 { diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index 185743199a..88fd92de58 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -58,58 +58,62 @@ type tableSet struct { rl chan struct{} } -func (ts tableSet) has(h hash.Hash) (bool, error) { - f := func(css chunkSourceSet) (bool, error) { +func (ts tableSet) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) { + f := func(css chunkSourceSet) (bool, gcBehavior, error) { for _, haver := range css { - has, err := haver.has(h) - + has, gcb, err := haver.has(h, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return false, gcb, nil } - if has { - return true, nil + return true, gcBehavior_Continue, nil } } - return false, nil + return false, gcBehavior_Continue, nil } - novelHas, err := f(ts.novel) - + novelHas, gcb, err := f(ts.novel) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return false, gcb, nil } - if novelHas { - return true, nil + return true, gcBehavior_Continue, nil } return f(ts.upstream) } -func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { - f := func(css chunkSourceSet) (bool, error) { +func (ts tableSet) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { + f := func(css chunkSourceSet) (bool, gcBehavior, error) { for _, haver := range css { - has, err := haver.hasMany(addrs) - + has, gcb, err := haver.hasMany(addrs, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return false, gcb, nil } - if !has { - return false, nil + return false, gcBehavior_Continue, nil } } - return true, nil + return true, gcBehavior_Continue, nil } - remaining, err := f(ts.novel) - + remaining, gcb, err := f(ts.novel) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return remaining, gcb, err } - if !remaining { - return false, nil + return false, gcBehavior_Continue, nil } return f(ts.upstream) @@ -124,7 +128,10 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { // consulted. Only used for part of the GC workflow where we want to have // access to all chunks in the store but need to check for existing chunk // presence in only a subset of its files. -func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remaining bool, err error) { +func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) { + var remaining bool + var err error + var gcb gcBehavior for _, rec := range addrs { if !rec.has { remaining = true @@ -132,7 +139,7 @@ func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remain } } if !remaining { - return false, nil + return false, gcBehavior_Continue, nil } for _, srcAddr := range srcs { src, ok := ts.novel[srcAddr] @@ -142,83 +149,114 @@ func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remain continue } } - remaining, err = src.hasMany(addrs) + remaining, gcb, err = src.hasMany(addrs, keeper) if err != nil { - return false, err + return false, gcb, err + } + if gcb != gcBehavior_Continue { + return false, gcb, nil } if !remaining { break } } - return remaining, nil + return remaining, gcBehavior_Continue, nil } -func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { +func (ts tableSet) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) { if err := ctx.Err(); err != nil { - return nil, err + return nil, gcBehavior_Continue, err } - f := func(css chunkSourceSet) ([]byte, error) { + f := func(css chunkSourceSet) ([]byte, gcBehavior, error) { for _, haver := range css { - data, err := haver.get(ctx, h, stats) - + data, gcb, err := haver.get(ctx, h, keeper, stats) if err != nil { - return nil, err + return nil, gcb, err + } + if gcb != gcBehavior_Continue { + return nil, gcb, nil } - if data != nil { - return data, nil + return data, gcBehavior_Continue, nil } } - - return nil, nil + return nil, gcBehavior_Continue, nil } - data, err := f(ts.novel) - + data, gcb, err := f(ts.novel) if err != nil { - return nil, err + return nil, gcb, err + } + if gcb != gcBehavior_Continue { + return nil, gcb, nil } - if data != nil { - return data, nil + return data, gcBehavior_Continue, nil } return f(ts.upstream) } -func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) { - f := func(css chunkSourceSet) bool { +func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + f := func(css chunkSourceSet) (bool, gcBehavior, error) { for _, haver := range css { - remaining, err = haver.getMany(ctx, eg, reqs, found, stats) + remaining, gcb, err := haver.getMany(ctx, eg, reqs, found, keeper, stats) if err != nil { - return true + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil } if !remaining { - return false + return false, gcb, nil } } - return true + return true, gcBehavior_Continue, nil + } + + remaining, gcb, err := f(ts.novel) + if err != nil { + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil + } + if !remaining { + return false, gcBehavior_Continue, nil } - return f(ts.novel) && err == nil && f(ts.upstream), err + return f(ts.upstream) } -func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) { - f := func(css chunkSourceSet) bool { +func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) { + f := func(css chunkSourceSet) (bool, gcBehavior, error) { for _, haver := range css { - remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats) + remaining, gcb, err := haver.getManyCompressed(ctx, eg, reqs, found, keeper, stats) if err != nil { - return true + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return true, gcb, nil } if !remaining { - return false + return false, gcBehavior_Continue, nil } } + return true, gcBehavior_Continue, nil + } - return true + remaining, gcb, err := f(ts.novel) + if err != nil { + return true, gcb, err + } + if gcb != gcBehavior_Continue { + return remaining, gcb, nil + } + if !remaining { + return false, gcBehavior_Continue, nil } - return f(ts.novel) && err == nil && f(ts.upstream), err + return f(ts.upstream) } func (ts tableSet) count() (uint32, error) { @@ -326,7 +364,7 @@ func (ts tableSet) Size() int { // append adds a memTable to an existing tableSet, compacting |mt| and // returning a new tableSet with newly compacted table added. -func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) { +func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, keeper keeperF, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, gcBehavior, error) { addrs := hash.NewHashSet() for _, getAddrs := range mt.getChildAddrs { getAddrs(ctx, addrs, func(h hash.Hash) bool { return hasCache.Contains(h) }) @@ -342,14 +380,17 @@ func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, h sort.Sort(hasRecordByPrefix(mt.pendingRefs)) absent, err := checker(mt.pendingRefs) if err != nil { - return tableSet{}, err + return tableSet{}, gcBehavior_Continue, err } else if absent.Size() > 0 { - return tableSet{}, fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String()) + return tableSet{}, gcBehavior_Continue, fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String()) } - cs, err := ts.p.Persist(ctx, mt, ts, stats) + cs, gcb, err := ts.p.Persist(ctx, mt, ts, keeper, stats) if err != nil { - return tableSet{}, err + return tableSet{}, gcBehavior_Continue, err + } + if gcb != gcBehavior_Continue { + return tableSet{}, gcb, nil } newTs := tableSet{ @@ -360,7 +401,7 @@ func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, h rl: ts.rl, } newTs.novel[cs.hash()] = cs - return newTs, nil + return newTs, gcBehavior_Continue, nil } // flatten returns a new tableSet with |upstream| set to the union of ts.novel @@ -500,11 +541,12 @@ func (ts tableSet) toSpecs() ([]tableSpec, error) { return tableSpecs, nil } -func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) { +func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64, keeper keeperF) (reads int, split, remaining bool, gcb gcBehavior, err error) { all := copyChunkSourceSet(ts.upstream) for a, cs := range ts.novel { all[a] = cs } + gcb = gcBehavior_Continue for _, tbl := range all { rdr, ok := tbl.(*fileTableReader) if !ok { @@ -514,9 +556,12 @@ func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads i var n int var more bool - n, more, err = rdr.calcReads(reqs, blockSize) + n, more, gcb, err = rdr.calcReads(reqs, blockSize, keeper) if err != nil { - return 0, false, false, err + return 0, false, false, gcb, err + } + if gcb != gcBehavior_Continue { + return 0, false, false, gcb, nil } reads += n diff --git a/go/store/nbs/table_set_test.go b/go/store/nbs/table_set_test.go index b7d54cbd09..e1cfcef3da 100644 --- a/go/store/nbs/table_set_test.go +++ b/go/store/nbs/table_set_test.go @@ -41,7 +41,7 @@ var hasManyHasAll = func([]hasRecord) (hash.HashSet, error) { func TestTableSetPrependEmpty(t *testing.T) { hasCache, err := lru.New2Q[hash.Hash, struct{}](1024) require.NoError(t, err) - ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), hasManyHasAll, hasCache, &Stats{}) + ts, _, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) specs, err := ts.toSpecs() require.NoError(t, err) @@ -61,7 +61,7 @@ func TestTableSetPrepend(t *testing.T) { mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) hasCache, err := lru.New2Q[hash.Hash, struct{}](1024) require.NoError(t, err) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) firstSpecs, err := ts.toSpecs() @@ -71,7 +71,7 @@ func TestTableSetPrepend(t *testing.T) { mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) secondSpecs, err := ts.toSpecs() @@ -93,17 +93,17 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) hasCache, err := lru.New2Q[hash.Hash, struct{}](1024) require.NoError(t, err) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) specs, err = ts.toSpecs() @@ -124,17 +124,17 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) { mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) hasCache, err := lru.New2Q[hash.Hash, struct{}](1024) require.NoError(t, err) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) ts, err = ts.flatten(context.Background()) @@ -146,7 +146,7 @@ func persist(t *testing.T, p tablePersister, chunks ...[]byte) { for _, c := range chunks { mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(c), c) - cs, err := p.Persist(context.Background(), mt, nil, &Stats{}) + cs, _, err := p.Persist(context.Background(), mt, nil, nil, &Stats{}) require.NoError(t, err) require.NoError(t, cs.close()) } @@ -164,7 +164,7 @@ func TestTableSetRebase(t *testing.T) { for _, c := range chunks { mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(c), c) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) } return ts @@ -213,13 +213,13 @@ func TestTableSetPhysicalLen(t *testing.T) { mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) hasCache, err := lru.New2Q[hash.Hash, struct{}](1024) require.NoError(t, err) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.append(context.Background(), mt, hasManyHasAll, hasCache, &Stats{}) + ts, _, err = ts.append(context.Background(), mt, hasManyHasAll, nil, hasCache, &Stats{}) require.NoError(t, err) assert.True(mustUint64(ts.physicalLen()) > indexSize(mustUint32(ts.count()))) diff --git a/go/store/nbs/table_test.go b/go/store/nbs/table_test.go index 596bebc689..e62bfc1618 100644 --- a/go/store/nbs/table_test.go +++ b/go/store/nbs/table_test.go @@ -62,7 +62,7 @@ func buildTable(chunks [][]byte) ([]byte, hash.Hash, error) { } func mustGetString(assert *assert.Assertions, ctx context.Context, tr tableReader, data []byte) string { - bytes, err := tr.get(ctx, computeAddr(data), &Stats{}) + bytes, _, err := tr.get(ctx, computeAddr(data), nil, &Stats{}) assert.NoError(err) return string(bytes) } @@ -106,13 +106,13 @@ func TestSimple(t *testing.T) { func assertChunksInReader(chunks [][]byte, r chunkReader, assert *assert.Assertions) { for _, c := range chunks { - assert.True(r.has(computeAddr(c))) + assert.True(r.has(computeAddr(c), nil)) } } func assertChunksNotInReader(chunks [][]byte, r chunkReader, assert *assert.Assertions) { for _, c := range chunks { - assert.False(r.has(computeAddr(c))) + assert.False(r.has(computeAddr(c), nil)) } } @@ -142,7 +142,7 @@ func TestHasMany(t *testing.T) { } sort.Sort(hasRecordByPrefix(hasAddrs)) - _, err = tr.hasMany(hasAddrs) + _, _, err = tr.hasMany(hasAddrs, nil) require.NoError(t, err) for _, ha := range hasAddrs { assert.True(ha.has, "Nothing for prefix %d", ha.prefix) @@ -192,7 +192,7 @@ func TestHasManySequentialPrefix(t *testing.T) { hasAddrs[0] = hasRecord{&addrs[1], addrs[1].Prefix(), 1, false} hasAddrs[1] = hasRecord{&addrs[2], addrs[2].Prefix(), 2, false} - _, err = tr.hasMany(hasAddrs) + _, _, err = tr.hasMany(hasAddrs, nil) require.NoError(t, err) for _, ha := range hasAddrs { @@ -246,7 +246,7 @@ func BenchmarkHasMany(b *testing.B) { b.Run("dense has many", func(b *testing.B) { var ok bool for i := 0; i < b.N; i++ { - ok, err = tr.hasMany(hrecs) + ok, _, err = tr.hasMany(hrecs, nil) } assert.False(b, ok) assert.NoError(b, err) @@ -254,7 +254,7 @@ func BenchmarkHasMany(b *testing.B) { b.Run("sparse has many", func(b *testing.B) { var ok bool for i := 0; i < b.N; i++ { - ok, err = tr.hasMany(sparse) + ok, _, err = tr.hasMany(sparse, nil) } assert.True(b, ok) assert.NoError(b, err) @@ -290,7 +290,7 @@ func TestGetMany(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) got := make([]*chunks.Chunk, 0) - _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, &Stats{}) + _, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, nil, &Stats{}) require.NoError(t, err) require.NoError(t, eg.Wait()) @@ -324,13 +324,13 @@ func TestCalcReads(t *testing.T) { gb2 := []getRecord{getBatch[0], getBatch[2]} sort.Sort(getRecordByPrefix(getBatch)) - reads, remaining, err := tr.calcReads(getBatch, 0) + reads, remaining, _, err := tr.calcReads(getBatch, 0, nil) require.NoError(t, err) assert.False(remaining) assert.Equal(1, reads) sort.Sort(getRecordByPrefix(gb2)) - reads, remaining, err = tr.calcReads(gb2, 0) + reads, remaining, _, err = tr.calcReads(gb2, 0, nil) require.NoError(t, err) assert.False(remaining) assert.Equal(2, reads) @@ -398,8 +398,8 @@ func Test65k(t *testing.T) { for i := 0; i < count; i++ { data := dataFn(i) h := computeAddr(data) - assert.True(tr.has(computeAddr(data))) - bytes, err := tr.get(context.Background(), h, &Stats{}) + assert.True(tr.has(computeAddr(data), nil)) + bytes, _, err := tr.get(context.Background(), h, nil, &Stats{}) require.NoError(t, err) assert.Equal(string(data), string(bytes)) } @@ -407,8 +407,8 @@ func Test65k(t *testing.T) { for i := count; i < count*2; i++ { data := dataFn(i) h := computeAddr(data) - assert.False(tr.has(computeAddr(data))) - bytes, err := tr.get(context.Background(), h, &Stats{}) + assert.False(tr.has(computeAddr(data), nil)) + bytes, _, err := tr.get(context.Background(), h, nil, &Stats{}) require.NoError(t, err) assert.NotEqual(string(data), string(bytes)) } @@ -461,7 +461,7 @@ func doTestNGetMany(t *testing.T, count int) { eg, ctx := errgroup.WithContext(context.Background()) got := make([]*chunks.Chunk, 0) - _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, &Stats{}) + _, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, nil, &Stats{}) require.NoError(t, err) require.NoError(t, eg.Wait())