Skip to content

Commit

Permalink
Online DDL: consider UUID throttle ratios in user_throttle_ratio (#…
Browse files Browse the repository at this point in the history
…17850)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Feb 25, 2025
1 parent 81ce29c commit 82d21e5
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 6 deletions.
53 changes: 50 additions & 3 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = []string{
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
}
clusterInstance.VtGateExtraArgs = []string{}
Expand Down Expand Up @@ -889,8 +889,8 @@ func testScheduler(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait

testAllowConcurrent(t, "t1", t1uuid, 1)
testAllowConcurrent(t, "t2", t2uuid, 1)
Expand All @@ -903,6 +903,20 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
// t2uuid migration is not in 'running' state, hence 'user_throttle_ratio' is not updated
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})

t.Run("check ready to complete (before)", func(t *testing.T) {
Expand All @@ -918,6 +932,39 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("throttle t2", func(t *testing.T) {
throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
})
t.Run("unthrottle t2", func(t *testing.T) {
throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("complete t2", func(t *testing.T) {
// Issue a complete and wait for successful completion
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2987,12 +2987,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, cancellable, nil
}

var currentUserThrottleRatio float64
var onlineddlUserThrottleRatio float64

// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if throttlerapp.OnlineDDLName.Equals(app.AppName) {
currentUserThrottleRatio = app.Ratio
onlineddlUserThrottleRatio = app.Ratio
break
}
}
Expand Down Expand Up @@ -3029,7 +3029,17 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i

uuidsFoundRunning[uuid] = true

_ = e.updateMigrationUserThrottleRatio(ctx, uuid, currentUserThrottleRatio)
var migrationUserThrottleRatio float64
for _, app := range e.lagThrottler.ThrottledApps() {
m := app.Name().SplitMap()
if m[uuid] {
migrationUserThrottleRatio = app.Ratio
break
}
}
userThrottleRatio := max(onlineddlUserThrottleRatio, migrationUserThrottleRatio)
_ = e.updateMigrationUserThrottleRatio(ctx, uuid, userThrottleRatio)

switch strategySetting.Strategy {
case schema.DDLStrategyOnline, schema.DDLStrategyVitess:
reviewVReplRunningMigration := func() error {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/base/app_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package base

import (
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// AppThrottle is the definition for an app throttling instruction
Expand All @@ -39,3 +41,7 @@ func NewAppThrottle(appName string, expireAt time.Time, ratio float64, exempt bo
}
return result
}

func (at *AppThrottle) Name() throttlerapp.Name {
return throttlerapp.Name(at.AppName)
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (n Name) Concatenate(other Name) Name {
func (n Name) SplitStrings() []string {
return strings.Split(n.String(), ":")
}
func (n Name) SplitMap() map[string]bool {
split := n.SplitStrings()
result := make(map[string]bool, len(split))
for _, s := range split {
result[s] = true
}
return result
}

const (
// AllName is a special catch-all name for all apps
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttlerapp/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@ func TestSplit(t *testing.T) {
n := Name("vreplication:vcopier")
parts := n.SplitStrings()
assert.Equal(t, []string{"vreplication", "vcopier"}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"vreplication": true, "vcopier": true}, m)
}
{
n := VReplicationName
parts := n.SplitStrings()
assert.Equal(t, []string{"vreplication"}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"vreplication": true}, m)
}
{
n := Name("")
parts := n.SplitStrings()
assert.Equal(t, []string{""}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"": true}, m)
}
}

0 comments on commit 82d21e5

Please sign in to comment.