Skip to content

Commit

Permalink
Check throttle ratio for specific migration UUID, report highest of t…
Browse files Browse the repository at this point in the history
…hat and online-ddl ratio

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Feb 24, 2025
1 parent 5fff440 commit 91d7089
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
53 changes: 50 additions & 3 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = []string{
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
}
clusterInstance.VtGateExtraArgs = []string{}
Expand Down Expand Up @@ -889,8 +889,8 @@ func testScheduler(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait

testAllowConcurrent(t, "t1", t1uuid, 1)
testAllowConcurrent(t, "t2", t2uuid, 1)
Expand All @@ -903,6 +903,20 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
// t2uuid migration is not in 'running' state, hence 'user_throttle_ratio' is not updated
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})

t.Run("check ready to complete (before)", func(t *testing.T) {
Expand All @@ -918,6 +932,39 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("throttle t2", func(t *testing.T) {
throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
})
t.Run("unthrottle t2", func(t *testing.T) {
throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("complete t2", func(t *testing.T) {
// Issue a complete and wait for successful completion
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2987,12 +2987,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, cancellable, nil
}

var currentUserThrottleRatio float64
var onlineddlUserThrottleRatio float64

// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if throttlerapp.OnlineDDLName.Equals(app.AppName) {
currentUserThrottleRatio = app.Ratio
onlineddlUserThrottleRatio = app.Ratio
break
}
}
Expand Down Expand Up @@ -3029,7 +3029,17 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i

uuidsFoundRunning[uuid] = true

_ = e.updateMigrationUserThrottleRatio(ctx, uuid, currentUserThrottleRatio)
var migrationUserThrottleRatio float64
for _, app := range e.lagThrottler.ThrottledApps() {
m := app.Name().SplitMap()
if m[uuid] {
migrationUserThrottleRatio = app.Ratio
break
}
}
userThrottleRatio := max(onlineddlUserThrottleRatio, migrationUserThrottleRatio)
_ = e.updateMigrationUserThrottleRatio(ctx, uuid, userThrottleRatio)

switch strategySetting.Strategy {
case schema.DDLStrategyOnline, schema.DDLStrategyVitess:
reviewVReplRunningMigration := func() error {
Expand Down

0 comments on commit 91d7089

Please sign in to comment.