Skip to content

Commit

Permalink
DM/mariadb: sync the gtid executed in slave during master-slave repli…
Browse files Browse the repository at this point in the history
…cation (#10753)

close #10741
  • Loading branch information
okJiang authored Mar 15, 2024
1 parent a589797 commit 720920a
Show file tree
Hide file tree
Showing 18 changed files with 462 additions and 45 deletions.
65 changes: 65 additions & 0 deletions .github/workflows/dm_mariadb_master_down_and_up.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Mariadb Master Down and Up

on:
push:
branches:
- test-*
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
workflow_dispatch:

jobs:
mariadb-master-down-and-up:
name: mariadb-master-down-and-up
runs-on: ubuntu-20.04

steps:
- name: Set up Go env
uses: actions/setup-go@v3
with:
go-version: '1.21'

- name: Check out code
uses: actions/checkout@v2

- name: Cache go modules
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-ticdc-${{ hashFiles('go.sum') }}

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
with:
path: tools/bin
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}

- name: Build DM binary
run: make dm_integration_test_build

- name: Setup containers
run: |
docker-compose -f ./dm/tests/mariadb_master_down_and_up/docker-compose.yml up -d
- name: Run test cases
run: |
bash ./dm/tests/mariadb_master_down_and_up/case.sh
- name: Copy logs to hack permission
if: ${{ always() }}
run: |
mkdir ./logs
sudo cp -r -L /tmp/dm_test/mariadb_master_down_and_up/master/log ./logs/master
sudo cp -r -L /tmp/dm_test/mariadb_master_down_and_up/worker1/log ./logs/worker1
sudo chown -R runner ./logs
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
continue-on-error: true
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: upstream-switch-logs
path: |
./logs
18 changes: 13 additions & 5 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID}
gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: {
mariaGTID.ServerID: mariaGTID,
},
}
clone = gtidSet
default:
err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
Expand Down Expand Up @@ -203,11 +207,15 @@ func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
if len(mariaGTIDs.Sets) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(mariaGTIDs.Sets), gSet)
}
gtidCount := 0
var mariaGTID *gmysql.MariadbGTID
for _, mariaGTID = range mariaGTIDs.Sets {
for _, set := range mariaGTIDs.Sets {
gtidCount += len(set)
for _, mariaGTID = range set {
}
}
if gtidCount != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(gtidCount, gSet)
}
return mariaGTID, nil
default:
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestGenCommonFileHeader(t *testing.T) {

// MariaDB
flavor = gmysql.MariaDBFlavor
gSetStr = "1-2-12,2-2-3,3-3-8,4-4-4"
gSetStr = "1-2-12,2-2-3,3-3-8,3-4-9"

gSet, err = gtid.ParserGTID(flavor, gSetStr)
require.Nil(t, err)
Expand Down
37 changes: 21 additions & 16 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,27 +730,32 @@ func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32,
payload := new(bytes.Buffer)

// Number of GTIDs, 4 bytes
numOfGTIDs := uint32(len(mariaDBGSet.Sets))
numOfGTIDs := uint32(0)
for _, set := range mariaDBGSet.Sets {
numOfGTIDs += uint32(len(set))
}
err := binary.Write(payload, binary.LittleEndian, numOfGTIDs)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Number of GTIDs %d", numOfGTIDs)
}

for _, mGTID := range mariaDBGSet.Sets {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
for _, set := range mariaDBGSet.Sets {
for _, mGTID := range set {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func TestGenRowsEvent(t *testing.T) {
require.Equal(t, tableID, rowsEvBody.TableID)
require.Equal(t, uint64(len(rows[0])), rowsEvBody.ColumnCount)
require.Equal(t, 0, rowsEvBody.Version) // WRITE_ROWS_EVENTv0
require.Nil(t, rowsEvBody.ExtraData)
require.Equal(t, rows, rowsEvBody.Rows)

// multi rows, with different length, invalid
Expand Down Expand Up @@ -664,10 +663,10 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.True(t, ok)
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 1)
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID], gtidListEvBody.GTIDs[0])
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID][gtidListEvBody.GTIDs[0].ServerID], gtidListEvBody.GTIDs[0])

// valid gSet with multi GTIDs
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,3-4-4")
require.Nil(t, err)
require.NotNil(t, gSet)
mGSet, ok = gSet.(*gmysql.MariadbGTIDSet)
Expand All @@ -684,7 +683,9 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 4)
for _, mGTID := range gtidListEvBody.GTIDs {
mGTID2, ok := mGSet.Sets[mGTID.DomainID]
set, ok := mGSet.Sets[mGTID.DomainID]
require.True(t, ok)
mGTID2, ok := set[mGTID.ServerID]
require.True(t, ok)
require.Equal(t, *mGTID2, mGTID)
}
Expand Down
6 changes: 5 additions & 1 deletion dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
prevGTID, ok := prevGSet.Sets[mariaGTID.DomainID]
set, ok := prevGSet.Sets[mariaGTID.DomainID]
if !ok {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
prevGTID, ok := set[mariaGTID.ServerID]
if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
Expand Down
16 changes: 1 addition & 15 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,7 @@ func (r *BinlogReader) parseFile(
Pos: uint32(ev.Position),
}
r.tctx.L().Info("rotate binlog", zap.Stringer("position", currentPos))
case *replication.GTIDEvent:
if r.prevGset == nil {
state.latestPos = int64(e.Header.LogPos)
break
}
gtidStr, err2 := event.GetGTIDStr(e)
if err2 != nil {
return errors.Trace(err2)
}
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
state.latestPos = int64(e.Header.LogPos)
case *replication.MariadbGTIDEvent:
case *replication.GTIDEvent, *replication.MariadbGTIDEvent:
if r.prevGset == nil {
state.latestPos = int64(e.Header.LogPos)
break
Expand Down
150 changes: 150 additions & 0 deletions dm/tests/mariadb_master_down_and_up/case.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/bin/bash

set -exu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
PATH=$CUR/../_utils:$PATH # for sync_diff_inspector

source $CUR/lib.sh

function clean_data() {
echo "-------clean_data--------"

exec_sql $slave_port "stop slave;"
exec_sql $slave_port "reset master;"

exec_sql $master_port "drop database if exists db1;"
exec_sql $master_port "drop database if exists db2;"
exec_sql $master_port "drop database if exists ${db};"
exec_sql $slave_port "drop database if exists db1;"
exec_sql $slave_port "drop database if exists db2;"
exec_sql $slave_port "drop database if exists ${db};"
exec_sql $slave_port "reset master;"
exec_tidb $tidb_port "drop database if exists db1;"
exec_tidb $tidb_port "drop database if exists db2;"
exec_tidb $tidb_port "drop database if exists ${db};"
rm -rf /tmp/dm_test
}

function cleanup_process() {
echo "-------cleanup_process--------"
pkill -hup dm-worker.test 2>/dev/null || true
pkill -hup dm-master.test 2>/dev/null || true
pkill -hup dm-syncer.test 2>/dev/null || true
}

function setup_replica() {
echo "-------setup_replica--------"

master_status=($(get_master_status))
master_gtid=$(exec_sql $master_port "select binlog_gtid_pos('${master_status[0]}', ${master_status[1]})" | awk 'NR==2')
exec_sql $slave_port "set global gtid_slave_pos = '$master_gtid';"

# master --> slave
change_master_to_gtid $slave_port $master_port
}

function run_dm_components_and_create_source() {
echo "-------run_dm_components--------"

pkill -9 dm-master || true
pkill -9 dm-worker || true

run_dm_master $WORK_DIR/master $MASTER_PORT $CUR/conf/dm-master.toml
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"alive" 1

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CUR/conf/dm-worker1.toml
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"free" 1
if [ "$1" = "relay" ]; then
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $CUR/conf/source1_relay.yaml" \
"\"result\": true" 2
else
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $CUR/conf/source1.yaml" \
"\"result\": true" 2
fi

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"alive" 1 \
"bound" 1
}

function gen_full_data() {
echo "-------gen_full_data--------"

exec_sql $master_port "create database ${db} collate latin1_bin;"
exec_sql $master_port "create table ${db}.${tb}(id int primary key, a int);"
for i in $(seq 1 100); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done
}

function start_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $CUR/conf/task-pessimistic.yaml --remove-meta" \
"\"result\": true" 2
}

function verify_result() {
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}

function clean_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task task_pessimistic" \
"\"result\": true" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source stop mysql-replica-01" \
"\"result\": true" 2
}

function test_master_down_and_up() {
cleanup_process
clean_data
install_sync_diff
setup_replica
gen_full_data
run_dm_components_and_create_source $1
start_task
verify_result
echo "-------start test--------"

for i in $(seq 201 250); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done
verify_result

# make master down
docker-compose -f $CUR/docker-compose.yml pause mariadb_master
# execute sqls in slave
for i in $(seq 401 450); do
exec_sql $slave_port "insert into ${db}.${tb} values($i,$i);"
done
verify_result

# make master up
docker-compose -f $CUR/docker-compose.yml unpause mariadb_master
for i in $(seq 501 550); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done

verify_result

clean_task
echo "CASE=test_master_down_and_up $1 success"
}

function run() {
wait_mysql 3306 1
wait_mysql 3307 2
test_master_down_and_up no_relay
test_master_down_and_up relay
}

run
Loading

0 comments on commit 720920a

Please sign in to comment.