From 3a13213766785ca1bdf0ee0921e06797dee53f78 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Mon, 13 Jan 2025 15:29:42 -0800 Subject: [PATCH] /go/libraries/doltcore/sql/dsess: parallelize sql.NewDatabase work --- .../sqle/dsess/autoincrement_tracker.go | 67 ++++++++++------ .../doltcore/sqle/dsess/globalstate.go | 77 +++++++++++++------ 2 files changed, 95 insertions(+), 49 deletions(-) mode change 100755 => 100644 go/libraries/doltcore/sqle/dsess/globalstate.go diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 4dda09c08a3..80e462c589d 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -23,6 +23,7 @@ import ( "github.com/dolthub/go-mysql-server/sql" gmstypes "github.com/dolthub/go-mysql-server/sql/types" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" @@ -398,34 +399,52 @@ func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName stri } func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltdb.Rootish) error { - for _, root := range roots { - r, err := root.ResolveRootValue(ctx) - if err != nil { - return err - } + rootsChan := make(chan doltdb.Rootish, len(roots)) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case root, ok := <-rootsChan: + if !ok { + return nil + } + r, rerr := root.ResolveRootValue(egCtx) + if rerr != nil { + return rerr + } - err = r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { - if !schema.HasAutoIncrement(sch) { - return false, nil - } + rerr = r.IterTables(egCtx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { + if !schema.HasAutoIncrement(sch) { + return false, nil + } - seq, err := table.GetAutoIncrementValue(ctx) - if err != nil { - return true, err - } + seq, rerr := table.GetAutoIncrementValue(egCtx) + if rerr != nil { + return true, rerr + } - tableNameStr := tableName.ToLower().Name - if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) { - a.sequences.Store(tableNameStr, seq) - } + tableNameStr := tableName.ToLower().Name + if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) { + a.sequences.Store(tableNameStr, seq) + } - return false, nil - }) + return false, nil + }) - if err != nil { - return err + if rerr != nil { + return rerr + } + } } - } - - return nil + }) + eg.Go(func() error { + defer close(rootsChan) + for _, root := range roots { + rootsChan <- root + } + return nil + }) + return eg.Wait() } diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go old mode 100755 new mode 100644 index dd675d099bc..0e23ec20f89 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/dolthub/go-mysql-server/sql" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -36,39 +37,65 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol return GlobalStateImpl{}, err } + rootRefsChan := make(chan ref.DoltRef, len(branches)+len(remotes)) rootRefs := make([]ref.DoltRef, 0, len(branches)+len(remotes)) rootRefs = append(rootRefs, branches...) rootRefs = append(rootRefs, remotes...) - var roots []doltdb.Rootish - for _, b := range rootRefs { - switch b.GetType() { - case ref.BranchRefType: - wsRef, err := ref.WorkingSetRefForHead(b) - if err != nil { - return GlobalStateImpl{}, err - } - ws, err := db.ResolveWorkingSet(ctx, wsRef) - if err == doltdb.ErrWorkingSetNotFound { - // use the branch head if there isn't a working set for it - cm, err := db.ResolveCommitRef(ctx, b) - if err != nil { - return GlobalStateImpl{}, err + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case r, ok := <-rootRefsChan: + if !ok { + return nil } - roots = append(roots, cm) - } else if err != nil { - return GlobalStateImpl{}, err - } else { - roots = append(roots, ws) - } - case ref.RemoteRefType: - cm, err := db.ResolveCommitRef(ctx, b) - if err != nil { - return GlobalStateImpl{}, err + switch r.GetType() { + case ref.BranchRefType: + wsRef, rerr := ref.WorkingSetRefForHead(r) + if rerr != nil { + return rerr + } + + ws, rerr := db.ResolveWorkingSet(egCtx, wsRef) + if rerr == doltdb.ErrWorkingSetNotFound { + // use the branch head if there isn't a working set for it + cm, rerr := db.ResolveCommitRef(egCtx, r) + if rerr != nil { + return rerr + } + roots = append(roots, cm) + } else if rerr != nil { + return rerr + } else { + roots = append(roots, ws) + } + case ref.RemoteRefType: + cm, rerr := db.ResolveCommitRef(egCtx, r) + if rerr != nil { + return rerr + } + roots = append(roots, cm) + } + } - roots = append(roots, cm) } + }) + + eg.Go(func() error { + defer close(rootRefsChan) + for _, r := range rootRefs { + rootRefsChan <- r + } + return nil + }) + + err = eg.Wait() + if err != nil { + return GlobalStateImpl{}, err } tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...)