Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce "Lock Strategy" option #182

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

sql "github.com/Shopify/ghostferry/sqlwrapper"
"sync"

"github.com/sirupsen/logrus"
)
Expand All @@ -16,6 +17,7 @@ type BinlogWriter struct {

BatchSize int
WriteRetries int
LockStrategy string

ErrorHandler ErrorHandler
StateTracker *StateTracker
Expand Down Expand Up @@ -79,6 +81,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)

queryBuffer := []byte(sql.AnnotateStmt("BEGIN;\n", b.DB.Marginalia))
locksToObtain := make(map[string]*sync.RWMutex)

for _, ev := range events {
eventDatabaseName := ev.Database()
Expand All @@ -98,13 +101,27 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {

queryBuffer = append(queryBuffer, sql.AnnotateStmt(sqlStmt, b.DB.Marginalia)...)
queryBuffer = append(queryBuffer, ";\n"...)

if b.LockStrategy == LockStrategyInGhostferry {
fullTableName := ev.TableSchema().Table.String()
if _, found := locksToObtain[fullTableName]; !found {
locksToObtain[fullTableName] = b.StateTracker.GetTableLock(fullTableName)
}
}
}

queryBuffer = append(queryBuffer, "COMMIT"...)

startEv := events[0]
endEv := events[len(events)-1]
query := string(queryBuffer)

for _, lock := range locksToObtain {
if lock != nil {
lock.Lock()
defer lock.Unlock()
}
}
_, err := b.DB.Exec(query)
if err != nil {
return fmt.Errorf("exec query at pos %v -> %v (%d bytes): %v", startEv.BinlogPosition(), endEv.BinlogPosition(), len(query), err)
Expand Down
25 changes: 25 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
VerifierTypeNoVerification = "NoVerification"

DefaultMarginalia = "application:ghostferry"

LockStrategySourceDB = "LockOnSourceDB"
LockStrategyInGhostferry = "LockInGhostferry"
LockStrategyNone = "None"
)

type TLSConfig struct {
Expand Down Expand Up @@ -412,6 +416,21 @@ type Config struct {
// Optional: defaults to false
AutomaticCutover bool

// This specifies how to prevent races between the data copy and binlog
// streaming. Possible values are:
// - LockOnSourceDB: obtain a table lock on the source table while copying
// data, which will prevent any type of data modification on the source
// DB; this is the strictest method but may intervene with the
// application trying to insert data,
// - LockInGhostferry: obtain a lock in ghostferry, preventing updates to
// the target DB while copying data; this should be sufficient in most
// scenarios, and
// - None: do not perform locking, assume the application does not update
// or delete data in a way that races may occur.
//
// Optional: defaults to "LockOnSourceDB"
LockStrategy string

// This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM
// by dumping the current state to stdout and the error HTTP callback.
// The dumped state can be used to resume Ghostferry.
Expand Down Expand Up @@ -538,6 +557,12 @@ func (c *Config) ValidateConfig() error {
}
}

if c.LockStrategy == "" {
c.LockStrategy = LockStrategySourceDB
} else if c.LockStrategy != LockStrategySourceDB && c.LockStrategy != LockStrategyInGhostferry && c.LockStrategy != LockStrategyNone {
return fmt.Errorf("Invalid LockStrategy specified (set to %s)", c.LockStrategy)
}

if c.DBWriteRetries == 0 {
c.DBWriteRetries = 5
}
Expand Down
24 changes: 22 additions & 2 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"strings"
"sync"

"github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/schema"
Expand All @@ -18,9 +19,24 @@ type SqlPreparer interface {

type SqlDBWithFakeRollback struct {
*sql.DB
lock *sync.RWMutex
}

func NewSqlDBWithFakeRollback(db *sql.DB, lock *sync.RWMutex) *SqlDBWithFakeRollback {
tx := &SqlDBWithFakeRollback{
DB: db,
lock: lock,
}
if lock != nil {
lock.Lock()
}
return tx
}

func (d *SqlDBWithFakeRollback) Rollback() error {
if d.lock != nil {
d.lock.Unlock()
}
return nil
}

Expand Down Expand Up @@ -53,9 +69,12 @@ func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPagi
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64, tableLock *sync.RWMutex) *Cursor {
cursor := c.NewCursor(table, startPaginationKey, maxPaginationKey)
cursor.RowLock = false
// NOTE: We only allow internal table locking, if row-locking is disabled
// to avoid a potential deadlock
cursor.tableLock = tableLock
return cursor
}

Expand All @@ -68,6 +87,7 @@ type Cursor struct {

paginationKeyColumn *schema.TableColumn
lastSuccessfulPaginationKey uint64
tableLock *sync.RWMutex
logger *logrus.Entry
}

Expand Down Expand Up @@ -101,7 +121,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
return err
}
} else {
tx = &SqlDBWithFakeRollback{c.DB}
tx = NewSqlDBWithFakeRollback(c.DB, c.tableLock)
}

batch, paginationKeypos, err = c.Fetch(tx)
Expand Down
33 changes: 32 additions & 1 deletion data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type DataIterator struct {
ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *StateTracker
LockStrategy string

targetPaginationKeys *sync.Map
batchListeners []func(*RowBatch) error
Expand Down Expand Up @@ -88,7 +89,37 @@ func (d *DataIterator) Run(tables []*TableSchema) {
return
}

cursor := d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64))
// NOTE: Using a lock to synchronize data iteration and binlog writing is
// necessary. It is possible that we read data on the source while the
// binlog receives an update to the same data.
//
// Example event sequence:
// 1) application writes table row version "v1" to the source
// 2) data iterator reads v1
// 3) application updates row v1 to become v2
// 4) binlog reader receives UPDATE command v1 -> v2
// 5) binlog writer executes UPDATE v1 -> v2: this is a NOP due to how the
// writer formats UPDATE statements (v1 does not exist in the target, so
// the UPDATE has no rows to operate on)
// 6) batch writer inserts v1
// Outcome: Source contains v2 while target contains v1.
//
// There are similar events for DELETE statements. INSERT should be safe.
//
// To avoid the problem, we use a lock from steps 2 to 6 to ensure the
// source data is not modified between reading from the source and writing
// the batch to the target.
var cursor *Cursor
if d.LockStrategy == LockStrategySourceDB {
cursor = d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(uint64))
} else {
var tableLock *sync.RWMutex
if d.LockStrategy == LockStrategyInGhostferry {
tableLock = d.StateTracker.GetTableLock(table.Table.String())
}
cursor = d.CursorConfig.NewCursorWithoutRowLock(table, startPaginationKey, targetPaginationKeyInterface.(uint64), tableLock)
}

if d.SelectFingerprint {
if len(cursor.ColumnsToSelect) == 0 {
cursor.ColumnsToSelect = []string{"*"}
Expand Down
1 change: 1 addition & 0 deletions examples/copydb/run-on-replica.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},

"RunFerryFromReplica": true,
"LockStrategy": "LockInGhostferry",
"SourceReplicationMaster": {
"Host": "127.0.0.1",
"Port": 29291,
Expand Down
2 changes: 2 additions & 0 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker,
LockStrategy: f.Config.LockStrategy,
}

if f.CopyFilter != nil {
Expand Down Expand Up @@ -149,6 +150,7 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter {

BatchSize: f.Config.BinlogEventBatchSize,
WriteRetries: f.Config.DBWriteRetries,
LockStrategy: f.Config.LockStrategy,

ErrorHandler: f.ErrorHandler,
StateTracker: f.StateTracker,
Expand Down
2 changes: 1 addition & 1 deletion iterative_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (v *IterativeVerifier) iterateAllTables(mismatchedPaginationKeyFunc func(ui
func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatchedPaginationKeyFunc func(uint64, *TableSchema) error) error {
// The cursor will stop iterating when it cannot find anymore rows,
// so it will not iterate until MaxUint64.
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64)
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64, nil)

// It only needs the PaginationKeys, not the entire row.
cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPaginationColumn().Name)}
Expand Down
22 changes: 22 additions & 0 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type StateTracker struct {

lastSuccessfulPaginationKeys map[string]uint64
completedTables map[string]bool
tableLocks map[string]*sync.RWMutex

iterationSpeedLog *ring.Ring
}
Expand All @@ -100,6 +101,7 @@ func NewStateTracker(speedLogCount int) *StateTracker {

lastSuccessfulPaginationKeys: make(map[string]uint64),
completedTables: make(map[string]bool),
tableLocks: make(map[string]*sync.RWMutex),
iterationSpeedLog: newSpeedLogRing(speedLogCount),
}
}
Expand Down Expand Up @@ -178,6 +180,26 @@ func (s *StateTracker) IsTableComplete(table string) bool {
return s.completedTables[table]
}

func (s *StateTracker) GetTableLock(table string) *sync.RWMutex {
s.CopyRWMutex.Lock()
defer s.CopyRWMutex.Unlock()

// table locks are needed only for synchronizing data copy and binlog
// writing. We optimize this into a NULL-lock if we know this race is
// not possible
if s.completedTables[table] {
return nil
}

if lock, found := s.tableLocks[table]; found {
return lock
}

lock := &sync.RWMutex{}
s.tableLocks[table] = lock
return lock
}

// This is reasonably accurate if the rows copied are distributed uniformly
// between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is
// concentrated in a particular region.
Expand Down
4 changes: 4 additions & 0 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ def start_ghostferry(resuming_state = nil)
environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia]
end

if @config[:lock_strategy]
environment["GHOSTFERRY_LOCK_STRATEGY"] = @config[:lock_strategy]
end

@logger.info("starting ghostferry test binary #{@compiled_binary_path}")
Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr|
stdin.puts(resuming_state) unless resuming_state.nil?
Expand Down
9 changes: 9 additions & 0 deletions test/integration/trivial_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ def test_logged_query_omits_columns
end
end
end

def test_lock_strategy_in_ghostferry
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { lock_strategy: "LockInGhostferry" })
ghostferry.run

assert_test_table_is_identical
end
end
4 changes: 4 additions & 0 deletions test/lib/go/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func NewStandardConfig() (*ghostferry.Config, error) {
}
}

if lockStrategy := os.Getenv("GHOSTFERRY_LOCK_STRATEGY"); lockStrategy != "" {
config.LockStrategy = lockStrategy
}

return config, config.ValidateConfig()
}

Expand Down