diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index e828b5d372..82c52cac65 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -156,7 +156,26 @@ var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block") // GCFinalizer interface. type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) -// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed. +// A MarkAndSweeper is returned from MarkAndSweepChunks and allows a caller +// to save chunks, and all chunks reachable from them, from the source store +// into the destination store. |SaveHashes| is called one or more times, +// passing in hashes which should be saved. Then |Close| is called and the +// |GCFinalizer| is used to complete the process. +type MarkAndSweeper interface { + // Ensures that the chunks corresponding to the passed hashes, and all + // the chunks reachable from them, are copied into the destination + // store. Passed and reachable chunks are filtered by the |filter| that + // was supplied to |MarkAndSweepChunks|. It is safe to pass a given + // hash more than once; it will only ever be copied once. + // + // A call to this function blocks until the entire transitive set of + // chunks is accessed and copied. + SaveHashes(context.Context, []hash.Hash) error + + Close(context.Context) (GCFinalizer, error) +} + +// A GCFinalizer is returned from a MarkAndSweeper after it is closed. // // A GCFinalizer is a handle to one or more table files which has been // constructed as part of the GC process. It can be used to add the table files @@ -210,17 +229,12 @@ type ChunkStoreGarbageCollector interface { // addChunk function must not be called after this function. EndGC() - // MarkAndSweepChunks is expected to read chunk addresses off of - // |hashes|, which represent chunks which should be copied into the - // provided |dest| store. Once |hashes| is closed, - // MarkAndSweepChunks is expected to update the contents of the store - // to only include the chunk whose addresses which were sent along on - // |hashes|. - // - // This behavior is a little different for ValueStore.GC()'s - // interactions with generational stores. See ValueStore and - // NomsBlockStore/GenerationalNBS for details. - MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) + // MarkAndSweepChunks returns a handle that can be used to supply + // hashes which should be saved into |dest|. The hashes are + // filtered through the |filter| and their references are walked with + // |getAddrs|, each of those addresses being filtered and copied as + // well. + MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) // Count returns the number of chunks in the store. Count() (uint32, error) diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 8253c04828..93643c17c1 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -360,7 +360,49 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error { return nil } -func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) { +type msvMarkAndSweeper struct { + ms *MemoryStoreView + + getAddrs GetAddrsCurry + filter HasManyFunc + + keepers map[hash.Hash]Chunk +} + +func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error { + newAddrs := hash.NewHashSet(hashes...) + for { + for h := range i.keepers { + delete(newAddrs, h) + } + filtered, err := i.filter(ctx, newAddrs) + if err != nil { + return err + } + if len(filtered) == 0 { + break + } + newAddrs = make(hash.HashSet) + for h := range filtered { + c, err := i.ms.Get(ctx, h) + if err != nil { + return err + } + i.keepers[h] = c + err = i.getAddrs(c)(ctx, newAddrs, NoopPendingRefExists) + if err != nil { + return err + } + } + } + return nil +} + +func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) { + return msvGcFinalizer{i.ms, i.keepers}, nil +} + +func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) { if dest != ms { panic("unsupported") } @@ -371,28 +413,12 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan } ms.mu.Unlock() - keepers := make(map[hash.Hash]Chunk, ms.storage.Len()) - -LOOP: - for { - select { - case hs, ok := <-hashes: - if !ok { - break LOOP - } - for _, h := range hs { - c, err := ms.Get(ctx, h) - if err != nil { - return nil, err - } - keepers[h] = c - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - return msvGcFinalizer{ms, keepers}, nil + return &msvMarkAndSweeper{ + ms: ms, + getAddrs: getAddrs, + filter: filter, + keepers: make(map[hash.Hash]Chunk), + }, nil } func (ms *MemoryStoreView) Count() (uint32, error) { diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 24e14219db..36e7467bbb 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() { collector.EndGC() } -func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) { +func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok || dest != s { return nil, ErrUnsupportedOperation } - return collector.MarkAndSweepChunks(ctx, hashes, collector, mode) + return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, mode) } func (s *TestStoreView) Count() (uint32, error) { diff --git a/go/store/hash/hash.go b/go/store/hash/hash.go index 6edd7e3000..d36db36d88 100644 --- a/go/store/hash/hash.go +++ b/go/store/hash/hash.go @@ -222,6 +222,14 @@ func (hs HashSet) Empty() { } } +func (hs HashSet) ToSlice() HashSlice { + ret := make(HashSlice, 0, len(hs)) + for h := range hs { + ret = append(ret, h) + } + return ret +} + func (hs HashSet) String() string { var sb strings.Builder sb.Grow(len(hs)*34 + 100) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index c2396ecc2b..64846797ad 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -481,8 +481,8 @@ func (gcs *GenerationalNBS) EndGC() { gcs.newGen.EndGC() } -func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode) +func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode) } func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 57e6f086a1..1ca852da04 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() { nbsMW.nbs.EndGC() } -func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode) +func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode) } func (nbsMW *NBSMetricWrapper) Count() (uint32, error) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index ab0bcad62d..7ddc44a001 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1598,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() { nbs.cond.Broadcast() } -func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { - return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode) +func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { + return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode) } -func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) { +func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode) (chunks.MarkAndSweeper, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation @@ -1647,18 +1647,127 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom destNBS = nbs } - specs, err := copyMarkedChunks(ctx, hashes, src, destNBS) + tfp, ok := destNBS.p.(tableFilePersister) + if !ok { + return nil, fmt.Errorf("NBS does not support copying garbage collection") + } + + gcc, err := newGarbageCollectionCopier() if err != nil { return nil, err } - if ctx.Err() != nil { - return nil, ctx.Err() + + return &markAndSweeper{ + src: src, + dest: destNBS, + getAddrs: getAddrs, + filter: filter, + visited: make(hash.HashSet), + tfp: tfp, + gcc: gcc, + mode: mode, + }, nil +} + +type markAndSweeper struct { + src NBSCompressedChunkStore + dest *NomsBlockStore + getAddrs chunks.GetAddrsCurry + filter chunks.HasManyFunc + + visited hash.HashSet + + tfp tableFilePersister + gcc *gcCopier + mode chunks.GCMode +} + +func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) error { + toVisit := make(hash.HashSet, len(hashes)) + for _, h := range hashes { + if _, ok := i.visited[h]; !ok { + toVisit.Insert(h) + } + } + var err error + var mu sync.Mutex + first := true + for { + if !first { + copy := toVisit.Copy() + for h := range toVisit { + if _, ok := i.visited[h]; ok { + delete(copy, h) + } + } + toVisit = copy + } + + toVisit, err = i.filter(ctx, toVisit) + if err != nil { + return err + } + if len(toVisit) == 0 { + break + } + + first = false + nextToVisit := make(hash.HashSet) + + found := 0 + var addErr error + err = i.src.GetManyCompressed(ctx, toVisit, func(ctx context.Context, cc CompressedChunk) { + mu.Lock() + defer mu.Unlock() + if addErr != nil { + return + } + found += 1 + if cc.IsGhost() { + // Ghost chunks encountered on the walk can be left alone --- they + // do not bring their dependencies, and because of how generational + // store works, they will still be ghost chunks + // in the store after the GC is finished. + return + } + addErr = i.gcc.addChunk(ctx, cc) + if addErr != nil { + return + } + c, err := cc.ToChunk() + if err != nil { + addErr = err + return + } + addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false }) + }) + if err != nil { + return err + } + if addErr != nil { + return addErr + } + if found != len(toVisit) { + return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit) + } + + i.visited.InsertAll(toVisit) + + toVisit = nextToVisit + } + return nil +} + +func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) { + specs, err := i.gcc.copyTablesToDir(ctx, i.tfp) + if err != nil { + return nil, err } return gcFinalizer{ - nbs: destNBS, + nbs: i.dest, specs: specs, - mode: mode, + mode: i.mode, }, nil } @@ -1684,55 +1793,6 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode) } -func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) { - tfp, ok := dest.p.(tableFilePersister) - if !ok { - return nil, fmt.Errorf("NBS does not support copying garbage collection") - } - - gcc, err := newGarbageCollectionCopier() - if err != nil { - return nil, err - } - - // TODO: We should clean up gcc on error. - -LOOP: - for { - select { - case hs, ok := <-keepChunks: - if !ok { - break LOOP - } - var addErr error - mu := new(sync.Mutex) - hashset := hash.NewHashSet(hs...) - found := 0 - err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { - mu.Lock() - defer mu.Unlock() - if addErr != nil { - return - } - found += 1 - addErr = gcc.addChunk(ctx, c) - }) - if err != nil { - return nil, err - } - if addErr != nil { - return nil, addErr - } - if found != len(hashset) { - return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - return gcc.copyTablesToDir(ctx, tfp) -} - func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { for _, v := range nbs.tables.novel { err := v.iterateAllChunks(ctx, cb) diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index a1eb211c1d..02c92bb7a9 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -334,14 +334,18 @@ func TestNBSCopyGC(t *testing.T) { require.NoError(t, err) require.True(t, ok) - keepChan := make(chan []hash.Hash, numChunks) + require.NoError(t, st.BeginGC(nil)) + noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + return hashes, nil + } + sweeper, err := st.MarkAndSweepChunks(ctx, noopGetAddrs, noopFilter, nil, chunks.GCMode_Full) + require.NoError(t, err) + keepersSlice := make([]hash.Hash, 0, len(keepers)) for h := range keepers { - keepChan <- []hash.Hash{h} + keepersSlice = append(keepersSlice, h) } - close(keepChan) - - require.NoError(t, st.BeginGC(nil)) - finalizer, err := st.MarkAndSweepChunks(ctx, keepChan, nil, chunks.GCMode_Full) + require.NoError(t, sweeper.SaveHashes(ctx, keepersSlice)) + finalizer, err := sweeper.Close(ctx) require.NoError(t, err) require.NoError(t, finalizer.SwapChunksInStore(ctx)) st.EndGC() diff --git a/go/store/types/parallel_ref_walker.go b/go/store/types/parallel_ref_walker.go deleted file mode 100644 index 0258ed8518..0000000000 --- a/go/store/types/parallel_ref_walker.go +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2020 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "context" - - "golang.org/x/sync/errgroup" - - "github.com/dolthub/dolt/go/store/hash" -) - -type parallelRefWalkerWork struct { - vals ValueSlice - res chan []hash.Hash -} - -type parallelRefWalker struct { - ctx context.Context - eg *errgroup.Group - concurrency int - nbf *NomsBinFormat - work chan parallelRefWalkerWork -} - -func (w *parallelRefWalker) goWork() error { - for { - select { - case <-w.ctx.Done(): - return w.ctx.Err() - case work, ok := <-w.work: - if !ok { - return nil - } - var res []hash.Hash - for _, v := range work.vals { - err := v.walkRefs(w.nbf, func(r Ref) error { - res = append(res, r.TargetHash()) - return nil - }) - if err != nil { - return err - } - } - select { - case work.res <- res: - break - case <-w.ctx.Done(): - return w.ctx.Err() - } - } - } -} - -func (w *parallelRefWalker) sendWork(work parallelRefWalkerWork) error { - select { - case w.work <- work: - return nil - case <-w.ctx.Done(): - return w.ctx.Err() - } -} - -func (w *parallelRefWalker) sendAllWork(vals ValueSlice) (int, chan []hash.Hash, error) { - resCh := make(chan []hash.Hash, w.concurrency) - i, numSent := 0, 0 - step := len(vals)/w.concurrency + 1 - for i < len(vals) { - j := i + step - if j > len(vals) { - j = len(vals) - } - if err := w.sendWork(parallelRefWalkerWork{ - vals[i:j], - resCh, - }); err != nil { - return 0, nil, err - } - i = j - numSent++ - } - return numSent, resCh, nil -} - -func (w *parallelRefWalker) GetRefs(visited hash.HashSet, vals ValueSlice) ([]hash.Hash, error) { - res := []hash.Hash{} - numSent, resCh, err := w.sendAllWork(vals) - if err != nil { - return nil, err - } - for i := 0; i < numSent; i++ { - select { - case b := <-resCh: - for _, r := range b { - if !visited.Has(r) { - res = append(res, r) - visited.Insert(r) - } - } - case <-w.ctx.Done(): - return nil, w.ctx.Err() - } - } - return res, nil -} - -func (w *parallelRefWalker) GetRefSet(visited hash.HashSet, vals ValueSlice) (hash.HashSet, error) { - res := make(hash.HashSet) - numSent, resCh, err := w.sendAllWork(vals) - if err != nil { - return nil, err - } - for i := 0; i < numSent; i++ { - select { - case b := <-resCh: - for _, r := range b { - if !visited.Has(r) { - res[r] = struct{}{} - visited.Insert(r) - } - } - case <-w.ctx.Done(): - return nil, w.ctx.Err() - } - } - return res, nil -} - -func (w *parallelRefWalker) Close() error { - close(w.work) - return w.eg.Wait() -} - -// |parallelRefWalker| provides a way to walk the |Ref|s in a |ValueSlice| -// using background worker threads to exploit hardware parallelism in cases -// where walking the merkle-DAG can become CPU bound. Construct a -// |parallelRefWalker| with a configured level of |concurrency| and then call -// |GetRefs(hash.HashSet, ValueSlice)| with the |ValueSlice| to get back a -// slice of |hash.Hash| for all the |Ref|s which appear in the values of -// |ValueSlice|. |GetRefs| will not return any |Ref|s which already appear in -// the |visited| set, and it will add all |Ref|s returned to the |visited| set. -// The worker threads should be shutdown with |Close()| after the walker is no -// longer needed. -// -// If any errors are encountered when walking |Ref|s, |parallelRefWalker| will -// enter a terminal error state where it will always return a non-|nil| -// |error|. A |parallelRefWalker| will also enter a terminal error state if the -// |ctx| provided to |newParallelRefWalker| is canceled or exceeds its -// deadline. |GetRefs| must not be called on |parallelRefWalker| after |Close| -// is called. -func newParallelRefWalker(ctx context.Context, nbf *NomsBinFormat, concurrency int) *parallelRefWalker { - eg, ctx := errgroup.WithContext(ctx) - res := ¶llelRefWalker{ - ctx, - eg, - concurrency, - nbf, - make(chan parallelRefWalkerWork, concurrency), - } - for i := 0; i < concurrency; i++ { - res.eg.Go(res.goWork) - } - return res -} diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 8d9cc8266c..7d0984831d 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -25,11 +25,8 @@ import ( "context" "errors" "fmt" - "runtime" "sync" - "golang.org/x/sync/errgroup" - "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" @@ -608,16 +605,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return err } - if root == (hash.Hash{}) { + if root.IsEmpty() { // empty root return nil } - oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) - if err != nil { - return err - } - newGenRefs.Insert(root) var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer @@ -728,170 +720,44 @@ func (lvs *ValueStore) gc(ctx context.Context, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, finalize func() hash.HashSet) (chunks.GCFinalizer, error) { - keepChunks := make(chan []hash.Hash, gcBuffSize) - - var gcFinalizer chunks.GCFinalizer - - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - var err error - gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest, chksMode) - return err - }) - - keepHashes := func(hs []hash.Hash) error { - select { - case keepChunks <- hs: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } - - concurrency := runtime.GOMAXPROCS(0) - 1 - if concurrency < 1 { - concurrency = 1 + sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode) + if err != nil { + return nil, err } - walker := newParallelRefWalker(ctx, lvs.nbf, concurrency) - - eg.Go(func() error { - defer walker.Close() - - err := lvs.gcProcessRefs(ctx, toVisit, keepHashes, walker, hashFilter, safepointF, finalize) - if err != nil { - return err - } - - // NOTE: We do not defer this close here. When keepChunks - // closes, it signals to NBSStore.MarkAndSweepChunks that we - // are done walking the references. If gcProcessRefs returns an - // error, we did not successfully walk all references and we do - // not want MarkAndSweepChunks finishing its work, swapping - // table files, etc. It would be racing with returning an error - // here. Instead, we have returned the error above and that - // will force it to fail when the errgroup ctx fails. - close(keepChunks) - return nil - }) - - err := eg.Wait() - return gcFinalizer, err -} - -func (lvs *ValueStore) gcProcessRefs(ctx context.Context, - initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error, - walker *parallelRefWalker, hashFilter chunks.HasManyFunc, - safepointF func() error, - finalize func() hash.HashSet) error { - visited := make(hash.HashSet) - - process := func(initialToVisit hash.HashSet) error { - visited.InsertAll(initialToVisit) - toVisitCount := len(initialToVisit) - toVisit := []hash.HashSet{initialToVisit} - for toVisitCount > 0 { - batches := makeBatches(toVisit, toVisitCount) - toVisit = make([]hash.HashSet, len(batches)+1) - toVisitCount = 0 - for i, batch := range batches { - vals, err := lvs.ReadManyValues(ctx, batch) - if err != nil { - return err - } - for i, v := range vals { - if v == nil { - return fmt.Errorf("gc failed, dangling reference requested %v", batch[i]) - } - } - // GC skips ghost values, but other ref walkers don't. Filter them out here. - realVals := make(ValueSlice, 0, len(vals)) - nonGhostBatch := make([]hash.Hash, 0, len(vals)) - for _, v := range vals { - h, err := v.Hash(lvs.Format()) - if err != nil { - return err - } - if _, ok := v.(GhostValue); ok { - visited.Insert(h) // Can't visit a ghost. That would be spooky. - } else { - realVals = append(realVals, v) - nonGhostBatch = append(nonGhostBatch, h) - } - } - vals = realVals - - if err := keepHashes(nonGhostBatch); err != nil { - return err - } - - hashes, err := walker.GetRefSet(visited, vals) - if err != nil { - return err - } - - // continue processing - hashes, err = hashFilter(ctx, hashes) - if err != nil { - return err - } - - toVisit[i] = hashes - toVisitCount += len(hashes) - } - } - return nil - } - err := process(initialToVisit) + err = sweeper.SaveHashes(ctx, toVisit.ToSlice()) if err != nil { - return err + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } - - // We can accumulate hashes which which are already visited. We prune - // those here. + toVisit = nil // Before we call finalize(), we can process the current set of // NewGenToVisit. NewGen -> Finalize is going to block writes until // we are done, so its best to keep it as small as possible. next := lvs.readAndResetNewGenToVisit() - if len(next) > 0 { - nextCopy := next.Copy() - for h, _ := range nextCopy { - if visited.Has(h) { - next.Remove(h) - } - } - next, err = hashFilter(ctx, next) - if err != nil { - return err - } - err = process(next) - if err != nil { - return err - } + err = sweeper.SaveHashes(ctx, next.ToSlice()) + if err != nil { + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } + next = nil final := finalize() - finalCopy := final.Copy() - for h, _ := range finalCopy { - if visited.Has(h) { - final.Remove(h) - } - } - finalCopy = nil - final, err = hashFilter(ctx, final) - if err != nil { - return err - } - err = process(final) + err = sweeper.SaveHashes(ctx, final.ToSlice()) if err != nil { - return err + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) } if safepointF != nil { - return safepointF() + err = safepointF() + if err != nil { + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) + } } - return nil + return sweeper.Close(ctx) } // Close closes the underlying ChunkStore