Skip to content

Commit

Permalink
reduce checksum failures
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Mar 7, 2024
1 parent 7b48d57 commit 5fbc94e
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 79 deletions.
42 changes: 18 additions & 24 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,7 @@ func (c *Checker) setInvalid(newVal bool) {
c.isInvalid = newVal
}

func (c *Checker) Run(ctx context.Context) error {
c.Lock()
c.startTime = time.Now()
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
}
c.Unlock()
func (c *Checker) initConnPool(ctx context.Context) error {
// Try and catch up before we apply a table lock,
// since we will need to catch up again with the lock held
// and we want to minimize that.
Expand All @@ -273,42 +264,45 @@ func (c *Checker) Run(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
c.logger.Info("starting checksum operation, this will require a table lock")
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, false, c.dbConfig, c.logger)
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, true, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()

// With the lock held, flush one more time under the lock tables.
// Because we know canal is up to date this now guarantees
// we have everything in the new table.
if err := c.feed.Flush(ctx); err != nil {
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
return err
}

// Assert that the change set is empty. This should always
// be the case because we are under a lock.
if !c.feed.AllChangesFlushed() {
return errors.New("not all changes flushed")
}

// Create a set of connections which can be used to checksum
// The table. They MUST be created before the lock is released
// with REPEATABLE-READ and a consistent snapshot (or dummy read)
// to initialize the read-view.
c.trxPool, err = dbconn.NewTrxPool(ctx, c.db, c.concurrency, c.dbConfig)
if err != nil {
return err
}
return err
}

// Assert that the change set is still empty.
// It should always be empty while we are still under the lock.
if c.feed.GetDeltaLen() > 0 {
return errors.New("the changeset is not empty, can not run checksum")
func (c *Checker) Run(ctx context.Context) error {
c.Lock()
c.startTime = time.Now()
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
}
c.Unlock()

// We can now unlock the table before starting the checksumming.
if err = serverLock.Close(); err != nil {
// initConnPool initialize the connection pool.
// This is done under a table lock which is acquired in this func.
// It is released as the func is returned.
if err := c.initConnPool(ctx); err != nil {
return err
}
c.logger.Info("table unlocked, starting checksum")
Expand Down
61 changes: 31 additions & 30 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ import (
)

func TestBasicChecksum(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt")
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, _t1_new, _t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _t1_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_t1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand Down Expand Up @@ -88,20 +88,20 @@ func TestBasicValidation(t *testing.T) {
}

func TestFixCorrupt(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, fixcorruption_t2, _fixcorruption_t1_chkpnt")
testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, _fixcorruption_t1_new, _fixcorruption_t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE fixcorruption_t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE fixcorruption_t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO fixcorruption_t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (2, 2, 3)") // corrupt
testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (2, 2, 3)") // corrupt

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "fixcorruption_t1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "fixcorruption_t2")
t2 := table.NewTableInfo(db, "test", "_fixcorruption_t1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand Down Expand Up @@ -131,20 +131,20 @@ func TestFixCorrupt(t *testing.T) {
}

func TestCorruptChecksum(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (2, 2, 3)") // corrupt
testutils.RunSQL(t, "DROP TABLE IF EXISTS chkpcorruptt1, _chkpcorruptt1_new, _chkpcorruptt1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE chkpcorruptt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO chkpcorruptt1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (2, 2, 3)") // corrupt

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
t1 := table.NewTableInfo(db, "test", "chkpcorruptt1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_chkpcorruptt1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand All @@ -164,18 +164,19 @@ func TestCorruptChecksum(t *testing.T) {
}

func TestBoundaryCases(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2.2, '')") // null vs empty string
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2.2, NULL)") // should not compare
testutils.RunSQL(t, "DROP TABLE IF EXISTS checkert1, _checkert1_new, _checkert1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE checkert1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _checkert1_new (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _checkert1_chkpnt (a INT NOT NULL)")
testutils.RunSQL(t, "INSERT INTO checkert1 VALUES (1, 2.2, '')") // null vs empty string
testutils.RunSQL(t, "INSERT INTO _checkert1_new VALUES (1, 2.2, NULL)") // should not compare

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
t1 := table.NewTableInfo(db, "test", "checkert1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_checkert1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand All @@ -193,14 +194,14 @@ func TestBoundaryCases(t *testing.T) {
assert.Error(t, checker.Run(context.Background()))

// UPDATE t1 to also be NULL
testutils.RunSQL(t, "UPDATE t1 SET c = NULL")
testutils.RunSQL(t, "UPDATE checkert1 SET c = NULL")
checker, err = NewChecker(db, t1, t2, feed, NewCheckerDefaultConfig())
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}

func TestChangeDataTypeDatetime(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, tdatetime2")
testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, _tdatetime_new")
testutils.RunSQL(t, `CREATE TABLE tdatetime (
id bigint NOT NULL AUTO_INCREMENT primary key,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -209,7 +210,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {
activated_at timestamp NULL DEFAULT NULL,
deactivated_at timestamp NULL DEFAULT NULL
)`)
testutils.RunSQL(t, `CREATE TABLE tdatetime2 (
testutils.RunSQL(t, `CREATE TABLE _tdatetime_new (
id bigint NOT NULL AUTO_INCREMENT primary key,
created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
Expand All @@ -230,7 +231,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {
('2023-05-26 06:24:24', '2023-05-28 23:45:01', '2023-05-26 06:24:23', '2023-05-26 06:24:42', '2023-05-28 23:45:01'),
('2023-05-28 23:46:07', '2023-05-29 00:57:55', '2023-05-28 23:46:05', '2023-05-28 23:46:05', NULL ),
('2023-05-28 23:53:34', '2023-05-29 00:57:56', '2023-05-28 23:53:33', '2023-05-28 23:58:09', NULL );`)
testutils.RunSQL(t, `INSERT INTO tdatetime2 SELECT * FROM tdatetime`)
testutils.RunSQL(t, `INSERT INTO _tdatetime_new SELECT * FROM tdatetime`)
// The checkpoint table is required for blockwait, structure doesn't matter.
testutils.RunSQL(t, "CREATE TABLE IF NOT EXISTS _tdatetime_chkpnt (id int)")

Expand All @@ -239,7 +240,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {

t1 := table.NewTableInfo(db, "test", "tdatetime")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "tdatetime2")
t2 := table.NewTableInfo(db, "test", "_tdatetime_new")
assert.NoError(t, t2.SetInfo(context.TODO())) // fails
logger := logrus.New()

Expand Down
14 changes: 1 addition & 13 deletions pkg/dbconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,12 +3026,7 @@ func newDSN(dsn string, config *DBConfig) (string, error) {
ops = append(ops, fmt.Sprintf("%s=%s", "innodb_lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.InnodbLockWaitTimeout))))
ops = append(ops, fmt.Sprintf("%s=%s", "lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.LockWaitTimeout))))
ops = append(ops, fmt.Sprintf("%s=%s", "range_optimizer_max_mem_size", url.QueryEscape(fmt.Sprint(config.RangeOptimizerMaxMemSize))))

if config.Aurora20Compatible {
ops = append(ops, fmt.Sprintf("%s=%s", "tx_isolation", url.QueryEscape(`"read-committed"`))) // Aurora 2.0
} else {
ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`))) // MySQL 8.0
}
ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`)))
// go driver options, should set:
// character_set_client, character_set_connection, character_set_results
ops = append(ops, fmt.Sprintf("%s=%s", "charset", "binary"))
Expand All @@ -3054,13 +3049,6 @@ func New(inputDSN string, config *DBConfig) (*sql.DB, error) {
}
if err := db.Ping(); err != nil {
utils.ErrInErr(db.Close())
if config.Aurora20Compatible {
return nil, err // Already tried it, didn't work.
}
// This could be because of transaction_isolation vs. tx_isolation.
// Try changing to Aurora 2.0 compatible mode and retrying.
// because config is a pointer, it will update future calls too.
config.Aurora20Compatible = true
return New(inputDSN, config)
}
db.SetMaxOpenConns(config.MaxOpenConnections)
Expand Down
1 change: 0 additions & 1 deletion pkg/dbconn/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type DBConfig struct {
InnodbLockWaitTimeout int
MaxRetries int
MaxOpenConnections int
Aurora20Compatible bool // use tx_isolation instead of transaction_isolation
RangeOptimizerMaxMemSize int64
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (r *Runner) Run(originalCtx context.Context) error {
return err
}
r.logger.Info("copy rows complete")
r.replClient.SetKeyAboveWatermarkOptimization(false) // should no longer be used.

// r.waitOnSentinel may return an error if there is
// some unexpected problem checking for the existence of
Expand Down Expand Up @@ -495,6 +496,7 @@ func (r *Runner) setup(ctx context.Context) error {
// If this is NOT nil then it will use this optimization when determining
// if it can ignore a KEY.
r.replClient.KeyAboveCopierCallback = r.copier.KeyAboveHighWatermark
r.replClient.SetKeyAboveWatermarkOptimization(true)

// Start routines in table and replication packages to
// Continuously update the min/max and estimated rows
Expand Down Expand Up @@ -800,9 +802,6 @@ func (r *Runner) getCurrentState() migrationState {

func (r *Runner) setCurrentState(s migrationState) {
atomic.StoreInt32((*int32)(&r.currentState), int32(s))
if s > stateCopyRows && r.replClient != nil {
r.replClient.SetKeyAboveWatermarkOptimization(false)
}
}

func (r *Runner) dumpCheckpoint(ctx context.Context) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) {
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
err = m.replClient.Run()
assert.NoError(t, err)
m.replClient.SetKeyAboveWatermarkOptimization(true)

// Now we are ready to start copying rows.
// Instead of calling m.copyRows() we will step through it manually.
Expand Down
25 changes: 18 additions & 7 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type Client struct {
table *table.TableInfo
newTable *table.TableInfo

disableKeyAboveWatermarkOptimization bool
disableDeltaMap bool // use queue instead
enableKeyAboveWatermark bool
disableDeltaMap bool // use queue instead

TableChangeNotificationCallback func()
KeyAboveCopierCallback func(interface{}) bool
Expand Down Expand Up @@ -146,10 +146,13 @@ func (c *Client) OnRow(e *canal.RowsEvent) error {
return fmt.Errorf("no primary key found for row: %#v", row)
}
atomic.AddInt64(&c.changesetRowsEventCount, 1)
// Important! We can only apply this optimization while in migrationStateCopyRows.
// If we do it too early, we might miss updates in-between starting the subscription,
// and opening the table in resume from checkpoint etc.
if !c.disableKeyAboveWatermarkOptimization && c.KeyAboveCopierCallback != nil && c.KeyAboveCopierCallback(key[0]) {

// The KeyAboveWatermark optimization has to be enabled
// We enable it once all the setup has been done (since we create a repl client
// earlier in setup to ensure binary logs are available).
// We then disable the optimization after the copier phase has finished.
if c.KeyAboveWatermarkEnabled() && c.KeyAboveCopierCallback(key[0]) {
c.logger.Debugf("key above watermark: %v", key[0])
continue // key can be ignored
}
switch e.Action {
Expand All @@ -165,6 +168,14 @@ func (c *Client) OnRow(e *canal.RowsEvent) error {
return nil
}

// KeyAboveWatermarkEnabled returns true if the key above watermark optimization is enabled.
// and it's also safe to do so.
func (c *Client) KeyAboveWatermarkEnabled() bool {
c.Lock()
defer c.Unlock()
return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil
}

// OnRotate is called when a rotate event is discovered via replication.
// We use this to capture the log file name, since only the position is caught on the row event.
func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error {
Expand All @@ -190,7 +201,7 @@ func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool) {
c.Lock()
defer c.Unlock()

c.disableKeyAboveWatermarkOptimization = !newVal
c.enableKeyAboveWatermark = newVal
}

// SetPos is used for resuming from a checkpoint.
Expand Down
1 change: 1 addition & 0 deletions pkg/repl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestReplClientComplex(t *testing.T) {
assert.NoError(t, err)
// Attach copier's keyabovewatermark to the repl client
client.KeyAboveCopierCallback = copier.KeyAboveHighWatermark
client.SetKeyAboveWatermarkOptimization(true)

assert.NoError(t, copier.Open4Test()) // need to manually open because we are not calling Run()

Expand Down
5 changes: 4 additions & 1 deletion pkg/table/chunker_optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,13 @@ func (t *chunkerOptimistic) calculateNewTargetChunkSize() uint64 {
return uint64(newTargetRows)
}

// KeyAboveHighWatermark returns true if the key is above the high watermark.
// TRUE means that the row will be discarded so if there is any ambiguity,
// it's important to return FALSE.
func (t *chunkerOptimistic) KeyAboveHighWatermark(key interface{}) bool {
t.Lock()
defer t.Unlock()
if t.chunkPtr.IsNil() {
if t.chunkPtr.IsNil() && t.checkpointHighPtr.IsNil() {
return true // every key is above because we haven't started copying.
}
if t.finalChunkSent {
Expand Down

0 comments on commit 5fbc94e

Please sign in to comment.