Skip to content

Commit

Permalink
Merge pull request #179 from Shopify/ignore_interrupt
Browse files Browse the repository at this point in the history
Ignore termination signals during cutover
  • Loading branch information
xliang6 authored Apr 28, 2020
2 parents d2df941 + eeaa7b1 commit 8cb5a5e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
28 changes: 18 additions & 10 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -81,7 +82,7 @@ type Ferry struct {

StartTime time.Time
DoneTime time.Time
OverallState string
OverallState atomic.Value

logger *logrus.Entry

Expand Down Expand Up @@ -289,7 +290,7 @@ func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error) {
// Initialize all the components of Ghostferry and connect to the Database
func (f *Ferry) Initialize() (err error) {
f.StartTime = time.Now().Truncate(time.Second)
f.OverallState = StateStarting
f.OverallState.Store(StateStarting)

f.logger = logrus.WithField("tag", "ferry")
f.rowCopyCompleteCh = make(chan struct{})
Expand Down Expand Up @@ -520,7 +521,7 @@ func (f *Ferry) Start() error {
// Wait for the background tasks to finish.
func (f *Ferry) Run() {
f.logger.Info("starting ferry run")
f.OverallState = StateCopying
f.OverallState.Store(StateCopying)

ctx, shutdown := context.WithCancel(context.Background())

Expand Down Expand Up @@ -564,7 +565,14 @@ func (f *Ferry) Run() {
s := <-c
if ctx.Err() == nil {
// Ghostferry is still running
f.ErrorHandler.Fatal("user_interrupt", fmt.Errorf("signal received: %v", s.String()))
if f.OverallState.Load() != StateCutover {
// Save state dump and exit if not during the cutover stage
f.ErrorHandler.Fatal("user_interrupt", fmt.Errorf("signal received: %v", s.String()))
} else {
// Log and ignore the signal during cutover
f.logger.Warnf("Received signal: %s during cutover. "+
"Refusing to interrupt and will attempt to complete the run.", s.String())
}
} else {
// shutdown() has been called and Ghostferry is done.
os.Exit(0)
Expand Down Expand Up @@ -614,7 +622,7 @@ func (f *Ferry) Run() {

if f.Verifier != nil {
f.logger.Info("calling VerifyBeforeCutover")
f.OverallState = StateVerifyBeforeCutover
f.OverallState.Store(StateVerifyBeforeCutover)

metrics.Measure("VerifyBeforeCutover", nil, 1.0, func() {
err := f.Verifier.VerifyBeforeCutover()
Expand All @@ -626,11 +634,11 @@ func (f *Ferry) Run() {
}

f.logger.Info("data copy is complete, waiting for cutover")
f.OverallState = StateWaitingForCutover
f.OverallState.Store(StateWaitingForCutover)
f.waitUntilAutomaticCutoverIsTrue()

f.logger.Info("entering cutover phase, notifying caller that row copy is complete")
f.OverallState = StateCutover
f.OverallState.Store(StateCutover)
f.notifyRowCopyComplete()

// Cutover is a cooperative activity between the Ghostferry library and
Expand All @@ -646,7 +654,7 @@ func (f *Ferry) Run() {
binlogWg.Wait()

f.logger.Info("ghostferry run is complete, shutting down auxiliary services")
f.OverallState = StateDone
f.OverallState.Store(StateDone)
f.DoneTime = time.Now()

shutdown()
Expand Down Expand Up @@ -742,7 +750,7 @@ func (f *Ferry) SerializeStateToJSON() (string, error) {

func (f *Ferry) Progress() *Progress {
s := &Progress{
CurrentState: f.OverallState,
CurrentState: f.OverallState.Load().(string),
CustomPayload: f.Config.ProgressCallback.Payload,
VerifierType: f.VerifierType,
}
Expand Down Expand Up @@ -832,7 +840,7 @@ func (f *Ferry) ensureInitialized() {
// Note: the constructor shouldn't have a large amount of positional argument
// so it is more readable.
//
if f.OverallState == "" {
if f.OverallState.Load() == nil || f.OverallState.Load() == "" {
panic("ferry has not been initialized")
}
}
Expand Down
7 changes: 4 additions & 3 deletions status_deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"sort"
"sync/atomic"
"time"

"github.com/siddontang/go-mysql/mysql"
Expand All @@ -26,7 +27,7 @@ type StatusDeprecated struct {
SourceHostPort string
TargetHostPort string

OverallState string
OverallState atomic.Value
StartTime time.Time
CurrentTime time.Time
TimeTaken time.Duration
Expand Down Expand Up @@ -63,7 +64,7 @@ func FetchStatusDeprecated(f *Ferry, v Verifier) *StatusDeprecated {
status.SourceHostPort = fmt.Sprintf("%s:%d", f.Source.Host, f.Source.Port)
status.TargetHostPort = fmt.Sprintf("%s:%d", f.Target.Host, f.Target.Port)

status.OverallState = f.OverallState
status.OverallState.Store(f.OverallState.Load())
status.StartTime = f.StartTime
status.CurrentTime = time.Now()
if f.DoneTime.IsZero() {
Expand Down Expand Up @@ -202,7 +203,7 @@ func FetchStatusDeprecated(f *Ferry, v Verifier) *StatusDeprecated {
status.VerificationDone = result.IsDone()

// We can only run the verifier if we're not copying and not verifying
status.VerifierAvailable = status.OverallState != StateStarting && status.OverallState != StateCopying && (!status.VerificationStarted || status.VerificationDone)
status.VerifierAvailable = status.OverallState.Load() != StateStarting && status.OverallState.Load() != StateCopying && (!status.VerificationStarted || status.VerificationDone)
status.VerificationResult = result.VerificationResult
status.VerificationErr = err
} else {
Expand Down
28 changes: 17 additions & 11 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def test_interrupt_resume_with_writes_to_source
assert_test_table_is_identical
end

def test_interrupt_resume_when_table_has_completed
# Start a run of Ghostferry expecting to be interrupted
def test_interrupt_ignored_when_table_has_completed
# Start a run of Ghostferry expecting termination signal to be ignored
datawriter = new_source_datawriter
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

Expand All @@ -89,19 +89,25 @@ def test_interrupt_resume_when_table_has_completed
ghostferry.send_signal("TERM")
end

dumped_state = ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)
with_env('CI', nil) do
ghostferry.run

# Resume ghostferry from interrupted state
datawriter = new_source_datawriter
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
assert_test_table_is_identical

start_datawriter_with_ghostferry(datawriter, ghostferry)
stop_datawriter_during_cutover(datawriter, ghostferry)
assert ghostferry.logrus_lines["ferry"].length > 0

ghostferry.run(dumped_state)
found_signal = false
ghostferry.logrus_lines["ferry"].each do |line|
if line["msg"].start_with?("Received signal: ")
found_signal = true
assert line["msg"].match?("Received signal: terminated during cutover. " \
"Refusing to interrupt and will attempt to complete the run.")
end
end

assert(found_signal, "Expected to receive a termination signal")
end

assert_test_table_is_identical
end

def test_interrupt_resume_will_not_emit_binlog_position_for_inline_verifier_if_no_verification_is_used
Expand Down

0 comments on commit 8cb5a5e

Please sign in to comment.