-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added some trivial integration tests and some interrupt/resume tests. Note the interrupt and resume works on master as of this commit without reconciler because the schema didn't change.
- Loading branch information
Showing
3 changed files
with
197 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
require "test_helper" | ||
|
||
require "json" | ||
|
||
class InterruptResumeTest < GhostferryTestCase | ||
def test_interrupt_resume_with_writes_to_source | ||
seed_simple_database_with_single_table | ||
|
||
# Start a ghostferry run expecting it to be interrupted. | ||
datawriter = new_source_datawriter | ||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
|
||
start_datawriter_with_ghostferry(datawriter, ghostferry) | ||
|
||
batches_written = 0 | ||
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do | ||
batches_written += 1 | ||
if batches_written >= 2 | ||
ghostferry.send_signal("TERM") | ||
end | ||
end | ||
|
||
dumped_state = ghostferry.run_expecting_interrupt | ||
assert_basic_fields_exist_in_dumped_state(dumped_state) | ||
|
||
# Resume Ghostferry with dumped state | ||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
|
||
# The datawriter is still writing to the database since earlier, so we need | ||
# to stop it during cutover. | ||
stop_datawriter_during_cutover(datawriter, ghostferry) | ||
|
||
ghostferry.run(dumped_state) | ||
|
||
assert_test_table_is_identical | ||
end | ||
|
||
def test_interrupt_resume_when_table_has_completed | ||
seed_simple_database_with_single_table | ||
|
||
# Start a run of Ghostferry expecting to be interrupted | ||
datawriter = new_source_datawriter | ||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
|
||
start_datawriter_with_ghostferry(datawriter, ghostferry) | ||
stop_datawriter_during_cutover(datawriter, ghostferry) | ||
|
||
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do | ||
ghostferry.send_signal("TERM") | ||
end | ||
|
||
dumped_state = ghostferry.run_expecting_interrupt | ||
assert_basic_fields_exist_in_dumped_state(dumped_state) | ||
|
||
# Resume ghostferry from interrupted state | ||
datawriter = new_source_datawriter | ||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
|
||
start_datawriter_with_ghostferry(datawriter, ghostferry) | ||
stop_datawriter_during_cutover(datawriter, ghostferry) | ||
ghostferry.run(dumped_state) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
require "test_helper" | ||
|
||
class TrivialIntegrationTests < GhostferryTestCase | ||
def test_copy_data_without_any_writes_to_source | ||
seed_simple_database_with_single_table | ||
|
||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
ghostferry.run | ||
|
||
assert_test_table_is_identical | ||
end | ||
|
||
def test_copy_data_with_writes_to_source | ||
seed_simple_database_with_single_table | ||
|
||
datawriter = new_source_datawriter | ||
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) | ||
|
||
start_datawriter_with_ghostferry(datawriter, ghostferry) | ||
stop_datawriter_during_cutover(datawriter, ghostferry) | ||
|
||
ghostferry.run | ||
assert_test_table_is_identical | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
require "logger" | ||
require "minitest" | ||
require "minitest/autorun" | ||
require "minitest/hooks/test" | ||
|
||
GO_CODE_PATH = File.join(File.absolute_path(File.dirname(__FILE__)), "lib", "go") | ||
|
||
require "db_helper" | ||
require "ghostferry_helper" | ||
require "data_writer_helper" | ||
|
||
Minitest.after_run do | ||
GhostferryHelper.remove_all_binaries | ||
end | ||
|
||
class GhostferryTestCase < Minitest::Test | ||
include Minitest::Hooks | ||
include GhostferryHelper | ||
include DbHelper | ||
include DataWriterHelper | ||
|
||
MINIMAL_GHOSTFERRY = "minimal.go" | ||
|
||
def new_ghostferry(filename) | ||
# Transform path to something ruby understands | ||
path = File.join(GO_CODE_PATH, filename) | ||
g = Ghostferry.new(path, logger: @logger) | ||
@ghostferry_instances << g | ||
g | ||
end | ||
|
||
def new_source_datawriter(*args) | ||
dw = DataWriter.new(source_db_config, *args, logger: @logger) | ||
@datawriter_instances << dw | ||
dw | ||
end | ||
|
||
############## | ||
# Test Hooks # | ||
############## | ||
|
||
def before_all | ||
@logger = Logger.new(STDOUT) | ||
if ENV["DEBUG"] == "1" | ||
@logger.level = Logger::DEBUG | ||
else | ||
@logger.level = Logger::INFO | ||
end | ||
|
||
initialize_db_connections | ||
end | ||
|
||
def before_setup | ||
reset_data | ||
|
||
# Any ghostferry instances created via the new_ghostferry method will be | ||
# pushed to here, which allows the test to kill the process after each test | ||
# should there be a hung process/failed test/errored test. | ||
@ghostferry_instances = [] | ||
|
||
# Same thing with DataWriter as above | ||
@datawriter_instances = [] | ||
end | ||
|
||
def after_teardown | ||
@ghostferry_instances.each do |ghostferry| | ||
ghostferry.kill | ||
end | ||
|
||
@datawriter_instances.each do |datawriter| | ||
datawriter.stop_and_join | ||
end | ||
end | ||
|
||
##################### | ||
# Assertion Helpers # | ||
##################### | ||
|
||
def assert_test_table_is_identical | ||
source, target = source_and_target_table_metrics | ||
|
||
assert source[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 | ||
assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 | ||
|
||
assert_equal( | ||
source[DEFAULT_FULL_TABLE_NAME][:checksum], | ||
target[DEFAULT_FULL_TABLE_NAME][:checksum], | ||
) | ||
|
||
assert_equal( | ||
source[DEFAULT_FULL_TABLE_NAME][:sample_row], | ||
target[DEFAULT_FULL_TABLE_NAME][:sample_row], | ||
) | ||
end | ||
|
||
# Use this method to assert the validity of the structure of the dumped | ||
# state. | ||
# | ||
# To actually assert the validity of the data within the dumped state, you | ||
# have to do it manually. | ||
def assert_basic_fields_exist_in_dumped_state(dumped_state) | ||
refute dumped_state.nil? | ||
refute dumped_state["GhostferryVersion"].nil? | ||
refute dumped_state["LastKnownTableSchemaCache"].nil? | ||
refute dumped_state["LastSuccessfulPrimaryKeys"].nil? | ||
refute dumped_state["CompletedTables"].nil? | ||
refute dumped_state["LastWrittenBinlogPosition"].nil? | ||
end | ||
end |