From 5fff4407bee1b6ae0a17a87543ffb012140cbfaa Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 24 Feb 2025 09:36:36 +0200 Subject: [PATCH 1/2] Throttler app name: map token values Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/base/app_throttle.go | 6 ++++++ go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go | 8 ++++++++ .../tabletserver/throttle/throttlerapp/app_test.go | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go index 36d57bdc973..5347b9ad42d 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go +++ b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go @@ -18,6 +18,8 @@ package base import ( "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) // AppThrottle is the definition for an app throttling instruction @@ -39,3 +41,7 @@ func NewAppThrottle(appName string, expireAt time.Time, ratio float64, exempt bo } return result } + +func (at *AppThrottle) Name() throttlerapp.Name { + return throttlerapp.Name(at.AppName) +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index fe9be91992c..2df0ced7411 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -42,6 +42,14 @@ func (n Name) Concatenate(other Name) Name { func (n Name) SplitStrings() []string { return strings.Split(n.String(), ":") } +func (n Name) SplitMap() map[string]bool { + split := n.SplitStrings() + result := make(map[string]bool, len(split)) + for _, s := range split { + result[s] = true + } + return result +} const ( // AllName is a special catch-all name for all apps diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app_test.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app_test.go index 0a5969dd7de..3d8af3e09c0 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app_test.go @@ -57,15 +57,21 @@ func TestSplit(t *testing.T) { n := Name("vreplication:vcopier") parts := n.SplitStrings() assert.Equal(t, []string{"vreplication", "vcopier"}, parts) + m := n.SplitMap() + assert.Equal(t, map[string]bool{"vreplication": true, "vcopier": true}, m) } { n := VReplicationName parts := n.SplitStrings() assert.Equal(t, []string{"vreplication"}, parts) + m := n.SplitMap() + assert.Equal(t, map[string]bool{"vreplication": true}, m) } { n := Name("") parts := n.SplitStrings() assert.Equal(t, []string{""}, parts) + m := n.SplitMap() + assert.Equal(t, map[string]bool{"": true}, m) } } From 91d7089dfcb8cba492f8c1af25aff5076584e0d6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 24 Feb 2025 09:38:00 +0200 Subject: [PATCH 2/2] Check throttle ratio for specific migration UUID, report highest of that and online-ddl ratio Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 53 +++++++++++++++++-- go/vt/vttablet/onlineddl/executor.go | 16 ++++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 8547431ddd3..55046569761 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -260,7 +260,7 @@ func TestMain(m *testing.M) { clusterInstance.VtTabletExtraArgs = []string{ "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", - "--migration_check_interval", "5s", + "--migration_check_interval", "2s", "--watch_replication_stream", } clusterInstance.VtGateExtraArgs = []string{} @@ -889,8 +889,8 @@ func testScheduler(t *testing.T) { onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) // ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running - t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait - t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait + t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait + t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait testAllowConcurrent(t, "t1", t1uuid, 1) testAllowConcurrent(t, "t2", t2uuid, 1) @@ -903,6 +903,20 @@ func testScheduler(t *testing.T) { // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 1.0, userThrotteRatio) + } + // t2uuid migration is not in 'running' state, hence 'user_throttle_ratio' is not updated + rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 0, userThrotteRatio) + } }) t.Run("check ready to complete (before)", func(t *testing.T) { @@ -918,6 +932,39 @@ func testScheduler(t *testing.T) { // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning) + + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 0, userThrotteRatio) + } + rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 0, userThrotteRatio) + } + }) + t.Run("throttle t2", func(t *testing.T) { + throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid)) + time.Sleep(ensureStateNotChangedTime) + rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 1.0, userThrotteRatio) + } + }) + t.Run("unthrottle t2", func(t *testing.T) { + throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid)) + time.Sleep(ensureStateNotChangedTime) + rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0) + assert.EqualValues(t, 0, userThrotteRatio) + } }) t.Run("complete t2", func(t *testing.T) { // Issue a complete and wait for successful completion diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 3a3d1a9906e..1165e9e05d1 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2987,12 +2987,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i return countRunnning, cancellable, nil } - var currentUserThrottleRatio float64 + var onlineddlUserThrottleRatio float64 // No point in reviewing throttler info if it's not enabled&open for _, app := range e.lagThrottler.ThrottledApps() { if throttlerapp.OnlineDDLName.Equals(app.AppName) { - currentUserThrottleRatio = app.Ratio + onlineddlUserThrottleRatio = app.Ratio break } } @@ -3029,7 +3029,17 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i uuidsFoundRunning[uuid] = true - _ = e.updateMigrationUserThrottleRatio(ctx, uuid, currentUserThrottleRatio) + var migrationUserThrottleRatio float64 + for _, app := range e.lagThrottler.ThrottledApps() { + m := app.Name().SplitMap() + if m[uuid] { + migrationUserThrottleRatio = app.Ratio + break + } + } + userThrottleRatio := max(onlineddlUserThrottleRatio, migrationUserThrottleRatio) + _ = e.updateMigrationUserThrottleRatio(ctx, uuid, userThrottleRatio) + switch strategySetting.Strategy { case schema.DDLStrategyOnline, schema.DDLStrategyVitess: reviewVReplRunningMigration := func() error {