Skip to content

Commit

Permalink
Merge pull request #8612 from dolthub/aaron/gc-gen-takes-journal-meta…
Browse files Browse the repository at this point in the history
…data

go/store/nbs: Fixing GCGen to be more correct.
  • Loading branch information
reltuk authored Nov 28, 2024
2 parents 0a5a3bf + 60ca378 commit f7a9ab4
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 42 deletions.
9 changes: 8 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ type GCFinalizer interface {
SwapChunksInStore(ctx context.Context) error
}

type GCMode int

const (
GCMode_Default GCMode = iota
GCMode_Full
)

// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
Expand Down Expand Up @@ -213,7 +220,7 @@ type ChunkStoreGarbageCollector interface {
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error)
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
if dest != ms {
panic("unsupported")
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector)
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/conjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, specs, appendixSpecs),
lock: generateLockHash(upstream.root, specs, appendixSpecs, nil),
gcGen: upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal
}

prev := mc.lock
next := generateLockHash(mc.root, mc.specs, mc.appendix)
next := generateLockHash(mc.root, mc.specs, mc.appendix, nil)
mc.lock = next

mc, err = backing.Update(ctx, prev, mc, &Stats{}, nil)
Expand Down
7 changes: 6 additions & 1 deletion go/store/nbs/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
// persisted manifest against the lock hash it saw last time it loaded the
// contents of a manifest. If they do not match, the client must not update
// the persisted manifest.
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) hash.Hash {
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec, extra []byte) hash.Hash {
blockHash := sha512.New()
blockHash.Write(root[:])
for _, spec := range appendix {
Expand All @@ -511,6 +511,11 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) h
for _, spec := range specs {
blockHash.Write(spec.name[:])
}
if len(extra) > 0 {
blockHash.Write([]byte{0})
blockHash.Write(extra)
}
blockHash.Write([]byte{0})
var h []byte
h = blockHash.Sum(h) // Appends hash to h
return hash.New(h[:hash.ByteLen])
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
47 changes: 32 additions & 15 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
Expand Down Expand Up @@ -266,7 +267,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
return contents, nil
}

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

// ensure we don't drop existing appendices
if contents.appendix != nil && len(contents.appendix) > 0 {
Expand Down Expand Up @@ -423,7 +424,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
newAppendixSpecs := append([]tableSpec{}, upstreamAppendixSpecs...)
contents.appendix = append(newAppendixSpecs, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
case ManifestAppendixOption_Set:
if len(appendixSpecs) < 1 {
Expand All @@ -438,7 +439,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
// append new appendix specs to contents.appendix
contents.appendix = append([]tableSpec{}, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
default:
return manifestContents{}, ErrUnsupportedManifestAppendixOption
Expand Down Expand Up @@ -480,7 +481,7 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
s := tableSpec{name: h, chunkCount: c}
contents.specs = append(contents.specs, s)
}
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

store.mm.LockForUpdate()
defer func() {
Expand Down Expand Up @@ -1307,7 +1308,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: current,
lock: generateLockHash(current, specs, appendixSpecs),
lock: generateLockHash(current, specs, appendixSpecs, nil),
gcGen: nbs.upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down Expand Up @@ -1597,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand All @@ -1611,12 +1612,20 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
nbs.mu.RLock()
defer nbs.mu.RUnlock()

// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix)
// Check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, []byte("full"))
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against full gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}

if mode != chunks.GCMode_Full {
// Allow a non-full GC to match the no-op work check as well.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, nil)
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against nil gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}
}
return nil
}
err := precheck()
Expand Down Expand Up @@ -1649,12 +1658,14 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
return gcFinalizer{
nbs: destNBS,
specs: specs,
mode: mode,
}, nil
}

type gcFinalizer struct {
nbs *NomsBlockStore
specs []tableSpec
mode chunks.GCMode
}

func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc, error) {
Expand All @@ -1670,7 +1681,7 @@ func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc
}

func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs)
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
Expand Down Expand Up @@ -1744,7 +1755,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c
return nil
}

func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mode chunks.GCMode) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()

Expand All @@ -1756,12 +1767,18 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
}
}()

newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{})
newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, nil)
var extra []byte
if mode == chunks.GCMode_Full {
extra = []byte("full")
}
newGCGen := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, extra)

newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: nbs.upstream.root,
lock: newLock,
gcGen: newLock,
gcGen: newGCGen,
specs: specs,
}

Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestNBSCopyGC(t *testing.T) {
go func() {
require.NoError(t, st.BeginGC(nil))
var finalizer chunks.GCFinalizer
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil)
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil, chunks.GCMode_Full)
if msErr == nil {
msErr = finalizer.SwapChunksInStore(ctx)
}
Expand Down
13 changes: 9 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
gcs, gcsOK := lvs.cs.(chunks.GenerationalCS)
collector, collectorOK := lvs.cs.(chunks.ChunkStoreGarbageCollector)

var chksMode chunks.GCMode

if gcsOK && collectorOK {
oldGen := gcs.OldGen()
newGen := gcs.NewGen()
Expand All @@ -586,8 +588,10 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
switch mode {
case GCModeDefault:
oldGenHasMany = oldGen.HasMany
chksMode = chunks.GCMode_Default
case GCModeFull:
oldGenHasMany = unfilteredHashFunc
chksMode = chunks.GCMode_Full
default:
return fmt.Errorf("unsupported GCMode %v", mode)
}
Expand Down Expand Up @@ -617,7 +621,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet {
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, chksMode, collector, oldGen, nil, func() hash.HashSet {
n := lvs.transitionToNewGenGC()
newGenRefs.InsertAll(n)
return make(hash.HashSet)
Expand All @@ -638,7 +642,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}

newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -685,7 +689,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -719,6 +723,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
func (lvs *ValueStore) gc(ctx context.Context,
toVisit hash.HashSet,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
Expand All @@ -729,7 +734,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
var err error
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest)
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest, chksMode)
return err
})

Expand Down
Loading

0 comments on commit f7a9ab4

Please sign in to comment.