Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: consider UUID throttle ratios in user_throttle_ratio #17850

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = []string{
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
}
clusterInstance.VtGateExtraArgs = []string{}
Expand Down Expand Up @@ -889,8 +889,8 @@ func testScheduler(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait

testAllowConcurrent(t, "t1", t1uuid, 1)
testAllowConcurrent(t, "t2", t2uuid, 1)
Expand All @@ -903,6 +903,20 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
// t2uuid migration is not in 'running' state, hence 'user_throttle_ratio' is not updated
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})

t.Run("check ready to complete (before)", func(t *testing.T) {
Expand All @@ -918,6 +932,39 @@ func testScheduler(t *testing.T) {
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)

rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
rs = onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("throttle t2", func(t *testing.T) {
throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 1.0, userThrotteRatio)
}
})
t.Run("unthrottle t2", func(t *testing.T) {
throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.Name(t2uuid))
time.Sleep(ensureStateNotChangedTime)
rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
userThrotteRatio := row.AsFloat64("user_throttle_ratio", 0)
assert.EqualValues(t, 0, userThrotteRatio)
}
})
t.Run("complete t2", func(t *testing.T) {
// Issue a complete and wait for successful completion
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2987,12 +2987,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, cancellable, nil
}

var currentUserThrottleRatio float64
var onlineddlUserThrottleRatio float64

// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if throttlerapp.OnlineDDLName.Equals(app.AppName) {
currentUserThrottleRatio = app.Ratio
onlineddlUserThrottleRatio = app.Ratio
break
}
}
Expand Down Expand Up @@ -3029,7 +3029,17 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i

uuidsFoundRunning[uuid] = true

_ = e.updateMigrationUserThrottleRatio(ctx, uuid, currentUserThrottleRatio)
var migrationUserThrottleRatio float64
for _, app := range e.lagThrottler.ThrottledApps() {
m := app.Name().SplitMap()
if m[uuid] {
migrationUserThrottleRatio = app.Ratio
break
}
}
userThrottleRatio := max(onlineddlUserThrottleRatio, migrationUserThrottleRatio)
_ = e.updateMigrationUserThrottleRatio(ctx, uuid, userThrottleRatio)

switch strategySetting.Strategy {
case schema.DDLStrategyOnline, schema.DDLStrategyVitess:
reviewVReplRunningMigration := func() error {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/base/app_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package base

import (
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// AppThrottle is the definition for an app throttling instruction
Expand All @@ -39,3 +41,7 @@ func NewAppThrottle(appName string, expireAt time.Time, ratio float64, exempt bo
}
return result
}

func (at *AppThrottle) Name() throttlerapp.Name {
return throttlerapp.Name(at.AppName)
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (n Name) Concatenate(other Name) Name {
func (n Name) SplitStrings() []string {
return strings.Split(n.String(), ":")
}
func (n Name) SplitMap() map[string]bool {
split := n.SplitStrings()
result := make(map[string]bool, len(split))
for _, s := range split {
result[s] = true
}
return result
}

const (
// AllName is a special catch-all name for all apps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@ func TestSplit(t *testing.T) {
n := Name("vreplication:vcopier")
parts := n.SplitStrings()
assert.Equal(t, []string{"vreplication", "vcopier"}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"vreplication": true, "vcopier": true}, m)
}
{
n := VReplicationName
parts := n.SplitStrings()
assert.Equal(t, []string{"vreplication"}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"vreplication": true}, m)
}
{
n := Name("")
parts := n.SplitStrings()
assert.Equal(t, []string{""}, parts)
m := n.SplitMap()
assert.Equal(t, map[string]bool{"": true}, m)
}
}
Loading