From f5a69f5aa1abdc76392939c85934baaffcba7e6f Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 14 Jan 2025 17:01:18 -0800 Subject: [PATCH] go/store/{nbs,types}: GC: Move the reference walk from types to nbs. Make the ChunkStore itself responsible for the reference walk, being given handles for walking references and excluding chunks as part of the GC process. This is an incremental step towards adding dependencies on read chunks during the GC process. The ChunkStore can better distinguish whether the read is part of the GC process itself or whether it came from the application layer. It also allows better management of cache impact and the potential for better memory usage. This transformation gets rid of parallel reference walking and some manual batching which was present in the ValueStore implementation of reference walking. The parallel reference walking was necessary for reasonable performance in format __LD_1__, but it's actually not necessary in __DOLT__. For some use cases it's a slight win, but the simplification involved in getting rid of it is worth it for now. --- go/store/chunks/chunk_store.go | 38 +++-- go/store/chunks/memory_store.go | 72 ++++++--- go/store/chunks/test_utils.go | 4 +- go/store/hash/hash.go | 8 + go/store/nbs/generational_chunk_store.go | 4 +- go/store/nbs/nbs_metrics_wrapper.go | 4 +- go/store/nbs/store.go | 174 ++++++++++++++-------- go/store/nbs/store_test.go | 16 +- go/store/types/parallel_ref_walker.go | 176 ---------------------- go/store/types/value_store.go | 178 +++-------------------- 10 files changed, 238 insertions(+), 436 deletions(-) delete mode 100644 go/store/types/parallel_ref_walker.go diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index e828b5d372..82c52cac65 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -156,7 +156,26 @@ var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block") // GCFinalizer interface. type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) -// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed. +// A MarkAndSweeper is returned from MarkAndSweepChunks and allows a caller +// to save chunks, and all chunks reachable from them, from the source store +// into the destination store. |SaveHashes| is called one or more times, +// passing in hashes which should be saved. Then |Close| is called and the +// |GCFinalizer| is used to complete the process. +type MarkAndSweeper interface { + // Ensures that the chunks corresponding to the passed hashes, and all + // the chunks reachable from them, are copied into the destination + // store. Passed and reachable chunks are filtered by the |filter| that + // was supplied to |MarkAndSweepChunks|. It is safe to pass a given + // hash more than once; it will only ever be copied once. + // + // A call to this function blocks until the entire transitive set of + // chunks is accessed and copied. + SaveHashes(context.Context, []hash.Hash) error + + Close(context.Context) (GCFinalizer, error) +} + +// A GCFinalizer is returned from a MarkAndSweeper after it is closed. // // A GCFinalizer is a handle to one or more table files which has been // constructed as part of the GC process. It can be used to add the table files @@ -210,17 +229,12 @@ type ChunkStoreGarbageCollector interface { // addChunk function must not be called after this function. EndGC() - // MarkAndSweepChunks is expected to read chunk addresses off of - // |hashes|, which represent chunks which should be copied into the - // provided |dest| store. Once |hashes| is closed, - // MarkAndSweepChunks is expected to update the contents of the store - // to only include the chunk whose addresses which were sent along on - // |hashes|. - // - // This behavior is a little different for ValueStore.GC()'s - // interactions with generational stores. See ValueStore and - // NomsBlockStore/GenerationalNBS for details. - MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) + // MarkAndSweepChunks returns a handle that can be used to supply + // hashes which should be saved into |dest|. The hashes are + // filtered through the |filter| and their references are walked with + // |getAddrs|, each of those addresses being filtered and copied as + // well. + MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) // Count returns the number of chunks in the store. Count() (uint32, error) diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 8253c04828..93643c17c1 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -360,7 +360,49 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error { return nil } -func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) { +type msvMarkAndSweeper struct { + ms *MemoryStoreView + + getAddrs GetAddrsCurry + filter HasManyFunc + + keepers map[hash.Hash]Chunk +} + +func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error { + newAddrs := hash.NewHashSet(hashes...) + for { + for h := range i.keepers { + delete(newAddrs, h) + } + filtered, err := i.filter(ctx, newAddrs) + if err != nil { + return err + } + if len(filtered) == 0 { + break + } + newAddrs = make(hash.HashSet) + for h := range filtered { + c, err := i.ms.Get(ctx, h) + if err != nil { + return err + } + i.keepers[h] = c + err = i.getAddrs(c)(ctx, newAddrs, NoopPendingRefExists) + if err != nil { + return err + } + } + } + return nil +} + +func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) { + return msvGcFinalizer{i.ms, i.keepers}, nil +} + +func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) { if dest != ms { panic("unsupported") } @@ -371,28 +413,12 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan } ms.mu.Unlock() - keepers := make(map[hash.Hash]Chunk, ms.storage.Len()) - -LOOP: - for { - select { - case hs, ok := <-hashes: - if !ok { - break LOOP - } - for _, h := range hs { - c, err := ms.Get(ctx, h) - if err != nil { - return nil, err - } - keepers[h] = c - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - return msvGcFinalizer{ms, keepers}, nil + return &msvMarkAndSweeper{ + ms: ms, + getAddrs: getAddrs, + filter: filter, + keepers: make(map[hash.Hash]Chunk), + }, nil } func (ms *MemoryStoreView) Count() (uint32, error) { diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 24e14219db..36e7467bbb 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() { collector.EndGC() } -func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) { +func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok || dest != s { return nil, ErrUnsupportedOperation } - return collector.MarkAndSweepChunks(ctx, hashes, collector, mode) + return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, mode) } func (s *TestStoreView) Count() (uint32, error) { diff --git a/go/store/hash/hash.go b/go/store/hash/hash.go index 6edd7e3000..d36db36d88 100644 --- a/go/store/hash/hash.go +++ b/go/store/hash/hash.go @@ -222,6 +222,14 @@ func (hs HashSet) Empty() { } } +func (hs HashSet) ToSlice() HashSlice { + ret := make(HashSlice, 0, len(hs)) + for h := range hs { + ret = append(ret, h) + } + return ret +} + func (hs HashSet) String() string { var sb strings.Builder sb.Grow(len(hs)*34 + 100) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index c2396ecc2b..64846797ad 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -481,8 +481,8 @@ func (gcs *GenerationalNBS) EndGC() { gcs.newGen.EndGC() } -func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode) +func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode) } func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 57e6f086a1..1ca852da04 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() { nbsMW.nbs.EndGC() } -func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode) +func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode) } func (nbsMW *NBSMetricWrapper) Count() (uint32, error) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index ab0bcad62d..7ddc44a001 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1598,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() { nbs.cond.Broadcast() } -func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode) +func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode) } -func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { +func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation @@ -1647,18 +1647,127 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom destNBS = nbs } - specs, err := copyMarkedChunks(ctx, hashes, src, destNBS) + tfp, ok := destNBS.p.(tableFilePersister) + if !ok { + return nil, fmt.Errorf("NBS does not support copying garbage collection") + } + + gcc, err := newGarbageCollectionCopier() if err != nil { return nil, err } - if ctx.Err() != nil { - return nil, ctx.Err() + + return &markAndSweeper{ + src: src, + dest: destNBS, + getAddrs: getAddrs, + filter: filter, + visited: make(hash.HashSet), + tfp: tfp, + gcc: gcc, + mode: mode, + }, nil +} + +type markAndSweeper struct { + src NBSCompressedChunkStore + dest *NomsBlockStore + getAddrs chunks.GetAddrsCurry + filter chunks.HasManyFunc + + visited hash.HashSet + + tfp tableFilePersister + gcc *gcCopier + mode chunks.GCMode +} + +func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error { + toVisit := make(hash.HashSet, len(hashes)) + for _, h := range hashes { + if _, ok := i.visited[h]; !ok { + toVisit.Insert(h) + } + } + var err error + var mu sync.Mutex + first := true + for { + if !first { + copy := toVisit.Copy() + for h := range toVisit { + if _, ok := i.visited[h]; ok { + delete(copy, h) + } + } + toVisit = copy + } + + toVisit, err = i.filter(ctx, toVisit) + if err != nil { + return err + } + if len(toVisit) == 0 { + break + } + + first = false + nextToVisit := make(hash.HashSet) + + found := 0 + var addErr error + err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) { + mu.Lock() + defer mu.Unlock() + if addErr != nil { + return + } + found += 1 + if cc.IsGhost() { + // Ghost chunks encountered on the walk can be left alone --- they + // do not bring their dependencies, and because of how generational + // store works, they will still be ghost chunks + // in the store after the GC is finished. + return + } + addErr = i.gcc.addChunk(ctx, cc) + if addErr != nil { + return + } + c, err := cc.ToChunk() + if err != nil { + addErr = err + return + } + addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false }) + }) + if err != nil { + return err + } + if addErr != nil { + return addErr + } + if found != len(toVisit) { + return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit) + } + + i.visited.InsertAll(toVisit) + + toVisit = nextToVisit + } + return nil +} + +func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) { + specs, err := i.gcc.copyTablesToDir(ctx, i.tfp) + if err != nil { + return nil, err } return gcFinalizer{ - nbs: destNBS, + nbs: i.dest, specs: specs, - mode: mode, + mode: i.mode, }, nil } @@ -1684,55 +1793,6 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode) } -func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) { - tfp, ok := dest.p.(tableFilePersister) - if !ok { - return nil, fmt.Errorf("NBS does not support copying garbage collection") - } - - gcc, err := newGarbageCollectionCopier() - if err != nil { - return nil, err - } - - // TODO: We should clean up gcc on error. - -LOOP: - for { - select { - case hs, ok := <-keepChunks: - if !ok { - break LOOP - } - var addErr error - mu := new(sync.Mutex) - hashset := hash.NewHashSet(hs...) - found := 0 - err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { - mu.Lock() - defer mu.Unlock() - if addErr != nil { - return - } - found += 1 - addErr = gcc.addChunk(ctx, c) - }) - if err != nil { - return nil, err - } - if addErr != nil { - return nil, addErr - } - if found != len(hashset) { - return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - return gcc.copyTablesToDir(ctx, tfp) -} - func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { for _, v := range nbs.tables.novel { err := v.iterateAllChunks(ctx, cb) diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index a1eb211c1d..02c92bb7a9 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -334,14 +334,18 @@ func TestNBSCopyGC(t *testing.T) { require.NoError(t, err) require.True(t, ok) - keepChan := make(chan []hash.Hash, numChunks) + require.NoError(t, st.BeginGC(nil)) + noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + return hashes, nil + } + sweeper, err := st.MarkAndSweepChunks(ctx, noopGetAddrs, noopFilter, nil, chunks.GCMode_Full) + require.NoError(t, err) + keepersSlice := make([]hash.Hash, 0, len(keepers)) for h := range keepers { - keepChan <- []hash.Hash{h} + keepersSlice = append(keepersSlice, h) } - close(keepChan) - - require.NoError(t, st.BeginGC(nil)) - finalizer, err := st.MarkAndSweepChunks(ctx, keepChan, nil, chunks.GCMode_Full) + require.NoError(t, sweeper.SaveHashes(ctx, keepersSlice)) + finalizer, err := sweeper.Close(ctx) require.NoError(t, err) require.NoError(t, finalizer.SwapChunksInStore(ctx)) st.EndGC() diff --git a/go/store/types/parallel_ref_walker.go b/go/store/types/parallel_ref_walker.go deleted file mode 100644 index 0258ed8518..0000000000 --- a/go/store/types/parallel_ref_walker.go +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2020 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "context" - - "golang.org/x/sync/errgroup" - - "github.com/dolthub/dolt/go/store/hash" -) - -type parallelRefWalkerWork struct { - vals ValueSlice - res chan []hash.Hash -} - -type parallelRefWalker struct { - ctx context.Context - eg *errgroup.Group - concurrency int - nbf *NomsBinFormat - work chan parallelRefWalkerWork -} - -func (w *parallelRefWalker) goWork() error { - for { - select { - case <-w.ctx.Done(): - return w.ctx.Err() - case work, ok := <-w.work: - if !ok { - return nil - } - var res []hash.Hash - for _, v := range work.vals { - err := v.walkRefs(w.nbf, func(r Ref) error { - res = append(res, r.TargetHash()) - return nil - }) - if err != nil { - return err - } - } - select { - case work.res <- res: - break - case <-w.ctx.Done(): - return w.ctx.Err() - } - } - } -} - -func (w *parallelRefWalker) sendWork(work parallelRefWalkerWork) error { - select { - case w.work <- work: - return nil - case <-w.ctx.Done(): - return w.ctx.Err() - } -} - -func (w *parallelRefWalker) sendAllWork(vals ValueSlice) (int, chan []hash.Hash, error) { - resCh := make(chan []hash.Hash, w.concurrency) - i, numSent := 0, 0 - step := len(vals)/w.concurrency + 1 - for i < len(vals) { - j := i + step - if j > len(vals) { - j = len(vals) - } - if err := w.sendWork(parallelRefWalkerWork{ - vals[i:j], - resCh, - }); err != nil { - return 0, nil, err - } - i = j - numSent++ - } - return numSent, resCh, nil -} - -func (w *parallelRefWalker) GetRefs(visited hash.HashSet, vals ValueSlice) ([]hash.Hash, error) { - res := []hash.Hash{} - numSent, resCh, err := w.sendAllWork(vals) - if err != nil { - return nil, err - } - for i := 0; i < numSent; i++ { - select { - case b := <-resCh: - for _, r := range b { - if !visited.Has(r) { - res = append(res, r) - visited.Insert(r) - } - } - case <-w.ctx.Done(): - return nil, w.ctx.Err() - } - } - return res, nil -} - -func (w *parallelRefWalker) GetRefSet(visited hash.HashSet, vals ValueSlice) (hash.HashSet, error) { - res := make(hash.HashSet) - numSent, resCh, err := w.sendAllWork(vals) - if err != nil { - return nil, err - } - for i := 0; i < numSent; i++ { - select { - case b := <-resCh: - for _, r := range b { - if !visited.Has(r) { - res[r] = struct{}{} - visited.Insert(r) - } - } - case <-w.ctx.Done(): - return nil, w.ctx.Err() - } - } - return res, nil -} - -func (w *parallelRefWalker) Close() error { - close(w.work) - return w.eg.Wait() -} - -// |parallelRefWalker| provides a way to walk the |Ref|s in a |ValueSlice| -// using background worker threads to exploit hardware parallelism in cases -// where walking the merkle-DAG can become CPU bound. Construct a -// |parallelRefWalker| with a configured level of |concurrency| and then call -// |GetRefs(hash.HashSet, ValueSlice)| with the |ValueSlice| to get back a -// slice of |hash.Hash| for all the |Ref|s which appear in the values of -// |ValueSlice|. |GetRefs| will not return any |Ref|s which already appear in -// the |visited| set, and it will add all |Ref|s returned to the |visited| set. -// The worker threads should be shutdown with |Close()| after the walker is no -// longer needed. -// -// If any errors are encountered when walking |Ref|s, |parallelRefWalker| will -// enter a terminal error state where it will always return a non-|nil| -// |error|. A |parallelRefWalker| will also enter a terminal error state if the -// |ctx| provided to |newParallelRefWalker| is canceled or exceeds its -// deadline. |GetRefs| must not be called on |parallelRefWalker| after |Close| -// is called. -func newParallelRefWalker(ctx context.Context, nbf *NomsBinFormat, concurrency int) *parallelRefWalker { - eg, ctx := errgroup.WithContext(ctx) - res := ¶llelRefWalker{ - ctx, - eg, - concurrency, - nbf, - make(chan parallelRefWalkerWork, concurrency), - } - for i := 0; i < concurrency; i++ { - res.eg.Go(res.goWork) - } - return res -} diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 8d9cc8266c..7d0984831d 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -25,11 +25,8 @@ import ( "context" "errors" "fmt" - "runtime" "sync" - "golang.org/x/sync/errgroup" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" @@ -608,16 +605,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return err } - if root == (hash.Hash{}) { + if root.IsEmpty() { // empty root return nil } - oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) - if err != nil { - return err - } - newGenRefs.Insert(root) var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer @@ -728,170 +720,44 @@ func (lvs *ValueStore) gc(ctx context.Context, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, finalize func() hash.HashSet) (chunks.GCFinalizer, error) { - keepChunks := make(chan []hash.Hash, gcBuffSize) - - var gcFinalizer chunks.GCFinalizer - - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - var err error - gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest, chksMode) - return err - }) - - keepHashes := func(hs []hash.Hash) error { - select { - case keepChunks <- hs: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } - - concurrency := runtime.GOMAXPROCS(0) - 1 - if concurrency < 1 { - concurrency = 1 + sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode) + if err != nil { + return nil, err } - walker := newParallelRefWalker(ctx, lvs.nbf, concurrency) - - eg.Go(func() error { - defer walker.Close() - - err := lvs.gcProcessRefs(ctx, toVisit, keepHashes, walker, hashFilter, safepointF, finalize) - if err != nil { - return err - } - - // NOTE: We do not defer this close here. When keepChunks - // closes, it signals to NBSStore.MarkAndSweepChunks that we - // are done walking the references. If gcProcessRefs returns an - // error, we did not successfully walk all references and we do - // not want MarkAndSweepChunks finishing its work, swapping - // table files, etc. It would be racing with returning an error - // here. Instead, we have returned the error above and that - // will force it to fail when the errgroup ctx fails. - close(keepChunks) - return nil - }) - - err := eg.Wait() - return gcFinalizer, err -} - -func (lvs *ValueStore) gcProcessRefs(ctx context.Context, - initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error, - walker *parallelRefWalker, hashFilter chunks.HasManyFunc, - safepointF func() error, - finalize func() hash.HashSet) error { - visited := make(hash.HashSet) - - process := func(initialToVisit hash.HashSet) error { - visited.InsertAll(initialToVisit) - toVisitCount := len(initialToVisit) - toVisit := []hash.HashSet{initialToVisit} - for toVisitCount > 0 { - batches := makeBatches(toVisit, toVisitCount) - toVisit = make([]hash.HashSet, len(batches)+1) - toVisitCount = 0 - for i, batch := range batches { - vals, err := lvs.ReadManyValues(ctx, batch) - if err != nil { - return err - } - for i, v := range vals { - if v == nil { - return fmt.Errorf("gc failed, dangling reference requested %v", batch[i]) - } - } - // GC skips ghost values, but other ref walkers don't. Filter them out here. - realVals := make(ValueSlice, 0, len(vals)) - nonGhostBatch := make([]hash.Hash, 0, len(vals)) - for _, v := range vals { - h, err := v.Hash(lvs.Format()) - if err != nil { - return err - } - if _, ok := v.(GhostValue); ok { - visited.Insert(h) // Can't visit a ghost. That would be spooky. - } else { - realVals = append(realVals, v) - nonGhostBatch = append(nonGhostBatch, h) - } - } - vals = realVals - - if err := keepHashes(nonGhostBatch); err != nil { - return err - } - - hashes, err := walker.GetRefSet(visited, vals) - if err != nil { - return err - } - - // continue processing - hashes, err = hashFilter(ctx, hashes) - if err != nil { - return err - } - - toVisit[i] = hashes - toVisitCount += len(hashes) - } - } - return nil - } - err := process(initialToVisit) + err = sweeper.SaveHashes(ctx, toVisit.ToSlice()) if err != nil { - return err + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } - - // We can accumulate hashes which which are already visited. We prune - // those here. + toVisit = nil // Before we call finalize(), we can process the current set of // NewGenToVisit. NewGen -> Finalize is going to block writes until // we are done, so its best to keep it as small as possible. next := lvs.readAndResetNewGenToVisit() - if len(next) > 0 { - nextCopy := next.Copy() - for h, _ := range nextCopy { - if visited.Has(h) { - next.Remove(h) - } - } - next, err = hashFilter(ctx, next) - if err != nil { - return err - } - err = process(next) - if err != nil { - return err - } + err = sweeper.SaveHashes(ctx, next.ToSlice()) + if err != nil { + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } + next = nil final := finalize() - finalCopy := final.Copy() - for h, _ := range finalCopy { - if visited.Has(h) { - final.Remove(h) - } - } - finalCopy = nil - final, err = hashFilter(ctx, final) - if err != nil { - return err - } - err = process(final) + err = sweeper.SaveHashes(ctx, final.ToSlice()) if err != nil { - return err + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } if safepointF != nil { - return safepointF() + err = safepointF() + if err != nil { + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) + } } - return nil + return sweeper.Close(ctx) } // Close closes the underlying ChunkStore