Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for deferred cutover #245

Merged
merged 20 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,13 @@ The replication throttler only affects the copy-rows operation, and does not app
- Adjusting the configuration of your replicas to increase the parallel replication threads
- Temporarily disabling durability on the replica (i.e. `SET GLOBAL sync_binlog=0` and `SET GLOBAL innodb_flush_log_at_trx_commit=0`)
- Increasing the `replica-max-lag` or disabling replica lag checking temporarily

### defer-cutover

The "defer cutover" feature makes spirit wait to perform the final cutover until a "sentinel" table has been dropped. This is similar to the --postpone-cut-over-flag-file feature of gh-ost.

If defer-cutover is true, Spirit will create a "sentinel" table in the same schema as the table being altered; the name of the sentinel table will use the pattern `_<table>_sentinel`. Spirit will never delete the sentinel table on its own. It will block for 48 hours waiting for the sentinel table to be dropped by the operator, after which it will exit with an error.

You can resume a migration from checkpoint and Spirit will start waiting again for you to drop the sentinel table. You can also choose to delete the sentinel table before restarting Spirit, which will cause it to resume from checkpoint and complete the cutover without waiting, even if you have again enabled defer-cutover for the migration.

If you start a migration and realize that you forgot to set defer-cutover, worry not! You can manually create a sentinel table using the pattern `_<table>_sentinel`, and Spirit will detect the table before the cutover is completed and block as though defer-cutover had been enabled from the beginning.
kolbe marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Migration struct {
LockWaitTimeout time.Duration `name:"lock-wait-timeout" help:"The DDL lock_wait_timeout required for checksum and cutover" optional:"" default:"30s"`
SkipPreRunChecks bool `name:"i-understand-mysql57-is-not-supported" hidden:"" default:"false"`
SkipDropAfterCutover bool `name:"skip-drop-after-cutover" help:"Keep old table after completing cutover" optional:"" default:"false"`
DeferCutOver bool `name:"defer-cutover" help:"Defer cutover until sentinel table is dropped" optional:"" default:"false"`
kolbe marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *Migration) Run() error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
func TestMain(m *testing.M) {
checkpointDumpInterval = 100 * time.Millisecond
statusInterval = 10 * time.Millisecond // the status will be accurate to 1ms
sentinelCheckInterval = 100 * time.Millisecond
sentinelWaitLimit = 10 * time.Second
os.Exit(m.Run())
}

Expand Down
98 changes: 95 additions & 3 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
stateAnalyzeTable
stateChecksum
statePostChecksum // second mass apply
stateWaitingOnSentinelTable
stateCutOver
stateClose
stateErrCleanup
Expand All @@ -44,6 +45,8 @@ var (
checkpointDumpInterval = 50 * time.Second
tableStatUpdateInterval = 5 * time.Minute
statusInterval = 30 * time.Second
sentinelCheckInterval = 1 * time.Second
sentinelWaitLimit = 48 * time.Hour
)

func (s migrationState) String() string {
Expand All @@ -60,6 +63,8 @@ func (s migrationState) String() string {
return "checksum"
case statePostChecksum:
return "postChecksum"
case stateWaitingOnSentinelTable:
return "waitingOnSentinelTable"
case stateCutOver:
return "cutOver"
case stateClose:
Expand Down Expand Up @@ -87,7 +92,8 @@ type Runner struct {
checkerLock sync.Mutex

// Track some key statistics.
startTime time.Time
startTime time.Time
sentinelWaitStartTime time.Time

// Used by the test-suite and some post-migration output.
// Indicates if certain optimizations applied.
Expand Down Expand Up @@ -219,6 +225,18 @@ func (r *Runner) Run(originalCtx context.Context) error {
}
r.logger.Info("copy rows complete")

// r.waitOnSentinel may return an error if there is
// some unexpected problem checking for the existence of
// the sentinel table OR if sentinelWaitLimit is exceeded.
// This function is invoked even if DeferCutOver is false
// because it's possible that the sentinel table was created
// manually after the migration started.
r.sentinelWaitStartTime = time.Now()
r.setCurrentState(stateWaitingOnSentinelTable)
if err := r.waitOnSentinelTable(ctx); err != nil {
return err
}
kolbe marked this conversation as resolved.
Show resolved Hide resolved

// Perform steps to prepare for final cutover.
// This includes computing an optional checksum,
// catching up on replClient apply, running ANALYZE TABLE so
Expand Down Expand Up @@ -407,6 +425,13 @@ func (r *Runner) setup(ctx context.Context) error {
if err := r.createCheckpointTable(ctx); err != nil {
return err
}

if r.migration.DeferCutOver {
if err := r.createSentinelTable(ctx); err != nil {
return err
}
}

r.copier, err = row.NewCopier(r.db, r.table, r.newTable, &row.CopierConfig{
Concurrency: r.migration.Threads,
TargetChunkTime: r.migration.TargetChunkTime,
Expand Down Expand Up @@ -479,15 +504,15 @@ func (r *Runner) tableChangeNotification() {
}
r.setCurrentState(stateErrCleanup)
// Write this to the logger, so it can be captured by the initiator.
r.logger.Error("table definition changed during migration")
r.logger.Errorf("table definition of %s changed during migration", r.table.QuotedName)
// Invalidate the checkpoint, so we don't try to resume.
// If we don't do this, the migration will permanently be blocked from proceeding.
// Letting it start again is the better choice.
if err := r.dropCheckpoint(context.Background()); err != nil {
r.logger.Errorf("could not remove checkpoint. err: %v", err)
}
// We can't do anything about it, just panic
panic("table definition changed during migration")
panic(fmt.Sprintf("table definition of %s changed during migration", r.table.QuotedName))
}

func (r *Runner) dropCheckpoint(ctx context.Context) error {
Expand Down Expand Up @@ -646,6 +671,20 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error {
return nil
}

func (r *Runner) sentinelTableName() string {
return fmt.Sprintf("_%s_sentinel", r.table.TableName)
}

func (r *Runner) createSentinelTable(ctx context.Context) error {
if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, r.sentinelTableName()); err != nil {
return err
}
if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL PRIMARY KEY)", r.table.SchemaName, r.sentinelTableName()); err != nil {
return err
}
return nil
}

func (r *Runner) cleanup(ctx context.Context) error {
if r.newTable != nil {
if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.newTable.SchemaName, r.newTable.TableName); err != nil {
Expand Down Expand Up @@ -931,10 +970,63 @@ func (r *Runner) dumpStatus(ctx context.Context) {
r.db.Stats().InUse,
)
r.checkerLock.Unlock()
case stateWaitingOnSentinelTable:
r.logger.Infof("migration status: state=%s sentinel-table=%s.%s total-time=%s sentinel-wait-time=%s sentinel-max-wait-time=%s conns-in-use=%d",
r.getCurrentState().String(),
r.table.SchemaName,
r.sentinelTableName(),
time.Since(r.startTime).Round(time.Second),
time.Since(r.sentinelWaitStartTime).Round(time.Second),
sentinelWaitLimit,
r.db.Stats().InUse,
)
default:
// For the linter:
// Status for all other states
}
}
}
}

func (r *Runner) sentinelTableExists(ctx context.Context) (bool, error) {
sql := "SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
var sentinelTableExists int
err := r.db.QueryRowContext(ctx, sql, r.table.SchemaName, r.sentinelTableName()).Scan(&sentinelTableExists)
if err != nil {
return false, err
}
return sentinelTableExists > 0, nil
}

// Check every sentinelCheckInterval up to sentinelWaitLimit to see if sentinelTable has been dropped
func (r *Runner) waitOnSentinelTable(ctx context.Context) error {
if sentinelExists, err := r.sentinelTableExists(ctx); err != nil {
return err
} else if !sentinelExists {
// Sentinel table does not exist, we can proceed with cutover
return nil
}

r.logger.Warnf("cutover deferred while sentinel table %s.%s exists; will wait %s", r.table.SchemaName, r.sentinelTableName(), sentinelWaitLimit)

timer := time.NewTimer(sentinelWaitLimit)

ticker := time.NewTicker(sentinelCheckInterval)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
sentinelExists, err := r.sentinelTableExists(ctx)
if err != nil {
return err
}
if !sentinelExists {
// Sentinel table has been dropped, we can proceed with cutover
r.logger.Infof("sentinel table dropped at %s", t)
return nil
}
case <-timer.C:
return errors.New("timed out waiting for sentinel table to be dropped")
}
}
}
Loading
Loading