Skip to content

Commit

Permalink
Merge pull request #344 from cashapp/mtocker-resume-checksum
Browse files Browse the repository at this point in the history
Add resume from checksum
  • Loading branch information
morgo authored Sep 4, 2024
2 parents 31b5fad + 219b7aa commit 5a7c164
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 27 deletions.
27 changes: 25 additions & 2 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Checker struct {
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
isResume bool
}

type CheckerConfig struct {
Expand All @@ -49,6 +50,7 @@ type CheckerConfig struct {
DBConfig *dbconn.DBConfig
Logger loggers.Advanced
FixDifferences bool
Watermark string // optional; defines a watermark to start from
}

func NewCheckerDefaultConfig() *CheckerConfig {
Expand Down Expand Up @@ -76,6 +78,14 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
if err != nil {
return nil, err
}
// If there is a watermark, we need to open the chunker at that watermark.
// Overwrite the previously attached chunker with a new one.
if config.Watermark != "" {
config.Logger.Warnf("opening checksum chunker at watermark: %s", config.Watermark)
if err := chunker.OpenAtWatermark(config.Watermark, newTable.MaxValue()); err != nil {
return nil, err
}
}
checksum := &Checker{
table: tbl,
newTable: newTable,
Expand All @@ -86,6 +96,7 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
isResume: config.Watermark != "",
}
return checksum, nil
}
Expand Down Expand Up @@ -158,6 +169,13 @@ func (c *Checker) RecentValue() string {
return fmt.Sprintf("%v", c.recentValue)
}

func (c *Checker) GetLowWatermark() (string, error) {
if c.chunker == nil {
return "", errors.New("chunker not initialized")
}
return c.chunker.GetLowWatermark()
}

func (c *Checker) inspectDifferences(trx *sql.Tx, chunk *table.Chunk) error {
sourceSubquery := fmt.Sprintf("SELECT CRC32(CONCAT(%s)) as row_checksum, %s FROM %s WHERE %s",
c.intersectColumns(),
Expand Down Expand Up @@ -362,8 +380,13 @@ func (c *Checker) Run(ctx context.Context) error {
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
// Open the chunker if it's not open.
// It will already be open if this is a resume from checkpoint.
// This is a little annoying, but just the way the chunker API works.
if !c.isResume {
if err := c.chunker.Open(); err != nil {
return err
}
}
c.Unlock()

Expand Down
33 changes: 33 additions & 0 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background())) // fails
}

func TestFromWatermark(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt")
testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "tfromwatermark")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

config := NewCheckerDefaultConfig()
config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}"
checker, err := NewChecker(db, t1, t2, feed, config)
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}
78 changes: 57 additions & 21 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type Runner struct {
checker *checksum.Checker
checkerLock sync.Mutex

// used to recover direct to checksum.
checksumWatermark string

// Track some key statistics.
startTime time.Time
sentinelWaitStartTime time.Time
Expand Down Expand Up @@ -252,7 +255,9 @@ func (r *Runner) Run(originalCtx context.Context) error {
go r.dumpCheckpointContinuously(ctx) // start periodically dumping the checkpoint.

// Perform the main copy rows task. This is where the majority
// of migrations usually spend time.
// of migrations usually spend time. It is not strictly necessary,
// but we always recopy the last-bit, even if we are resuming
// partially through the checksum.
r.setCurrentState(stateCopyRows)
if err := r.copier.Run(ctx); err != nil {
return err
Expand Down Expand Up @@ -630,7 +635,16 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error {
if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, cpName); err != nil {
return err
}
if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL AUTO_INCREMENT PRIMARY KEY, low_watermark TEXT, binlog_name VARCHAR(255), binlog_pos INT, rows_copied BIGINT, rows_copied_logical BIGINT, alter_statement TEXT)",
if err := dbconn.Exec(ctx, r.db, `CREATE TABLE %n.%n (
id int NOT NULL AUTO_INCREMENT PRIMARY KEY,
copier_watermark TEXT,
checksum_watermark TEXT,
binlog_name VARCHAR(255),
binlog_pos INT,
rows_copied BIGINT,
rows_copied_logical BIGINT,
alter_statement TEXT
)`,
r.table.SchemaName, cpName); err != nil {
return err
}
Expand Down Expand Up @@ -740,17 +754,21 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
// Make sure we can read from the new table.
if err := dbconn.Exec(ctx, r.db, "SELECT * FROM %n.%n LIMIT 1",
r.migration.Database, newName); err != nil {
return fmt.Errorf("could not read from table '%s'", newName)
return fmt.Errorf("could not find any checkpoints in table '%s'", newName)
}

query := fmt.Sprintf("SELECT low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
// We intentionally SELECT * FROM the checkpoint table because if the structure
// changes, we want this operation to fail. This will indicate that the checkpoint
// was created by either an earlier or later version of spirit, in which case
// we do not support recovery.
query := fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
r.migration.Database, cpName)
var lowWatermark, binlogName, alterStatement string
var binlogPos int
var copierWatermark, binlogName, alterStatement string
var id, binlogPos int
var rowsCopied, rowsCopiedLogical uint64
err := r.db.QueryRow(query).Scan(&lowWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
err := r.db.QueryRow(query).Scan(&id, &copierWatermark, &r.checksumWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
if err != nil {
return fmt.Errorf("could not read from table '%s'", cpName)
return fmt.Errorf("could not read from table '%s', err:%v", cpName, err)
}
if r.migration.Alter != alterStatement {
return ErrMismatchedAlter
Expand All @@ -777,8 +795,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
Logger: r.logger,
MetricsSink: r.metricsSink,
DBConfig: r.dbConfig,
}, lowWatermark, rowsCopied, rowsCopiedLogical)

}, copierWatermark, rowsCopied, rowsCopiedLogical)
if err != nil {
return err
}
Expand All @@ -796,9 +813,6 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
})

r.checkpointTable = table.NewTableInfo(r.db, r.table.SchemaName, cpName)
if err != nil {
return err
}

// Start the replClient now. This is because if the checkpoint is so old there
// are no longer binary log files, we want to abandon resume-from-checkpoint
Expand All @@ -808,7 +822,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
r.logger.Warnf("resuming from checkpoint failed because resuming from the previous binlog position failed. log-file: %s log-pos: %d", binlogName, binlogPos)
return err
}
r.logger.Warnf("resuming from checkpoint. low-watermark: %s log-file: %s log-pos: %d copy-rows: %d", lowWatermark, binlogName, binlogPos, rowsCopied)
r.logger.Warnf("resuming from checkpoint. copier-watermark: %s checksum-watermark: %s log-file: %s log-pos: %d copy-rows: %d", copierWatermark, r.checksumWatermark, binlogName, binlogPos, rowsCopied)
r.usedResumeFromCheckpoint = true
return nil
}
Expand All @@ -821,20 +835,24 @@ func (r *Runner) checksum(ctx context.Context) error {
// - background flushing
// - checkpoint thread
// - checksum "replaceChunk" DB connections
// Handle a case in the tests not having a dbConfig
// Handle a case just in the tests not having a dbConfig
if r.dbConfig == nil {
r.dbConfig = dbconn.NewDBConfig()
}
r.db.SetMaxOpenConns(r.dbConfig.MaxOpenConnections + 2)
var err error
for i := range 3 { // try the checksum up to 3 times.
if i > 0 {
r.checksumWatermark = "" // reset the watermark if we are retrying.
}
r.checkerLock.Lock()
r.checker, err = checksum.NewChecker(r.db, r.table, r.newTable, r.replClient, &checksum.CheckerConfig{
Concurrency: r.migration.Threads,
TargetChunkTime: r.migration.TargetChunkTime,
DBConfig: r.dbConfig,
Logger: r.logger,
FixDifferences: true, // we want to repair the differences.
Watermark: r.checksumWatermark,
})
r.checkerLock.Unlock()
if err != nil {
Expand Down Expand Up @@ -881,26 +899,44 @@ func (r *Runner) setCurrentState(s migrationState) {
atomic.StoreInt32((*int32)(&r.currentState), int32(s))
}

// dumpCheckpoint is called approximately every minute.
// It writes the current state of the migration to the checkpoint table,
// which can be used in recovery. Previously resuming from checkpoint
// would always restart at the copier, but it can now also resume at
// the checksum phase.
func (r *Runner) dumpCheckpoint(ctx context.Context) error {
// Retrieve the binlog position first and under a mutex.
// Currently, it never advances, but it's possible it might in future
// and this race condition is missed.
binlog := r.replClient.GetBinlogApplyPosition()
lowWatermark, err := r.copier.GetLowWatermark()
copierWatermark, err := r.copier.GetLowWatermark()
if err != nil {
return err // it might not be ready, we can try again.
}
// We only dump the checksumWatermark if we are in >= checksum state.
// We require a mutex because the checker can be replaced during
// operation, leaving a race condition.
var checksumWatermark string
if r.getCurrentState() >= stateChecksum {
r.checkerLock.Lock()
defer r.checkerLock.Unlock()
if r.checker != nil {
checksumWatermark, err = r.checker.GetLowWatermark()
if err != nil {
return err
}
}
}
copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount)
logicalCopyRows := atomic.LoadUint64(&r.copier.CopyRowsLogicalCount)
// Note: when we dump the lowWatermark to the log, we are exposing the PK values,
// when using the composite chunker are based on actual user-data.
// We believe this is OK but may change it in the future. Please do not
// add any other fields to this log line.
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", lowWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?)",
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", copierWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?, %?)",
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
lowWatermark,
copierWatermark,
checksumWatermark,
binlog.Name,
binlog.Pos,
copyRows,
Expand Down
94 changes: 90 additions & 4 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,20 @@ func TestCheckpointRestore(t *testing.T) {
// from issue #125
watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}"
binlog := r.replClient.GetBinlogApplyPosition()
query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)",
r.checkpointTable.QuotedName)
_, err = r.db.ExecContext(context.TODO(), query, watermark, binlog.Name, binlog.Pos, 0, 0, r.migration.Alter)
err = dbconn.Exec(context.TODO(), r.db, `INSERT INTO %n.%n
(copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement)
VALUES
(%?, %?, %?, %?, %?, %?, %?)`,
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
watermark,
"",
binlog.Name,
binlog.Pos,
0,
0,
r.migration.Alter,
)
assert.NoError(t, err)

r2, err := NewRunner(&Migration{
Expand All @@ -944,6 +955,81 @@ func TestCheckpointRestore(t *testing.T) {
assert.True(t, r2.usedResumeFromCheckpoint)
}

func TestCheckpointResumeDuringChecksum(t *testing.T) {
tbl := `CREATE TABLE cptresume (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id2 INT NOT NULL,
pad VARCHAR(100) NOT NULL default 0)`
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`)
testutils.RunSQL(t, tbl)
testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`)

r, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)

// Call r.Run() with our context in a go-routine.
// When we see that we are waiting on the sentinel table,
// we then manually start the first bits of checksum, and then close()
// We should be able to resume from the checkpoint into the checksum state.
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := r.Run(ctx)
assert.Error(t, err) // context cancelled
}()
for {
// Wait for the sentinel table.
if r.getCurrentState() >= stateWaitingOnSentinelTable {
break
}
time.Sleep(time.Millisecond)
}

assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel.
assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark.
cancel() // unblock the original waiting on sentinel.
assert.NoError(t, r.Close()) // close the run.

// drop the sentinel table.
testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`)

// insert a couple more rows (should not change anything)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('b', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('c', 100) FROM dual`)

// Start again as a new runner,
r2, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)
err = r2.Run(context.Background())
assert.NoError(t, err)
assert.True(t, r2.usedResumeFromCheckpoint)
assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark
}

func TestCheckpointDifferentRestoreOptions(t *testing.T) {
tbl := `CREATE TABLE cpt1difft1 (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -2663,7 +2749,7 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {

go func() {
err := runner.Run(ctx)
assert.ErrorContains(t, err, "context canceled") // it gets interrupted as soon as there is a checkpoint saved.
assert.Error(t, err) // it gets interrupted as soon as there is a checkpoint saved.
}()

// wait until a checkpoint is saved (which means copy is in progress)
Expand Down

1 comment on commit 5a7c164

@chikitai

This comment was marked as spam.

Please sign in to comment.