Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go/store/nbs: During a GC process, take dependencies on chunks that are read through the ChunkStore. #8760

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
7 changes: 4 additions & 3 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func gatherAllChunks(ctx context.Context, cs chunkSource, idx tableIndex, stats
return nil, nil, err
}

bytes, err := cs.get(ctx, h, stats)
bytes, _, err := cs.get(ctx, h, nil, stats)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -907,7 +907,7 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats *
return chk, nil
}

bytes, err := csc.cs.get(ctx, h, stats)
bytes, _, err := csc.cs.get(ctx, h, nil, stats)
if bytes == nil || err != nil {
return nil, err
}
Expand All @@ -919,7 +919,8 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats *

// has returns true if the chunk is in the ChunkSource. This is not related to what is cached, just a helper.
func (csc *simpleChunkSourceCache) has(h hash.Hash) (bool, error) {
return csc.cs.has(h)
res, _, err := csc.cs.has(h, nil)
return res, err
}

// addresses get all chunk addresses of the ChunkSource as a hash.HashSet.
Expand Down
53 changes: 37 additions & 16 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,63 @@ func openReader(file string) (io.ReaderAt, uint64, error) {
return f, uint64(stat.Size()), nil
}

func (acs archiveChunkSource) has(h hash.Hash) (bool, error) {
return acs.aRdr.has(h), nil
func (acs archiveChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) {
res := acs.aRdr.has(h)
if res && keeper != nil && keeper(h) {
return false, gcBehavior_Block, nil
}
return res, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) hasMany(addrs []hasRecord) (bool, error) {
func (acs archiveChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
// single threaded first pass.
foundAll := true
for i, addr := range addrs {
if acs.aRdr.has(*(addr.a)) {
h := *addr.a
if acs.aRdr.has(h) {
if keeper != nil && keeper(h) {
return false, gcBehavior_Block, nil
}
addrs[i].has = true
} else {
foundAll = false
}
}
return !foundAll, nil
return !foundAll, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
// ctx, stats ? NM4.
return acs.aRdr.get(h)
func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) {
res, err := acs.aRdr.get(h)
if err != nil {
return nil, gcBehavior_Continue, err
}
if res != nil && keeper != nil && keeper(h) {
return nil, gcBehavior_Block, nil
}
return res, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
// single threaded first pass.
foundAll := true
for i, req := range reqs {
data, err := acs.aRdr.get(*req.a)
if err != nil || data == nil {
h := *req.a
data, err := acs.aRdr.get(h)
if err != nil {
return true, gcBehavior_Continue, err
}
if data == nil {
foundAll = false
} else {
if keeper != nil && keeper(h) {
return true, gcBehavior_Block, nil
}
chunk := chunks.NewChunk(data)
found(ctx, &chunk)
reqs[i].found = true
}
}
return !foundAll, nil
return !foundAll, gcBehavior_Continue, nil
}

// iterate iterates over the archive chunks. The callback is called for each chunk in the archive. This is not optimized
Expand Down Expand Up @@ -146,14 +167,14 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
return archiveChunkSource{acs.file, rdr}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) {
return nil, errors.New("Archive chunk source does not support getRecordRanges")
func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord, _ keeperF) (map[hash.Hash]Range, gcBehavior, error) {
return nil, gcBehavior_Continue, errors.New("Archive chunk source does not support getRecordRanges")
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
found(ctx, ChunkToCompressedChunk(*chk))
}, stats)
}, keeper, stats)
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
Expand Down
16 changes: 8 additions & 8 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,28 +655,28 @@ type testChunkSource struct {

var _ chunkSource = (*testChunkSource)(nil)

func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ keeperF, _ *Stats) ([]byte, gcBehavior, error) {
for _, chk := range tcs.chunks {
if chk.Hash() == h {
return chk.Data(), nil
return chk.Data(), gcBehavior_Continue, nil
}
}
return nil, errors.New("not found")
return nil, gcBehavior_Continue, errors.New("not found")
}

func (tcs *testChunkSource) has(h hash.Hash) (bool, error) {
func (tcs *testChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) hasMany(addrs []hasRecord) (bool, error) {
func (tcs *testChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
panic("never used")
}

Expand All @@ -700,7 +700,7 @@ func (tcs *testChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64,
panic("never used")
}

func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
panic("never used")
}

Expand Down
20 changes: 13 additions & 7 deletions go/store/nbs/aws_table_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,31 @@ func (s3p awsTablePersister) key(k string) string {
return k
}

func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
name, data, chunkCount, err := mt.write(haver, stats)

func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}
if gcb != gcBehavior_Continue {
return emptyChunkSource{}, gcb, nil
}

if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, gcBehavior_Continue, nil
}

err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String())

if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}

tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize)
src, err := newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
return src, gcBehavior_Continue, nil
}

func (s3p awsTablePersister) multipartUpload(ctx context.Context, r io.Reader, sz uint64, key string) error {
Expand Down
22 changes: 11 additions & 11 deletions go/store/nbs/aws_table_persister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
defer src.close()

Expand All @@ -108,7 +108,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
defer src.close()
if assert.True(mustUint32(src.count()) > 0) {
Expand All @@ -133,7 +133,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, existingTable, nil, &Stats{})
require.NoError(t, err)
defer src.close()
assert.True(mustUint32(src.count()) == 0)
Expand All @@ -148,7 +148,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

_, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
_, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
assert.Error(err)
})
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
for i := 0; i < len(chunks); i++ {
mt := newMemTable(uint64(2 * targetPartSize))
mt.addChunk(computeAddr(chunks[i]), chunks[i])
cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
}

var err error
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
}
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
Expand Down Expand Up @@ -417,9 +417,9 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
rand.Read(medChunks[i])
mt.addChunk(computeAddr(medChunks[i]), medChunks[i])
}
cs1, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs1, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
cs2, err := s3p.Persist(context.Background(), mtb, nil, &Stats{})
cs2, _, err := s3p.Persist(context.Background(), mtb, nil, nil, &Stats{})
require.NoError(t, err)
sources := chunkSources{cs1, cs2}

Expand Down Expand Up @@ -450,7 +450,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
mt := newMemTable(uint64(2 * targetPartSize))
mt.addChunk(computeAddr(smallChunks[i]), smallChunks[i])
var err error
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
}

Expand All @@ -461,7 +461,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
}

var err error
cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)

Expand All @@ -474,7 +474,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
mt.addChunk(computeAddr(medChunks[i]), medChunks[i])
}

cs, err = s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)

Expand Down
36 changes: 22 additions & 14 deletions go/store/nbs/bs_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ var _ tableFilePersister = &blobstorePersister{}

// Persist makes the contents of mt durable. Chunks already present in
// |haver| may be dropped in the process.
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
address, data, chunkCount, err := mt.write(haver, stats)
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, err
} else if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, gcBehavior_Continue, err
}
if gcb != gcBehavior_Continue {
return emptyChunkSource{}, gcb, nil
}
if chunkCount == 0 {
return emptyChunkSource{}, gcBehavior_Continue, nil
}
name := address.String()

Expand All @@ -59,24 +63,28 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver

// first write table records and tail (index+footer) as separate blobs
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
_, err = bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records))
return
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records))
return err
})
eg.Go(func() (err error) {
_, err = bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail))
return
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail))
return err
})
if err = eg.Wait(); err != nil {
return nil, err
return nil, gcBehavior_Continue, err
}

// then concatenate into a final blob
if _, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}); err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}
rdr := &bsTableReaderAt{name, bsp.bs}
return newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize)
src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
return src, gcBehavior_Continue, nil
}

// ConjoinAll implements tablePersister.
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/cmp_chunk_table_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
found := make([]CompressedChunk, 0)

eg, egCtx := errgroup.WithContext(ctx)
_, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, &Stats{})
_, _, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, nil, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())

Expand Down Expand Up @@ -146,7 +146,7 @@ func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader)
reqs := toGetRecords(hashes)
found := make([]*chunks.Chunk, 0)
eg, ctx := errgroup.WithContext(ctx)
_, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, &Stats{})
_, _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, nil, &Stats{})
if err != nil {
return nil, err
}
Expand Down
Loading
Loading