From ceaba06ca5d482775048e8c192e64d1cd936868d Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Sun, 3 May 2020 15:13:17 -0700 Subject: [PATCH] Introduce "Lock Strategy" option With this commit, we allow using an in-application lock in ghostferry, instead of using the source DB as lock. The lock is required to avoid race conditions between the data iteration/copy and the binlog writer. The default behavior is preserved; a new option "LockStrategy" allows moving the lock from the source DB into ghostferry, or disabling the lock altogether. This fixes #169 Change-Id: I20f1d2a189078a3877f831c7a98e8ca956620cc7 --- binlog_writer.go | 17 ++++++++++++ config.go | 25 ++++++++++++++++++ cursor.go | 24 +++++++++++++++-- data_iterator.go | 33 +++++++++++++++++++++++- examples/copydb/run-on-replica.conf.json | 1 + ferry.go | 2 ++ iterative_verifier.go | 2 +- state_tracker.go | 22 ++++++++++++++++ test/helpers/ghostferry_helper.rb | 4 +++ test/integration/trivial_test.rb | 9 +++++++ test/lib/go/integrationferry.go | 4 +++ 11 files changed, 139 insertions(+), 4 deletions(-) diff --git a/binlog_writer.go b/binlog_writer.go index 8743993e..1da4a64b 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -4,6 +4,7 @@ import ( "fmt" sql "github.com/Shopify/ghostferry/sqlwrapper" + "sync" "github.com/sirupsen/logrus" ) @@ -16,6 +17,7 @@ type BinlogWriter struct { BatchSize int WriteRetries int + LockStrategy string ErrorHandler ErrorHandler StateTracker *StateTracker @@ -79,6 +81,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error { WaitForThrottle(b.Throttler) queryBuffer := []byte(sql.AnnotateStmt("BEGIN;\n", b.DB.Marginalia)) + locksToObtain := make(map[string]*sync.RWMutex) for _, ev := range events { eventDatabaseName := ev.Database() @@ -98,6 +101,13 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error { queryBuffer = append(queryBuffer, sql.AnnotateStmt(sqlStmt, b.DB.Marginalia)...) queryBuffer = append(queryBuffer, ";\n"...) + + if b.LockStrategy == LockStrategyInGhostferry { + fullTableName := ev.TableSchema().Table.String() + if _, found := locksToObtain[fullTableName]; !found { + locksToObtain[fullTableName] = b.StateTracker.GetTableLock(fullTableName) + } + } } queryBuffer = append(queryBuffer, "COMMIT"...) @@ -105,6 +115,13 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error { startEv := events[0] endEv := events[len(events)-1] query := string(queryBuffer) + + for _, lock := range locksToObtain { + if lock != nil { + lock.Lock() + defer lock.Unlock() + } + } _, err := b.DB.Exec(query) if err != nil { return fmt.Errorf("exec query at pos %v -> %v (%d bytes): %v", startEv.BinlogPosition(), endEv.BinlogPosition(), len(query), err) diff --git a/config.go b/config.go index 69aaf166..44252019 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,10 @@ const ( VerifierTypeNoVerification = "NoVerification" DefaultMarginalia = "application:ghostferry" + + LockStrategySourceDB = "LockOnSourceDB" + LockStrategyInGhostferry = "LockInGhostferry" + LockStrategyNone = "None" ) type TLSConfig struct { @@ -412,6 +416,21 @@ type Config struct { // Optional: defaults to false AutomaticCutover bool + // This specifies how to prevent races between the data copy and binlog + // streaming. Possible values are: + // - LockOnSourceDB: obtain a table lock on the source table while copying + // data, which will prevent any type of data modification on the source + // DB; this is the strictest method but may intervene with the + // application trying to insert data, + // - LockInGhostferry: obtain a lock in ghostferry, preventing updates to + // the target DB while copying data; this should be sufficient in most + // scenarios, and + // - None: do not perform locking, assume the application does not update + // or delete data in a way that races may occur. + // + // Optional: defaults to "LockOnSourceDB" + LockStrategy string + // This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM // by dumping the current state to stdout and the error HTTP callback. // The dumped state can be used to resume Ghostferry. @@ -538,6 +557,12 @@ func (c *Config) ValidateConfig() error { } } + if c.LockStrategy == "" { + c.LockStrategy = LockStrategySourceDB + } else if c.LockStrategy != LockStrategySourceDB && c.LockStrategy != LockStrategyInGhostferry && c.LockStrategy != LockStrategyNone { + return fmt.Errorf("Invalid LockStrategy specified (set to %s)", c.LockStrategy) + } + if c.DBWriteRetries == 0 { c.DBWriteRetries = 5 } diff --git a/cursor.go b/cursor.go index 51709504..6cdf0a04 100644 --- a/cursor.go +++ b/cursor.go @@ -5,6 +5,7 @@ import ( "fmt" sql "github.com/Shopify/ghostferry/sqlwrapper" "strings" + "sync" "github.com/Masterminds/squirrel" "github.com/siddontang/go-mysql/schema" @@ -18,9 +19,24 @@ type SqlPreparer interface { type SqlDBWithFakeRollback struct { *sql.DB + lock *sync.RWMutex +} + +func NewSqlDBWithFakeRollback(db *sql.DB, lock *sync.RWMutex) *SqlDBWithFakeRollback { + tx := &SqlDBWithFakeRollback{ + DB: db, + lock: lock, + } + if lock != nil { + lock.Lock() + } + return tx } func (d *SqlDBWithFakeRollback) Rollback() error { + if d.lock != nil { + d.lock.Unlock() + } return nil } @@ -53,9 +69,12 @@ func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPagi } // returns a new Cursor with an embedded copy of itself -func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor { +func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64, tableLock *sync.RWMutex) *Cursor { cursor := c.NewCursor(table, startPaginationKey, maxPaginationKey) cursor.RowLock = false + // NOTE: We only allow internal table locking, if row-locking is disabled + // to avoid a potential deadlock + cursor.tableLock = tableLock return cursor } @@ -68,6 +87,7 @@ type Cursor struct { paginationKeyColumn *schema.TableColumn lastSuccessfulPaginationKey uint64 + tableLock *sync.RWMutex logger *logrus.Entry } @@ -101,7 +121,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error { return err } } else { - tx = &SqlDBWithFakeRollback{c.DB} + tx = NewSqlDBWithFakeRollback(c.DB, c.tableLock) } batch, paginationKeypos, err = c.Fetch(tx) diff --git a/data_iterator.go b/data_iterator.go index ffead901..7c9678f1 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -17,6 +17,7 @@ type DataIterator struct { ErrorHandler ErrorHandler CursorConfig *CursorConfig StateTracker *StateTracker + LockStrategy string targetPaginationKeys *sync.Map batchListeners []func(*RowBatch) error @@ -88,7 +89,37 @@ func (d *DataIterator) Run(tables []*TableSchema) { return } - cursor := d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64)) + // NOTE: Using a lock to synchronize data iteration and binlog writing is + // necessary. It is possible that we read data on the source while the + // binlog receives an update to the same data. + // + // Example event sequence: + // 1) application writes table row version "v1" to the source + // 2) data iterator reads v1 + // 3) application updates row v1 to become v2 + // 4) binlog reader receives UPDATE command v1 -> v2 + // 5) binlog writer executes UPDATE v1 -> v2: this is a NOP due to how the + // writer formats UPDATE statements (v1 does not exist in the target, so + // the UPDATE has no rows to operate on) + // 6) batch writer inserts v1 + // Outcome: Source contains v2 while target contains v1. + // + // There are similar events for DELETE statements. INSERT should be safe. + // + // To avoid the problem, we use a lock from steps 2 to 6 to ensure the + // source data is not modified between reading from the source and writing + // the batch to the target. + var cursor *Cursor + if d.LockStrategy == LockStrategySourceDB { + cursor = d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64)) + } else { + var tableLock *sync.RWMutex + if d.LockStrategy == LockStrategyInGhostferry { + tableLock = d.StateTracker.GetTableLock(table.Table.String()) + } + cursor = d.CursorConfig.NewCursorWithoutRowLock(table, startPaginationKey, targetPaginationKeyInterface.(uint64), tableLock) + } + if d.SelectFingerprint { if len(cursor.ColumnsToSelect) == 0 { cursor.ColumnsToSelect = []string{"*"} diff --git a/examples/copydb/run-on-replica.conf.json b/examples/copydb/run-on-replica.conf.json index 042ecd4d..36ddeb48 100644 --- a/examples/copydb/run-on-replica.conf.json +++ b/examples/copydb/run-on-replica.conf.json @@ -22,6 +22,7 @@ }, "RunFerryFromReplica": true, + "LockStrategy": "LockInGhostferry", "SourceReplicationMaster": { "Host": "127.0.0.1", "Port": 29291, diff --git a/ferry.go b/ferry.go index 0493afc0..5cdca536 100644 --- a/ferry.go +++ b/ferry.go @@ -110,6 +110,7 @@ func (f *Ferry) NewDataIterator() *DataIterator { ReadRetries: f.Config.DBReadRetries, }, StateTracker: f.StateTracker, + LockStrategy: f.Config.LockStrategy, } if f.CopyFilter != nil { @@ -149,6 +150,7 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter { BatchSize: f.Config.BinlogEventBatchSize, WriteRetries: f.Config.DBWriteRetries, + LockStrategy: f.Config.LockStrategy, ErrorHandler: f.ErrorHandler, StateTracker: f.StateTracker, diff --git a/iterative_verifier.go b/iterative_verifier.go index 8a93fc3a..51e0cc30 100644 --- a/iterative_verifier.go +++ b/iterative_verifier.go @@ -384,7 +384,7 @@ func (v *IterativeVerifier) iterateAllTables(mismatchedPaginationKeyFunc func(ui func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatchedPaginationKeyFunc func(uint64, *TableSchema) error) error { // The cursor will stop iterating when it cannot find anymore rows, // so it will not iterate until MaxUint64. - cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64) + cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64, nil) // It only needs the PaginationKeys, not the entire row. cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPaginationColumn().Name)} diff --git a/state_tracker.go b/state_tracker.go index 6fafea10..e449c398 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -89,6 +89,7 @@ type StateTracker struct { lastSuccessfulPaginationKeys map[string]uint64 completedTables map[string]bool + tableLocks map[string]*sync.RWMutex iterationSpeedLog *ring.Ring } @@ -100,6 +101,7 @@ func NewStateTracker(speedLogCount int) *StateTracker { lastSuccessfulPaginationKeys: make(map[string]uint64), completedTables: make(map[string]bool), + tableLocks: make(map[string]*sync.RWMutex), iterationSpeedLog: newSpeedLogRing(speedLogCount), } } @@ -178,6 +180,26 @@ func (s *StateTracker) IsTableComplete(table string) bool { return s.completedTables[table] } +func (s *StateTracker) GetTableLock(table string) *sync.RWMutex { + s.CopyRWMutex.Lock() + defer s.CopyRWMutex.Unlock() + + // table locks are needed only for synchronizing data copy and binlog + // writing. We optimize this into a NULL-lock if we know this race is + // not possible + if s.completedTables[table] { + return nil + } + + if lock, found := s.tableLocks[table]; found { + return lock + } + + lock := &sync.RWMutex{} + s.tableLocks[table] = lock + return lock +} + // This is reasonably accurate if the rows copied are distributed uniformly // between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is // concentrated in a particular region. diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index f35d4f47..1ed1dfa7 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -243,6 +243,10 @@ def start_ghostferry(resuming_state = nil) environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia] end + if @config[:lock_strategy] + environment["GHOSTFERRY_LOCK_STRATEGY"] = @config[:lock_strategy] + end + @logger.info("starting ghostferry test binary #{@compiled_binary_path}") Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| stdin.puts(resuming_state) unless resuming_state.nil? diff --git a/test/integration/trivial_test.rb b/test/integration/trivial_test.rb index 246d31ba..1f77115b 100644 --- a/test/integration/trivial_test.rb +++ b/test/integration/trivial_test.rb @@ -39,4 +39,13 @@ def test_logged_query_omits_columns end end end + + def test_lock_strategy_in_ghostferry + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { lock_strategy: "LockInGhostferry" }) + ghostferry.run + + assert_test_table_is_identical + end end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index f7fab888..9e4e0663 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -247,6 +247,10 @@ func NewStandardConfig() (*ghostferry.Config, error) { } } + if lockStrategy := os.Getenv("GHOSTFERRY_LOCK_STRATEGY"); lockStrategy != "" { + config.LockStrategy = lockStrategy + } + return config, config.ValidateConfig() }