Skip to content

Commit

Permalink
Merge pull request #8780 from dolthub/aaron/gc-safepoint-controller
Browse files Browse the repository at this point in the history
go/store/types: Move to a safepoint controller which will allow a caller better control over when to take actions while the GC is running.
  • Loading branch information
reltuk authored Jan 24, 2025
2 parents f19ceef + a6ff95c commit e75e954
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 63 deletions.
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointController types.GCSafepointController) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
Expand Down Expand Up @@ -1772,7 +1772,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func()
return err
}

return collector.GC(ctx, mode, oldGen, newGen, safepointF)
return collector.GC(ctx, mode, oldGen, newGen, safepointController)
}

func (ddb *DoltDB) ShallowGC(ctx context.Context) error {
Expand Down
131 changes: 81 additions & 50 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dprocedures

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -57,6 +59,29 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {

var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.")

type safepointController struct {
begin func(context.Context, func(hash.Hash) bool) error
preFinalize func(context.Context) error
postFinalize func(context.Context) error
cancel func()
}

func (sc safepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
return sc.begin(ctx, keeper)
}

func (sc safepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error {
return sc.preFinalize(ctx)
}

func (sc safepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error {
return sc.postFinalize(ctx)
}

func (sc safepointController) CancelSafepoint() {
sc.cancel()
}

func doDoltGC(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()

Expand Down Expand Up @@ -116,66 +141,72 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
mode = types.GCModeFull
}

// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, mode, func() error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
// TODO: Implement safepointController so that begin can capture inflight sessions
// and preFinalize can ensure they're all in a good place before returning.
sc := safepointController{
begin: func(context.Context, func(hash.Hash) bool) error { return nil },
preFinalize: func(context.Context) error { return nil },
postFinalize: func(context.Context) error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
}

killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
for _, p := range processes {
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}

// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}
if !allgood {
return errors.New("unable to establish safepoint.")

// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
ctx.ProcessList.Kill(p.Connection)
}
}
if !allgood {
return errors.New("unable to establish safepoint.")
}
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
})
},
cancel: func() {},
}

err = ddb.GC(ctx, mode, sc)
if err != nil {
return cmdFailure, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/sqle/enginetest/dolt_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -7671,7 +7671,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Name: "drop temporary table behavior",
Name: "drop temporary table behavior",
Dialect: "mysql",
SetUpScript: []string{
"create table t (i int);",
Expand Down Expand Up @@ -7723,7 +7723,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Query: "drop temporary table t;",
Query: "drop temporary table t;",
ExpectedErr: sql.ErrUnknownTable,
},
},
Expand Down
2 changes: 1 addition & 1 deletion go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type GarbageCollector interface {

// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
}

// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
Expand Down
4 changes: 2 additions & 2 deletions go/store/datas/database_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
}

// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointController)
}

func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {
Expand Down
57 changes: 51 additions & 6 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,15 @@ const (
GCModeFull
)

type GCSafepointController interface {
BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error
EstablishPreFinalizeSafepoint(context.Context) error
EstablishPostFinalizeSafepoint(context.Context) error
CancelSafepoint()
}

// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
lvs.versOnce.Do(lvs.expectVersion)

lvs.transitionToOldGenGC()
Expand Down Expand Up @@ -600,6 +607,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()

var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}

root, err := lvs.Root(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -634,10 +655,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}

newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false

err = newGenFinalizer.SwapChunksInStore(ctx)
if err != nil {
Expand Down Expand Up @@ -669,6 +691,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()

var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}

root, err := lvs.Root(ctx)
if err != nil {
return err
Expand All @@ -682,10 +718,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false

err = finalizer.SwapChunksInStore(ctx)
if err != nil {
Expand Down Expand Up @@ -718,7 +755,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
safepointController GCSafepointController,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode)
if err != nil {
Expand All @@ -732,6 +769,14 @@ func (lvs *ValueStore) gc(ctx context.Context,
}
toVisit = nil

if safepointController != nil {
err = safepointController.EstablishPreFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
}

// Before we call finalize(), we can process the current set of
// NewGenToVisit. NewGen -> Finalize is going to block writes until
// we are done, so its best to keep it as small as possible.
Expand All @@ -750,8 +795,8 @@ func (lvs *ValueStore) gc(ctx context.Context,
return nil, errors.Join(err, cErr)
}

if safepointF != nil {
err = safepointF()
if safepointController != nil {
err = safepointController.EstablishPostFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
Expand Down

0 comments on commit e75e954

Please sign in to comment.