Skip to content

Commit

Permalink
parse gtid for parseHandler if MySQL works in GTID_MODE, and begin to…
Browse files Browse the repository at this point in the history
… startWithGTID after mysqldump is done (go-mysql-org#444)

* add gtid set of MySQL after mysqldump go-mysql-org#439
  • Loading branch information
jianhaiqing authored and siddontang committed Dec 7, 2019
1 parent 07ca784 commit 08dd4b5
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 3 deletions.
10 changes: 10 additions & 0 deletions canal/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
return nil
}

func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) {
if h.gset != nil {
err = h.gset.Update(gtidsets)
} else {
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
}
return err
}

func (h *dumpParseHandler) Data(db string, table string, values []string) error {
if err := h.c.ctx.Err(); err != nil {
return err
Expand Down Expand Up @@ -167,6 +176,7 @@ func (c *Canal) dump() error {

pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
c.master.Update(pos)
c.master.UpdateGTIDSet(h.gset)
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
gset := c.master.GTIDSet()
if gset == nil {
if gset == nil || gset.String() == "" {
pos := c.master.Position()
s, err := c.syncer.StartSync(pos)
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions dump/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/mysql"
)

// use docker mysql for test
Expand Down Expand Up @@ -113,16 +114,88 @@ func (s *schemaTestSuite) TestDump(c *C) {
}

type testParseHandler struct {
gset mysql.GTIDSet
}

func (h *testParseHandler) BinLog(name string, pos uint64) error {
return nil
}

func (h *testParseHandler) GtidSet(gtidsets string) (err error) {
if h.gset != nil {
err = h.gset.Update(gtidsets)
} else {
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
}
return err
}

func (h *testParseHandler) Data(schema string, table string, values []string) error {
return nil
}

type GtidParseTest struct {
gset mysql.GTIDSet
}

func (h *GtidParseTest) UpdateGtidSet(gtidStr string) (err error) {
if h.gset != nil {
err = h.gset.Update(gtidStr)
} else {
h.gset, err = mysql.ParseGTIDSet("mysql", gtidStr)
}
return err
}

func (s *parserTestSuite) TestParseGtidExp(c *C) {
// binlogExp := regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
// gtidExp := regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")
tbls := []struct {
input string
expected string
}{
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
2337be48-0456-11e9-bd1c-00505690543b:1-7,
41d816cd-0455-11e9-be42-005056901a22:1-2,
5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,
75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,
780ad602-0456-11e9-8bcd-005056901a22:1-516653148,
92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,
c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,
cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,
cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,
cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,
d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,
e7574090-b123-11e8-8bb4-005056a29643:1-12'
`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7,41d816cd-0455-11e9-be42-005056901a22:1-2,5f1eea9e-b1e5-11e8-bc77-005056a221ed:1-144609156,75848cdb-8131-11e7-b6fc-1c1b0de85e7b:1-151378598,780ad602-0456-11e9-8bcd-005056901a22:1-516653148,92809ddd-1e3c-11e9-9d04-00505690f6ab:1-11858565,c59598c7-0467-11e9-bbbe-005056901a22:1-226464969,cbd7809d-0433-11e9-b1cf-00505690543b:1-18233950,cca778e9-8cdf-11e8-94d0-005056a247b1:1-303899574,cf80679b-7695-11e8-8873-1c1b0d9a4ab9:1-12836047,d0951f24-1e21-11e9-bb2e-00505690b730:1-4758092,e7574090-b123-11e8-8bb4-005056a29643:1-12"},
{`SET @@GLOBAL.GTID_PURGED='071a84e8-b253-11e8-8472-005056a27e86:1-76,
2337be48-0456-11e9-bd1c-00505690543b:1-7';
`, "071a84e8-b253-11e8-8472-005056a27e86:1-76,2337be48-0456-11e9-bd1c-00505690543b:1-7"},
{`SET @@GLOBAL.GTID_PURGED='c0977f88-3104-11e9-81e1-00505690245b:1-274559';
`, "c0977f88-3104-11e9-81e1-00505690245b:1-274559"},
{`CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.008995', MASTER_LOG_POS=102052485;`, ""},
}

for _, tt := range tbls {
reader := strings.NewReader(tt.input)
var handler = new(testParseHandler)

Parse(reader, handler, true)

if tt.expected == "" {
if handler.gset != nil {
c.Assert(handler.gset, IsNil)
} else {
continue
}
}
expectedGtidset, err := mysql.ParseGTIDSet("mysql", tt.expected)
c.Assert(err, IsNil)
c.Assert(expectedGtidset.Equal(handler.gset), IsTrue)
}

}

func (s *parserTestSuite) TestParseFindTable(c *C) {
tbl := []struct {
sql string
Expand Down
16 changes: 15 additions & 1 deletion dump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ var (
type ParseHandler interface {
// Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos;
BinLog(name string, pos uint64) error

GtidSet(gtidsets string) error
Data(schema string, table string, values []string) error
}

var binlogExp *regexp.Regexp
var useExp *regexp.Regexp
var valuesExp *regexp.Regexp
var gtidExp *regexp.Regexp

func init() {
binlogExp = regexp.MustCompile("^CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\\d+);")
useExp = regexp.MustCompile("^USE `(.+)`;")
valuesExp = regexp.MustCompile("^INSERT INTO `(.+?)` VALUES \\((.+)\\);$")
// The pattern will only match MySQL GTID, as you know SET GLOBAL gtid_slave_pos='0-1-4' is used for MariaDB.
//SET @@GLOBAL.GTID_PURGED='1638041a-0457-11e9-bb9f-00505690b730:1-429405150';
gtidExp = regexp.MustCompile("(\\w{8}(-\\w{4}){3}-\\w{12}:\\d+-\\d+)")
}

// Parse the dump data with Dumper generate.
Expand All @@ -55,6 +59,16 @@ func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
})

if parseBinlogPos && !binlogParsed {
// parsed gtid set from mysqldump
// gtid comes before binlog file-positon
if m := gtidExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
gtidStr := m[0][1]
if gtidStr != "" {
if err := h.GtidSet(gtidStr); err != nil {
return errors.Trace(err)
}
}
}
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
name := m[0][1]
pos, err := strconv.ParseUint(m[0][2], 10, 64)
Expand Down
6 changes: 5 additions & 1 deletion mysql/mysql_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
"github.com/siddontang/go/hack"
)

Expand Down Expand Up @@ -398,6 +398,10 @@ func (s *MysqlGTIDSet) Equal(o GTIDSet) bool {
return false
}

if len(sub.Sets) != len(s.Sets) {
return false
}

for key, set := range sub.Sets {
o, ok := s.Sets[key]
if !ok {
Expand Down

0 comments on commit 08dd4b5

Please sign in to comment.