From d869c70c26af2f1339e5ee2a43f98f1e0e056b03 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Mon, 13 Jan 2025 16:36:35 -0800 Subject: [PATCH 01/10] /go/libraries/doltcore/sqle/dsess: parallelize new sqle db stuff --- .../sqle/dsess/autoincrement_tracker.go | 47 +++++----- .../doltcore/sqle/dsess/globalstate.go | 88 ++++++++++++++----- 2 files changed, 90 insertions(+), 45 deletions(-) mode change 100755 => 100644 go/libraries/doltcore/sqle/dsess/globalstate.go diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 4dda09c08a..6a5a69dfb3 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -23,6 +23,7 @@ import ( "github.com/dolthub/go-mysql-server/sql" gmstypes "github.com/dolthub/go-mysql-server/sql/types" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" @@ -398,34 +399,38 @@ func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName stri } func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltdb.Rootish) error { - for _, root := range roots { - r, err := root.ResolveRootValue(ctx) - if err != nil { - return err - } + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(128) - err = r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { - if !schema.HasAutoIncrement(sch) { - return false, nil + for _, root := range roots { + eg.Go(func() error { + if egCtx.Err() != nil { + return egCtx.Err() } - seq, err := table.GetAutoIncrementValue(ctx) - if err != nil { - return true, err + r, rerr := root.ResolveRootValue(egCtx) + if rerr != nil { + return rerr } + return r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { + if !schema.HasAutoIncrement(sch) { + return false, nil + } - tableNameStr := tableName.ToLower().Name - if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) { - a.sequences.Store(tableNameStr, seq) - } + seq, iErr := table.GetAutoIncrementValue(egCtx) + if iErr != nil { + return true, iErr + } - return false, nil - }) + tableNameStr := tableName.ToLower().Name + if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) { + a.sequences.Store(tableNameStr, seq) + } - if err != nil { - return err - } + return false, nil + }) + }) } - return nil + return eg.Wait() } diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go old mode 100755 new mode 100644 index dd675d099b..fb4190fec9 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/dolthub/go-mysql-server/sql" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -36,39 +37,78 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol return GlobalStateImpl{}, err } + eg2, egCtx := errgroup.WithContext(ctx) + eg2.SetLimit(128) + rootRefs := make([]ref.DoltRef, 0, len(branches)+len(remotes)) rootRefs = append(rootRefs, branches...) rootRefs = append(rootRefs, remotes...) - var roots []doltdb.Rootish + rootChan := make(chan doltdb.Rootish, len(rootRefs)) + + wg := sync.WaitGroup{} + + eg, _ := errgroup.WithContext(egCtx) + eg.Go(func() error { + wg.Wait() + close(rootChan) + return nil + }) + for _, b := range rootRefs { - switch b.GetType() { - case ref.BranchRefType: - wsRef, err := ref.WorkingSetRefForHead(b) - if err != nil { - return GlobalStateImpl{}, err + wg.Add(1) + + eg2.Go(func() error { + defer wg.Done() + + if egCtx.Err() != nil { + return egCtx.Err() } - ws, err := db.ResolveWorkingSet(ctx, wsRef) - if err == doltdb.ErrWorkingSetNotFound { - // use the branch head if there isn't a working set for it - cm, err := db.ResolveCommitRef(ctx, b) - if err != nil { - return GlobalStateImpl{}, err + switch b.GetType() { + case ref.BranchRefType: + wsRef, rerr := ref.WorkingSetRefForHead(b) + if rerr != nil { + return rerr } - roots = append(roots, cm) - } else if err != nil { - return GlobalStateImpl{}, err - } else { - roots = append(roots, ws) - } - case ref.RemoteRefType: - cm, err := db.ResolveCommitRef(ctx, b) - if err != nil { - return GlobalStateImpl{}, err + + ws, rerr := db.ResolveWorkingSet(egCtx, wsRef) + if rerr == doltdb.ErrWorkingSetNotFound { + // use the branch head if there isn't a working set for it + cm, rerr := db.ResolveCommitRef(egCtx, b) + if rerr != nil { + return rerr + } + rootChan <- cm + } else if err != nil { + return rerr + } else { + rootChan <- ws + } + case ref.RemoteRefType: + cm, rerr := db.ResolveCommitRef(egCtx, b) + if rerr != nil { + return rerr + } + rootChan <- cm } - roots = append(roots, cm) - } + return nil + }) + } + + err = eg2.Wait() + if err != nil { + return GlobalStateImpl{}, err + } + + err = eg.Wait() + if err != nil { + return GlobalStateImpl{}, err + } + + var roots []doltdb.Rootish + for root := range rootChan { + roots = append(roots, root) } tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) From eb23dcd260aaf10ff2a947af5bf8067577d55dc5 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Mon, 13 Jan 2025 17:33:18 -0800 Subject: [PATCH 02/10] /go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go: wip, nil prob --- go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 6a5a69dfb3..10415505e1 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -412,7 +412,8 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd if rerr != nil { return rerr } - return r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { + + return r.IterTables(egCtx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) { if !schema.HasAutoIncrement(sch) { return false, nil } From fa6c12f249ff35fd3e9a10e593579842abb59cc7 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Mon, 13 Jan 2025 17:37:26 -0800 Subject: [PATCH 03/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: try just changing ai --- .../doltcore/sqle/dsess/globalstate.go | 88 +++++-------------- 1 file changed, 24 insertions(+), 64 deletions(-) mode change 100644 => 100755 go/libraries/doltcore/sqle/dsess/globalstate.go diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go old mode 100644 new mode 100755 index fb4190fec9..dd675d099b --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -19,7 +19,6 @@ import ( "sync" "github.com/dolthub/go-mysql-server/sql" - "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -37,78 +36,39 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol return GlobalStateImpl{}, err } - eg2, egCtx := errgroup.WithContext(ctx) - eg2.SetLimit(128) - rootRefs := make([]ref.DoltRef, 0, len(branches)+len(remotes)) rootRefs = append(rootRefs, branches...) rootRefs = append(rootRefs, remotes...) - rootChan := make(chan doltdb.Rootish, len(rootRefs)) - - wg := sync.WaitGroup{} - - eg, _ := errgroup.WithContext(egCtx) - eg.Go(func() error { - wg.Wait() - close(rootChan) - return nil - }) - + var roots []doltdb.Rootish for _, b := range rootRefs { - wg.Add(1) - - eg2.Go(func() error { - defer wg.Done() - - if egCtx.Err() != nil { - return egCtx.Err() + switch b.GetType() { + case ref.BranchRefType: + wsRef, err := ref.WorkingSetRefForHead(b) + if err != nil { + return GlobalStateImpl{}, err } - switch b.GetType() { - case ref.BranchRefType: - wsRef, rerr := ref.WorkingSetRefForHead(b) - if rerr != nil { - return rerr - } - - ws, rerr := db.ResolveWorkingSet(egCtx, wsRef) - if rerr == doltdb.ErrWorkingSetNotFound { - // use the branch head if there isn't a working set for it - cm, rerr := db.ResolveCommitRef(egCtx, b) - if rerr != nil { - return rerr - } - rootChan <- cm - } else if err != nil { - return rerr - } else { - rootChan <- ws + ws, err := db.ResolveWorkingSet(ctx, wsRef) + if err == doltdb.ErrWorkingSetNotFound { + // use the branch head if there isn't a working set for it + cm, err := db.ResolveCommitRef(ctx, b) + if err != nil { + return GlobalStateImpl{}, err } - case ref.RemoteRefType: - cm, rerr := db.ResolveCommitRef(egCtx, b) - if rerr != nil { - return rerr - } - rootChan <- cm + roots = append(roots, cm) + } else if err != nil { + return GlobalStateImpl{}, err + } else { + roots = append(roots, ws) } - return nil - }) - } - - err = eg2.Wait() - if err != nil { - return GlobalStateImpl{}, err - } - - err = eg.Wait() - if err != nil { - return GlobalStateImpl{}, err - } - - var roots []doltdb.Rootish - for root := range rootChan { - roots = append(roots, root) + case ref.RemoteRefType: + cm, err := db.ResolveCommitRef(ctx, b) + if err != nil { + return GlobalStateImpl{}, err + } + roots = append(roots, cm) + } } tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) From 53ba7ca521239c3446d23fdc97f9af1da74521cd Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 09:33:34 -0800 Subject: [PATCH 04/10] temp: add some timing logs --- .../doltcore/sqle/dsess/autoincrement_tracker.go | 15 ++++++++++++++- go/libraries/doltcore/sqle/dsess/globalstate.go | 12 ++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) mode change 100755 => 100644 go/libraries/doltcore/sqle/dsess/globalstate.go diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 10415505e1..841bcae165 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -16,13 +16,16 @@ package dsess import ( "context" + "fmt" "io" "math" "strings" "sync" + "time" "github.com/dolthub/go-mysql-server/sql" gmstypes "github.com/dolthub/go-mysql-server/sql/types" + "github.com/fatih/color" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -399,17 +402,20 @@ func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName stri } func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltdb.Rootish) error { + start := time.Now() eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(128) for _, root := range roots { eg.Go(func() error { if egCtx.Err() != nil { + fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: ctx error: elapsed: %v\n", time.Since(start)) return egCtx.Err() } r, rerr := root.ResolveRootValue(egCtx) if rerr != nil { + fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: resolve root error: elapsed: %v\n", time.Since(start)) return rerr } @@ -420,6 +426,7 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd seq, iErr := table.GetAutoIncrementValue(egCtx) if iErr != nil { + fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: IterTables: get autoincrement value error: elapsed: %v\n", time.Since(start)) return true, iErr } @@ -433,5 +440,11 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd }) } - return eg.Wait() + err := eg.Wait() + if err != nil { + return err + } + + fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: success: elapsed: %v\n", time.Since(start)) + return nil } diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go old mode 100755 new mode 100644 index dd675d099b..43d2f0b04f --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -16,9 +16,12 @@ package dsess import ( "context" + "fmt" "sync" + "time" "github.com/dolthub/go-mysql-server/sql" + "github.com/fatih/color" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -26,13 +29,16 @@ import ( ) func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.DoltDB) (GlobalStateImpl, error) { + start := time.Now() branches, err := db.GetBranches(ctx) if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: get branches error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } remotes, err := db.GetRemoteRefs(ctx) if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: get remote refs error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } @@ -46,6 +52,7 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol case ref.BranchRefType: wsRef, err := ref.WorkingSetRefForHead(b) if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } @@ -54,10 +61,12 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol // use the branch head if there isn't a working set for it cm, err := db.ResolveCommitRef(ctx, b) if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } roots = append(roots, cm) } else if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } else { roots = append(roots, ws) @@ -65,12 +74,15 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol case ref.RemoteRefType: cm, err := db.ResolveCommitRef(ctx, b) if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } roots = append(roots, cm) } } + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: success: elapsed: %v\n", time.Since(start)) + tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) if err != nil { return GlobalStateImpl{}, err From 4035348eabb4001079cfad9aa59de429df999f88 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 10:10:57 -0800 Subject: [PATCH 05/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: parallelize global state also --- .../doltcore/sqle/dsess/globalstate.go | 89 +++++++++++++------ 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index 43d2f0b04f..868809edd2 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -22,6 +22,7 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/fatih/color" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -46,43 +47,75 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol rootRefs = append(rootRefs, branches...) rootRefs = append(rootRefs, remotes...) - var roots []doltdb.Rootish + rootRefsChan := make(chan doltdb.Rootish, len(rootRefs)) + eg, egCtx := errgroup.WithContext(ctx) + wg := sync.WaitGroup{} + + eg.Go(func() error { + defer close(rootRefsChan) + wg.Wait() + return nil + }) + for _, b := range rootRefs { - switch b.GetType() { - case ref.BranchRefType: - wsRef, err := ref.WorkingSetRefForHead(b) - if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start)) - return GlobalStateImpl{}, err - } + wg.Add(1) + eg.Go(func() error { + defer wg.Done() + switch b.GetType() { + case ref.BranchRefType: + wsRef, err := ref.WorkingSetRefForHead(b) + if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start)) + return err + } - ws, err := db.ResolveWorkingSet(ctx, wsRef) - if err == doltdb.ErrWorkingSetNotFound { - // use the branch head if there isn't a working set for it - cm, err := db.ResolveCommitRef(ctx, b) + ws, err := db.ResolveWorkingSet(egCtx, wsRef) + if err == doltdb.ErrWorkingSetNotFound { + // use the branch head if there isn't a working set for it + cm, err := db.ResolveCommitRef(egCtx, b) + if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start)) + return err + } + rootRefsChan <- cm + } else if err != nil { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start)) + return err + } else { + rootRefsChan <- ws + } + case ref.RemoteRefType: + cm, err := db.ResolveCommitRef(egCtx, b) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start)) - return GlobalStateImpl{}, err + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start)) + return err } - roots = append(roots, cm) - } else if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start)) - return GlobalStateImpl{}, err - } else { - roots = append(roots, ws) - } - case ref.RemoteRefType: - cm, err := db.ResolveCommitRef(ctx, b) - if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start)) - return GlobalStateImpl{}, err + rootRefsChan <- cm } - roots = append(roots, cm) - } + return nil + }) + } + + err = eg.Wait() + if err != nil { + return GlobalStateImpl{}, err + } + + if len(rootRefsChan) != len(rootRefs) { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: rootRefsChan does not equal rootRefs\n") } fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: success: elapsed: %v\n", time.Since(start)) + var roots []doltdb.Rootish + for rootRef := range rootRefsChan { + roots = append(roots, rootRef) + } + + if len(roots) != len(rootRefs) { + fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: roots does not equal rootRefs\n") + } + tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) if err != nil { return GlobalStateImpl{}, err From bff342afe5a6d1629590d6b13059ea32da1cf789 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 10:20:26 -0800 Subject: [PATCH 06/10] /go/libraries/doltcore/sqle/dsess: remove logs --- .../sqle/dsess/autoincrement_tracker.go | 15 +------------- .../doltcore/sqle/dsess/globalstate.go | 20 ------------------- 2 files changed, 1 insertion(+), 34 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 841bcae165..10415505e1 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -16,16 +16,13 @@ package dsess import ( "context" - "fmt" "io" "math" "strings" "sync" - "time" "github.com/dolthub/go-mysql-server/sql" gmstypes "github.com/dolthub/go-mysql-server/sql/types" - "github.com/fatih/color" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -402,20 +399,17 @@ func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName stri } func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltdb.Rootish) error { - start := time.Now() eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(128) for _, root := range roots { eg.Go(func() error { if egCtx.Err() != nil { - fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: ctx error: elapsed: %v\n", time.Since(start)) return egCtx.Err() } r, rerr := root.ResolveRootValue(egCtx) if rerr != nil { - fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: resolve root error: elapsed: %v\n", time.Since(start)) return rerr } @@ -426,7 +420,6 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd seq, iErr := table.GetAutoIncrementValue(egCtx) if iErr != nil { - fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: IterTables: get autoincrement value error: elapsed: %v\n", time.Since(start)) return true, iErr } @@ -440,11 +433,5 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd }) } - err := eg.Wait() - if err != nil { - return err - } - - fmt.Fprintf(color.Output, "DUSTIN: InitWithRoots: success: elapsed: %v\n", time.Since(start)) - return nil + return eg.Wait() } diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index 868809edd2..525de71572 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -16,12 +16,9 @@ package dsess import ( "context" - "fmt" "sync" - "time" "github.com/dolthub/go-mysql-server/sql" - "github.com/fatih/color" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -30,16 +27,13 @@ import ( ) func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.DoltDB) (GlobalStateImpl, error) { - start := time.Now() branches, err := db.GetBranches(ctx) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: get branches error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } remotes, err := db.GetRemoteRefs(ctx) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: get remote refs error: elapsed: %v\n", time.Since(start)) return GlobalStateImpl{}, err } @@ -65,7 +59,6 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol case ref.BranchRefType: wsRef, err := ref.WorkingSetRefForHead(b) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start)) return err } @@ -74,12 +67,10 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol // use the branch head if there isn't a working set for it cm, err := db.ResolveCommitRef(egCtx, b) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start)) return err } rootRefsChan <- cm } else if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start)) return err } else { rootRefsChan <- ws @@ -87,7 +78,6 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol case ref.RemoteRefType: cm, err := db.ResolveCommitRef(egCtx, b) if err != nil { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start)) return err } rootRefsChan <- cm @@ -101,21 +91,11 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol return GlobalStateImpl{}, err } - if len(rootRefsChan) != len(rootRefs) { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: rootRefsChan does not equal rootRefs\n") - } - - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: success: elapsed: %v\n", time.Since(start)) - var roots []doltdb.Rootish for rootRef := range rootRefsChan { roots = append(roots, rootRef) } - if len(roots) != len(rootRefs) { - fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: roots does not equal rootRefs\n") - } - tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) if err != nil { return GlobalStateImpl{}, err From ddecc470e1c0a62fe828d1d82926c0c3507d36c7 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 10:29:19 -0800 Subject: [PATCH 07/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: check for ctx error --- go/libraries/doltcore/sqle/dsess/globalstate.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index 525de71572..bc6345852d 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -55,6 +55,10 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol wg.Add(1) eg.Go(func() error { defer wg.Done() + if egCtx.Err() != nil { + return egCtx.Err() + } + switch b.GetType() { case ref.BranchRefType: wsRef, err := ref.WorkingSetRefForHead(b) From 8c1eda8ccd961d43669eb83b5d56e969058d6da7 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 10:39:43 -0800 Subject: [PATCH 08/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: trying to fix closed channel send error --- go/libraries/doltcore/sqle/dsess/globalstate.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index bc6345852d..034f662468 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -45,11 +45,11 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol eg, egCtx := errgroup.WithContext(ctx) wg := sync.WaitGroup{} - eg.Go(func() error { - defer close(rootRefsChan) - wg.Wait() - return nil - }) + //eg.Go(func() error { + // defer close(rootRefsChan) + // wg.Wait() + // return nil + //}) for _, b := range rootRefs { wg.Add(1) @@ -90,6 +90,8 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol }) } + wg.Wait() + close(rootRefsChan) err = eg.Wait() if err != nil { return GlobalStateImpl{}, err From a819aa2c0d4a932ee47584723864232ecd18d809 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 10:52:49 -0800 Subject: [PATCH 09/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: fix again --- go/libraries/doltcore/sqle/dsess/globalstate.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index 034f662468..96871146b2 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -45,12 +45,6 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol eg, egCtx := errgroup.WithContext(ctx) wg := sync.WaitGroup{} - //eg.Go(func() error { - // defer close(rootRefsChan) - // wg.Wait() - // return nil - //}) - for _, b := range rootRefs { wg.Add(1) eg.Go(func() error { @@ -90,8 +84,10 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol }) } + // prevent sending on closed channel wg.Wait() close(rootRefsChan) + err = eg.Wait() if err != nil { return GlobalStateImpl{}, err From 93355c512b92bd2e0cb2810e2c04f118856cb313 Mon Sep 17 00:00:00 2001 From: coffeegoddd Date: Tue, 14 Jan 2025 11:32:32 -0800 Subject: [PATCH 10/10] /go/libraries/doltcore/sqle/dsess/globalstate.go: pr feedback --- .../doltcore/sqle/dsess/globalstate.go | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index 96871146b2..8315997460 100644 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -41,14 +41,13 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol rootRefs = append(rootRefs, branches...) rootRefs = append(rootRefs, remotes...) - rootRefsChan := make(chan doltdb.Rootish, len(rootRefs)) + roots := make([]doltdb.Rootish, len(rootRefs)) eg, egCtx := errgroup.WithContext(ctx) - wg := sync.WaitGroup{} + eg.SetLimit(128) - for _, b := range rootRefs { - wg.Add(1) + for idx, b := range rootRefs { + idx, b := idx, b eg.Go(func() error { - defer wg.Done() if egCtx.Err() != nil { return egCtx.Err() } @@ -67,37 +66,28 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol if err != nil { return err } - rootRefsChan <- cm + roots[idx] = cm } else if err != nil { return err } else { - rootRefsChan <- ws + roots[idx] = ws } case ref.RemoteRefType: cm, err := db.ResolveCommitRef(egCtx, b) if err != nil { return err } - rootRefsChan <- cm + roots[idx] = cm } return nil }) } - // prevent sending on closed channel - wg.Wait() - close(rootRefsChan) - err = eg.Wait() if err != nil { return GlobalStateImpl{}, err } - var roots []doltdb.Rootish - for rootRef := range rootRefsChan { - roots = append(roots, rootRef) - } - tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...) if err != nil { return GlobalStateImpl{}, err