From b0311665e223d7af4684d79c2b59b85b8ae01d63 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Mon, 7 Oct 2024 07:06:09 +0000 Subject: [PATCH] Send off transaction events as soon as we see a GTID event. Multiple `TableMap` events can happen inside of a single transaction, so we can't really use them to filter out events early or anything like that. Instead, we'll just skip them. Also adds some basic test cases for the transaction streaming. --- go/binlog/gomysql_reader.go | 39 +---- go/binlog/gomysql_reader_test.go | 243 +++++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+), 32 deletions(-) create mode 100644 go/binlog/gomysql_reader_test.go diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 6566ac854..6f58d0387 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -169,8 +169,6 @@ func rowsEventToBinlogEntry(eventType replication.EventType, rowsEvent *replicat } type Transaction struct { - DatabaseName string - TableName string SequenceNumber int64 LastCommitted int64 Changes chan *BinlogEntry @@ -231,13 +229,13 @@ groups: SequenceNumber: binlogEvent.SequenceNumber, LastCommitted: binlogEvent.LastCommitted, Changes: make(chan *BinlogEntry, 1000), - // Table and Schema aren't known until the following TableMapEvent - DatabaseName: "", - TableName: "", } previousSequenceNumber = binlogEvent.SequenceNumber + // We are good to send the transaction, the transaction events arrive async + this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted) + transactionsChannel <- group default: this.migrationContext.Log.Infof("Ignoring Event: %+v", ev.Event) continue @@ -260,6 +258,8 @@ groups: this.migrationContext.Log.Infof("QueryEvent: %+v", binlogEvent) this.migrationContext.Log.Infof("Query: %s", binlogEvent.Query) + close(group.Changes) + // wait for the next event group continue groups } @@ -275,33 +275,6 @@ groups: // Next event should be a table map event - ev, err = this.binlogStreamer.GetEvent(ctx) - if err != nil { - close(group.Changes) - return err - } - this.migrationContext.Log.Infof("2 - Event: %s", ev.Header.EventType) - - switch binlogEvent := ev.Event.(type) { - case *replication.TableMapEvent: - // TODO: Can we be smart here and short circuit processing groups for tables that don't match the table in the migration context? - - this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted) - - group.TableName = string(binlogEvent.Table) - group.DatabaseName = string(binlogEvent.Schema) - // we are good to send the transaction, the transaction events arrive async - transactionsChannel <- group - default: - this.migrationContext.Log.Infof("unexpected Event: %+v", ev.Event) - - close(group.Changes) - - // TODO: handle the group - we want to make sure we process the group's LastCommitted and SequenceNumber - - continue groups - } - events: // Now we can start processing the group for { @@ -313,6 +286,8 @@ groups: this.migrationContext.Log.Infof("3 - Event: %s", ev.Header.EventType) switch binlogEvent := ev.Event.(type) { + case *replication.TableMapEvent: + this.migrationContext.Log.Infof("TableMapEvent for %s.%s: %+v", binlogEvent.Schema, binlogEvent.Table, binlogEvent) case *replication.RowsEvent: binlogEntry, err := rowsEventToBinlogEntry(ev.Header.EventType, binlogEvent, this.currentCoordinates) if err != nil { diff --git a/go/binlog/gomysql_reader_test.go b/go/binlog/gomysql_reader_test.go new file mode 100644 index 000000000..a73eceef5 --- /dev/null +++ b/go/binlog/gomysql_reader_test.go @@ -0,0 +1,243 @@ +package binlog + +import ( + "database/sql" + "testing" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/mysql" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +func getCurrentBinlogCoordinates(t *testing.T, db *sql.DB) mysql.BinlogCoordinates { + var file string + var position int64 + var binlogDoDb string + var binlogIgnoreDb string + var executedGtidSet string + + err := db.QueryRow("SHOW MASTER STATUS").Scan(&file, &position, &binlogDoDb, &binlogIgnoreDb, &executedGtidSet) + require.NoError(t, err) + + return mysql.BinlogCoordinates{LogFile: file, LogPos: position} +} + +func getMigrationContext() *base.MigrationContext { + migrationContext := base.NewMigrationContext() + migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{ + Key: mysql.InstanceKey{ + Hostname: "localhost", + Port: 3306, + }, + User: "root", + Password: "", + } + migrationContext.SetConnectionConfig("innodb") + migrationContext.ReplicaServerId = 99999 + return migrationContext +} + +func prepareDatabase(t *testing.T, db *sql.DB) { + _, err := db.Exec("DROP DATABASE testing") + require.NoError(t, err) + + _, err = db.Exec("CREATE DATABASE testing") + require.NoError(t, err) + + _, err = db.Exec("CREATE TABLE testing.gh_ost_test (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB") + require.NoError(t, err) + + _, err = db.Exec("CREATE TABLE testing.gh_ost_test2 (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB") + require.NoError(t, err) +} + +func TestStreamTransactionSingleAutoCommitChange(t *testing.T) { + db, err := sql.Open("mysql", "root:@/") + require.NoError(t, err) + defer db.Close() + + prepareDatabase(t, db) + + binlogCoordinates := getCurrentBinlogCoordinates(t, db) + + migrationContext := getMigrationContext() + migrationContext.DatabaseName = "testing" + migrationContext.OriginalTableName = "gh_ost_test" + migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" + + transactionsChan := make(chan *Transaction) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + reader := NewGoMySQLReader(migrationContext) + reader.ConnectBinlogStreamer(binlogCoordinates) + + err := reader.StreamTransactions(ctx, transactionsChan) + require.Equal(t, err, context.Canceled) + }() + + _, err = db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')") + require.NoError(t, err) + + tx := <-transactionsChan + + changes := make([]*BinlogEntry, 0) + for change := range tx.Changes { + changes = append(changes, change) + } + require.Len(t, changes, 1) + + cancel() + close(transactionsChan) + require.Len(t, transactionsChan, 0) +} + +func TestStreamTransactionSingleChangeInTransaction(t *testing.T) { + db, err := sql.Open("mysql", "root:@/") + require.NoError(t, err) + defer db.Close() + + prepareDatabase(t, db) + + binlogCoordinates := getCurrentBinlogCoordinates(t, db) + + migrationContext := getMigrationContext() + migrationContext.DatabaseName = "testing" + migrationContext.OriginalTableName = "gh_ost_test" + migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" + + transactionsChan := make(chan *Transaction) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + reader := NewGoMySQLReader(migrationContext) + reader.ConnectBinlogStreamer(binlogCoordinates) + + err := reader.StreamTransactions(ctx, transactionsChan) + require.Equal(t, err, context.Canceled) + }() + + sqlTx, err := db.Begin() + require.NoError(t, err) + + _, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')") + require.NoError(t, err) + + err = sqlTx.Commit() + require.NoError(t, err) + + tx := <-transactionsChan + + changes := make([]*BinlogEntry, 0) + for change := range tx.Changes { + changes = append(changes, change) + } + require.Len(t, changes, 1) + + cancel() + close(transactionsChan) + require.Len(t, transactionsChan, 0) +} + +func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) { + db, err := sql.Open("mysql", "root:@/") + require.NoError(t, err) + defer db.Close() + + prepareDatabase(t, db) + + binlogCoordinates := getCurrentBinlogCoordinates(t, db) + + migrationContext := getMigrationContext() + migrationContext.DatabaseName = "testing" + migrationContext.OriginalTableName = "gh_ost_test" + migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" + + transactionsChan := make(chan *Transaction) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + reader := NewGoMySQLReader(migrationContext) + reader.ConnectBinlogStreamer(binlogCoordinates) + + err := reader.StreamTransactions(ctx, transactionsChan) + require.Equal(t, err, context.Canceled) + }() + + sqlTx, err := db.Begin() + require.NoError(t, err) + + _, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test1')") + require.NoError(t, err) + + _, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test2')") + require.NoError(t, err) + + _, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test3')") + require.NoError(t, err) + + err = sqlTx.Commit() + require.NoError(t, err) + + tx := <-transactionsChan + require.NotNil(t, tx) + + changes := make([]*BinlogEntry, 0) + for change := range tx.Changes { + changes = append(changes, change) + } + require.Len(t, changes, 3) + + cancel() + close(transactionsChan) + require.Len(t, transactionsChan, 0) +} + +func TestStreamTransactionWithDDL(t *testing.T) { + db, err := sql.Open("mysql", "root:@/") + require.NoError(t, err) + defer db.Close() + + prepareDatabase(t, db) + + binlogCoordinates := getCurrentBinlogCoordinates(t, db) + + migrationContext := getMigrationContext() + migrationContext.DatabaseName = "testing" + migrationContext.OriginalTableName = "gh_ost_test" + migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" + + transactionsChan := make(chan *Transaction) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + reader := NewGoMySQLReader(migrationContext) + reader.ConnectBinlogStreamer(binlogCoordinates) + + err := reader.StreamTransactions(ctx, transactionsChan) + require.Equal(t, err, context.Canceled) + }() + + _, err = db.Exec("ALTER TABLE testing.gh_ost_test ADD COLUMN age INT") + require.NoError(t, err) + + tx := <-transactionsChan + + changes := make([]*BinlogEntry, 0) + for change := range tx.Changes { + changes = append(changes, change) + } + require.Len(t, changes, 0) + + cancel() + close(transactionsChan) + require.Len(t, transactionsChan, 0) +}