Skip to content

Commit

Permalink
[no-release-notes] go/store/nbs: Make CompressedChunk support ghost c…
Browse files Browse the repository at this point in the history
…hunks and make and GhostBlockStore support GetManyCompressed.

As part of ongoing work on GC, we are moving the references walk from
store/types to be the responsibility of the ChunkStore itself. It is most
natural to express it if a generational store's GetManyCompressed can return
ghost chunks as well.
  • Loading branch information
reltuk committed Jan 15, 2025
1 parent 6392c11 commit f45fa52
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 36 deletions.
3 changes: 3 additions & 0 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
if cChk.IsGhost() {
return fmt.Errorf("attempted to push or pull ghost chunk: %w", nbs.ErrGhostChunkRequested)
}
if len(cChk.FullCompressedChunk) == 0 {
return errors.New("failed to get all chunks.")
}
Expand Down
6 changes: 6 additions & 0 deletions go/store/nbs/cmp_chunk_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte {

// AddCmpChunk adds a compressed chunk
func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
if c.IsGhost() {
// Ghost chunks cannot be written to a table file. They should
// always be filtered by the write processes before landing
// here.
return ErrGhostChunkRequested
}
if len(c.CompressedData) == 0 {
panic("NBS blocks cannot be zero length")
}
Expand Down
42 changes: 10 additions & 32 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
)

var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

type GenerationalNBS struct {
oldGen *NomsBlockStore
Expand Down Expand Up @@ -143,60 +143,38 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo
}

func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
mu := &sync.Mutex{}
var mu sync.Mutex
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notInOldGen, chunk.Hash())
}()

mu.Lock()
delete(notInOldGen, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})

if err != nil {
return err
}

if len(notInOldGen) == 0 {
return nil
}

notFound := notInOldGen.Copy()
err = gcs.newGen.GetManyCompressed(ctx, notInOldGen, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notFound, chunk.Hash())
}()
mu.Lock()
delete(notFound, chunk.Hash())
mu.Unlock()
found(ctx, chunk)
})
if err != nil {
return err
}

if len(notFound) == 0 {
return nil
}

// We are definitely missing some chunks. Check if any are ghost chunks, mainly to give a better error message.
// The missing chunks may be ghost chunks.
if gcs.ghostGen != nil {
// If any of the hashes are in the ghost store.
ghostFound := false
err := gcs.ghostGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) {
// This should be true for all chunks in the ghost store.
if chunk.IsGhost() {
ghostFound = true
}
})

if err != nil {
return err
}
if ghostFound {
return ErrGhostChunkRequested
}
return gcs.ghostGen.GetManyCompressed(ctx, notFound, found)
}
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions go/store/nbs/ghost_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type GhostBlockStore struct {
ghostObjectsFile string
}

// We use the Has, HasMany, Get, GetMany, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = &GhostBlockStore{}
// We use the Has, HasMany, Get, GetMany, GetManyCompressed, and PersistGhostHashes methods from the ChunkStore interface. All other methods are not supported.
var _ chunks.ChunkStore = (*GhostBlockStore)(nil)
var _ NBSCompressedChunkStore = (*GenerationalNBS)(nil)

// NewGhostBlockStore returns a new GhostBlockStore instance. Currently the only parameter is the path to the directory
// where we will create a text file called ghostObjects.txt. This file will contain the hashes of the ghost objects. Creation
Expand Down Expand Up @@ -87,6 +88,15 @@ func (g GhostBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found
return nil
}

func (g GhostBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
for h := range hashes {
if g.skippedRefs.Has(h) {
found(ctx, NewGhostCompressedChunk(h))
}
}
return nil
}

func (g *GhostBlockStore) PersistGhostHashes(ctx context.Context, hashes hash.HashSet) error {
if hashes.Size() == 0 {
return fmt.Errorf("runtime error. PersistGhostHashes called with empty hash set")
Expand Down
17 changes: 15 additions & 2 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CompressedChunk struct {

// CompressedData is just the snappy encoded byte buffer that stores the chunk data
CompressedData []byte

// true if the chunk is a ghost chunk.
ghost bool
}

// NewCompressedChunk creates a CompressedChunk
Expand All @@ -64,14 +67,20 @@ func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil
}

func NewGhostCompressedChunk(h hash.Hash) CompressedChunk {
return CompressedChunk{H: h, ghost: true}
}

// ToChunk snappy decodes the compressed data and returns a chunks.Chunk
func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) {
data, err := snappy.Decode(nil, cmp.CompressedData)
if cmp.IsGhost() {
return *chunks.NewGhostChunk(cmp.H), nil
}

data, err := snappy.Decode(nil, cmp.CompressedData)
if err != nil {
return chunks.Chunk{}, err
}

return chunks.NewChunkWithHash(cmp.H, data), nil
}

Expand All @@ -96,6 +105,10 @@ func (cmp CompressedChunk) IsEmpty() bool {
return len(cmp.CompressedData) == 0 || (len(cmp.CompressedData) == 1 && cmp.CompressedData[0] == 0)
}

func (cmp CompressedChunk) IsGhost() bool {
return cmp.ghost
}

// CompressedSize returns the size of this CompressedChunk.
func (cmp CompressedChunk) CompressedSize() int {
return len(cmp.CompressedData)
Expand Down

0 comments on commit f45fa52

Please sign in to comment.