Skip to content

Commit

Permalink
Send off transaction events as soon as we see a GTID event.
Browse files Browse the repository at this point in the history
Multiple `TableMap` events can happen inside of a single transaction, so we can't really use them to filter out events early or anything like that. Instead, we'll just skip them.

Also adds some basic test cases for the transaction streaming.
  • Loading branch information
arthurschreiber committed Oct 7, 2024
1 parent c6c877e commit b031166
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 32 deletions.
39 changes: 7 additions & 32 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ func rowsEventToBinlogEntry(eventType replication.EventType, rowsEvent *replicat
}

type Transaction struct {
DatabaseName string
TableName string
SequenceNumber int64
LastCommitted int64
Changes chan *BinlogEntry
Expand Down Expand Up @@ -231,13 +229,13 @@ groups:
SequenceNumber: binlogEvent.SequenceNumber,
LastCommitted: binlogEvent.LastCommitted,
Changes: make(chan *BinlogEntry, 1000),
// Table and Schema aren't known until the following TableMapEvent
DatabaseName: "",
TableName: "",
}

previousSequenceNumber = binlogEvent.SequenceNumber

// We are good to send the transaction, the transaction events arrive async
this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted)
transactionsChannel <- group
default:
this.migrationContext.Log.Infof("Ignoring Event: %+v", ev.Event)
continue
Expand All @@ -260,6 +258,8 @@ groups:
this.migrationContext.Log.Infof("QueryEvent: %+v", binlogEvent)
this.migrationContext.Log.Infof("Query: %s", binlogEvent.Query)

close(group.Changes)

// wait for the next event group
continue groups
}
Expand All @@ -275,33 +275,6 @@ groups:

// Next event should be a table map event

ev, err = this.binlogStreamer.GetEvent(ctx)
if err != nil {
close(group.Changes)
return err
}
this.migrationContext.Log.Infof("2 - Event: %s", ev.Header.EventType)

switch binlogEvent := ev.Event.(type) {
case *replication.TableMapEvent:
// TODO: Can we be smart here and short circuit processing groups for tables that don't match the table in the migration context?

this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted)

group.TableName = string(binlogEvent.Table)
group.DatabaseName = string(binlogEvent.Schema)
// we are good to send the transaction, the transaction events arrive async
transactionsChannel <- group
default:
this.migrationContext.Log.Infof("unexpected Event: %+v", ev.Event)

close(group.Changes)

// TODO: handle the group - we want to make sure we process the group's LastCommitted and SequenceNumber

continue groups
}

events:
// Now we can start processing the group
for {
Expand All @@ -313,6 +286,8 @@ groups:
this.migrationContext.Log.Infof("3 - Event: %s", ev.Header.EventType)

switch binlogEvent := ev.Event.(type) {
case *replication.TableMapEvent:
this.migrationContext.Log.Infof("TableMapEvent for %s.%s: %+v", binlogEvent.Schema, binlogEvent.Table, binlogEvent)
case *replication.RowsEvent:
binlogEntry, err := rowsEventToBinlogEntry(ev.Header.EventType, binlogEvent, this.currentCoordinates)
if err != nil {
Expand Down
243 changes: 243 additions & 0 deletions go/binlog/gomysql_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package binlog

import (
"database/sql"
"testing"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

func getCurrentBinlogCoordinates(t *testing.T, db *sql.DB) mysql.BinlogCoordinates {
var file string
var position int64
var binlogDoDb string
var binlogIgnoreDb string
var executedGtidSet string

err := db.QueryRow("SHOW MASTER STATUS").Scan(&file, &position, &binlogDoDb, &binlogIgnoreDb, &executedGtidSet)
require.NoError(t, err)

return mysql.BinlogCoordinates{LogFile: file, LogPos: position}
}

func getMigrationContext() *base.MigrationContext {
migrationContext := base.NewMigrationContext()
migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{
Key: mysql.InstanceKey{
Hostname: "localhost",
Port: 3306,
},
User: "root",
Password: "",
}
migrationContext.SetConnectionConfig("innodb")
migrationContext.ReplicaServerId = 99999
return migrationContext
}

func prepareDatabase(t *testing.T, db *sql.DB) {
_, err := db.Exec("DROP DATABASE testing")
require.NoError(t, err)

_, err = db.Exec("CREATE DATABASE testing")
require.NoError(t, err)

_, err = db.Exec("CREATE TABLE testing.gh_ost_test (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
require.NoError(t, err)

_, err = db.Exec("CREATE TABLE testing.gh_ost_test2 (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
require.NoError(t, err)
}

func TestStreamTransactionSingleAutoCommitChange(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
require.NoError(t, err)
defer db.Close()

prepareDatabase(t, db)

binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

transactionsChan := make(chan *Transaction)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
reader := NewGoMySQLReader(migrationContext)
reader.ConnectBinlogStreamer(binlogCoordinates)

err := reader.StreamTransactions(ctx, transactionsChan)
require.Equal(t, err, context.Canceled)
}()

_, err = db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)

tx := <-transactionsChan

changes := make([]*BinlogEntry, 0)
for change := range tx.Changes {
changes = append(changes, change)
}
require.Len(t, changes, 1)

cancel()
close(transactionsChan)
require.Len(t, transactionsChan, 0)
}

func TestStreamTransactionSingleChangeInTransaction(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
require.NoError(t, err)
defer db.Close()

prepareDatabase(t, db)

binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

transactionsChan := make(chan *Transaction)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
reader := NewGoMySQLReader(migrationContext)
reader.ConnectBinlogStreamer(binlogCoordinates)

err := reader.StreamTransactions(ctx, transactionsChan)
require.Equal(t, err, context.Canceled)
}()

sqlTx, err := db.Begin()
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)

err = sqlTx.Commit()
require.NoError(t, err)

tx := <-transactionsChan

changes := make([]*BinlogEntry, 0)
for change := range tx.Changes {
changes = append(changes, change)
}
require.Len(t, changes, 1)

cancel()
close(transactionsChan)
require.Len(t, transactionsChan, 0)
}

func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
require.NoError(t, err)
defer db.Close()

prepareDatabase(t, db)

binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

transactionsChan := make(chan *Transaction)

ctx, cancel := context.WithCancel(context.Background())

go func() {
reader := NewGoMySQLReader(migrationContext)
reader.ConnectBinlogStreamer(binlogCoordinates)

err := reader.StreamTransactions(ctx, transactionsChan)
require.Equal(t, err, context.Canceled)
}()

sqlTx, err := db.Begin()
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test1')")
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test2')")
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test3')")
require.NoError(t, err)

err = sqlTx.Commit()
require.NoError(t, err)

tx := <-transactionsChan
require.NotNil(t, tx)

changes := make([]*BinlogEntry, 0)
for change := range tx.Changes {
changes = append(changes, change)
}
require.Len(t, changes, 3)

cancel()
close(transactionsChan)
require.Len(t, transactionsChan, 0)
}

func TestStreamTransactionWithDDL(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
require.NoError(t, err)
defer db.Close()

prepareDatabase(t, db)

binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

transactionsChan := make(chan *Transaction)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
reader := NewGoMySQLReader(migrationContext)
reader.ConnectBinlogStreamer(binlogCoordinates)

err := reader.StreamTransactions(ctx, transactionsChan)
require.Equal(t, err, context.Canceled)
}()

_, err = db.Exec("ALTER TABLE testing.gh_ost_test ADD COLUMN age INT")
require.NoError(t, err)

tx := <-transactionsChan

changes := make([]*BinlogEntry, 0)
for change := range tx.Changes {
changes = append(changes, change)
}
require.Len(t, changes, 0)

cancel()
close(transactionsChan)
require.Len(t, transactionsChan, 0)
}

0 comments on commit b031166

Please sign in to comment.