Skip to content

Commit

Permalink
go/store/{nbs,types}: GC: Move the reference walk from types to nbs.
Browse files Browse the repository at this point in the history
Make the ChunkStore itself responsible for the reference walk, being given handles for walking references and excluding chunks as part of the GC process. This is an incremental step towards adding dependencies on read chunks during the GC process. The ChunkStore can better distinguish whether the read is part of the GC process itself or whether it came from the application layer. It also allows better management of cache impact and the potential for better memory usage.

This transformation gets rid of parallel reference walking and some manual batching which was present in the ValueStore implementation of reference walking. The parallel reference walking was necessary for reasonable performance in format __LD_1__, but it's actually not necessary in __DOLT__. For some use cases it's a slight win, but the simplification involved in getting rid of it is worth it for now.
  • Loading branch information
reltuk committed Jan 15, 2025
1 parent 1681cbd commit f5a69f5
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 436 deletions.
38 changes: 26 additions & 12 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,26 @@ var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")
// GCFinalizer interface.
type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error)

// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed.
// A MarkAndSweeper is returned from MarkAndSweepChunks and allows a caller
// to save chunks, and all chunks reachable from them, from the source store
// into the destination store. |SaveHashes| is called one or more times,
// passing in hashes which should be saved. Then |Close| is called and the
// |GCFinalizer| is used to complete the process.
type MarkAndSweeper interface {
// Ensures that the chunks corresponding to the passed hashes, and all
// the chunks reachable from them, are copied into the destination
// store. Passed and reachable chunks are filtered by the |filter| that
// was supplied to |MarkAndSweepChunks|. It is safe to pass a given
// hash more than once; it will only ever be copied once.
//
// A call to this function blocks until the entire transitive set of
// chunks is accessed and copied.
SaveHashes(context.Context, []hash.Hash) error

Close(context.Context) (GCFinalizer, error)
}

// A GCFinalizer is returned from a MarkAndSweeper after it is closed.
//
// A GCFinalizer is a handle to one or more table files which has been
// constructed as part of the GC process. It can be used to add the table files
Expand Down Expand Up @@ -210,17 +229,12 @@ type ChunkStoreGarbageCollector interface {
// addChunk function must not be called after this function.
EndGC()

// MarkAndSweepChunks is expected to read chunk addresses off of
// |hashes|, which represent chunks which should be copied into the
// provided |dest| store. Once |hashes| is closed,
// MarkAndSweepChunks is expected to update the contents of the store
// to only include the chunk whose addresses which were sent along on
// |hashes|.
//
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)
// MarkAndSweepChunks returns a handle that can be used to supply
// hashes which should be saved into |dest|. The hashes are
// filtered through the |filter| and their references are walked with
// |getAddrs|, each of those addresses being filtered and copied as
// well.
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
72 changes: 49 additions & 23 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,49 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
type msvMarkAndSweeper struct {
ms *MemoryStoreView

getAddrs GetAddrsCurry
filter HasManyFunc

keepers map[hash.Hash]Chunk
}

func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
newAddrs := hash.NewHashSet(hashes...)
for {
for h := range i.keepers {
delete(newAddrs, h)
}
filtered, err := i.filter(ctx, newAddrs)
if err != nil {
return err
}
if len(filtered) == 0 {
break
}
newAddrs = make(hash.HashSet)
for h := range filtered {
c, err := i.ms.Get(ctx, h)
if err != nil {
return err
}
i.keepers[h] = c
err = i.getAddrs(c)(ctx, newAddrs, NoopPendingRefExists)
if err != nil {
return err
}
}
}
return nil
}

func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) {
return msvGcFinalizer{i.ms, i.keepers}, nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
if dest != ms {
panic("unsupported")
}
Expand All @@ -371,28 +413,12 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan
}
ms.mu.Unlock()

keepers := make(map[hash.Hash]Chunk, ms.storage.Len())

LOOP:
for {
select {
case hs, ok := <-hashes:
if !ok {
break LOOP
}
for _, h := range hs {
c, err := ms.Get(ctx, h)
if err != nil {
return nil, err
}
keepers[h] = c
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

return msvGcFinalizer{ms, keepers}, nil
return &msvMarkAndSweeper{
ms: ms,
getAddrs: getAddrs,
filter: filter,
keepers: make(map[hash.Hash]Chunk),
}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
8 changes: 8 additions & 0 deletions go/store/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func (hs HashSet) Empty() {
}
}

func (hs HashSet) ToSlice() HashSlice {
ret := make(HashSlice, 0, len(hs))
for h := range hs {
ret = append(ret, h)
}
return ret
}

func (hs HashSet) String() string {
var sb strings.Builder
sb.Grow(len(hs)*34 + 100)
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
174 changes: 117 additions & 57 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand Down Expand Up @@ -1647,18 +1647,127 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
destNBS = nbs
}

specs, err := copyMarkedChunks(ctx, hashes, src, destNBS)
tfp, ok := destNBS.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
}
if ctx.Err() != nil {
return nil, ctx.Err()

return &markAndSweeper{
src: src,
dest: destNBS,
getAddrs: getAddrs,
filter: filter,
visited: make(hash.HashSet),
tfp: tfp,
gcc: gcc,
mode: mode,
}, nil
}

type markAndSweeper struct {
src NBSCompressedChunkStore
dest *NomsBlockStore
getAddrs chunks.GetAddrsCurry
filter chunks.HasManyFunc

visited hash.HashSet

tfp tableFilePersister
gcc *gcCopier
mode chunks.GCMode
}

func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
toVisit := make(hash.HashSet, len(hashes))
for _, h := range hashes {
if _, ok := i.visited[h]; !ok {
toVisit.Insert(h)
}
}
var err error
var mu sync.Mutex
first := true
for {
if !first {
copy := toVisit.Copy()
for h := range toVisit {
if _, ok := i.visited[h]; ok {
delete(copy, h)
}
}
toVisit = copy
}

toVisit, err = i.filter(ctx, toVisit)
if err != nil {
return err
}
if len(toVisit) == 0 {
break
}

first = false
nextToVisit := make(hash.HashSet)

found := 0
var addErr error
err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
if cc.IsGhost() {
// Ghost chunks encountered on the walk can be left alone --- they
// do not bring their dependencies, and because of how generational
// store works, they will still be ghost chunks
// in the store after the GC is finished.
return
}
addErr = i.gcc.addChunk(ctx, cc)
if addErr != nil {
return
}
c, err := cc.ToChunk()
if err != nil {
addErr = err
return
}
addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false })
})
if err != nil {
return err
}
if addErr != nil {
return addErr
}
if found != len(toVisit) {
return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit)
}

i.visited.InsertAll(toVisit)

toVisit = nextToVisit
}
return nil
}

func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) {
specs, err := i.gcc.copyTablesToDir(ctx, i.tfp)
if err != nil {
return nil, err
}

return gcFinalizer{
nbs: destNBS,
nbs: i.dest,
specs: specs,
mode: mode,
mode: i.mode,
}, nil
}

Expand All @@ -1684,55 +1793,6 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
tfp, ok := dest.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
}

// TODO: We should clean up gcc on error.

LOOP:
for {
select {
case hs, ok := <-keepChunks:
if !ok {
break LOOP
}
var addErr error
mu := new(sync.Mutex)
hashset := hash.NewHashSet(hs...)
found := 0
err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
addErr = gcc.addChunk(ctx, c)
})
if err != nil {
return nil, err
}
if addErr != nil {
return nil, addErr
}
if found != len(hashset) {
return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return gcc.copyTablesToDir(ctx, tfp)
}

func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
for _, v := range nbs.tables.novel {
err := v.iterateAllChunks(ctx, cb)
Expand Down
Loading

0 comments on commit f5a69f5

Please sign in to comment.