Skip to content

Commit

Permalink
Merge pull request #8102 from dolthub/fulghum/bugfix
Browse files Browse the repository at this point in the history
Bug fix: binlog heartbeat `nextLogPosition` field
  • Loading branch information
fulghum authored Jul 3, 2024
2 parents 79b64da + 3393dc2 commit a0f5626
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newBinlogStreamer() *binlogStreamer {
// is received over the stream (e.g. the connection closing) or the streamer is closed,
// through it's quit channel.
func (streamer *binlogStreamer) startStream(ctx *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error {
if err := sendInitialEvents(ctx, conn, binlogFormat, binlogEventMeta); err != nil {
if err := sendInitialEvents(ctx, conn, binlogFormat, &binlogEventMeta); err != nil {
return err
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEve

// sendInitialEvents sends the initial binlog events (i.e. Rotate, FormatDescription) over a newly established binlog
// streaming connection.
func sendInitialEvents(_ *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error {
func sendInitialEvents(_ *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error {
err := sendRotateEvent(conn, binlogFormat, binlogEventMeta)
if err != nil {
return err
Expand All @@ -199,16 +199,20 @@ func sendInitialEvents(_ *sql.Context, conn *mysql.Conn, binlogFormat *mysql.Bin
return conn.FlushBuffer()
}

func sendRotateEvent(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error {
func sendRotateEvent(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error {
binlogFilePosition := uint64(0)
binlogEventMeta.NextLogPosition = uint32(binlogFilePosition)

binlogEvent := mysql.NewRotateEvent(*binlogFormat, binlogEventMeta, binlogFilePosition, binlogFilename)
// The Rotate event sent at the start of a stream is a "virtual" event that isn't actually
// recorded to the binary log file, but sent to the replica so it knows what file is being
// read from. Because it is virtual, we do NOT update the nextLogPosition field of
// BinlogEventMetadata.
binlogEvent := mysql.NewRotateEvent(*binlogFormat, *binlogEventMeta, binlogFilePosition, binlogFilename)
return conn.WriteBinlogEvent(binlogEvent, false)
}

func sendFormatDescription(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error {
binlogEvent := mysql.NewFormatDescriptionEvent(*binlogFormat, binlogEventMeta)
func sendFormatDescription(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error {
binlogEvent := mysql.NewFormatDescriptionEvent(*binlogFormat, *binlogEventMeta)
binlogEventMeta.NextLogPosition += binlogEvent.Length()
return conn.WriteBinlogEvent(binlogEvent, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ func TestBinlogPrimary_Heartbeats(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars)
setupForDoltToMySqlReplication()
startReplication(t, doltPort)

primaryDatabase.MustExec("create table db01.heartbeatTest(pk int);")
// Start replication, with a 45s delay before any commands are sent to the primary.
// This gives enough time for the first heartbeat event to be sent, before any user
// initiated binlog events, so we can test that scenario.
startReplicationWithDelay(t, doltPort, 45*time.Second)

// Insert a row every second, for 70s, which gives the server a chance to send two heartbeats
primaryDatabase.MustExec("create table db01.heartbeatTest(pk int);")
endTime := time.Now().Add(70 * time.Second)
maxInsertValue := 0
for time.Now().Before(endTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,24 @@ func stopDoltSqlServer(t *testing.T) {
}
}

// startReplication starts up replication on the replica, connecting to |port| on the primary,
// creates the test database, db01, on the primary, and ensures it gets replicated to the replica.
func startReplication(t *testing.T, port int) {
startReplicationWithDelay(t, port, 100*time.Millisecond)
}

// startReplication starts up replication on the replica, connecting to |port| on the primary,
// pauses for |delay| before creating the test database, db01, on the primary, and ensures it
// gets replicated to the replica.
func startReplicationWithDelay(t *testing.T, port int, delay time.Duration) {
replicaDatabase.MustExec("SET @@GLOBAL.server_id=123;")
replicaDatabase.MustExec(
fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+
"SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+
"SOURCE_PORT=%v, SOURCE_AUTO_POSITION=1, SOURCE_CONNECT_RETRY=5;", port))

replicaDatabase.MustExec("start replica;")
time.Sleep(100 * time.Millisecond)
time.Sleep(delay)

// Look to see if the test database, db01, has been created yet. If not, create it and wait for it to
// replicate to the replica. Note that when re-starting replication in certain tests, we can't rely on
Expand Down

0 comments on commit a0f5626

Please sign in to comment.