Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Joos <[email protected]>
  • Loading branch information
arthurschreiber and danieljoos committed Oct 8, 2024
1 parent b031166 commit f81a790
Show file tree
Hide file tree
Showing 10 changed files with 724 additions and 230 deletions.
8 changes: 5 additions & 3 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
return nil
}

// rowsEventToBinlogEntry processes MySQL RowsEvent into our BinlogEntry for later application.
// RowsEventToBinlogEntry processes MySQL RowsEvent into our BinlogEntry for later application.
// copied from handleRowEvents
func rowsEventToBinlogEntry(eventType replication.EventType, rowsEvent *replication.RowsEvent, binlogCoords mysql.BinlogCoordinates) (*BinlogEntry, error) {
func RowsEventToBinlogEntry(eventType replication.EventType, rowsEvent *replication.RowsEvent, binlogCoords mysql.BinlogCoordinates) (*BinlogEntry, error) {
dml := ToEventDML(eventType.String())
if dml == NotDML {
return nil, fmt.Errorf("Unknown DML type: %s", eventType.String())
Expand Down Expand Up @@ -289,12 +289,14 @@ groups:
case *replication.TableMapEvent:
this.migrationContext.Log.Infof("TableMapEvent for %s.%s: %+v", binlogEvent.Schema, binlogEvent.Table, binlogEvent)
case *replication.RowsEvent:
binlogEntry, err := rowsEventToBinlogEntry(ev.Header.EventType, binlogEvent, this.currentCoordinates)
binlogEntry, err := RowsEventToBinlogEntry(ev.Header.EventType, binlogEvent, this.currentCoordinates)
if err != nil {
close(group.Changes)
return err
}
this.migrationContext.Log.Infof("RowsEvent: %v", binlogEvent)
group.Changes <- binlogEntry
this.migrationContext.Log.Infof("Length of group.Changes: %d", len(group.Changes))
case *replication.XIDEvent:
this.migrationContext.Log.Infof("XIDEvent: %+v", binlogEvent)
this.migrationContext.Log.Infof("COMMIT for transaction")
Expand Down
2 changes: 1 addition & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (this *Applier) InitDBConnections() (err error) {
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
this.singletonDB.SetMaxOpenConns(16)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f81a790

Please sign in to comment.