From 81bc1db44a478c2f047297b13a0ae589a932c438 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 7 Mar 2024 09:09:11 -0700 Subject: [PATCH] reduce checksum failures --- pkg/checksum/checker.go | 42 ++++++++++------------- pkg/checksum/checker_test.go | 61 +++++++++++++++++---------------- pkg/dbconn/conn.go | 16 ++------- pkg/dbconn/dbconn.go | 1 - pkg/migration/runner.go | 5 ++- pkg/migration/runner_test.go | 1 + pkg/repl/client.go | 25 ++++++++++---- pkg/repl/client_test.go | 1 + pkg/table/chunker_optimistic.go | 5 ++- 9 files changed, 77 insertions(+), 80 deletions(-) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index b3e1cf4e..4f065444 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -254,16 +254,7 @@ func (c *Checker) setInvalid(newVal bool) { c.isInvalid = newVal } -func (c *Checker) Run(ctx context.Context) error { - c.Lock() - c.startTime = time.Now() - defer func() { - c.ExecTime = time.Since(c.startTime) - }() - if err := c.chunker.Open(); err != nil { - return err - } - c.Unlock() +func (c *Checker) initConnPool(ctx context.Context) error { // Try and catch up before we apply a table lock, // since we will need to catch up again with the lock held // and we want to minimize that. @@ -273,42 +264,45 @@ func (c *Checker) Run(ctx context.Context) error { // Lock the source table in a trx // so the connection is not used by others c.logger.Info("starting checksum operation, this will require a table lock") - serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, false, c.dbConfig, c.logger) + serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, true, c.dbConfig, c.logger) if err != nil { return err } defer serverLock.Close() - // With the lock held, flush one more time under the lock tables. // Because we know canal is up to date this now guarantees // we have everything in the new table. - if err := c.feed.Flush(ctx); err != nil { + if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil { return err } - // Assert that the change set is empty. This should always // be the case because we are under a lock. if !c.feed.AllChangesFlushed() { return errors.New("not all changes flushed") } - // Create a set of connections which can be used to checksum // The table. They MUST be created before the lock is released // with REPEATABLE-READ and a consistent snapshot (or dummy read) // to initialize the read-view. c.trxPool, err = dbconn.NewTrxPool(ctx, c.db, c.concurrency, c.dbConfig) - if err != nil { - return err - } + return err +} - // Assert that the change set is still empty. - // It should always be empty while we are still under the lock. - if c.feed.GetDeltaLen() > 0 { - return errors.New("the changeset is not empty, can not run checksum") +func (c *Checker) Run(ctx context.Context) error { + c.Lock() + c.startTime = time.Now() + defer func() { + c.ExecTime = time.Since(c.startTime) + }() + if err := c.chunker.Open(); err != nil { + return err } + c.Unlock() - // We can now unlock the table before starting the checksumming. - if err = serverLock.Close(); err != nil { + // initConnPool initialize the connection pool. + // This is done under a table lock which is acquired in this func. + // It is released as the func is returned. + if err := c.initConnPool(ctx); err != nil { return err } c.logger.Info("table unlocked, starting checksum") diff --git a/pkg/checksum/checker_test.go b/pkg/checksum/checker_test.go index 3a18eaec..68534336 100644 --- a/pkg/checksum/checker_test.go +++ b/pkg/checksum/checker_test.go @@ -17,19 +17,19 @@ import ( ) func TestBasicChecksum(t *testing.T) { - testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt") + testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, _t1_new, _t1_chkpnt") testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") - testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)") - testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _t1_new VALUES (1, 2, 3)") db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) t1 := table.NewTableInfo(db, "test", "t1") assert.NoError(t, t1.SetInfo(context.TODO())) - t2 := table.NewTableInfo(db, "test", "t2") + t2 := table.NewTableInfo(db, "test", "_t1_new") assert.NoError(t, t2.SetInfo(context.TODO())) logger := logrus.New() @@ -88,20 +88,20 @@ func TestBasicValidation(t *testing.T) { } func TestFixCorrupt(t *testing.T) { - testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, fixcorruption_t2, _fixcorruption_t1_chkpnt") + testutils.RunSQL(t, "DROP TABLE IF EXISTS fixcorruption_t1, _fixcorruption_t1_new, _fixcorruption_t1_chkpnt") testutils.RunSQL(t, "CREATE TABLE fixcorruption_t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") - testutils.RunSQL(t, "CREATE TABLE fixcorruption_t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") testutils.RunSQL(t, "CREATE TABLE _fixcorruption_t1_chkpnt (a INT)") // for binlog advancement testutils.RunSQL(t, "INSERT INTO fixcorruption_t1 VALUES (1, 2, 3)") - testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (1, 2, 3)") - testutils.RunSQL(t, "INSERT INTO fixcorruption_t2 VALUES (2, 2, 3)") // corrupt + testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _fixcorruption_t1_new VALUES (2, 2, 3)") // corrupt db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) t1 := table.NewTableInfo(db, "test", "fixcorruption_t1") assert.NoError(t, t1.SetInfo(context.TODO())) - t2 := table.NewTableInfo(db, "test", "fixcorruption_t2") + t2 := table.NewTableInfo(db, "test", "_fixcorruption_t1_new") assert.NoError(t, t2.SetInfo(context.TODO())) logger := logrus.New() @@ -131,20 +131,20 @@ func TestFixCorrupt(t *testing.T) { } func TestCorruptChecksum(t *testing.T) { - testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2, _t1_chkpnt") - testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") - testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") - testutils.RunSQL(t, "CREATE TABLE _t1_chkpnt (a INT)") // for binlog advancement - testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2, 3)") - testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2, 3)") - testutils.RunSQL(t, "INSERT INTO t2 VALUES (2, 2, 3)") // corrupt + testutils.RunSQL(t, "DROP TABLE IF EXISTS chkpcorruptt1, _chkpcorruptt1_new, _chkpcorruptt1_chkpnt") + testutils.RunSQL(t, "CREATE TABLE chkpcorruptt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _chkpcorruptt1_chkpnt (a INT)") // for binlog advancement + testutils.RunSQL(t, "INSERT INTO chkpcorruptt1 VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _chkpcorruptt1_new VALUES (2, 2, 3)") // corrupt db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) - t1 := table.NewTableInfo(db, "test", "t1") + t1 := table.NewTableInfo(db, "test", "chkpcorruptt1") assert.NoError(t, t1.SetInfo(context.TODO())) - t2 := table.NewTableInfo(db, "test", "t2") + t2 := table.NewTableInfo(db, "test", "_chkpcorruptt1_new") assert.NoError(t, t2.SetInfo(context.TODO())) logger := logrus.New() @@ -164,18 +164,19 @@ func TestCorruptChecksum(t *testing.T) { } func TestBoundaryCases(t *testing.T) { - testutils.RunSQL(t, "DROP TABLE IF EXISTS t1, t2") - testutils.RunSQL(t, "CREATE TABLE t1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))") - testutils.RunSQL(t, "CREATE TABLE t2 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))") - testutils.RunSQL(t, "INSERT INTO t1 VALUES (1, 2.2, '')") // null vs empty string - testutils.RunSQL(t, "INSERT INTO t2 VALUES (1, 2.2, NULL)") // should not compare + testutils.RunSQL(t, "DROP TABLE IF EXISTS checkert1, _checkert1_new, _checkert1_chkpnt") + testutils.RunSQL(t, "CREATE TABLE checkert1 (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _checkert1_new (a INT NOT NULL, b FLOAT, c VARCHAR(255), PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _checkert1_chkpnt (a INT NOT NULL)") + testutils.RunSQL(t, "INSERT INTO checkert1 VALUES (1, 2.2, '')") // null vs empty string + testutils.RunSQL(t, "INSERT INTO _checkert1_new VALUES (1, 2.2, NULL)") // should not compare db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) - t1 := table.NewTableInfo(db, "test", "t1") + t1 := table.NewTableInfo(db, "test", "checkert1") assert.NoError(t, t1.SetInfo(context.TODO())) - t2 := table.NewTableInfo(db, "test", "t2") + t2 := table.NewTableInfo(db, "test", "_checkert1_new") assert.NoError(t, t2.SetInfo(context.TODO())) logger := logrus.New() @@ -193,14 +194,14 @@ func TestBoundaryCases(t *testing.T) { assert.Error(t, checker.Run(context.Background())) // UPDATE t1 to also be NULL - testutils.RunSQL(t, "UPDATE t1 SET c = NULL") + testutils.RunSQL(t, "UPDATE checkert1 SET c = NULL") checker, err = NewChecker(db, t1, t2, feed, NewCheckerDefaultConfig()) assert.NoError(t, err) assert.NoError(t, checker.Run(context.Background())) } func TestChangeDataTypeDatetime(t *testing.T) { - testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, tdatetime2") + testutils.RunSQL(t, "DROP TABLE IF EXISTS tdatetime, _tdatetime_new") testutils.RunSQL(t, `CREATE TABLE tdatetime ( id bigint NOT NULL AUTO_INCREMENT primary key, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -209,7 +210,7 @@ func TestChangeDataTypeDatetime(t *testing.T) { activated_at timestamp NULL DEFAULT NULL, deactivated_at timestamp NULL DEFAULT NULL )`) - testutils.RunSQL(t, `CREATE TABLE tdatetime2 ( + testutils.RunSQL(t, `CREATE TABLE _tdatetime_new ( id bigint NOT NULL AUTO_INCREMENT primary key, created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), @@ -230,7 +231,7 @@ func TestChangeDataTypeDatetime(t *testing.T) { ('2023-05-26 06:24:24', '2023-05-28 23:45:01', '2023-05-26 06:24:23', '2023-05-26 06:24:42', '2023-05-28 23:45:01'), ('2023-05-28 23:46:07', '2023-05-29 00:57:55', '2023-05-28 23:46:05', '2023-05-28 23:46:05', NULL ), ('2023-05-28 23:53:34', '2023-05-29 00:57:56', '2023-05-28 23:53:33', '2023-05-28 23:58:09', NULL );`) - testutils.RunSQL(t, `INSERT INTO tdatetime2 SELECT * FROM tdatetime`) + testutils.RunSQL(t, `INSERT INTO _tdatetime_new SELECT * FROM tdatetime`) // The checkpoint table is required for blockwait, structure doesn't matter. testutils.RunSQL(t, "CREATE TABLE IF NOT EXISTS _tdatetime_chkpnt (id int)") @@ -239,7 +240,7 @@ func TestChangeDataTypeDatetime(t *testing.T) { t1 := table.NewTableInfo(db, "test", "tdatetime") assert.NoError(t, t1.SetInfo(context.TODO())) - t2 := table.NewTableInfo(db, "test", "tdatetime2") + t2 := table.NewTableInfo(db, "test", "_tdatetime_new") assert.NoError(t, t2.SetInfo(context.TODO())) // fails logger := logrus.New() diff --git a/pkg/dbconn/conn.go b/pkg/dbconn/conn.go index abe81fc7..0d2ea73b 100644 --- a/pkg/dbconn/conn.go +++ b/pkg/dbconn/conn.go @@ -3026,12 +3026,7 @@ func newDSN(dsn string, config *DBConfig) (string, error) { ops = append(ops, fmt.Sprintf("%s=%s", "innodb_lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.InnodbLockWaitTimeout)))) ops = append(ops, fmt.Sprintf("%s=%s", "lock_wait_timeout", url.QueryEscape(fmt.Sprint(config.LockWaitTimeout)))) ops = append(ops, fmt.Sprintf("%s=%s", "range_optimizer_max_mem_size", url.QueryEscape(fmt.Sprint(config.RangeOptimizerMaxMemSize)))) - - if config.Aurora20Compatible { - ops = append(ops, fmt.Sprintf("%s=%s", "tx_isolation", url.QueryEscape(`"read-committed"`))) // Aurora 2.0 - } else { - ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`))) // MySQL 8.0 - } + ops = append(ops, fmt.Sprintf("%s=%s", "transaction_isolation", url.QueryEscape(`"read-committed"`))) // go driver options, should set: // character_set_client, character_set_connection, character_set_results ops = append(ops, fmt.Sprintf("%s=%s", "charset", "binary")) @@ -3054,14 +3049,7 @@ func New(inputDSN string, config *DBConfig) (*sql.DB, error) { } if err := db.Ping(); err != nil { utils.ErrInErr(db.Close()) - if config.Aurora20Compatible { - return nil, err // Already tried it, didn't work. - } - // This could be because of transaction_isolation vs. tx_isolation. - // Try changing to Aurora 2.0 compatible mode and retrying. - // because config is a pointer, it will update future calls too. - config.Aurora20Compatible = true - return New(inputDSN, config) + return nil, err } db.SetMaxOpenConns(config.MaxOpenConnections) db.SetConnMaxLifetime(maxConnLifetime) diff --git a/pkg/dbconn/dbconn.go b/pkg/dbconn/dbconn.go index 29f632ac..90ca83f7 100644 --- a/pkg/dbconn/dbconn.go +++ b/pkg/dbconn/dbconn.go @@ -28,7 +28,6 @@ type DBConfig struct { InnodbLockWaitTimeout int MaxRetries int MaxOpenConnections int - Aurora20Compatible bool // use tx_isolation instead of transaction_isolation RangeOptimizerMaxMemSize int64 } diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 4f3ded2a..2418982b 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -233,6 +233,7 @@ func (r *Runner) Run(originalCtx context.Context) error { return err } r.logger.Info("copy rows complete") + r.replClient.SetKeyAboveWatermarkOptimization(false) // should no longer be used. // r.waitOnSentinel may return an error if there is // some unexpected problem checking for the existence of @@ -495,6 +496,7 @@ func (r *Runner) setup(ctx context.Context) error { // If this is NOT nil then it will use this optimization when determining // if it can ignore a KEY. r.replClient.KeyAboveCopierCallback = r.copier.KeyAboveHighWatermark + r.replClient.SetKeyAboveWatermarkOptimization(true) // Start routines in table and replication packages to // Continuously update the min/max and estimated rows @@ -800,9 +802,6 @@ func (r *Runner) getCurrentState() migrationState { func (r *Runner) setCurrentState(s migrationState) { atomic.StoreInt32((*int32)(&r.currentState), int32(s)) - if s > stateCopyRows && r.replClient != nil { - r.replClient.SetKeyAboveWatermarkOptimization(false) - } } func (r *Runner) dumpCheckpoint(ctx context.Context) error { diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 36ee6197..9cacd769 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -1305,6 +1305,7 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) { m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark err = m.replClient.Run() assert.NoError(t, err) + m.replClient.SetKeyAboveWatermarkOptimization(true) // Now we are ready to start copying rows. // Instead of calling m.copyRows() we will step through it manually. diff --git a/pkg/repl/client.go b/pkg/repl/client.go index d3b14dd1..2a00f7ab 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -68,8 +68,8 @@ type Client struct { table *table.TableInfo newTable *table.TableInfo - disableKeyAboveWatermarkOptimization bool - disableDeltaMap bool // use queue instead + enableKeyAboveWatermark bool + disableDeltaMap bool // use queue instead TableChangeNotificationCallback func() KeyAboveCopierCallback func(interface{}) bool @@ -146,10 +146,13 @@ func (c *Client) OnRow(e *canal.RowsEvent) error { return fmt.Errorf("no primary key found for row: %#v", row) } atomic.AddInt64(&c.changesetRowsEventCount, 1) - // Important! We can only apply this optimization while in migrationStateCopyRows. - // If we do it too early, we might miss updates in-between starting the subscription, - // and opening the table in resume from checkpoint etc. - if !c.disableKeyAboveWatermarkOptimization && c.KeyAboveCopierCallback != nil && c.KeyAboveCopierCallback(key[0]) { + + // The KeyAboveWatermark optimization has to be enabled + // We enable it once all the setup has been done (since we create a repl client + // earlier in setup to ensure binary logs are available). + // We then disable the optimization after the copier phase has finished. + if c.KeyAboveWatermarkEnabled() && c.KeyAboveCopierCallback(key[0]) { + c.logger.Debugf("key above watermark: %v", key[0]) continue // key can be ignored } switch e.Action { @@ -165,6 +168,14 @@ func (c *Client) OnRow(e *canal.RowsEvent) error { return nil } +// KeyAboveWatermarkEnabled returns true if the key above watermark optimization is enabled. +// and it's also safe to do so. +func (c *Client) KeyAboveWatermarkEnabled() bool { + c.Lock() + defer c.Unlock() + return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil +} + // OnRotate is called when a rotate event is discovered via replication. // We use this to capture the log file name, since only the position is caught on the row event. func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error { @@ -190,7 +201,7 @@ func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool) { c.Lock() defer c.Unlock() - c.disableKeyAboveWatermarkOptimization = !newVal + c.enableKeyAboveWatermark = newVal } // SetPos is used for resuming from a checkpoint. diff --git a/pkg/repl/client_test.go b/pkg/repl/client_test.go index b23327d4..a6e003b9 100644 --- a/pkg/repl/client_test.go +++ b/pkg/repl/client_test.go @@ -89,6 +89,7 @@ func TestReplClientComplex(t *testing.T) { assert.NoError(t, err) // Attach copier's keyabovewatermark to the repl client client.KeyAboveCopierCallback = copier.KeyAboveHighWatermark + client.SetKeyAboveWatermarkOptimization(true) assert.NoError(t, copier.Open4Test()) // need to manually open because we are not calling Run() diff --git a/pkg/table/chunker_optimistic.go b/pkg/table/chunker_optimistic.go index 55e88313..48c9dbc3 100644 --- a/pkg/table/chunker_optimistic.go +++ b/pkg/table/chunker_optimistic.go @@ -408,10 +408,13 @@ func (t *chunkerOptimistic) calculateNewTargetChunkSize() uint64 { return uint64(newTargetRows) } +// KeyAboveHighWatermark returns true if the key is above the high watermark. +// TRUE means that the row will be discarded so if there is any ambiguity, +// it's important to return FALSE. func (t *chunkerOptimistic) KeyAboveHighWatermark(key interface{}) bool { t.Lock() defer t.Unlock() - if t.chunkPtr.IsNil() { + if t.chunkPtr.IsNil() && t.checkpointHighPtr.IsNil() { return true // every key is above because we haven't started copying. } if t.finalChunkSent {