Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce checksum failures #274

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,7 @@ func (c *Checker) setInvalid(newVal bool) {
c.isInvalid = newVal
}

func (c *Checker) Run(ctx context.Context) error {
c.Lock()
c.startTime = time.Now()
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
}
c.Unlock()
func (c *Checker) initConnPool(ctx context.Context) error {
// Try and catch up before we apply a table lock,
// since we will need to catch up again with the lock held
// and we want to minimize that.
Expand All @@ -273,42 +264,45 @@ func (c *Checker) Run(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
c.logger.Info("starting checksum operation, this will require a table lock")
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, false, c.dbConfig, c.logger)
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, true, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()

// With the lock held, flush one more time under the lock tables.
// Because we know canal is up to date this now guarantees
// we have everything in the new table.
if err := c.feed.Flush(ctx); err != nil {
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, This is also a important change to reduce the chances the below check AllChangesFlushed() failing due to parallelism, but I guess not the reason the migrations under consideration failed, as they failed in the checksum phase.

return err
}

// Assert that the change set is empty. This should always
// be the case because we are under a lock.
if !c.feed.AllChangesFlushed() {
return errors.New("not all changes flushed")
}

// Create a set of connections which can be used to checksum
// The table. They MUST be created before the lock is released
// with REPEATABLE-READ and a consistent snapshot (or dummy read)
// to initialize the read-view.
c.trxPool, err = dbconn.NewTrxPool(ctx, c.db, c.concurrency, c.dbConfig)
if err != nil {
return err
}
return err
}

// Assert that the change set is still empty.
// It should always be empty while we are still under the lock.
if c.feed.GetDeltaLen() > 0 {
return errors.New("the changeset is not empty, can not run checksum")
func (c *Checker) Run(ctx context.Context) error {
c.Lock()
c.startTime = time.Now()
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
}
c.Unlock()

// We can now unlock the table before starting the checksumming.
if err = serverLock.Close(); err != nil {
// initConnPool initialize the connection pool.
// This is done under a table lock which is acquired in this func.
// It is released as the func is returned.
if err := c.initConnPool(ctx); err != nil {
return err
}
c.logger.Info("table unlocked, starting checksum")
Expand Down
61 changes: 31 additions & 30 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ import (
)

func TestBasicChecksum(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt")
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, _t1_new, _t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _t1_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_t1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand Down Expand Up @@ -88,20 +88,20 @@ func TestBasicValidation(t *testing.T) {
}

func TestFixCorrupt(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, fixcorruption_t2, _fixcorruption_t1_chkpnt")
testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, _fixcorruption_t1_new, _fixcorruption_t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE fixcorruption_t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE fixcorruption_t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO fixcorruption_t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (2, 2, 3)") // corrupt
testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (2, 2, 3)") // corrupt

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "fixcorruption_t1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "fixcorruption_t2")
t2 := table.NewTableInfo(db, "test", "_fixcorruption_t1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand Down Expand Up @@ -131,20 +131,20 @@ func TestFixCorrupt(t *testing.T) {
}

func TestCorruptChecksum(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO t2 VALUES (2, 2, 3)") // corrupt
testutils.RunSQL(t, "DROP TABLE IF EXISTS chkpcorruptt1, _chkpcorruptt1_new, _chkpcorruptt1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE chkpcorruptt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO chkpcorruptt1 VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (2, 2, 3)") // corrupt

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
t1 := table.NewTableInfo(db, "test", "chkpcorruptt1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_chkpcorruptt1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand All @@ -164,18 +164,19 @@ func TestCorruptChecksum(t *testing.T) {
}

func TestBoundaryCases(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2")
testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2.2, '')") // null vs empty string
testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2.2, NULL)") // should not compare
testutils.RunSQL(t, "DROP TABLE IF EXISTS checkert1, _checkert1_new, _checkert1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE checkert1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _checkert1_new (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _checkert1_chkpnt (a INT NOT NULL)")
testutils.RunSQL(t, "INSERT INTO checkert1 VALUES (1, 2.2, '')") // null vs empty string
testutils.RunSQL(t, "INSERT INTO _checkert1_new VALUES (1, 2.2, NULL)") // should not compare

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "t1")
t1 := table.NewTableInfo(db, "test", "checkert1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "t2")
t2 := table.NewTableInfo(db, "test", "_checkert1_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

Expand All @@ -193,14 +194,14 @@ func TestBoundaryCases(t *testing.T) {
assert.Error(t, checker.Run(context.Background()))

// UPDATE t1 to also be NULL
testutils.RunSQL(t, "UPDATE t1 SET c = NULL")
testutils.RunSQL(t, "UPDATE checkert1 SET c = NULL")
checker, err = NewChecker(db, t1, t2, feed, NewCheckerDefaultConfig())
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}

func TestChangeDataTypeDatetime(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, tdatetime2")
testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, _tdatetime_new")
testutils.RunSQL(t, `CREATE TABLE tdatetime (
id bigint NOT NULL AUTO_INCREMENT primary key,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -209,7 +210,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {
activated_at timestamp NULL DEFAULT NULL,
deactivated_at timestamp NULL DEFAULT NULL
)`)
testutils.RunSQL(t, `CREATE TABLE tdatetime2 (
testutils.RunSQL(t, `CREATE TABLE _tdatetime_new (
id bigint NOT NULL AUTO_INCREMENT primary key,
created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
Expand All @@ -230,7 +231,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {
('2023-05-26 06:24:24', '2023-05-28 23:45:01', '2023-05-26 06:24:23', '2023-05-26 06:24:42', '2023-05-28 23:45:01'),
('2023-05-28 23:46:07', '2023-05-29 00:57:55', '2023-05-28 23:46:05', '2023-05-28 23:46:05', NULL ),
('2023-05-28 23:53:34', '2023-05-29 00:57:56', '2023-05-28 23:53:33', '2023-05-28 23:58:09', NULL );`)
testutils.RunSQL(t, `INSERT INTO tdatetime2 SELECT * FROM tdatetime`)
testutils.RunSQL(t, `INSERT INTO _tdatetime_new SELECT * FROM tdatetime`)
// The checkpoint table is required for blockwait, structure doesn't matter.
testutils.RunSQL(t, "CREATE TABLE IF NOT EXISTS _tdatetime_chkpnt (id int)")

Expand All @@ -239,7 +240,7 @@ func TestChangeDataTypeDatetime(t *testing.T) {

t1 := table.NewTableInfo(db, "test", "tdatetime")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "tdatetime2")
t2 := table.NewTableInfo(db, "test", "_tdatetime_new")
assert.NoError(t, t2.SetInfo(context.TODO())) // fails
logger := logrus.New()

Expand Down
16 changes: 2 additions & 14 deletions pkg/dbconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,12 +3026,7 @@ func newDSN(dsn string, config *DBConfig) (string, error) {
ops = append(ops, fmt.Sprintf("%s=%s", "innodb_lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.InnodbLockWaitTimeout))))
ops = append(ops, fmt.Sprintf("%s=%s", "lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.LockWaitTimeout))))
ops = append(ops, fmt.Sprintf("%s=%s", "range_optimizer_max_mem_size", url.QueryEscape(fmt.Sprint(config.RangeOptimizerMaxMemSize))))

if config.Aurora20Compatible {
ops = append(ops, fmt.Sprintf("%s=%s", "tx_isolation", url.QueryEscape(`"read-committed"`))) // Aurora 2.0
} else {
ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`))) // MySQL 8.0
}
ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`)))
// go driver options, should set:
// character_set_client, character_set_connection, character_set_results
ops = append(ops, fmt.Sprintf("%s=%s", "charset", "binary"))
Expand All @@ -3054,14 +3049,7 @@ func New(inputDSN string, config *DBConfig) (*sql.DB, error) {
}
if err := db.Ping(); err != nil {
utils.ErrInErr(db.Close())
if config.Aurora20Compatible {
return nil, err // Already tried it, didn't work.
}
// This could be because of transaction_isolation vs. tx_isolation.
// Try changing to Aurora 2.0 compatible mode and retrying.
// because config is a pointer, it will update future calls too.
config.Aurora20Compatible = true
return New(inputDSN, config)
return nil, err
}
db.SetMaxOpenConns(config.MaxOpenConnections)
db.SetConnMaxLifetime(maxConnLifetime)
Expand Down
1 change: 0 additions & 1 deletion pkg/dbconn/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type DBConfig struct {
InnodbLockWaitTimeout int
MaxRetries int
MaxOpenConnections int
Aurora20Compatible bool // use tx_isolation instead of transaction_isolation
RangeOptimizerMaxMemSize int64
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (r *Runner) Run(originalCtx context.Context) error {
return err
}
r.logger.Info("copy rows complete")
r.replClient.SetKeyAboveWatermarkOptimization(false) // should no longer be used.

// r.waitOnSentinel may return an error if there is
// some unexpected problem checking for the existence of
Expand Down Expand Up @@ -495,6 +496,7 @@ func (r *Runner) setup(ctx context.Context) error {
// If this is NOT nil then it will use this optimization when determining
// if it can ignore a KEY.
r.replClient.KeyAboveCopierCallback = r.copier.KeyAboveHighWatermark
r.replClient.SetKeyAboveWatermarkOptimization(true)

// Start routines in table and replication packages to
// Continuously update the min/max and estimated rows
Expand Down Expand Up @@ -800,9 +802,6 @@ func (r *Runner) getCurrentState() migrationState {

func (r *Runner) setCurrentState(s migrationState) {
atomic.StoreInt32((*int32)(&r.currentState), int32(s))
if s > stateCopyRows && r.replClient != nil {
r.replClient.SetKeyAboveWatermarkOptimization(false)
}
}

func (r *Runner) dumpCheckpoint(ctx context.Context) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) {
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
err = m.replClient.Run()
assert.NoError(t, err)
m.replClient.SetKeyAboveWatermarkOptimization(true)

// Now we are ready to start copying rows.
// Instead of calling m.copyRows() we will step through it manually.
Expand Down
25 changes: 18 additions & 7 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type Client struct {
table *table.TableInfo
newTable *table.TableInfo

disableKeyAboveWatermarkOptimization bool
disableDeltaMap bool // use queue instead
enableKeyAboveWatermark bool
disableDeltaMap bool // use queue instead

TableChangeNotificationCallback func()
KeyAboveCopierCallback func(interface{}) bool
Expand Down Expand Up @@ -146,10 +146,13 @@ func (c *Client) OnRow(e *canal.RowsEvent) error {
return fmt.Errorf("no primary key found for row: %#v", row)
}
atomic.AddInt64(&c.changesetRowsEventCount, 1)
// Important! We can only apply this optimization while in migrationStateCopyRows.
// If we do it too early, we might miss updates in-between starting the subscription,
// and opening the table in resume from checkpoint etc.
if !c.disableKeyAboveWatermarkOptimization && c.KeyAboveCopierCallback != nil && c.KeyAboveCopierCallback(key[0]) {

// The KeyAboveWatermark optimization has to be enabled
// We enable it once all the setup has been done (since we create a repl client
// earlier in setup to ensure binary logs are available).
// We then disable the optimization after the copier phase has finished.
if c.KeyAboveWatermarkEnabled() && c.KeyAboveCopierCallback(key[0]) {
c.logger.Debugf("key above watermark: %v", key[0])
continue // key can be ignored
}
switch e.Action {
Expand All @@ -165,6 +168,14 @@ func (c *Client) OnRow(e *canal.RowsEvent) error {
return nil
}

// KeyAboveWatermarkEnabled returns true if the key above watermark optimization is enabled.
// and it's also safe to do so.
func (c *Client) KeyAboveWatermarkEnabled() bool {
c.Lock()
defer c.Unlock()
return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil
}

// OnRotate is called when a rotate event is discovered via replication.
// We use this to capture the log file name, since only the position is caught on the row event.
func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error {
Expand All @@ -190,7 +201,7 @@ func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool) {
c.Lock()
defer c.Unlock()

c.disableKeyAboveWatermarkOptimization = !newVal
c.enableKeyAboveWatermark = newVal
}

// SetPos is used for resuming from a checkpoint.
Expand Down
1 change: 1 addition & 0 deletions pkg/repl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestReplClientComplex(t *testing.T) {
assert.NoError(t, err)
// Attach copier's keyabovewatermark to the repl client
client.KeyAboveCopierCallback = copier.KeyAboveHighWatermark
client.SetKeyAboveWatermarkOptimization(true)

assert.NoError(t, copier.Open4Test()) // need to manually open because we are not calling Run()

Expand Down
5 changes: 4 additions & 1 deletion pkg/table/chunker_optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,13 @@ func (t *chunkerOptimistic) calculateNewTargetChunkSize() uint64 {
return uint64(newTargetRows)
}

// KeyAboveHighWatermark returns true if the key is above the high watermark.
// TRUE means that the row will be discarded so if there is any ambiguity,
// it's important to return FALSE.
func (t *chunkerOptimistic) KeyAboveHighWatermark(key interface{}) bool {
t.Lock()
defer t.Unlock()
if t.chunkPtr.IsNil() {
if t.chunkPtr.IsNil() && t.checkpointHighPtr.IsNil() {
return true // every key is above because we haven't started copying.
}
if t.finalChunkSent {
Expand Down
Loading