diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 9c17fd111d2..5e72e052665 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -179,14 +179,21 @@ func NewSqlEngine( "authentication_dolt_jwt": NewAuthenticateDoltJWTPlugin(config.JwksConfig), }) + statsPro := stats.NewProvider() + engine.Analyzer.Catalog.StatsProvider = statsPro + engine.Analyzer.ExecBuilder = rowexec.DefaultBuilder - sessFactory := doltSessionFactory(pro, mrEnv.Config(), bcController, config.Autocommit) + sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, config.Autocommit) sqlEngine.provider = pro sqlEngine.contextFactory = sqlContextFactory() sqlEngine.dsessFactory = sessFactory + sqlEngine.engine = engine - engine.Analyzer.Catalog.StatsProvider = stats.NewProvider() - engine.Analyzer.Catalog.StatsProvider.(*stats.Provider).Load(sql.NewContext(ctx), dbs) + // configuring stats depends on sessionBuilder + // sessionBuilder needs ref to statsProv + if err = statsPro.Configure(ctx, sqlEngine.NewDefaultContext, bThreads, pro, dbs); err != nil { + return nil, err + } // Load MySQL Db information if err = engine.Analyzer.Catalog.MySQLDb.LoadData(sql.NewEmptyContext(), data); err != nil { @@ -229,8 +236,6 @@ func NewSqlEngine( } } - sqlEngine.engine = engine - return sqlEngine, nil } @@ -386,9 +391,9 @@ func sqlContextFactory() contextFactory { } // doltSessionFactory returns a sessionFactory that creates a new DoltSession -func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory { +func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory { return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) { - doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc) + doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro) if err != nil { return nil, err } diff --git a/go/go.mod b/go/go.mod index 2fe1e57b448..a8388247fbf 100644 --- a/go/go.mod +++ b/go/go.mod @@ -57,7 +57,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/creasty/defaults v1.6.0 github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 - github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca + github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b github.com/dolthub/swiss v0.1.0 github.com/goccy/go-json v0.10.2 github.com/google/go-github/v57 v57.0.0 diff --git a/go/go.sum b/go/go.sum index aec27389a53..a85585fabec 100644 --- a/go/go.sum +++ b/go/go.sum @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168= -github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca h1:tI3X4fIUTOT0N8n+GYkPNa384WlJoOBcztK5c5mBzjU= -github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca/go.mod h1:ANK0a6tyjrZ2cOzDJT3nFsDp80xksI4UfeijFlvnjwE= +github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b h1:FDNaFIT63vRLfn6cTOr0rsgceMlFeuulaUru54HqIIg= +github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b/go.mod h1:ANK0a6tyjrZ2cOzDJT3nFsDp80xksI4UfeijFlvnjwE= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto= github.com/dolthub/jsonpath v0.0.2-0.20240201003050-392940944c15 h1:sfTETOpsrNJPDn2KydiCtDgVu6Xopq8k3JP8PjFT22s= diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 7246d270b56..869557a7730 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1744,7 +1744,17 @@ func (ddb *DoltDB) SetStatisics(ctx context.Context, addr hash.Hash) error { return err } -var ErrNoStatistics = errors.New("No statistics found.") +func (ddb *DoltDB) DropStatisics(ctx context.Context) error { + statsDs, err := ddb.db.GetDataset(ctx, ref.NewStatsRef().String()) + + _, err = ddb.db.Delete(ctx, statsDs, "") + if err != nil { + return err + } + return nil +} + +var ErrNoStatistics = errors.New("no statistics found") // GetStatistics returns the value of the singleton ref.StatsRef for this database func (ddb *DoltDB) GetStatistics(ctx context.Context) (prolly.Map, error) { diff --git a/go/libraries/doltcore/schema/statistic.go b/go/libraries/doltcore/schema/statistic.go index b0754acdff3..21cf5e27d26 100644 --- a/go/libraries/doltcore/schema/statistic.go +++ b/go/libraries/doltcore/schema/statistic.go @@ -25,9 +25,9 @@ const StatsVersion int64 = 1 const ( StatsQualifierColName = "qualifier" - StatsDbColName = "database" - StatsTableColName = "table" - StatsIndexColName = "index" + StatsDbColName = "database_name" + StatsTableColName = "table_name" + StatsIndexColName = "index_name" StatsPositionColName = "position" StatsCommitHashColName = "commit_hash" StatsRowCountColName = "row_count" diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index 11be7434f0b..818cd050588 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -625,6 +625,10 @@ func (p *DoltDatabaseProvider) ListDroppedDatabases(ctx *sql.Context) ([]string, return p.droppedDatabaseManager.ListDroppedDatabases(ctx) } +func (p *DoltDatabaseProvider) DbFactoryUrl() string { + return p.dbFactoryUrl +} + func (p *DoltDatabaseProvider) UndropDatabase(ctx *sql.Context, name string) (err error) { p.mu.Lock() defer p.mu.Unlock() diff --git a/go/libraries/doltcore/sqle/dprocedures/init.go b/go/libraries/doltcore/sqle/dprocedures/init.go index bc1cc09b62e..b072b4a8537 100644 --- a/go/libraries/doltcore/sqle/dprocedures/init.go +++ b/go/libraries/doltcore/sqle/dprocedures/init.go @@ -47,6 +47,11 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{ {Name: "dolt_revert", Schema: int64Schema("status"), Function: doltRevert}, {Name: "dolt_tag", Schema: int64Schema("status"), Function: doltTag}, {Name: "dolt_verify_constraints", Schema: int64Schema("violations"), Function: doltVerifyConstraints}, + + {Name: "dolt_stats_drop", Schema: doltMergeSchema, Function: statsFunc(statsDrop)}, + {Name: "dolt_stats_restart", Schema: doltMergeSchema, Function: statsFunc(statsRestart)}, + {Name: "dolt_stats_stop", Schema: doltMergeSchema, Function: statsFunc(statsStop)}, + {Name: "dolt_stats_status", Schema: doltMergeSchema, Function: statsFunc(statsStatus)}, } // stringSchema returns a non-nullable schema with all columns as LONGTEXT. diff --git a/go/libraries/doltcore/sqle/dprocedures/stats_funcs.go b/go/libraries/doltcore/sqle/dprocedures/stats_funcs.go new file mode 100644 index 00000000000..80771124e1d --- /dev/null +++ b/go/libraries/doltcore/sqle/dprocedures/stats_funcs.go @@ -0,0 +1,112 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dprocedures + +import ( + "fmt" + "strings" + + "github.com/dolthub/go-mysql-server/sql" + + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" +) + +func statsFunc(fn func(ctx *sql.Context) (interface{}, error)) func(ctx *sql.Context, args ...string) (sql.RowIter, error) { + return func(ctx *sql.Context, args ...string) (sql.RowIter, error) { + res, err := fn(ctx) + if err != nil { + return nil, err + } + return rowToIter(res), nil + } +} + +// AutoRefreshStatsProvider is a sql.StatsProvider that exposes hooks for +// observing and manipulating background database auto refresh threads. +type AutoRefreshStatsProvider interface { + sql.StatsProvider + CancelRefreshThread(string) + StartRefreshThread(*sql.Context, dsess.DoltDatabaseProvider, string, *env.DoltEnv) error + ThreadStatus(string) string +} + +// statsRestart tries to stop and then start a refresh thread +func statsRestart(ctx *sql.Context) (interface{}, error) { + dSess := dsess.DSessFromSess(ctx.Session) + statsPro := dSess.StatsProvider() + dbName := strings.ToLower(ctx.GetCurrentDatabase()) + + if afp, ok := statsPro.(AutoRefreshStatsProvider); ok { + pro := dSess.Provider() + newFs, err := pro.FileSystem().WithWorkingDir(dbName) + if err != nil { + return nil, fmt.Errorf("failed to restart stats collection: %w", err) + } + + dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, newFs, pro.DbFactoryUrl(), "TODO") + + afp.CancelRefreshThread(dbName) + + err = afp.StartRefreshThread(ctx, pro, dbName, dEnv) + if err != nil { + return nil, fmt.Errorf("failed to restart collection: %w", err) + } + return fmt.Sprintf("restarted stats collection: %s", ref.StatsRef{}.String()), nil + } + return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider") +} + +// statsStatus returns the last update for a stats thread +func statsStatus(ctx *sql.Context) (interface{}, error) { + dSess := dsess.DSessFromSess(ctx.Session) + dbName := strings.ToLower(ctx.GetCurrentDatabase()) + pro := dSess.StatsProvider() + if afp, ok := pro.(AutoRefreshStatsProvider); ok { + return afp.ThreadStatus(dbName), nil + } + return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider") +} + +// statsStop cancels a refresh thread +func statsStop(ctx *sql.Context) (interface{}, error) { + dSess := dsess.DSessFromSess(ctx.Session) + statsPro := dSess.StatsProvider() + dbName := strings.ToLower(ctx.GetCurrentDatabase()) + + if afp, ok := statsPro.(AutoRefreshStatsProvider); ok { + afp.CancelRefreshThread(dbName) + return fmt.Sprintf("stopped thread: %s", dbName), nil + } + return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider") +} + +// statsDrop deletes the stats ref +func statsDrop(ctx *sql.Context) (interface{}, error) { + dSess := dsess.DSessFromSess(ctx.Session) + pro := dSess.StatsProvider() + dbName := strings.ToLower(ctx.GetCurrentDatabase()) + + if afp, ok := pro.(AutoRefreshStatsProvider); ok { + // currently unsafe to drop stats while running refresh + afp.CancelRefreshThread(dbName) + } + err := pro.DropDbStats(ctx, dbName, true) + if err != nil { + return nil, fmt.Errorf("failed to drop stats: %w", err) + } + return fmt.Sprintf("deleted stats ref for %s", dbName), nil +} diff --git a/go/libraries/doltcore/sqle/dsess/dolt_session_test.go b/go/libraries/doltcore/sqle/dsess/dolt_session_test.go index 88288cd1311..e89a86f12f4 100644 --- a/go/libraries/doltcore/sqle/dsess/dolt_session_test.go +++ b/go/libraries/doltcore/sqle/dsess/dolt_session_test.go @@ -251,6 +251,10 @@ type emptyRevisionDatabaseProvider struct { sql.DatabaseProvider } +func (e emptyRevisionDatabaseProvider) DbFactoryUrl() string { + return "" +} + func (e emptyRevisionDatabaseProvider) UndropDatabase(ctx *sql.Context, dbName string) error { return nil } diff --git a/go/libraries/doltcore/sqle/dsess/session.go b/go/libraries/doltcore/sqle/dsess/session.go index f9c4012e3f0..652f4bd315e 100644 --- a/go/libraries/doltcore/sqle/dsess/session.go +++ b/go/libraries/doltcore/sqle/dsess/session.go @@ -25,6 +25,7 @@ import ( "github.com/dolthub/go-mysql-server/sql" sqltypes "github.com/dolthub/go-mysql-server/sql/types" + "github.com/shopspring/decimal" "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" @@ -58,6 +59,7 @@ type DoltSession struct { tempTables map[string][]sql.Table globalsConf config.ReadWriteConfig branchController *branch_control.Controller + statsProv sql.StatsProvider mu *sync.Mutex fs filesys.Filesys @@ -94,6 +96,7 @@ func NewDoltSession( pro DoltDatabaseProvider, conf config.ReadWriteConfig, branchController *branch_control.Controller, + statsProvider sql.StatsProvider, ) (*DoltSession, error) { username := conf.GetStringOrDefault(config.UserNameKey, "") email := conf.GetStringOrDefault(config.UserEmailKey, "") @@ -109,6 +112,7 @@ func NewDoltSession( tempTables: make(map[string][]sql.Table), globalsConf: globals, branchController: branchController, + statsProv: statsProvider, mu: &sync.Mutex{}, fs: pro.FileSystem(), } @@ -121,6 +125,11 @@ func (d *DoltSession) Provider() DoltDatabaseProvider { return d.provider } +// StatsProvider returns the sql.StatsProvider for this session. +func (d *DoltSession) StatsProvider() sql.StatsProvider { + return d.statsProv +} + // DSessFromSess retrieves a dolt session from a standard sql.Session func DSessFromSess(sess sql.Session) *DoltSession { return sess.(*DoltSession) @@ -1588,6 +1597,9 @@ func setPersistedValue(conf config.WritableConfig, key string, value interface{} return config.SetFloat(conf, key, float64(v)) case float64: return config.SetFloat(conf, key, v) + case decimal.Decimal: + f64, _ := v.Float64() + return config.SetFloat(conf, key, f64) case string: return config.SetString(conf, key, v) case bool: diff --git a/go/libraries/doltcore/sqle/dsess/session_db_provider.go b/go/libraries/doltcore/sqle/dsess/session_db_provider.go index ba23a979a9e..90194031fb7 100644 --- a/go/libraries/doltcore/sqle/dsess/session_db_provider.go +++ b/go/libraries/doltcore/sqle/dsess/session_db_provider.go @@ -74,6 +74,7 @@ type DoltDatabaseProvider interface { sql.MutableDatabaseProvider // FileSystem returns the filesystem used by this provider, rooted at the data directory for all databases. FileSystem() filesys.Filesys + DbFactoryUrl() string // FileSystemForDatabase returns a filesystem, with the working directory set to the root directory // of the requested database. If the requested database isn't found, a database not found error // is returned. diff --git a/go/libraries/doltcore/sqle/dsess/variables.go b/go/libraries/doltcore/sqle/dsess/variables.go index 09788be6269..7d6d92b79b2 100644 --- a/go/libraries/doltcore/sqle/dsess/variables.go +++ b/go/libraries/doltcore/sqle/dsess/variables.go @@ -57,6 +57,11 @@ const ( DoltClusterRoleVariable = "dolt_cluster_role" DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch" DoltClusterAckWritesTimeoutSecs = "dolt_cluster_ack_writes_timeout_secs" + + DoltStatsAutoRefreshEnabled = "dolt_stats_auto_refresh_enabled" + DoltStatsAutoRefreshThreshold = "dolt_stats_auto_refresh_threshold" + DoltStatsAutoRefreshInterval = "dolt_stats_auto_refresh_interval" + DoltStatsMemoryOnly = "dolt_stats_memory_only" ) const URLTemplateDatabasePlaceholder = "{database}" diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go index 3fcd0c68c97..a52d3cdb426 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "runtime" + "sync" "testing" "time" @@ -40,6 +41,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats" "github.com/dolthub/dolt/go/libraries/utils/config" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/types" @@ -2095,6 +2097,18 @@ func TestColumnDiffSystemTablePrepared(t *testing.T) { } } +func TestStatsFunctions(t *testing.T) { + harness := newDoltHarness(t) + defer harness.Close() + harness.Setup(setup.MydbData) + harness.configureStats = true + for _, test := range StatProcTests { + t.Run(test.Name, func(t *testing.T) { + enginetest.TestScript(t, harness, test) + }) + } +} + func TestDiffTableFunction(t *testing.T) { harness := newDoltHarness(t) defer harness.Close() @@ -3062,6 +3076,112 @@ func TestCreateDatabaseErrorCleansUp(t *testing.T) { require.True(t, isDir) } +// TestStatsAutoRefreshConcurrency tests some common concurrent patterns that stats +// refresh is subject to -- namely reading/writing the stats objects in (1) DML statements +// (2) auto refresh threads, and (3) manual ANALYZE statements. +// todo: the dolt_stat functions should be concurrency tested +func TestStatsAutoRefreshConcurrency(t *testing.T) { + // create engine + harness := newDoltHarness(t) + harness.Setup(setup.MydbData) + engine := mustNewEngine(t, harness) + defer engine.Close() + + enginetest.RunQueryWithContext(t, engine, harness, nil, `create table xy (x int primary key, y int, z int, key (z), key (y,z), key (y,z,x))`) + enginetest.RunQueryWithContext(t, engine, harness, nil, `create table uv (u int primary key, v int, w int, key (w), key (w,u), key (u,w,v))`) + + sqlDb, _ := harness.provider.BaseDatabase(harness.NewContext(), "mydb") + + // Setting an interval of 0 and a threshold of 0 will result + // in the stats being updated after every operation + intervalSec := time.Duration(0) + thresholdf64 := 0. + bThreads := sql.NewBackgroundThreads() + statsProv := engine.EngineAnalyzer().Catalog.StatsProvider.(*stats.Provider) + + // it is important to use new sessions for this test, to avoid working root conflicts + readCtx := enginetest.NewSession(harness) + writeCtx := enginetest.NewSession(harness) + newCtx := func(context.Context) (*sql.Context, error) { + return enginetest.NewSession(harness), nil + } + + err := statsProv.InitAutoRefresh(newCtx, sqlDb.Name(), bThreads, intervalSec, thresholdf64) + require.NoError(t, err) + + execQ := func(ctx *sql.Context, q string, id int, tag string) { + _, iter, err := engine.Query(ctx, q) + require.NoError(t, err) + _, err = sql.RowIterToRows(ctx, iter) + //fmt.Printf("%s %d\n", tag, id) + require.NoError(t, err) + } + + iters := 1_000 + { + // 3 threads to test auto-refresh/DML concurrency safety + // - auto refresh (read + write) + // - write (write only) + // - read (read only) + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + for i := 0; i < iters; i++ { + q := "select count(*) from xy a join xy b on a.x = b.x" + execQ(readCtx, q, i, "read") + q = "select count(*) from uv a join uv b on a.u = b.u" + execQ(readCtx, q, i, "read") + } + wg.Done() + }() + + go func() { + for i := 0; i < iters; i++ { + q := fmt.Sprintf("insert into xy values (%d,%d,%d)", i, i, i) + execQ(writeCtx, q, i, "write") + q = fmt.Sprintf("insert into uv values (%d,%d,%d)", i, i, i) + execQ(writeCtx, q, i, "write") + } + wg.Done() + }() + + wg.Wait() + } + + { + // 3 threads to test auto-refresh/manual ANALYZE concurrency + // - auto refresh (read + write) + // - add (read + write) + // - drop (write only) + + wg := sync.WaitGroup{} + wg.Add(2) + + analyzeAddCtx := enginetest.NewSession(harness) + analyzeDropCtx := enginetest.NewSession(harness) + + // hammer the provider with concurrent stat updates + go func() { + for i := 0; i < iters; i++ { + execQ(analyzeAddCtx, "analyze table xy,uv", i, "analyze create") + } + wg.Done() + }() + + go func() { + for i := 0; i < iters; i++ { + execQ(analyzeDropCtx, "analyze table xy drop histogram on (y,z)", i, "analyze drop yz") + execQ(analyzeDropCtx, "analyze table uv drop histogram on (w,u)", i, "analyze drop wu") + } + wg.Done() + }() + + wg.Wait() + } +} + // runMergeScriptTestsInBothDirections creates a new test run, named |name|, and runs the specified merge |tests| // in both directions (right to left merge, and left to right merge). If // |runAsPrepared| is true then the test scripts will be run using the prepared diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_harness.go b/go/libraries/doltcore/sqle/enginetest/dolt_harness.go index 93d5dfad2c7..81fd0a9c8ae 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_harness.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_harness.go @@ -43,6 +43,7 @@ import ( type DoltHarness struct { t *testing.T provider dsess.DoltDatabaseProvider + statsPro sql.StatsProvider multiRepoEnv *env.MultiRepoEnv session *dsess.DoltSession branchControl *branch_control.Controller @@ -52,6 +53,7 @@ type DoltHarness struct { resetData []setup.SetupScript engine *gms.Engine skipSetupCommit bool + configureStats bool useLocalFilesystem bool setupTestProcedures bool } @@ -189,11 +191,14 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) { require.True(t, ok) d.provider = doltProvider + statsPro := stats.NewProvider() + d.statsPro = statsPro + var err error - d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl) + d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro) require.NoError(t, err) - e, err := enginetest.NewEngine(t, d, d.provider, d.setupData, stats.NewProvider()) + e, err := enginetest.NewEngine(t, d, d.provider, d.setupData, d.statsPro) if err != nil { return nil, err } @@ -214,6 +219,22 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) { } } + if d.configureStats { + bThreads := sql.NewBackgroundThreads() + e = e.WithBackgroundThreads(bThreads) + + dSess := dsess.DSessFromSess(ctx.Session) + dbCache := dSess.DatabaseCache(ctx) + + dsessDbs := make([]dsess.SqlDatabase, len(dbs)) + for i, dbName := range dbs { + dsessDbs[i], _ = dbCache.GetCachedRevisionDb(fmt.Sprintf("%s/main", dbName), dbName) + } + if err = statsPro.Configure(ctx, func(context.Context) (*sql.Context, error) { return d.NewSession(), nil }, bThreads, doltProvider, dsessDbs); err != nil { + return nil, err + } + } + return e, nil } @@ -226,7 +247,7 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) { // Get a fresh session if we are reusing the engine if !initializeEngine { var err error - d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl) + d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl, nil) require.NoError(t, err) } @@ -296,7 +317,7 @@ func (d *DoltHarness) newSessionWithClient(client sql.Client) *dsess.DoltSession localConfig := d.multiRepoEnv.Config() pro := d.session.Provider() - dSession, err := dsess.NewDoltSession(sql.NewBaseSessionWithClientServer("address", client, 1), pro.(dsess.DoltDatabaseProvider), localConfig, d.branchControl) + dSession, err := dsess.NewDoltSession(sql.NewBaseSessionWithClientServer("address", client, 1), pro.(dsess.DoltDatabaseProvider), localConfig, d.branchControl, d.statsPro) dSession.SetCurrentDatabase("mydb") require.NoError(d.t, err) return dSession @@ -318,6 +339,7 @@ func (d *DoltHarness) NewDatabases(names ...string) []sql.Database { d.closeProvider() d.engine = nil d.provider = nil + d.statsPro = stats.NewProvider() d.branchControl = branch_control.CreateDefaultController(context.Background()) @@ -327,7 +349,7 @@ func (d *DoltHarness) NewDatabases(names ...string) []sql.Database { d.provider = doltProvider var err error - d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), doltProvider, d.multiRepoEnv.Config(), d.branchControl) + d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), doltProvider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro) require.NoError(d.t, err) // TODO: the engine tests should do this for us @@ -385,7 +407,7 @@ func (d *DoltHarness) NewReadOnlyEngine(provider sql.DatabaseProvider) (enginete } // reset the session as well since we have swapped out the database provider, which invalidates caching assumptions - d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), readOnlyProvider, d.multiRepoEnv.Config(), d.branchControl) + d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), readOnlyProvider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro) require.NoError(d.t, err) return enginetest.NewEngineWithProvider(nil, d, readOnlyProvider), nil diff --git a/go/libraries/doltcore/sqle/enginetest/stats_queries.go b/go/libraries/doltcore/sqle/enginetest/stats_queries.go index a7d9cd36838..aac26dd8ea9 100644 --- a/go/libraries/doltcore/sqle/enginetest/stats_queries.go +++ b/go/libraries/doltcore/sqle/enginetest/stats_queries.go @@ -26,6 +26,7 @@ import ( "github.com/dolthub/go-mysql-server/sql/types" "github.com/stretchr/testify/require" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats" @@ -310,7 +311,7 @@ var DoltStatsIOTests = []queries.ScriptTest{ }, Assertions: []queries.ScriptTestAssertion{ { - Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics", + Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics", Expected: []sql.Row{ {"mydb", "xy", "primary", "f6la1u3ku5pucfctgrca2afq9vlr4nrs", "x", "bigint"}, {"mydb", "xy", "yz", "9ec31007jaqtahij0tmlmd7j9t9hl1he", "y,z", "int,varchar(500)"}, @@ -348,18 +349,18 @@ var DoltStatsIOTests = []queries.ScriptTest{ }, Assertions: []queries.ScriptTestAssertion{ { - Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics where `table` = 'xy'", + Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics where table_name = 'xy'", Expected: []sql.Row{ {"mydb", "xy", "primary", "f6la1u3ku5pucfctgrca2afq9vlr4nrs", "x", "bigint"}, {"mydb", "xy", "yz", "9ec31007jaqtahij0tmlmd7j9t9hl1he", "y,z", "int,varchar(500)"}, }, }, { - Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where `table` = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName), + Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName), Expected: []sql.Row{{uint64(6), uint64(6), uint64(0)}, {uint64(6), uint64(3), uint64(0)}}, }, { - Query: "select `table`, `index` from dolt_statistics", + Query: "select `table_name`, `index_name` from dolt_statistics", Expected: []sql.Row{ {"ab", "primary"}, {"ab", "bc"}, @@ -368,18 +369,137 @@ var DoltStatsIOTests = []queries.ScriptTest{ }, }, { - Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics where `table` = 'ab'", + Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics where table_name = 'ab'", Expected: []sql.Row{ {"mydb", "ab", "primary", "t6j206v6b9t8vnmhpcc2i57lom8kejk3", "a", "bigint"}, {"mydb", "ab", "bc", "sibnr73868rb5dqa76opfn4pkelhhqna", "b,c", "int,int"}, }, }, { - Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where `table` = 'ab'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName), + Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'ab'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName), Expected: []sql.Row{{uint64(6), uint64(6), uint64(0)}, {uint64(6), uint64(3), uint64(0)}}, }, }, }, + { + // only edited chunks are scanned and re-written + Name: "incremental stats updates", + SetUpScript: []string{ + "CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));", + "insert into xy values (0,0,'a'), (2,0,'a'), (4,1,'a'), (6,2,'a')", + "analyze table xy", + "insert into xy values (1,0,'a'), (3,0,'a'), (5,2,'a'), (7,1,'a')", + "analyze table xy", + }, + Assertions: []queries.ScriptTestAssertion{ + { + Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName), + Expected: []sql.Row{ + {uint64(8), uint64(8), uint64(0)}, + {uint64(8), uint64(3), uint64(0)}, + }, + }, + }, + }, +} + +var StatProcTests = []queries.ScriptTest{ + { + Name: "basic start, status, stop loop", + SetUpScript: []string{ + "CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));", + "insert into xy values (0,0,'a'), (2,0,'a'), (4,1,'a'), (6,2,'a')", + }, + Assertions: []queries.ScriptTestAssertion{ + { + Query: "select count(*) from dolt_statistics", + ExpectedErrStr: doltdb.ErrNoStatistics.Error(), + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"no active stats thread"}}, + }, + // set refresh interval arbitrarily high to avoid updating when we restart + { + Query: "set @@PERSIST.dolt_stats_auto_refresh_interval = 1000;", + Expected: []sql.Row{{}}, + }, + { + Query: "set @@PERSIST.dolt_stats_auto_refresh_threshold = 0", + Expected: []sql.Row{{}}, + }, + { + Query: "call dolt_stats_restart()", + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"restarted thread: mydb"}}, + }, + { + Query: "set @@PERSIST.dolt_stats_auto_refresh_interval = 0;", + Expected: []sql.Row{{}}, + }, + // new restart picks up 0-interval, will start refreshing immediately + { + Query: "call dolt_stats_restart()", + }, + { + Query: "select sleep(.1)", + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"updated to hash: vogi4fq0fe8n8rqa80pbsujlmmaljsoo"}}, + }, + { + Query: "select count(*) from dolt_statistics", + Expected: []sql.Row{{2}}, + }, + // kill refresh thread + { + Query: "call dolt_stats_stop()", + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"cancelled thread: mydb"}}, + }, + // insert without refresh thread will not update stats + { + Query: "insert into xy values (1,0,'a'), (3,0,'a'), (5,2,'a'), (7,1,'a')", + }, + { + Query: "select sleep(.1)", + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"cancelled thread: mydb"}}, + }, + // manual analyze will update stats + { + Query: "analyze table xy", + Expected: []sql.Row{{"xy", "analyze", "status", "OK"}}, + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"updated to hash: fhnmdo8psvs10od36pqfi0g4cvvu732h"}}, + }, + { + Query: "select count(*) from dolt_statistics", + Expected: []sql.Row{{2}}, + }, + // kill refresh thread and delete stats ref + { + Query: "call dolt_stats_drop()", + }, + { + Query: "call dolt_stats_status()", + Expected: []sql.Row{{"dropped"}}, + }, + { + Query: "select count(*) from dolt_statistics", + ExpectedErrStr: doltdb.ErrNoStatistics.Error(), + }, + }, + }, } // TestProviderReloadScriptWithEngine runs the test script given with the engine provided. diff --git a/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go b/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go index 3004f9ba3e8..a01eaf06f8c 100644 --- a/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go +++ b/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go @@ -33,6 +33,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/env" dsql "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats" "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/types" @@ -141,7 +142,7 @@ func innerInit(h *DoltHarness, dEnv *env.DoltEnv) error { return err } - ctx := dsql.NewTestSQLCtxWithProvider(context.Background(), pro) + ctx := dsql.NewTestSQLCtxWithProvider(context.Background(), pro, stats.NewProvider()) h.sess = ctx.Session.(*dsess.DoltSession) dbs := h.engine.Analyzer.Catalog.AllDatabases(ctx) @@ -303,6 +304,7 @@ func sqlNewEngine(dEnv *env.DoltEnv) (*sqle.Engine, dsess.DoltDatabaseProvider, } pro = pro.WithDbFactoryUrl(doltdb.InMemDoltDB) + engine := sqle.NewDefault(pro) return engine, pro, nil diff --git a/go/libraries/doltcore/sqle/sqlddl_test.go b/go/libraries/doltcore/sqle/sqlddl_test.go index cda688e710e..a9f2e698622 100644 --- a/go/libraries/doltcore/sqle/sqlddl_test.go +++ b/go/libraries/doltcore/sqle/sqlddl_test.go @@ -1113,7 +1113,7 @@ func newTestEngine(ctx context.Context, dEnv *env.DoltEnv) (*gms.Engine, *sql.Co panic(err) } - doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), nil) + doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), nil, nil) if err != nil { panic(err) } diff --git a/go/libraries/doltcore/sqle/stats/auto_refresh.go b/go/libraries/doltcore/sqle/stats/auto_refresh.go new file mode 100644 index 00000000000..2dcccc7ce00 --- /dev/null +++ b/go/libraries/doltcore/sqle/stats/auto_refresh.go @@ -0,0 +1,356 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/dolthub/go-mysql-server/sql" + types2 "github.com/dolthub/go-mysql-server/sql/types" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/prolly/tree" +) + +const asyncAutoRefreshStats = "async_auto_refresh_stats" + +func (p *Provider) Configure(ctx context.Context, ctxFactory func(ctx context.Context) (*sql.Context, error), bThreads *sql.BackgroundThreads, pro *sqle.DoltDatabaseProvider, dbs []dsess.SqlDatabase) error { + p.SetStarter(NewInitDatabaseHook(p, ctxFactory, bThreads, nil)) + + if _, disabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsMemoryOnly); disabled == int8(1) { + return nil + } + + loadCtx, err := ctxFactory(ctx) + if err != nil { + return err + } + if err := p.Load(loadCtx, dbs); err != nil { + return err + } + if _, enabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshEnabled); enabled == int8(1) { + _, threshold, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshThreshold) + _, interval, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshInterval) + interval64, _, _ := types2.Int64.Convert(interval) + intervalSec := time.Second * time.Duration(interval64.(int64)) + thresholdf64 := threshold.(float64) + + for _, db := range dbs { + if err := p.InitAutoRefresh(ctxFactory, db.Name(), bThreads, intervalSec, thresholdf64); err != nil { + return err + } + } + pro.InitDatabaseHook = NewInitDatabaseHook(p, ctxFactory, bThreads, pro.InitDatabaseHook) + pro.DropDatabaseHook = NewDropDatabaseHook(p, ctxFactory, pro.DropDatabaseHook) + } + return nil +} + +func (p *Provider) InitAutoRefresh(ctxFactory func(ctx context.Context) (*sql.Context, error), dbName string, bThreads *sql.BackgroundThreads, checkInterval time.Duration, updateThresh float64) error { + // this is only called after initial statistics are finished loading + // launch a thread that periodically checks freshness + + // retain handle to cancel on drop database + // todo: add Cancel(name) to sql.BackgroundThreads interface + p.mu.Lock() + defer p.mu.Unlock() + + dropDbCtx, dbStatsCancel := context.WithCancel(context.Background()) + p.cancelers[dbName] = dbStatsCancel + + return bThreads.Add(fmt.Sprintf("%s_%s", asyncAutoRefreshStats, dbName), func(ctx context.Context) { + timer := time.NewTimer(checkInterval) + for { + // wake up checker on interval + select { + case <-ctx.Done(): + timer.Stop() + return + case <-dropDbCtx.Done(): + timer.Stop() + return + case <-timer.C: + sqlCtx, err := ctxFactory(ctx) + if err != nil { + return + } + + sqlCtx.GetLogger().Debugf("starting statistics refresh check for '%s': %s", dbName, time.Now().String()) + timer.Reset(checkInterval) + + // Iterate all dbs, tables, indexes. Each db will collect + // []indexMeta above refresh threshold. We read and process those + // chunks' statistics. We merge updated chunks with precomputed + // chunks. The full set of statistics for each database lands + // 1) in the provider's most recent set of database statistics, and + // 2) on disk in the database's statistics ref'd prolly.Map. + curStats := p.getStats(dbName) + if curStats == nil { + curStats = newDbStats(dbName) + } + + newStats := make(map[sql.StatQualifier]*DoltStats) + var deletedStats []sql.StatQualifier + qualExists := make(map[sql.StatQualifier]bool) + tableExistsAndSkipped := make(map[string]bool) + + // important: update session references every loop + dSess := dsess.DSessFromSess(sqlCtx.Session) + prov := dSess.Provider() + ddb, ok := dSess.GetDoltDB(sqlCtx, dbName) + if !ok { + sqlCtx.GetLogger().Debugf("statistics refresh error: database not found %s", dbName) + } + + sqlDb, err := prov.Database(sqlCtx, dbName) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + tables, err := sqlDb.GetTableNames(sqlCtx) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + for _, table := range tables { + sqlTable, ok, err := sqlDb.GetTableInsensitive(sqlCtx, table) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + if !ok { + sqlCtx.GetLogger().Debugf("statistics refresh error: table not found %s", table) + continue + } + + var dTab *doltdb.Table + switch t := sqlTable.(type) { + case *sqle.AlterableDoltTable: + dTab, err = t.DoltTable.DoltTable(sqlCtx) + case *sqle.WritableDoltTable: + dTab, err = t.DoltTable.DoltTable(sqlCtx) + case *sqle.DoltTable: + dTab, err = t.DoltTable(sqlCtx) + default: + err = fmt.Errorf("failed to unwrap dolt table from type: %T", sqlTable) + } + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + tableHash, err := dTab.GetRowDataHash(ctx) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + if curStats.getLatestHash(table) == tableHash { + // no data changes since last check + tableExistsAndSkipped[table] = true + sqlCtx.GetLogger().Debugf("statistics refresh: table hash unchanged since last check: %s", tableHash) + continue + } else { + sqlCtx.GetLogger().Debugf("statistics refresh: new table hash: %s", tableHash) + } + + iat, ok := sqlTable.(sql.IndexAddressableTable) + if !ok { + sqlCtx.GetLogger().Debugf("statistics refresh error: table does not support indexes %s", table) + continue + } + + indexes, err := iat.GetIndexes(sqlCtx) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + // collect indexes and ranges to be updated + var idxMetas []indexMeta + for _, index := range indexes { + qual := sql.NewStatQualifier(dbName, table, strings.ToLower(index.ID())) + qualExists[qual] = true + curStat := curStats.getIndexStats(qual) + if curStat == nil { + curStat = NewDoltStats() + curStat.Qual = qual + + cols := make([]string, len(index.Expressions())) + tablePrefix := fmt.Sprintf("%s.", table) + for i, c := range index.Expressions() { + cols[i] = strings.TrimPrefix(strings.ToLower(c), tablePrefix) + } + curStat.Columns = cols + } + sqlCtx.GetLogger().Debugf("statistics refresh index: %s", qual.String()) + + updateMeta, err := newIdxMeta(sqlCtx, curStat, dTab, index, curStat.Columns) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + curCnt := float64(len(curStat.active)) + updateCnt := float64(len(updateMeta.updateChunks)) + deleteCnt := float64(len(curStat.active) - len(updateMeta.preexisting)) + sqlCtx.GetLogger().Debugf("statistics current: %d, new: %d, delete: %d", int(curCnt), int(updateCnt), int(deleteCnt)) + + if curCnt == 0 || (deleteCnt+updateCnt)/curCnt > updateThresh { + sqlCtx.GetLogger().Debugf("statistics updating: %s", updateMeta.qual) + // mark index for updating + idxMetas = append(idxMetas, updateMeta) + // update lastest hash if we haven't already + curStats.setLatestHash(table, tableHash) + } + } + // get new buckets for index chunks to update + newTableStats, err := updateStats(sqlCtx, sqlTable, dTab, indexes, idxMetas) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + // merge new chunks with preexisting chunks + for _, updateMeta := range idxMetas { + stat := newTableStats[updateMeta.qual] + if stat != nil { + newStats[updateMeta.qual] = mergeStatUpdates(stat, updateMeta) + } + } + } + + func() { + curStats.mu.Lock() + defer curStats.mu.Unlock() + for _, s := range curStats.stats { + // table or index delete leaves hole in stats + // this is separate from threshold check + if !tableExistsAndSkipped[s.Qual.Table()] && !qualExists[s.Qual] { + // only delete stats we've verified are deleted + deletedStats = append(deletedStats, s.Qual) + } + } + }() + + prevMap := curStats.getCurrentMap() + if prevMap.KeyDesc().Count() == 0 { + kd, vd := schema.StatsTableDoltSchema.GetMapDescriptors() + prevMap, err = prolly.NewMapFromTuples(ctx, ddb.NodeStore(), kd, vd) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + } + + if len(deletedStats) == 0 && len(newStats) == 0 { + continue + } + + if len(deletedStats) > 0 { + sqlCtx.GetLogger().Debugf("statistics refresh: deleting stats %#v", deletedStats) + } + delMap, err := deleteStats(sqlCtx, prevMap, deletedStats...) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + newMap, err := flushStats(sqlCtx, delMap, newStats) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + + curStats.setCurrentMap(newMap) + for q, s := range newStats { + curStats.setIndexStats(q, s) + } + p.setStats(dbName, curStats) + err = ddb.SetStatisics(ctx, newMap.HashOf()) + if err != nil { + sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error()) + continue + } + } + } + }) +} + +func newIdxMeta(ctx *sql.Context, curStats *DoltStats, doltTable *doltdb.Table, sqlIndex sql.Index, cols []string) (indexMeta, error) { + var idx durable.Index + var err error + if strings.EqualFold(sqlIndex.ID(), "PRIMARY") { + idx, err = doltTable.GetRowData(ctx) + } else { + idx, err = doltTable.GetIndexRowData(ctx, sqlIndex.ID()) + } + if err != nil { + return indexMeta{}, err + } + + prollyMap := durable.ProllyMapFromIndex(idx) + + // get newest histogram target level hashes + levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) + if err != nil { + return indexMeta{}, err + } + + var addrs []hash.Hash + var preservedStats []DoltBucket + var missingAddrs float64 + var missingChunks []tree.Node + var missingOffsets [][]uint64 + var offset uint64 + for _, n := range levelNodes { + // Compare the previous histogram chunks to the newest tree chunks. + // Partition the newest chunks into 1) preserved or 2) missing. + // Missing chunks will need to be scanned on a stats update, so + // track the (start, end) ordinal offsets to simplify the read iter. + treeCnt, err := n.TreeCount() + if err != nil { + return indexMeta{}, err + } + + addrs = append(addrs, n.HashOf()) + if bucketIdx, ok := curStats.active[n.HashOf()]; !ok { + missingChunks = append(missingChunks, n) + missingOffsets = append(missingOffsets, []uint64{offset, offset + uint64(treeCnt)}) + missingAddrs++ + } else { + preservedStats = append(preservedStats, curStats.Histogram[bucketIdx]) + } + offset += uint64(treeCnt) + } + return indexMeta{ + qual: curStats.Qual, + cols: cols, + updateChunks: missingChunks, + updateOrdinals: missingOffsets, + preexisting: preservedStats, + allAddrs: addrs, + }, nil +} diff --git a/go/libraries/doltcore/sqle/stats/initdbhook.go b/go/libraries/doltcore/sqle/stats/initdbhook.go new file mode 100644 index 00000000000..a44f4d05151 --- /dev/null +++ b/go/libraries/doltcore/sqle/stats/initdbhook.go @@ -0,0 +1,58 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "context" + "time" + + "github.com/dolthub/go-mysql-server/sql" + types2 "github.com/dolthub/go-mysql-server/sql/types" + + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" +) + +func NewInitDatabaseHook(statsProv *Provider, ctxFactory func(ctx context.Context) (*sql.Context, error), bThreads *sql.BackgroundThreads, orig sqle.InitDatabaseHook) sqle.InitDatabaseHook { + return func(ctx *sql.Context, pro *sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error { + if orig != nil { + err := orig(ctx, pro, name, denv) + if err != nil { + return err + } + } + _, threshold, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshThreshold) + _, interval, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshInterval) + interval64, _, _ := types2.Int64.Convert(interval) + intervalSec := time.Second * time.Duration(interval64.(int64)) + thresholdf64 := threshold.(float64) + return statsProv.InitAutoRefresh(ctxFactory, name, bThreads, intervalSec, thresholdf64) + } +} + +func NewDropDatabaseHook(statsProv *Provider, ctxFactory func(ctx context.Context) (*sql.Context, error), orig sqle.DropDatabaseHook) sqle.DropDatabaseHook { + return func(name string) { + if orig != nil { + orig(name) + } + ctx, err := ctxFactory(context.Background()) + if err != nil { + return + } + statsProv.CancelRefreshThread(name) + statsProv.DropDbStats(ctx, name, false) + } +} diff --git a/go/libraries/doltcore/sqle/stats/read.go b/go/libraries/doltcore/sqle/stats/read.go index 3efe708d125..3e6083f5a30 100644 --- a/go/libraries/doltcore/sqle/stats/read.go +++ b/go/libraries/doltcore/sqle/stats/read.go @@ -36,13 +36,13 @@ import ( ) func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats, error) { - dbStat := &dbStats{db: db.Name(), active: make(map[hash.Hash]int), stats: make(map[sql.StatQualifier]*DoltStats)} + dbStat := newDbStats(db.Name()) iter, err := dtables.NewStatsIter(ctx, m) if err != nil { return nil, err } - currentStat := &DoltStats{} + currentStat := NewDoltStats() var lowerBound sql.Row for { row, err := iter.Next(ctx) @@ -123,9 +123,14 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats, } currentStat.fds = fds currentStat.colSet = colSet + currentStat.updateActive() dbStat.stats[currentStat.Qual] = currentStat } - currentStat = &DoltStats{Qual: qual, Columns: columns, LowerBound: lowerBound} + + currentStat = NewDoltStats() + currentStat.Qual = qual + currentStat.Columns = columns + currentStat.LowerBound = lowerBound } if currentStat.Histogram == nil { @@ -148,7 +153,7 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats, UpperBound: boundRow, } - dbStat.active[commit] = position + currentStat.active[commit] = position currentStat.Histogram = append(currentStat.Histogram, bucket) currentStat.RowCount += uint64(rowCount) currentStat.DistinctCount += uint64(distinctCount) @@ -157,6 +162,18 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats, currentStat.CreatedAt = createdAt } } + currentStat.LowerBound, err = loadLowerBound(ctx, currentStat.Qual) + if err != nil { + return nil, err + } + fds, colSet, err := loadFuncDeps(ctx, db, currentStat.Qual) + if err != nil { + return nil, err + } + currentStat.fds = fds + currentStat.colSet = colSet + currentStat.updateActive() + dbStat.setIndexStats(currentStat.Qual, currentStat) dbStat.stats[currentStat.Qual] = currentStat return dbStat, nil } diff --git a/go/libraries/doltcore/sqle/stats/stats_provider.go b/go/libraries/doltcore/sqle/stats/stats_provider.go index 775132dabc4..f9d0755fb9d 100644 --- a/go/libraries/doltcore/sqle/stats/stats_provider.go +++ b/go/libraries/doltcore/sqle/stats/stats_provider.go @@ -15,6 +15,7 @@ package stats import ( + "context" "errors" "fmt" "strings" @@ -25,18 +26,27 @@ import ( "github.com/dolthub/go-mysql-server/sql/stats" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/prolly/tree" ) var ErrFailedToLoad = errors.New("failed to load statistics") type DoltStats struct { - level int - chunks []hash.Hash + mu *sync.Mutex + // chunks is a list of addresses for the histogram fanout level + chunks []hash.Hash + // active maps a chunk/bucket address to its position in + // the histogram. 1-indexed to differentiate from an empty + // field on disk + active map[hash.Hash]int + RowCount uint64 DistinctCount uint64 NullCount uint64 @@ -52,12 +62,17 @@ type DoltStats struct { colSet sql.ColSet } +func NewDoltStats() *DoltStats { + return &DoltStats{mu: &sync.Mutex{}, active: make(map[hash.Hash]int)} +} + func DoltStatsFromSql(stat sql.Statistic) (*DoltStats, error) { hist, err := DoltHistFromSql(stat.Histogram(), stat.Types()) if err != nil { return nil, err } return &DoltStats{ + mu: &sync.Mutex{}, Qual: stat.Qualifier(), RowCount: stat.RowCount(), DistinctCount: stat.DistinctCount(), @@ -74,7 +89,35 @@ func DoltStatsFromSql(stat sql.Statistic) (*DoltStats, error) { }, nil } +func (s *DoltStats) updateActive() { + s.mu.Lock() + defer s.mu.Unlock() + newActive := make(map[hash.Hash]int) + for i, hash := range s.chunks { + newActive[hash] = i + } + s.active = newActive +} + +func (s *DoltStats) updateCounts() { + s.mu.Lock() + defer s.mu.Unlock() + var newDistinct uint64 + var newRows uint64 + var newNulls uint64 + for _, b := range s.Histogram { + newDistinct += b.DistinctCount + newRows += b.RowCount + newNulls += b.NullCount + } + s.RowCount = newRows + s.DistinctCount = newDistinct + s.NullCount = newNulls +} + func (s *DoltStats) toSql() sql.Statistic { + s.mu.Lock() + defer s.mu.Unlock() typStrs := make([]string, len(s.Types)) for i, typ := range s.Types { typStrs[i] = typ.String() @@ -142,16 +185,21 @@ func (s DoltHistogram) toSql() []*stats.Bucket { } type indexMeta struct { - db string - table string - index string - cols []string + qual sql.StatQualifier + cols []string + updateChunks []tree.Node + // [start, stop] ordinals for each chunk for update + updateOrdinals [][]uint64 + preexisting []DoltBucket + allAddrs []hash.Hash } func NewProvider() *Provider { return &Provider{ - mu: &sync.Mutex{}, - dbStats: make(map[string]*dbStats), + mu: &sync.Mutex{}, + dbStats: make(map[string]*dbStats), + cancelers: make(map[string]context.CancelFunc), + status: make(map[string]string), } } @@ -162,29 +210,127 @@ type Provider struct { mu *sync.Mutex latestRootAddr hash.Hash dbStats map[string]*dbStats + cancelers map[string]context.CancelFunc + starter sqle.InitDatabaseHook + status map[string]string } // each database has one statistics table that is a collection of the // table stats in the database type dbStats struct { - db string - active map[hash.Hash]int - stats map[sql.StatQualifier]*DoltStats - currentMap prolly.Map + mu *sync.Mutex + db string + stats map[sql.StatQualifier]*DoltStats + currentMap prolly.Map + latestRoot *doltdb.RootValue + latestTableHashes map[string]hash.Hash +} + +func newDbStats(dbName string) *dbStats { + return &dbStats{ + mu: &sync.Mutex{}, + db: dbName, + stats: make(map[sql.StatQualifier]*DoltStats), + latestTableHashes: make(map[string]hash.Hash), + } } var _ sql.StatsProvider = (*Provider)(nil) -// Init scans the statistics tables, populating the |stats| attribute. -// Statistics are not available for reading until we've finished loading. -func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error { +func (p *Provider) StartRefreshThread(ctx *sql.Context, pro dsess.DoltDatabaseProvider, name string, env *env.DoltEnv) error { + err := p.starter(ctx, pro.(*sqle.DoltDatabaseProvider), name, env) + if err != nil { + p.UpdateStatus(name, fmt.Sprintf("error restarting thread %s: %s", name, err.Error())) + return err + } + p.UpdateStatus(name, fmt.Sprintf("restarted thread: %s", name)) + return nil +} + +func (p *Provider) SetStarter(hook sqle.InitDatabaseHook) { + p.starter = hook +} + +func (p *Provider) CancelRefreshThread(dbName string) { + p.mu.Lock() + defer p.mu.Unlock() + if cancel, ok := p.cancelers[dbName]; ok { + cancel() + p.status[dbName] = fmt.Sprintf("cancelled thread: %s", dbName) + } +} + +func (p *Provider) ThreadStatus(dbName string) string { + if msg, ok := p.status[dbName]; ok { + return msg + } + return "no active stats thread" +} + +func (p *Provider) setStats(dbName string, s *dbStats) { + p.mu.Lock() + defer p.mu.Unlock() + p.dbStats[dbName] = s + if s != nil && len(s.stats) > 0 { + p.status[dbName] = fmt.Sprintf("updated to hash: %s", s.currentMap.HashOf()) + } +} + +func (p *Provider) getStats(dbName string) *dbStats { p.mu.Lock() defer p.mu.Unlock() + s, _ := p.dbStats[dbName] + return s +} + +func (s *dbStats) getLatestHash(tableName string) hash.Hash { + s.mu.Lock() + defer s.mu.Unlock() + h, _ := s.latestTableHashes[tableName] + return h +} + +func (s *dbStats) setLatestHash(tableName string, h hash.Hash) { + s.mu.Lock() + defer s.mu.Unlock() + s.latestTableHashes[tableName] = h +} + +func (s *dbStats) getCurrentMap() prolly.Map { + return s.currentMap +} + +func (s *dbStats) setCurrentMap(m prolly.Map) { + s.currentMap = m +} + +func (s *dbStats) getIndexStats(qual sql.StatQualifier) *DoltStats { + s.mu.Lock() + defer s.mu.Unlock() + stat, _ := s.stats[qual] + return stat +} + +func (s *dbStats) setIndexStats(qual sql.StatQualifier, stat *DoltStats) { + s.mu.Lock() + defer s.mu.Unlock() + s.stats[qual] = stat +} + +func (s *dbStats) dropIndexStats(qual sql.StatQualifier) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.stats, qual) +} +// Init scans the statistics tables, populating the |stats| attribute. +// Statistics are not available for reading until we've finished loading. +func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error { for _, db := range dbs { // set map keys so concurrent orthogonal writes are OK - p.dbStats[strings.ToLower(db.Name())] = &dbStats{db: strings.ToLower(db.Name()), stats: make(map[sql.StatQualifier]*DoltStats)} + p.setStats(strings.ToLower(db.Name()), newDbStats(strings.ToLower(db.Name()))) } + eg, ctx := ctx.NewErrgroup() for _, db := range dbs { // copy closure variables @@ -216,7 +362,7 @@ func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error { } else if err != nil { return err } - p.dbStats[dbName] = stats + p.setStats(dbName, stats) return nil }) } @@ -225,8 +371,10 @@ func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error { func (p *Provider) GetTableStats(ctx *sql.Context, db, table string) ([]sql.Statistic, error) { var ret []sql.Statistic - if dbStats := p.dbStats[strings.ToLower(db)]; dbStats != nil { - for qual, stat := range p.dbStats[strings.ToLower(db)].stats { + if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil { + dbStat.mu.Lock() + defer dbStat.mu.Unlock() + for qual, stat := range dbStat.stats { if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Tab) { ret = append(ret, stat.toSql()) } @@ -241,49 +389,75 @@ func (p *Provider) SetStats(ctx *sql.Context, stats sql.Statistic) error { return err } dbName := strings.ToLower(stats.Qualifier().Database) - if _, ok := p.dbStats[dbName]; !ok { - p.dbStats[dbName] = &dbStats{db: dbName, stats: make(map[sql.StatQualifier]*DoltStats)} + stat := p.getStats(dbName) + if stat == nil { + stat = newDbStats(dbName) } - p.dbStats[dbName].stats[stats.Qualifier()] = doltStats + stat.setIndexStats(stats.Qualifier(), doltStats) + p.setStats(dbName, stat) return nil } func (p *Provider) GetStats(ctx *sql.Context, qual sql.StatQualifier, cols []string) (sql.Statistic, bool) { - if dbStats := p.dbStats[strings.ToLower(qual.Database)]; dbStats != nil { - if s, ok := p.dbStats[strings.ToLower(qual.Database)].stats[qual]; ok { - return s.toSql(), true + if stat := p.getStats(strings.ToLower(qual.Database)); stat != nil { + idxStat := stat.getIndexStats(qual) + if idxStat != nil { + return idxStat.toSql(), true } } return nil, false } +func (p *Provider) DropDbStats(ctx *sql.Context, db string, flush bool) error { + p.setStats(db, nil) + p.mu.Lock() + defer p.mu.Unlock() + p.status[db] = "dropped" + if flush { + dSess := dsess.DSessFromSess(ctx.Session) + ddb, ok := dSess.GetDoltDB(ctx, db) + if !ok { + return nil + } + return ddb.DropStatisics(ctx) + } + return nil +} + func (p *Provider) DropStats(ctx *sql.Context, qual sql.StatQualifier, cols []string) error { - if dbStats := p.dbStats[strings.ToLower(qual.Database)]; dbStats != nil { - delete(p.dbStats[strings.ToLower(qual.Database)].stats, qual) + if stat := p.getStats(strings.ToLower(qual.Database)); stat != nil { + stat.dropIndexStats(qual) + p.UpdateStatus(qual.Db(), fmt.Sprintf("dropped statisic: %s", qual.String())) } return nil } +func (p *Provider) UpdateStatus(db string, msg string) { + p.mu.Lock() + defer p.mu.Unlock() + p.status[db] = msg +} + func (p *Provider) RowCount(ctx *sql.Context, db, table string) (uint64, error) { - var cnt uint64 - if dbStats := p.dbStats[strings.ToLower(db)]; dbStats != nil { - for qual, s := range p.dbStats[strings.ToLower(db)].stats { - if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) { - if s.RowCount > cnt { - cnt = s.RowCount - } + if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil { + dbStat.mu.Lock() + defer dbStat.mu.Unlock() + for qual, s := range dbStat.stats { + if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) && strings.EqualFold(qual.Index(), "primary") { + return s.RowCount, nil } } } - return cnt, nil + return 0, nil } func (p *Provider) DataLength(_ *sql.Context, db, table string) (uint64, error) { - var avgSize uint64 - for meta, s := range p.dbStats[strings.ToLower(db)].stats { - if strings.EqualFold(db, meta.Database) && strings.EqualFold(table, meta.Table()) { - if s.AvgSize > avgSize { - avgSize = s.AvgSize + if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil { + dbStat.mu.Lock() + defer dbStat.mu.Unlock() + for qual, s := range dbStat.stats { + if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) && strings.EqualFold(qual.Index(), "primary") { + return s.AvgSize, nil } } } @@ -291,7 +465,9 @@ func (p *Provider) DataLength(_ *sql.Context, db, table string) (uint64, error) } func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db string) error { + tableName := strings.ToLower(table.Name()) dbName := strings.ToLower(db) + iat, ok := table.(sql.IndexAddressableTable) if !ok { return nil @@ -301,7 +477,42 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin return err } - tablePrefix := fmt.Sprintf("%s.", strings.ToLower(table.Name())) + // it's important to update session references every call + dSess := dsess.DSessFromSess(ctx.Session) + prov := dSess.Provider() + sqlDb, err := prov.Database(ctx, dbName) + if err != nil { + return err + } + sqlTable, ok, err := sqlDb.GetTableInsensitive(ctx, tableName) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("error creating statistics for table: %s; table not found", tableName) + } + + var dTab *doltdb.Table + switch t := sqlTable.(type) { + case *sqle.AlterableDoltTable: + dTab, err = t.DoltTable.DoltTable(ctx) + case *sqle.WritableDoltTable: + dTab, err = t.DoltTable.DoltTable(ctx) + case *sqle.DoltTable: + dTab, err = t.DoltTable(ctx) + default: + return fmt.Errorf("failed to unwrap dolt table from type: %T", sqlTable) + } + if err != nil { + return err + } + + curStats := p.getStats(dbName) + if curStats == nil { + curStats = newDbStats(dbName) + } + + tablePrefix := fmt.Sprintf("%s.", tableName) var idxMetas []indexMeta for _, idx := range indexes { cols := make([]string, len(idx.Expressions())) @@ -309,34 +520,37 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin cols[i] = strings.TrimPrefix(strings.ToLower(c), tablePrefix) } - idxMeta := indexMeta{ - db: db, - table: strings.ToLower(table.Name()), - index: strings.ToLower(idx.ID()), - cols: cols, + qual := sql.NewStatQualifier(db, table.Name(), strings.ToLower(idx.ID())) + curStat := curStats.getIndexStats(qual) + if curStat == nil { + curStat = NewDoltStats() + curStat.Qual = qual + } + idxMeta, err := newIdxMeta(ctx, curStat, dTab, idx, cols) + if err != nil { + return err } idxMetas = append(idxMetas, idxMeta) } - newStats, err := refreshStats(ctx, indexes, idxMetas) + newTableStats, err := updateStats(ctx, sqlTable, dTab, indexes, idxMetas) if err != nil { return err } - sess := dsess.DSessFromSess(ctx.Session) - ddb, ok := sess.GetDoltDB(ctx, dbName) - if !ok { - return fmt.Errorf("database not found in session for stats update: %s", db) + // merge new chunks with preexisting chunks + newStats := make(map[sql.StatQualifier]*DoltStats) + for _, idxMeta := range idxMetas { + stat := newTableStats[idxMeta.qual] + newStats[idxMeta.qual] = mergeStatUpdates(stat, idxMeta) } - if _, ok := p.dbStats[dbName]; !ok { - p.dbStats[dbName] = &dbStats{db: strings.ToLower(db), stats: make(map[sql.StatQualifier]*DoltStats)} - } - for qual, stats := range newStats { - p.dbStats[dbName].stats[qual] = stats + ddb, ok := dSess.GetDoltDB(ctx, dbName) + if !ok { + return fmt.Errorf("database not found in session for stats update: %s", db) } - prevMap := p.dbStats[dbName].currentMap + prevMap := curStats.currentMap if prevMap.KeyDesc().Count() == 0 { kd, vd := schema.StatsTableDoltSchema.GetMapDescriptors() prevMap, err = prolly.NewMapFromTuples(ctx, ddb.NodeStore(), kd, vd) @@ -349,7 +563,12 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin return err } - p.dbStats[dbName].currentMap = newMap + curStats.setCurrentMap(newMap) + for k, v := range newStats { + curStats.setIndexStats(k, v) + } + + p.setStats(dbName, curStats) return ddb.SetStatisics(ctx, newMap.HashOf()) } diff --git a/go/libraries/doltcore/sqle/stats/refresh.go b/go/libraries/doltcore/sqle/stats/update.go similarity index 65% rename from go/libraries/doltcore/sqle/stats/refresh.go rename to go/libraries/doltcore/sqle/stats/update.go index d7b179f6ea5..cb8546ec284 100644 --- a/go/libraries/doltcore/sqle/stats/refresh.go +++ b/go/libraries/doltcore/sqle/stats/update.go @@ -18,7 +18,6 @@ import ( "container/heap" "context" "errors" - "fmt" "io" "strings" "time" @@ -28,9 +27,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" - "github.com/dolthub/dolt/go/libraries/doltcore/sqle" - "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" - "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/val" ) @@ -40,121 +37,103 @@ const ( mcvCnt = 3 ) -// refreshStats builds histograms for each index statistic metadata -// indicated in |newStats|. -func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) (map[sql.StatQualifier]*DoltStats, error) { - dSess := dsess.DSessFromSess(ctx.Session) - prov := dSess.Provider() - db, err := prov.Database(ctx, idxMetas[0].db) - if err != nil { - return nil, err - } - tab, ok, err := db.GetTableInsensitive(ctx, idxMetas[0].table) - if err != nil { - return nil, err - } - if !ok { - return nil, fmt.Errorf("error creating statistics for table: %s; table not found", idxMetas[0].table) - } - +// updateStats builds histograms for a list of index statistic metadata. +// We only read chunk ranges indicated by |indexMeta.updateOrdinals|. If +// the returned buckets are a subset of the index the caller is responsible +// for reconciling the difference. +func updateStats(ctx *sql.Context, sqlTable sql.Table, dTab *doltdb.Table, indexes []sql.Index, idxMetas []indexMeta) (map[sql.StatQualifier]*DoltStats, error) { nameToIdx := make(map[string]sql.Index) for _, idx := range indexes { nameToIdx[strings.ToLower(idx.ID())] = idx } - var dTab *doltdb.Table - switch t := tab.(type) { - case *sqle.AlterableDoltTable: - dTab, err = t.DoltTable.DoltTable(ctx) - case *sqle.WritableDoltTable: - dTab, err = t.DoltTable.DoltTable(ctx) - case *sqle.DoltTable: - dTab, err = t.DoltTable(ctx) - default: - return nil, fmt.Errorf("failed to unwrap dolt table from type: %T", tab) - } - if err != nil { - return nil, err - } - ret := make(map[sql.StatQualifier]*DoltStats) - for i, meta := range idxMetas { + for _, meta := range idxMetas { var idx durable.Index var err error - if strings.EqualFold(meta.index, "PRIMARY") { + if strings.EqualFold(meta.qual.Index(), "PRIMARY") { idx, err = dTab.GetRowData(ctx) } else { - idx, err = dTab.GetIndexRowData(ctx, meta.index) + idx, err = dTab.GetIndexRowData(ctx, meta.qual.Index()) } if err != nil { return nil, err } + prollyMap := durable.ProllyMapFromIndex(idx) keyBuilder := val.NewTupleBuilder(prollyMap.KeyDesc()) - buffPool := prollyMap.NodeStore().Pool() - firstIter, err := prollyMap.IterOrdinalRange(ctx, 0, 1) - if err != nil { - return nil, err - } - keyBytes, _, err := firstIter.Next(ctx) + sqlIdx := nameToIdx[strings.ToLower(meta.qual.Index())] + fds, colSet, err := stats.IndexFds(meta.qual.Table(), sqlTable.Schema(), sqlIdx) if err != nil { return nil, err } - for i := range keyBuilder.Desc.Types { - keyBuilder.PutRaw(i, keyBytes.GetField(i)) - } - - prefixLen := len(meta.cols) - firstKey := keyBuilder.BuildPrefixNoRecycle(buffPool, prefixLen) - firstRow := make(sql.Row, prefixLen) - for i := 0; i < prefixLen; i++ { - firstRow[i], err = tree.GetField(ctx, prollyMap.KeyDesc(), i, firstKey, prollyMap.NodeStore()) - if err != nil { - return nil, err - } - } var types []sql.Type - for _, cet := range indexes[i].ColumnExpressionTypes() { + for _, cet := range nameToIdx[strings.ToLower(meta.qual.Index())].ColumnExpressionTypes() { types = append(types, cet.Type) } - // find level - levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) - if err != nil { + if cnt, err := prollyMap.Count(); err != nil { return nil, err + } else if cnt == 0 { + // table is empty + ret[meta.qual] = NewDoltStats() + ret[meta.qual].chunks = meta.allAddrs + ret[meta.qual].CreatedAt = time.Now() + ret[meta.qual].Columns = meta.cols + ret[meta.qual].Types = types + ret[meta.qual].Qual = meta.qual + + ret[meta.qual].fds = fds + ret[meta.qual].colSet = colSet + continue } - var addrs []hash.Hash - for _, n := range levelNodes { - addrs = append(addrs, n.HashOf()) + + firstRow, err := firstRowForIndex(ctx, prollyMap, keyBuilder, len(meta.cols)) + if err != nil { + return nil, err } - qual := sql.NewStatQualifier(meta.db, meta.table, meta.index) - updater := newBucketBuilder(qual, len(meta.cols), prollyMap.KeyDesc()) - ret[qual] = &DoltStats{ - level: levelNodes[0].Level(), - chunks: addrs, - CreatedAt: time.Now(), - Columns: meta.cols, - Types: types, - Qual: qual, + // find level if not exists + if len(meta.updateChunks) == 0 { + levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) + if err != nil { + return nil, err + } + var chunks []tree.Node + var offsets [][]uint64 + var offset uint64 + for _, n := range levelNodes { + chunks = append(chunks, n) + treeCnt, err := n.TreeCount() + if err != nil { + return nil, err + } + offsets = append(offsets, []uint64{offset, offset + uint64(treeCnt)}) + offset += uint64(treeCnt) + } + meta.updateChunks = chunks + meta.updateOrdinals = offsets } + updater := newBucketBuilder(meta.qual, len(meta.cols), prollyMap.KeyDesc()) + ret[meta.qual] = NewDoltStats() + ret[meta.qual].chunks = meta.allAddrs + ret[meta.qual].CreatedAt = time.Now() + ret[meta.qual].Columns = meta.cols + ret[meta.qual].Types = types + ret[meta.qual].Qual = meta.qual + var start, stop uint64 // read leaf rows for each bucket - for i, _ := range levelNodes { + for i, chunk := range meta.updateChunks { // each node is a bucket updater.newBucket() // we read exclusive range [node first key, next node first key) - start = stop - leafCnt, err := levelNodes[i].TreeCount() - if err != nil { - return nil, err - } - stop = start + uint64(leafCnt) + start, stop = meta.updateOrdinals[i][0], meta.updateOrdinals[i][1] iter, err := prollyMap.IterOrdinalRange(ctx, start, stop) if err != nil { return nil, err @@ -172,7 +151,7 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) ( keyBuilder.PutRaw(i, keyBytes.GetField(i)) } - updater.add(keyBuilder.BuildPrefixNoRecycle(buffPool, updater.prefixLen)) + updater.add(keyBuilder.BuildPrefixNoRecycle(prollyMap.Pool(), updater.prefixLen)) keyBuilder.Recycle() } @@ -181,16 +160,10 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) ( if err != nil { return nil, err } - bucket.Chunk = addrs[i] + bucket.Chunk = chunk.HashOf() ret[updater.qual].Histogram = append(ret[updater.qual].Histogram, bucket) } - sqlIdx := nameToIdx[strings.ToLower(qual.Index())] - fds, colSet, err := stats.IndexFds(qual.Table(), tab.Schema(), sqlIdx) - if err != nil { - return nil, err - } - ret[updater.qual].DistinctCount = uint64(updater.globalDistinct) ret[updater.qual].RowCount = uint64(updater.globalCount) ret[updater.qual].LowerBound = firstRow @@ -200,6 +173,65 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) ( return ret, nil } +func mergeStatUpdates(newStats *DoltStats, idxMeta indexMeta) *DoltStats { + if len(newStats.Histogram) == len(idxMeta.allAddrs) { + newStats.updateActive() + return newStats + } + oldHist := idxMeta.preexisting + var mergeHist DoltHistogram + newHist := newStats.Histogram + var i, j int + for _, chunkAddr := range idxMeta.allAddrs { + if i < len(oldHist) && oldHist[i].Chunk == chunkAddr { + mergeHist = append(mergeHist, oldHist[i]) + i++ + } else if j < len(newHist) && newHist[j].Chunk == chunkAddr { + mergeHist = append(mergeHist, newHist[j]) + j++ + } + } + + newStats.Histogram = mergeHist + newStats.chunks = idxMeta.allAddrs + newStats.updateActive() + newStats.updateCounts() + return newStats +} + +func firstRowForIndex(ctx *sql.Context, prollyMap prolly.Map, keyBuilder *val.TupleBuilder, prefixLen int) (sql.Row, error) { + if cnt, err := prollyMap.Count(); err != nil { + return nil, err + } else if cnt == 0 { + return nil, nil + } + + buffPool := prollyMap.NodeStore().Pool() + + // first row is ordinal 0 + firstIter, err := prollyMap.IterOrdinalRange(ctx, 0, 1) + if err != nil { + return nil, err + } + keyBytes, _, err := firstIter.Next(ctx) + if err != nil { + return nil, err + } + for i := range keyBuilder.Desc.Types { + keyBuilder.PutRaw(i, keyBytes.GetField(i)) + } + + firstKey := keyBuilder.BuildPrefixNoRecycle(buffPool, prefixLen) + firstRow := make(sql.Row, prefixLen) + for i := 0; i < prefixLen; i++ { + firstRow[i], err = tree.GetField(ctx, prollyMap.KeyDesc(), i, firstKey, prollyMap.NodeStore()) + if err != nil { + return nil, err + } + } + return firstRow, nil +} + func newBucketBuilder(qual sql.StatQualifier, prefixLen int, tupleDesc val.TupleDesc) *bucketBuilder { return &bucketBuilder{ qual: qual, diff --git a/go/libraries/doltcore/sqle/stats/refresh_test.go b/go/libraries/doltcore/sqle/stats/update_test.go similarity index 100% rename from go/libraries/doltcore/sqle/stats/refresh_test.go rename to go/libraries/doltcore/sqle/stats/update_test.go diff --git a/go/libraries/doltcore/sqle/stats/write.go b/go/libraries/doltcore/sqle/stats/write.go index 8bc80104928..c2e27445a31 100644 --- a/go/libraries/doltcore/sqle/stats/write.go +++ b/go/libraries/doltcore/sqle/stats/write.go @@ -24,21 +24,33 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" stypes "github.com/dolthub/dolt/go/store/types" "github.com/dolthub/dolt/go/store/val" ) +// About ~200 20 byte address fit in a ~4k chunk. Chunk sizes +// are approximate, but certainly shouldn't reach the square +// of the expected size. +const maxBucketFanout = 200 * 200 + func newStatsTable(ctx *sql.Context, ns tree.NodeStore, vrw stypes.ValueReadWriter) (*doltdb.Table, error) { return doltdb.CreateEmptyTable(ctx, ns, vrw, schema.StatsTableDoltSchema) } // flushStats writes a set of table statistics to the given node store, and returns a new prolly.Map func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualifier]*DoltStats) (prolly.Map, error) { + if _, disabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsMemoryOnly); disabled == int8(1) { + // do not write to disk + return prolly.Map{}, nil + } + sch := schema.StatsTableDoltSchema kd, vd := sch.GetMapDescriptors() - m := prev.Mutate() + var m *prolly.MutableMap + m = prev.Mutate() pool := prev.NodeStore().Pool() keyBuilder := val.NewTupleBuilder(kd) @@ -56,10 +68,10 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif } return b.String() } - var pos int64 for qual, stats := range tableStats { + var pos int64 - // delete previous entries for this index + // delete previous entries for this index -> (db, table, index, pos) keyBuilder.PutString(0, qual.Database) keyBuilder.PutString(1, qual.Table()) keyBuilder.PutString(2, qual.Index()) @@ -68,11 +80,11 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif keyBuilder.PutString(0, qual.Database) keyBuilder.PutString(1, qual.Table()) keyBuilder.PutString(2, qual.Index()) - keyBuilder.PutInt64(3, 10000) + keyBuilder.PutInt64(3, maxBucketFanout+1) maxKey := keyBuilder.Build(pool) // there is a limit on the number of buckets for a given index, iter - // will terminate after we run over. + // will terminate before maxBucketFanout iter, err := prev.IterKeyRange(ctx, firstKey, maxKey) if err != nil { return prolly.Map{}, err @@ -100,6 +112,13 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif } typesStr := typesB.String() + if len(stats.Types) != len(stats.Columns) { + ctx.GetLogger().Println(stats.Qual.String()) + ctx.GetLogger().Println(typesStr) + ctx.GetLogger().Println(strings.Join(stats.Columns, ",")) + panic("invalid statistic") + } + for _, h := range stats.Histogram { var upperBoundElems []string for _, v := range h.UpperBound { @@ -139,3 +158,54 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif return m.Map(ctx) } + +func deleteStats(ctx *sql.Context, prev prolly.Map, quals ...sql.StatQualifier) (prolly.Map, error) { + if cnt, err := prev.Count(); err != nil { + return prolly.Map{}, err + } else if cnt == 0 { + return prev, nil + } + + sch := schema.StatsTableDoltSchema + kd, _ := sch.GetMapDescriptors() + var m *prolly.MutableMap + m = prev.Mutate() + pool := prev.NodeStore().Pool() + + keyBuilder := val.NewTupleBuilder(kd) + + for _, qual := range quals { + // delete previous entries for this index -> (db, table, index, pos) + keyBuilder.PutString(0, qual.Database) + keyBuilder.PutString(1, qual.Table()) + keyBuilder.PutString(2, qual.Index()) + keyBuilder.PutInt64(3, 0) + firstKey := keyBuilder.Build(pool) + keyBuilder.PutString(0, qual.Database) + keyBuilder.PutString(1, qual.Table()) + keyBuilder.PutString(2, qual.Index()) + keyBuilder.PutInt64(3, maxBucketFanout+1) + maxKey := keyBuilder.Build(pool) + + // there is a limit on the number of buckets for a given index, iter + // will terminate before maxBucketFanout + iter, err := prev.IterKeyRange(ctx, firstKey, maxKey) + if err != nil { + return prolly.Map{}, err + } + + for { + k, _, err := iter.Next(ctx) + if errors.Is(err, io.EOF) { + break + } else if err != nil { + return prolly.Map{}, err + } + err = m.Put(ctx, k, nil) + if err != nil { + return prolly.Map{}, err + } + } + } + return m.Map(ctx) +} diff --git a/go/libraries/doltcore/sqle/system_variables.go b/go/libraries/doltcore/sqle/system_variables.go index fad1ffdda07..00774d3d8d2 100644 --- a/go/libraries/doltcore/sqle/system_variables.go +++ b/go/libraries/doltcore/sqle/system_variables.go @@ -195,6 +195,34 @@ func AddDoltSystemVariables() { Type: types.NewSystemBoolType("dolt_dont_merge_json"), Default: int8(0), }, + { + Name: dsess.DoltStatsAutoRefreshEnabled, + Dynamic: true, + Scope: sql.SystemVariableScope_Global, + Type: types.NewSystemBoolType(dsess.DoltStatsAutoRefreshEnabled), + Default: int8(0), + }, + { + Name: dsess.DoltStatsMemoryOnly, + Dynamic: true, + Scope: sql.SystemVariableScope_Global, + Type: types.NewSystemBoolType(dsess.DoltStatsMemoryOnly), + Default: int8(0), + }, + { + Name: dsess.DoltStatsAutoRefreshThreshold, + Dynamic: true, + Scope: sql.SystemVariableScope_Global, + Type: types.NewSystemDoubleType(dsess.DoltStatsAutoRefreshEnabled, 0, 10), + Default: float64(.5), + }, + { + Name: dsess.DoltStatsAutoRefreshInterval, + Dynamic: true, + Scope: sql.SystemVariableScope_Global, + Type: types.NewSystemIntType(dsess.DoltStatsAutoRefreshInterval, 0, 1<<10, false), + Default: 120, + }, }) } diff --git a/go/libraries/doltcore/sqle/testutil.go b/go/libraries/doltcore/sqle/testutil.go index 52af039bf27..d5192f2e3cf 100644 --- a/go/libraries/doltcore/sqle/testutil.go +++ b/go/libraries/doltcore/sqle/testutil.go @@ -110,8 +110,8 @@ func ExecuteSql(dEnv *env.DoltEnv, root *doltdb.RootValue, statements string) (* return db.GetRoot(ctx) } -func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider) *sql.Context { - s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController(ctx)) +func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider, statsPro sql.StatsProvider) *sql.Context { + s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController(ctx), statsPro) if err != nil { panic(err) } @@ -132,7 +132,8 @@ func NewTestEngine(dEnv *env.DoltEnv, ctx context.Context, db dsess.SqlDatabase) } engine := sqle.NewDefault(pro) - sqlCtx := NewTestSQLCtxWithProvider(ctx, pro) + + sqlCtx := NewTestSQLCtxWithProvider(ctx, pro, nil) sqlCtx.SetCurrentDatabase(db.Name()) return engine, sqlCtx, nil } diff --git a/integration-tests/bats/stats.bats b/integration-tests/bats/stats.bats new file mode 100644 index 00000000000..e8c56b67b12 --- /dev/null +++ b/integration-tests/bats/stats.bats @@ -0,0 +1,350 @@ +#!/usr/bin/env bats +load $BATS_TEST_DIRNAME/helper/common.bash +load $BATS_TEST_DIRNAME/helper/query-server-common.bash + +setup() { + skiponwindows "tests are flaky on Windows" + if [ "$SQL_ENGINE" = "remote-engine" ]; then + skip "This test tests remote connections directly, SQL_ENGINE is not needed." + fi + + setup_common + + TMPDIRS=$(pwd)/tmpdirs + mkdir -p $TMPDIRS/{repo1,repo2} + + cd $TMPDIRS/repo1 + dolt init + + dolt sql <50% of rows + dolt sql -q "delete from xy where x > 500" + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "4" ] +} + +@test "stats: add/delete table" { + cd repo1 + + dolt sql -q "insert into ab values (0,0), (1,0), (2,0)" + + # setting variables doesn't hang or error + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;" + + start_sql_server + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "2" ] + + # add table + dolt sql -q "create table xy (x int primary key, y int)" + # schema changes don't impact the table hash + dolt sql -q "insert into xy values (0,0)" + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "1" ] + + dolt sql -q "truncate table xy" + + sleep 1 + + dolt sql -q "select * from xy" + + dolt sql -q "select * from dolt_statistics where table_name = 'xy'" + + run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "0" ] + + dolt sql -q "drop table xy" + + run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "0" ] +} + +@test "stats: add/delete index" { + cd repo2 + + dolt sql -q "insert into xy values (0,0), (1,0), (2,0)" + + # setting variables doesn't hang or error + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;" + + start_sql_server + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "2" ] + + # delete secondary + dolt sql -q "alter table xy drop index yx" + # schema changes don't impact the table hash + dolt sql -q "insert into xy values (3,0)" + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "1" ] + + dolt sql -q "alter table xy add index yx (y,x)" + # row change to impact table hash + dolt sql -q "insert into xy values (4,0)" + + sleep 1 + + run dolt sql -r csv -q "select count(*) from dolt_statistics" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "2" ] +} + +@test "stats: most common values" { + cd repo2 + + dolt sql -q "alter table xy add index (y)" + dolt sql -q "insert into xy values (0,0), (1,0), (2,0), (3,0), (4,0), (5,0)" + + # setting variables doesn't hang or error + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;" + + # auto refresh can only initialize at server startup + start_sql_server + + # need to trigger at least one refresh cycle + sleep 1 + + run dolt sql -r csv -q "select mcv1 from dolt_statistics where index_name = 'y'" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "0" ] + + sleep 1 + + dolt sql -q "update xy set y = 2 where x between 0 and 3" + + sleep 1 + + run dolt sql -r csv -q "select mcv1 as mcv from dolt_statistics where index_name = 'y' union select mcv2 as mcv from dolt_statistics where index_name = 'y' order by mcv" + [ "$status" -eq 0 ] + [ "${lines[1]}" = "0" ] + [ "${lines[2]}" = "2" ] +} + +@test "stats: multi db" { + cd repo1 + dolt sql -q "insert into ab values (0,0), (1,1)" + + cd ../repo2 + dolt sql -q "insert into ab values (0,0), (1,1)" + dolt sql -q "insert into xy values (0,0), (1,1)" + + cd .. + start_sql_server + sleep 1 + stop_sql_server + + run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics" + [ "$status" -eq 1 ] + [[ "$output" =~ "no statistics found" ]] || false + + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = 0.5" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;" + + start_sql_server + sleep 1 + + dolt sql -q "use repo1" + run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics order by index_name" + [ "$status" -eq 0 ] + [ "${lines[0]}" = "database_name,table_name,index_name" ] + [ "${lines[1]}" = "repo1,ab,ba" ] + [ "${lines[2]}" = "repo1,ab,primary" ] + + run dolt sql -r csv -q "select database_name, table_name, index_name from repo2.dolt_statistics order by index_name" + [ "$status" -eq 0 ] + [ "${lines[0]}" = "database_name,table_name,index_name" ] + [ "${lines[1]}" = "repo2,ab,ba" ] + [ "${lines[2]}" = "repo2,ab,primary" ] + [ "${lines[3]}" = "repo2,xy,primary" ] + [ "${lines[4]}" = "repo2,xy,yx" ] +} + +@test "stats: add/delete database" { + cd repo1 + + # setting variables doesn't hang or error + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5" + dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;" + + start_sql_server + + dolt sql -q "insert into ab values (0,0), (1,0), (2,0)" + dolt sql <