diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel b/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel index 49f341c7b164e..99407d712be13 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel @@ -17,7 +17,7 @@ go_test( ], embed = [":heap"], flaky = True, - shard_count = 14, + shard_count = 15, deps = [ "//pkg/testkit/testsetup", "@com_github_stretchr_testify//require", diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go index 67b42ea428e26..185ccfa05e788 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go +++ b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go @@ -16,6 +16,7 @@ // 1. Use the `errors` package from PingCAP. // 2. Use generics to define the `heapData` struct. // 3. Add a peak API. +// 4. Add an IsEmpty API. package heap @@ -291,6 +292,13 @@ func (h *Heap[K, V]) IsClosed() bool { return h.closed } +// IsEmpty returns true if the heap is empty. +func (h *Heap[K, V]) IsEmpty() bool { + h.lock.RLock() + defer h.lock.RUnlock() + return len(h.data.queue) == 0 +} + // NewHeap returns a Heap which can be used to queue up items to process. func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap[K, V] { h := &Heap[K, V]{ diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go index 5b119661be043..0fba17556fd55 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go +++ b/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go @@ -16,6 +16,7 @@ // 1. Use "github.com/stretchr/testify/require" to do assertions. // 2. Test max heap instead of min heap. // 3. Add a test for the peak API. +// 4. Add a test for the IsEmpty API. package heap @@ -345,3 +346,14 @@ func TestHeapAddAfterClose(t *testing.T) { err = h.BulkAdd([]testHeapObject{mkHeapObj("test", 1)}) require.EqualError(t, err, closedMsg) } + +func TestHeap_IsEmpty(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + require.True(t, h.IsEmpty()) + + h.Add(mkHeapObj("foo", 10)) + require.False(t, h.IsEmpty()) + + h.Pop() + require.True(t, h.IsEmpty()) +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index d6b5dd6b0bbbd..895d425f02755 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -10,11 +10,13 @@ go_library( "job.go", "non_partitioned_table_analysis_job.go", "queue.go", + "queue_v2.go", "static_partitioned_table_analysis_job.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue", visibility = ["//visibility:public"], deps = [ + "//pkg/ddl/notifier", "//pkg/infoschema", "//pkg/meta/model", "//pkg/sessionctx", @@ -22,6 +24,7 @@ go_library( "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", + "//pkg/statistics/handle/autoanalyze/internal/heap", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", @@ -29,6 +32,7 @@ go_library( "//pkg/util", "//pkg/util/intest", "//pkg/util/timeutil", + "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//oracle", "@org_uber_go_zap//:zap", ], @@ -46,10 +50,11 @@ go_test( "main_test.go", "non_partitioned_table_analysis_job_test.go", "queue_test.go", + "queue_v2_test.go", "static_partitioned_table_analysis_job_test.go", ], flaky = True, - shard_count = 29, + shard_count = 32, deps = [ ":priorityqueue", "//pkg/meta/model", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go new file mode 100644 index 0000000000000..6af099e9683de --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go @@ -0,0 +1,256 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package priorityqueue + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/ddl/notifier" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util" + "go.uber.org/zap" +) + +const ( + lastAnalysisDurationRefreshInterval = time.Minute * 10 +) + +// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs. +type AnalysisPriorityQueueV2 struct { + inner *heap.Heap[int64, AnalysisJob] + statsHandle statstypes.StatsHandle + calculator *PriorityCalculator + autoAnalysisTimeWindow atomic.Value // stores *autoAnalysisTimeWindow + + ctx context.Context + cancel context.CancelFunc + wg util.WaitGroupWrapper + + // initialized is a flag to check if the queue is initialized. + initialized atomic.Bool +} + +// NewAnalysisPriorityQueueV2 creates a new AnalysisPriorityQueue2. +func NewAnalysisPriorityQueueV2(handle statstypes.StatsHandle) *AnalysisPriorityQueueV2 { + ctx, cancel := context.WithCancel(context.Background()) + + return &AnalysisPriorityQueueV2{ + statsHandle: handle, + calculator: NewPriorityCalculator(), + ctx: ctx, + cancel: cancel, + } +} + +// IsInitialized checks if the priority queue is initialized. +func (pq *AnalysisPriorityQueueV2) IsInitialized() bool { + return pq.initialized.Load() +} + +// Initialize initializes the priority queue. +func (pq *AnalysisPriorityQueueV2) Initialize() error { + if pq.initialized.Load() { + statslogutil.StatsLogger().Warn("Priority queue already initialized") + return nil + } + + start := time.Now() + defer func() { + statslogutil.StatsLogger().Info("Priority queue initialized", zap.Duration("duration", time.Since(start))) + }() + + keyFunc := func(job AnalysisJob) (int64, error) { + return job.GetTableID(), nil + } + // We want the job with the highest weight to be at the top of the priority queue. + lessFunc := func(a, b AnalysisJob) bool { + return a.GetWeight() > b.GetWeight() + } + pq.inner = heap.NewHeap(keyFunc, lessFunc) + if err := pq.init(); err != nil { + pq.Close() + return errors.Trace(err) + } + + // Start a goroutine to maintain the priority queue. + pq.wg.Run(pq.run) + pq.initialized.Store(true) + return nil +} + +// init initializes the priority queue and adds jobs to it. +func (pq *AnalysisPriorityQueueV2) init() error { + return statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { + parameters := exec.GetAutoAnalyzeParameters(sctx) + err := pq.setAutoAnalysisTimeWindow(parameters) + if err != nil { + return err + } + if !pq.IsWithinTimeWindow() { + return nil + } + timeWindow := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow) + + err = FetchAllTablesAndBuildAnalysisJobs( + sctx, + parameters, + *timeWindow, + pq.statsHandle, + pq.Push, + ) + if err != nil { + return errors.Trace(err) + } + return nil + }, statsutil.FlagWrapTxn) +} + +// run maintains the priority queue. +func (pq *AnalysisPriorityQueueV2) run() { + defer func() { + if r := recover(); r != nil { + statslogutil.StatsLogger().Error("Priority queue panicked", zap.Any("recover", r), zap.Stack("stack")) + } + }() + + timeRefreshInterval := time.NewTicker(lastAnalysisDurationRefreshInterval) + defer timeRefreshInterval.Stop() + + for { + select { + case <-pq.ctx.Done(): + statslogutil.StatsLogger().Info("Priority queue stopped") + return + case <-timeRefreshInterval.C: + statslogutil.StatsLogger().Info("Start to refresh last analysis durations of jobs") + pq.RefreshLastAnalysisDuration() + } + } +} + +// RefreshLastAnalysisDuration refreshes the last analysis duration of all jobs in the priority queue. +func (pq *AnalysisPriorityQueueV2) RefreshLastAnalysisDuration() { + if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { + parameters := exec.GetAutoAnalyzeParameters(sctx) + if err := pq.setAutoAnalysisTimeWindow(parameters); err != nil { + return errors.Trace(err) + } + if !pq.IsWithinTimeWindow() { + statslogutil.StatsLogger().Debug("Not within the auto analyze time window, skip refreshing last analysis duration") + return nil + } + start := time.Now() + defer func() { + statslogutil.StatsLogger().Info("Last analysis duration refreshed", zap.Duration("duration", time.Since(start))) + }() + jobs := pq.inner.List() + for _, job := range jobs { + indicators := job.GetIndicators() + currentTs, err := getStartTs(sctx) + if err != nil { + return errors.Trace(err) + } + + jobFactory := NewAnalysisJobFactory(sctx, 0, currentTs) + tableStats, ok := pq.statsHandle.Get(job.GetTableID()) + if !ok { + statslogutil.StatsLogger().Warn("Table stats not found during refreshing last analysis duration", + zap.Int64("tableID", job.GetTableID()), + zap.String("job", job.String()), + ) + err := pq.inner.Delete(job) + if err != nil { + statslogutil.StatsLogger().Error("Failed to delete job from priority queue", + zap.Error(err), + zap.String("job", job.String()), + ) + } + } + indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats) + job.SetIndicators(indicators) + job.SetWeight(pq.calculator.CalculateWeight(job)) + if err := pq.inner.Update(job); err != nil { + statslogutil.StatsLogger().Error("Failed to add job to priority queue", + zap.Error(err), + zap.String("job", job.String()), + ) + } + } + return nil + }, statsutil.FlagWrapTxn); err != nil { + statslogutil.StatsLogger().Error("Failed to refresh last analysis duration", zap.Error(err)) + } +} + +func (*AnalysisPriorityQueueV2) handleDDLEvent(_ context.Context, _ sessionctx.Context, _ notifier.SchemaChangeEvent) { + // TODO: Handle the ddl event. + // Only care about the add index event. +} + +// Push pushes a job into the priority queue. +func (pq *AnalysisPriorityQueueV2) Push(job AnalysisJob) error { + return pq.inner.Add(job) +} + +// Pop pops a job from the priority queue. +func (pq *AnalysisPriorityQueueV2) Pop() (AnalysisJob, error) { + return pq.inner.Pop() +} + +// IsEmpty checks whether the priority queue is empty. +func (pq *AnalysisPriorityQueueV2) IsEmpty() bool { + return pq.inner.IsEmpty() +} + +func (pq *AnalysisPriorityQueueV2) setAutoAnalysisTimeWindow( + parameters map[string]string, +) error { + start, end, err := exec.ParseAutoAnalysisWindow( + parameters[variable.TiDBAutoAnalyzeStartTime], + parameters[variable.TiDBAutoAnalyzeEndTime], + ) + if err != nil { + return errors.Wrap(err, "parse auto analyze period failed") + } + timeWindow := NewAutoAnalysisTimeWindow(start, end) + pq.autoAnalysisTimeWindow.Store(&timeWindow) + return nil +} + +// IsWithinTimeWindow checks if the current time is within the auto analyze time window. +func (pq *AnalysisPriorityQueueV2) IsWithinTimeWindow() bool { + window := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow) + return window.IsWithinTimeWindow(time.Now()) +} + +// Close closes the priority queue. +func (pq *AnalysisPriorityQueueV2) Close() { + if !pq.initialized.Load() { + return + } + + pq.cancel() + pq.wg.Wait() + pq.inner.Close() +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2_test.go new file mode 100644 index 0000000000000..0db34e1d2ee16 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2_test.go @@ -0,0 +1,159 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package priorityqueue_test + +import ( + "context" + "testing" + "time" + + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" +) + +func TestAnalysisPriorityQueueV2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int)") + tk.MustExec("create table t2 (a int)") + tk.MustExec("insert into t1 values (1)") + tk.MustExec("insert into t2 values (1)") + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + }() + + ctx := context.Background() + handle := dom.StatsHandle() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + + pq := priorityqueue.NewAnalysisPriorityQueueV2(handle) + defer pq.Close() + + t.Run("Initialize", func(t *testing.T) { + err := pq.Initialize() + require.NoError(t, err) + require.True(t, pq.IsInitialized()) + + // Test double initialization + err = pq.Initialize() + require.NoError(t, err) + }) + + t.Run("IsEmpty And Pop", func(t *testing.T) { + require.False(t, pq.IsEmpty()) + + poppedJob, err := pq.Pop() + require.NoError(t, err) + require.NotNil(t, poppedJob) + + poppedJob, err = pq.Pop() + require.NoError(t, err) + require.NotNil(t, poppedJob) + + require.True(t, pq.IsEmpty()) + }) +} + +func TestIsWithinTimeWindow(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + handle := dom.StatsHandle() + pq := priorityqueue.NewAnalysisPriorityQueueV2(handle) + err := pq.Initialize() + require.NoError(t, err) + require.True(t, pq.IsWithinTimeWindow()) + pq.Close() + + tk.MustExec("set global tidb_auto_analyze_start_time = '00:00 +0000'") + tk.MustExec("set global tidb_auto_analyze_end_time = '00:00 +0000'") + // Reset the priority queue with the new time window. + pq = priorityqueue.NewAnalysisPriorityQueueV2(handle) + err = pq.Initialize() + require.NoError(t, err) + require.False(t, pq.IsWithinTimeWindow()) + pq.Close() +} + +func TestRefreshLastAnalysisDuration(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int)") + tk.MustExec("create table t2 (a int)") + tk.MustExec("insert into t1 values (1)") + tk.MustExec("insert into t2 values (1)") + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + }() + + ctx := context.Background() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + schema := pmodel.NewCIStr("test") + tbl1, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t1")) + require.NoError(t, err) + tbl2, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t2")) + require.NoError(t, err) + + pq := priorityqueue.NewAnalysisPriorityQueueV2(handle) + defer pq.Close() + require.NoError(t, pq.Initialize()) + + // Check current jobs. + job1, err := pq.Pop() + require.NoError(t, err) + require.Equal(t, tbl1.Meta().ID, job1.GetTableID()) + oldLastAnalysisDuration1 := job1.GetIndicators().LastAnalysisDuration + require.Equal(t, time.Minute*30, oldLastAnalysisDuration1) + job2, err := pq.Pop() + require.NoError(t, err) + require.Equal(t, tbl2.Meta().ID, job2.GetTableID()) + oldLastAnalysisDuration2 := job2.GetIndicators().LastAnalysisDuration + require.Equal(t, time.Minute*30, oldLastAnalysisDuration2) + + // Push the jobs back to the queue + require.NoError(t, pq.Push(job1)) + require.NoError(t, pq.Push(job2)) + + // Analyze the tables + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + + // Call RefreshLastAnalysisDuration + pq.RefreshLastAnalysisDuration() + + // Check if the jobs' last analysis durations and weights have been updated + updatedJob1, err := pq.Pop() + require.NoError(t, err) + require.NotZero(t, updatedJob1.GetWeight()) + require.NotZero(t, updatedJob1.GetIndicators().LastAnalysisDuration) + require.NotEqual(t, oldLastAnalysisDuration1, updatedJob1.GetIndicators().LastAnalysisDuration) + + updatedJob2, err := pq.Pop() + require.NoError(t, err) + require.NotZero(t, updatedJob2.GetWeight()) + require.NotZero(t, updatedJob2.GetIndicators().LastAnalysisDuration) + require.NotEqual(t, oldLastAnalysisDuration2, updatedJob2.GetIndicators().LastAnalysisDuration) +}