Skip to content

Commit

Permalink
Merge pull request #8752 from dolthub/aaron/gc-walk-in-chunkstore
Browse files Browse the repository at this point in the history
go/store/{nbs,types}: GC: Move the reference walk from types to nbs.
  • Loading branch information
reltuk authored Jan 16, 2025
2 parents d2c94f3 + f5a69f5 commit 47d9ff7
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 436 deletions.
38 changes: 26 additions & 12 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,26 @@ var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")
// GCFinalizer interface.
type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error)

// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed.
// A MarkAndSweeper is returned from MarkAndSweepChunks and allows a caller
// to save chunks, and all chunks reachable from them, from the source store
// into the destination store. |SaveHashes| is called one or more times,
// passing in hashes which should be saved. Then |Close| is called and the
// |GCFinalizer| is used to complete the process.
type MarkAndSweeper interface {
// Ensures that the chunks corresponding to the passed hashes, and all
// the chunks reachable from them, are copied into the destination
// store. Passed and reachable chunks are filtered by the |filter| that
// was supplied to |MarkAndSweepChunks|. It is safe to pass a given
// hash more than once; it will only ever be copied once.
//
// A call to this function blocks until the entire transitive set of
// chunks is accessed and copied.
SaveHashes(context.Context, []hash.Hash) error

Close(context.Context) (GCFinalizer, error)
}

// A GCFinalizer is returned from a MarkAndSweeper after it is closed.
//
// A GCFinalizer is a handle to one or more table files which has been
// constructed as part of the GC process. It can be used to add the table files
Expand Down Expand Up @@ -210,17 +229,12 @@ type ChunkStoreGarbageCollector interface {
// addChunk function must not be called after this function.
EndGC()

// MarkAndSweepChunks is expected to read chunk addresses off of
// |hashes|, which represent chunks which should be copied into the
// provided |dest| store. Once |hashes| is closed,
// MarkAndSweepChunks is expected to update the contents of the store
// to only include the chunk whose addresses which were sent along on
// |hashes|.
//
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)
// MarkAndSweepChunks returns a handle that can be used to supply
// hashes which should be saved into |dest|. The hashes are
// filtered through the |filter| and their references are walked with
// |getAddrs|, each of those addresses being filtered and copied as
// well.
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
72 changes: 49 additions & 23 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,49 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
type msvMarkAndSweeper struct {
ms *MemoryStoreView

getAddrs GetAddrsCurry
filter HasManyFunc

keepers map[hash.Hash]Chunk
}

func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
newAddrs := hash.NewHashSet(hashes...)
for {
for h := range i.keepers {
delete(newAddrs, h)
}
filtered, err := i.filter(ctx, newAddrs)
if err != nil {
return err
}
if len(filtered) == 0 {
break
}
newAddrs = make(hash.HashSet)
for h := range filtered {
c, err := i.ms.Get(ctx, h)
if err != nil {
return err
}
i.keepers[h] = c
err = i.getAddrs(c)(ctx, newAddrs, NoopPendingRefExists)
if err != nil {
return err
}
}
}
return nil
}

func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) {
return msvGcFinalizer{i.ms, i.keepers}, nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
if dest != ms {
panic("unsupported")
}
Expand All @@ -371,28 +413,12 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan
}
ms.mu.Unlock()

keepers := make(map[hash.Hash]Chunk, ms.storage.Len())

LOOP:
for {
select {
case hs, ok := <-hashes:
if !ok {
break LOOP
}
for _, h := range hs {
c, err := ms.Get(ctx, h)
if err != nil {
return nil, err
}
keepers[h] = c
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

return msvGcFinalizer{ms, keepers}, nil
return &msvMarkAndSweeper{
ms: ms,
getAddrs: getAddrs,
filter: filter,
keepers: make(map[hash.Hash]Chunk),
}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
8 changes: 8 additions & 0 deletions go/store/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func (hs HashSet) Empty() {
}
}

func (hs HashSet) ToSlice() HashSlice {
ret := make(HashSlice, 0, len(hs))
for h := range hs {
ret = append(ret, h)
}
return ret
}

func (hs HashSet) String() string {
var sb strings.Builder
sb.Grow(len(hs)*34 + 100)
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
174 changes: 117 additions & 57 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand Down Expand Up @@ -1647,18 +1647,127 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
destNBS = nbs
}

specs, err := copyMarkedChunks(ctx, hashes, src, destNBS)
tfp, ok := destNBS.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
}
if ctx.Err() != nil {
return nil, ctx.Err()

return &markAndSweeper{
src: src,
dest: destNBS,
getAddrs: getAddrs,
filter: filter,
visited: make(hash.HashSet),
tfp: tfp,
gcc: gcc,
mode: mode,
}, nil
}

type markAndSweeper struct {
src NBSCompressedChunkStore
dest *NomsBlockStore
getAddrs chunks.GetAddrsCurry
filter chunks.HasManyFunc

visited hash.HashSet

tfp tableFilePersister
gcc *gcCopier
mode chunks.GCMode
}

func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
toVisit := make(hash.HashSet, len(hashes))
for _, h := range hashes {
if _, ok := i.visited[h]; !ok {
toVisit.Insert(h)
}
}
var err error
var mu sync.Mutex
first := true
for {
if !first {
copy := toVisit.Copy()
for h := range toVisit {
if _, ok := i.visited[h]; ok {
delete(copy, h)
}
}
toVisit = copy
}

toVisit, err = i.filter(ctx, toVisit)
if err != nil {
return err
}
if len(toVisit) == 0 {
break
}

first = false
nextToVisit := make(hash.HashSet)

found := 0
var addErr error
err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
if cc.IsGhost() {
// Ghost chunks encountered on the walk can be left alone --- they
// do not bring their dependencies, and because of how generational
// store works, they will still be ghost chunks
// in the store after the GC is finished.
return
}
addErr = i.gcc.addChunk(ctx, cc)
if addErr != nil {
return
}
c, err := cc.ToChunk()
if err != nil {
addErr = err
return
}
addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false })
})
if err != nil {
return err
}
if addErr != nil {
return addErr
}
if found != len(toVisit) {
return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit)
}

i.visited.InsertAll(toVisit)

toVisit = nextToVisit
}
return nil
}

func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) {
specs, err := i.gcc.copyTablesToDir(ctx, i.tfp)
if err != nil {
return nil, err
}

return gcFinalizer{
nbs: destNBS,
nbs: i.dest,
specs: specs,
mode: mode,
mode: i.mode,
}, nil
}

Expand All @@ -1684,55 +1793,6 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
tfp, ok := dest.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
}

// TODO: We should clean up gcc on error.

LOOP:
for {
select {
case hs, ok := <-keepChunks:
if !ok {
break LOOP
}
var addErr error
mu := new(sync.Mutex)
hashset := hash.NewHashSet(hs...)
found := 0
err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
addErr = gcc.addChunk(ctx, c)
})
if err != nil {
return nil, err
}
if addErr != nil {
return nil, addErr
}
if found != len(hashset) {
return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return gcc.copyTablesToDir(ctx, tfp)
}

func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
for _, v := range nbs.tables.novel {
err := v.iterateAllChunks(ctx, cb)
Expand Down
Loading

0 comments on commit 47d9ff7

Please sign in to comment.