Skip to content

Commit

Permalink
Merge pull request #176 from Shopify/target-verification
Browse files Browse the repository at this point in the history
Add Target verification against data corruption
  • Loading branch information
fjordan authored Apr 29, 2020
2 parents ee5d539 + 910c87e commit 50a7760
Show file tree
Hide file tree
Showing 25 changed files with 488 additions and 69 deletions.
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ source "https://rubygems.org"
gem "minitest"
gem "minitest-hooks"
gem "mysql2"
gem "byebug"
gem "pry-byebug"

gem "rake"
12 changes: 10 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
GEM
remote: https://rubygems.org/
specs:
byebug (11.1.1)
byebug (11.1.3)
coderay (1.1.2)
method_source (1.0.0)
minitest (5.11.3)
minitest-hooks (1.5.0)
minitest (> 5.3)
mysql2 (0.5.2)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-byebug (3.9.0)
byebug (~> 11.0)
pry (~> 0.13.0)
rake (12.3.2)

PLATFORMS
ruby

DEPENDENCIES
byebug
minitest
minitest-hooks
mysql2
pry-byebug
rake

BUNDLED WITH
Expand Down
2 changes: 2 additions & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio
s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"position": s.lastStreamedBinlogPosition.Pos,
"host": s.DBConfig.Host,
"port": s.DBConfig.Port,
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
Expand Down
2 changes: 1 addition & 1 deletion binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
}

if b.StateTracker != nil {
b.StateTracker.UpdateLastResumableBinlogPosition(events[len(events)-1].ResumableBinlogPosition())
b.StateTracker.UpdateLastResumableSourceBinlogPosition(events[len(events)-1].ResumableBinlogPosition())
}

return nil
Expand Down
19 changes: 19 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type DatabaseConfig struct {
// SQL annotations used to differentiate Ghostferry's DMLs
// against other actor's. This will default to the defaultMarginalia
// constant above if not set.
//
// This is used to ensure any changes to the Target during the move process
// are performed only by Ghostferry (until cutover). Otherwise, the modification
// will be identified as data corruption and fail the move.
Marginalia string
}

Expand Down Expand Up @@ -490,6 +494,21 @@ type Config struct {
// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified.
// 3. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table.
CascadingPaginationColumnConfig *CascadingPaginationColumnConfig

// SkipTargetVerification is used to enable or disable target verification during moves.
// This feature is currently only available while using the InlineVerifier.
//
// This does so by inspecting the annotations (configured as Marginalia in the DatabaseConfig above)
// and will fail the move unless all applicable DMLs (as identified by the sharding key) sent to the
// Target were sent from Ghostferry.
//
// NOTE:
// The Target database must be configured with binlog_rows_query_log_events
// set to "ON" for this to function properly. Ghostferry not allow the move
// process to begin if this is enabled and the above option is set to "OFF".
//
// Required: defaults to false
SkipTargetVerification bool
}

func (c *Config) ValidateConfig() error {
Expand Down
2 changes: 2 additions & 0 deletions copydb/copydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (this *CopydbFerry) Run() {
// should be identical.
copyWG.Wait()

this.Ferry.StopTargetVerifier()

// This is where you cutover from using the source database to
// using the target database.
logrus.Info("ghostferry main operations has terminated but the control server remains online")
Expand Down
24 changes: 11 additions & 13 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/siddontang/go-mysql/schema"
)

var annotationRegex = regexp.MustCompile(`/\*(.*?)\*/`)
var annotationRegex = regexp.MustCompile(`^/\*(.*?)\*/`)

type RowData []interface{}

Expand Down Expand Up @@ -49,7 +49,7 @@ type DMLEvent interface {
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
ResumableBinlogPosition() mysql.Position
Annotations() ([]string, error)
Annotation() (string, error)
}

// The base of DMLEvent to provide the necessary methods.
Expand Down Expand Up @@ -80,21 +80,19 @@ func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position {
return e.resumablePos
}

// Annotations will return all comments prefixed to the SQL string
func (e *DMLEventBase) Annotations() ([]string, error) {
// Annotation will return the first prefixed comment on the SQL string,
// or an error if the query attribute of the DMLEvent is not set
func (e *DMLEventBase) Annotation() (string, error) {
if e.query == nil {
return nil, errors.New("could not get query from DML event")
return "", errors.New("could not get query from DML event")
}

captured := annotationRegex.FindAllStringSubmatch(string(e.query), -1)

var matches []string
for _, match := range captured {
if len(match) > 1 {
matches = append(matches, match[1])
}
captured := annotationRegex.FindStringSubmatch(string(e.query))
if len(captured) > 1 {
return captured[1], nil
}
return matches, nil

return "", nil
}

func NewDMLEventBase(table *TableSchema, pos, resumablePos mysql.Position, query []byte) *DMLEventBase {
Expand Down
82 changes: 69 additions & 13 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

var (
zeroPosition siddontangmysql.Position
VersionString string = "?.?.?+??????????????+???????"
WebUiBasedir string = ""
)
Expand Down Expand Up @@ -60,6 +61,9 @@ type Ferry struct {
BinlogStreamer *BinlogStreamer
BinlogWriter *BinlogWriter

targetVerifierWg *sync.WaitGroup
TargetVerifier *TargetVerifier

DataIterator *DataIterator
BatchWriter *BatchWriter

Expand Down Expand Up @@ -121,12 +125,12 @@ func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator {
return dataIterator
}

func (f *Ferry) NewBinlogStreamer() *BinlogStreamer {
func (f *Ferry) NewBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig) *BinlogStreamer {
f.ensureInitialized()

return &BinlogStreamer{
DB: f.SourceDB,
DBConfig: f.Source,
DB: db,
DBConfig: dbConf,
MyServerId: f.Config.MyServerId,
ErrorHandler: f.ErrorHandler,
Filter: f.CopyFilter,
Expand Down Expand Up @@ -362,7 +366,6 @@ func (f *Ferry) Initialize() (err error) {
return err
}

var zeroPosition siddontangmysql.Position
// Ensures the query to check for position is executable.
_, err = f.WaitUntilReplicaIsCaughtUpToMaster.IsCaughtUp(zeroPosition, 1)
if err != nil {
Expand Down Expand Up @@ -435,7 +438,17 @@ func (f *Ferry) Initialize() (err error) {

// The iterative verifier needs the binlog streamer so this has to be first.
// Eventually this can be moved below the verifier initialization.
f.BinlogStreamer = f.NewBinlogStreamer()
f.BinlogStreamer = f.NewBinlogStreamer(f.SourceDB, f.Config.Source)

if !f.Config.SkipTargetVerification {
targetVerifier, err := NewTargetVerifier(f.TargetDB, f.StateTracker, f.NewBinlogStreamer(f.TargetDB, f.Config.Target))
if err != nil {
return err
}

f.TargetVerifier = targetVerifier
}

f.BinlogWriter = f.NewBinlogWriter()
f.DataIterator = f.NewDataIterator()
f.BatchWriter = f.NewBatchWriter()
Expand Down Expand Up @@ -495,12 +508,25 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
var pos siddontangmysql.Position
var sourcePos siddontangmysql.Position
var targetPos siddontangmysql.Position

var err error
if f.StateToResumeFrom != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
sourcePos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinSourceBinlogPosition())
} else {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
sourcePos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
}
if err != nil {
return err
}

if !f.Config.SkipTargetVerification {
if f.StateToResumeFrom != nil && f.StateToResumeFrom.LastStoredBinlogPositionForTargetVerifier != zeroPosition {
targetPos, err = f.TargetVerifier.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastStoredBinlogPositionForTargetVerifier)
} else {
targetPos, err = f.TargetVerifier.BinlogStreamer.ConnectBinlogStreamerToMysql()
}
}
if err != nil {
return err
Expand All @@ -509,9 +535,13 @@ func (f *Ferry) Start() error {
// If we don't set this now, there is a race condition where Ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
f.StateTracker.UpdateLastResumableBinlogPosition(pos)
f.StateTracker.UpdateLastResumableSourceBinlogPosition(sourcePos)
if f.inlineVerifier != nil {
f.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(pos)
f.StateTracker.UpdateLastResumableSourceBinlogPositionForInlineVerifier(sourcePos)
}

if !f.Config.SkipTargetVerification {
f.StateTracker.UpdateLastResumableBinlogPositionForTargetVerifier(targetPos)
}

return nil
Expand Down Expand Up @@ -591,23 +621,31 @@ func (f *Ferry) Run() {
}

binlogWg := &sync.WaitGroup{}
binlogWg.Add(2)

binlogWg.Add(1)
go func() {
defer binlogWg.Done()
f.BinlogWriter.Run()
}()

binlogWg.Add(1)
go func() {
defer binlogWg.Done()

f.BinlogStreamer.Run()
f.BinlogWriter.Stop()
}()

if !f.Config.SkipTargetVerification {
f.targetVerifierWg = &sync.WaitGroup{}
f.targetVerifierWg.Add(1)
go func() {
defer f.targetVerifierWg.Done()
f.TargetVerifier.BinlogStreamer.Run()
}()
}

dataIteratorWg := &sync.WaitGroup{}
dataIteratorWg.Add(1)

go func() {
defer dataIteratorWg.Done()
f.DataIterator.Run(f.Tables.AsSlice())
Expand Down Expand Up @@ -732,6 +770,13 @@ func (f *Ferry) FlushBinlogAndStopStreaming() {
f.BinlogStreamer.FlushAndStop()
}

func (f *Ferry) StopTargetVerifier() {
if !f.Config.SkipTargetVerification {
f.TargetVerifier.BinlogStreamer.FlushAndStop()
f.targetVerifierWg.Wait()
}
}

func (f *Ferry) SerializeStateToJSON() (string, error) {
if f.StateTracker == nil {
err := errors.New("no valid StateTracker")
Expand Down Expand Up @@ -892,5 +937,16 @@ func (f *Ferry) checkConnectionForBinlogFormat(db *sql.DB) error {
}
}

if !f.Config.SkipTargetVerification {
row = db.QueryRow("SHOW VARIABLES LIKE 'binlog_rows_query_log_events'")
err = row.Scan(&name, &value)
if err != nil {
return err
}
if strings.ToUpper(value) != "ON" {
return fmt.Errorf("binlog_rows_query_log_events must be ON, not %s", value)
}
}

return nil
}
4 changes: 2 additions & 2 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func (v *InlineVerifier) VerifyBeforeCutover() error {

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

mismatchFound, mismatches, err := v.verifyAllEventsInStore()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
Expand Down Expand Up @@ -569,8 +570,7 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error {
}

if v.StateTracker != nil {
ev := evs[len(evs)-1]
v.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(ev.ResumableBinlogPosition())
v.StateTracker.UpdateLastResumableSourceBinlogPositionForInlineVerifier(evs[len(evs)-1].ResumableBinlogPosition())
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func (r *ShardingFerry) Run() {

r.Ferry.Throttler.SetDisabled(false)

r.Ferry.StopTargetVerifier()

metrics.Measure("CutoverUnlock", nil, 1.0, func() {
err = r.config.CutoverUnlock.Post(&client)
})
Expand Down
Loading

0 comments on commit 50a7760

Please sign in to comment.