Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/store/{nbs,types}: GC: Move the reference walk from types to nbs. #8752

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,26 @@ var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")
// GCFinalizer interface.
type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error)

// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed.
// A MarkAndSweeper is returned from MarkAndSweepChunks and allows a caller
// to save chunks, and all chunks reachable from them, from the source store
// into the destination store. |SaveHashes| is called one or more times,
// passing in hashes which should be saved. Then |Close| is called and the
// |GCFinalizer| is used to complete the process.
type MarkAndSweeper interface {
// Ensures that the chunks corresponding to the passed hashes, and all
// the chunks reachable from them, are copied into the destination
// store. Passed and reachable chunks are filtered by the |filter| that
// was supplied to |MarkAndSweepChunks|. It is safe to pass a given
// hash more than once; it will only ever be copied once.
//
// A call to this function blocks until the entire transitive set of
// chunks is accessed and copied.
SaveHashes(context.Context, []hash.Hash) error

Close(context.Context) (GCFinalizer, error)
}

// A GCFinalizer is returned from a MarkAndSweeper after it is closed.
//
// A GCFinalizer is a handle to one or more table files which has been
// constructed as part of the GC process. It can be used to add the table files
Expand Down Expand Up @@ -210,17 +229,12 @@ type ChunkStoreGarbageCollector interface {
// addChunk function must not be called after this function.
EndGC()

// MarkAndSweepChunks is expected to read chunk addresses off of
// |hashes|, which represent chunks which should be copied into the
// provided |dest| store. Once |hashes| is closed,
// MarkAndSweepChunks is expected to update the contents of the store
// to only include the chunk whose addresses which were sent along on
// |hashes|.
//
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)
// MarkAndSweepChunks returns a handle that can be used to supply
// hashes which should be saved into |dest|. The hashes are
// filtered through the |filter| and their references are walked with
// |getAddrs|, each of those addresses being filtered and copied as
// well.
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
72 changes: 49 additions & 23 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,49 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
type msvMarkAndSweeper struct {
ms *MemoryStoreView

getAddrs GetAddrsCurry
filter HasManyFunc

keepers map[hash.Hash]Chunk
}

func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
newAddrs := hash.NewHashSet(hashes...)
for {
for h := range i.keepers {
delete(newAddrs, h)
}
filtered, err := i.filter(ctx, newAddrs)
if err != nil {
return err
}
if len(filtered) == 0 {
break
}
newAddrs = make(hash.HashSet)
for h := range filtered {
c, err := i.ms.Get(ctx, h)
if err != nil {
return err
}
i.keepers[h] = c
err = i.getAddrs(c)(ctx, newAddrs, NoopPendingRefExists)
if err != nil {
return err
}
}
}
return nil
}

func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) {
return msvGcFinalizer{i.ms, i.keepers}, nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
if dest != ms {
panic("unsupported")
}
Expand All @@ -371,28 +413,12 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan
}
ms.mu.Unlock()

keepers := make(map[hash.Hash]Chunk, ms.storage.Len())

LOOP:
for {
select {
case hs, ok := <-hashes:
if !ok {
break LOOP
}
for _, h := range hs {
c, err := ms.Get(ctx, h)
if err != nil {
return nil, err
}
keepers[h] = c
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

return msvGcFinalizer{ms, keepers}, nil
return &msvMarkAndSweeper{
ms: ms,
getAddrs: getAddrs,
filter: filter,
keepers: make(map[hash.Hash]Chunk),
}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
8 changes: 8 additions & 0 deletions go/store/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func (hs HashSet) Empty() {
}
}

func (hs HashSet) ToSlice() HashSlice {
ret := make(HashSlice, 0, len(hs))
for h := range hs {
ret = append(ret, h)
}
return ret
}

func (hs HashSet) String() string {
var sb strings.Builder
sb.Grow(len(hs)*34 + 100)
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
174 changes: 117 additions & 57 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand Down Expand Up @@ -1647,18 +1647,127 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
destNBS = nbs
}

specs, err := copyMarkedChunks(ctx, hashes, src, destNBS)
tfp, ok := destNBS.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat unrelated, but if this embedded tfp their relationship might be clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! I'll take a pass and potentially send out a separate PR :)

if err != nil {
return nil, err
}
if ctx.Err() != nil {
return nil, ctx.Err()

return &markAndSweeper{
src: src,
dest: destNBS,
getAddrs: getAddrs,
filter: filter,
visited: make(hash.HashSet),
tfp: tfp,
gcc: gcc,
mode: mode,
}, nil
}

type markAndSweeper struct {
src NBSCompressedChunkStore
dest *NomsBlockStore
getAddrs chunks.GetAddrsCurry
filter chunks.HasManyFunc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm hazy on what filter does, when would we discard hashes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. It's used for generational GC. So, when we collect newgen -> oldgen, we're walking refs and we want to stop the walk anytime we walk into the old gen. Then, after those chunks are in the old gen, when we collect newgen -> newgen, we want to stop the walk once again anytime we walk into the old gen.


visited hash.HashSet

tfp tableFilePersister
gcc *gcCopier
mode chunks.GCMode
}

func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error {
toVisit := make(hash.HashSet, len(hashes))
for _, h := range hashes {
if _, ok := i.visited[h]; !ok {
toVisit.Insert(h)
}
}
var err error
var mu sync.Mutex
first := true
for {
if !first {
copy := toVisit.Copy()
for h := range toVisit {
if _, ok := i.visited[h]; ok {
delete(copy, h)
}
}
toVisit = copy
}

toVisit, err = i.filter(ctx, toVisit)
if err != nil {
return err
}
if len(toVisit) == 0 {
break
}

first = false
nextToVisit := make(hash.HashSet)

found := 0
var addErr error
err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
if cc.IsGhost() {
// Ghost chunks encountered on the walk can be left alone --- they
// do not bring their dependencies, and because of how generational
// store works, they will still be ghost chunks
// in the store after the GC is finished.
return
}
addErr = i.gcc.addChunk(ctx, cc)
if addErr != nil {
return
}
c, err := cc.ToChunk()
if err != nil {
addErr = err
return
}
addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false })
})
if err != nil {
return err
}
if addErr != nil {
return addErr
}
if found != len(toVisit) {
return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit)
}

i.visited.InsertAll(toVisit)

toVisit = nextToVisit
}
return nil
}

func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) {
specs, err := i.gcc.copyTablesToDir(ctx, i.tfp)
if err != nil {
return nil, err
}

return gcFinalizer{
nbs: destNBS,
nbs: i.dest,
specs: specs,
mode: mode,
mode: i.mode,
}, nil
}

Expand All @@ -1684,55 +1793,6 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
tfp, ok := dest.p.(tableFilePersister)
if !ok {
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
if err != nil {
return nil, err
}

// TODO: We should clean up gcc on error.

LOOP:
for {
select {
case hs, ok := <-keepChunks:
if !ok {
break LOOP
}
var addErr error
mu := new(sync.Mutex)
hashset := hash.NewHashSet(hs...)
found := 0
err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
addErr = gcc.addChunk(ctx, c)
})
if err != nil {
return nil, err
}
if addErr != nil {
return nil, addErr
}
if found != len(hashset) {
return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return gcc.copyTablesToDir(ctx, tfp)
}

func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
for _, v := range nbs.tables.novel {
err := v.iterateAllChunks(ctx, cb)
Expand Down
Loading
Loading