diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 236a19fd..523459ee 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -6,6 +6,7 @@ require "tmpdir" require "webrick" require "cgi" +require "securerandom" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -47,11 +48,12 @@ module Status AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" end - attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines + attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393) @log_capturer = log_capturer @logger = log_capturer.logger + @tag = SecureRandom.hex[0..3] @main_path = main_path @config = config @@ -94,6 +96,7 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: # The main method to call to run a Ghostferry subprocess. def run(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})") resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) compile_binary @@ -112,21 +115,26 @@ def run(resuming_state = nil) # When using this method, you need to ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_interrupt(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue GhostferryExitFailure + @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got GhostferryExitFailure") dumped_state = @stdout.join("") JSON.parse(dumped_state) else + @logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong") raise "Ghostferry did not get interrupted" end # Same as above - ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_failure(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue GhostferryExitFailure + @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got GhostferryExitFailure") else - raise "Ghostferry did not fail" + raise "[#{@tag}] Ghostferry did not fail" end def run_with_logs(resuming_state = nil) @@ -140,14 +148,14 @@ def run_with_logs(resuming_state = nil) def compile_binary return if File.exist?(@compiled_binary_path) - @logger.debug("compiling test binary to #{@compiled_binary_path}") + @logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}") rc = system( "go", "build", "-o", @compiled_binary_path, @main_path ) - raise "could not compile ghostferry" unless rc + raise "[#{@tag}] could not compile ghostferry" unless rc end def start_server @@ -179,7 +187,7 @@ def start_server resp.status = 400 @server.shutdown elsif statuses.size > 1 - @logger.warn("Got multiple statuses at once: #{statuses.inspect}") + @logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}") puts "Got multiple statuses at once: #{statuses.inspect}" end @@ -187,6 +195,7 @@ def start_server data = query["data"] + @logger.info("[#{@tag}] server: got / with #{statuses.inspect}") statuses.each do |status| next if @status_handlers[status].nil? @@ -202,6 +211,7 @@ def start_server @server.mount_proc "/callbacks/progress" do |req, resp| begin + @logger.info("[#{@tag}] server: got /callbacks/progress") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -216,6 +226,7 @@ def start_server @server.mount_proc "/callbacks/state" do |req, resp| begin + @logger.info("[#{@tag}] server: got /callbacks/state") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -228,14 +239,15 @@ def start_server end @server.mount_proc "/callbacks/error" do |req, resp| + @logger.info("[#{@tag}] server: got /callbacks/error") @error = JSON.parse(JSON.parse(req.body)["Payload"]) @callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil? end @server_thread = Thread.new do - @logger.debug("starting server thread") + @logger.debug("[#{@tag}] starting server thread") @server.start - @logger.debug("server thread stopped") + @logger.debug("[#{@tag}] server thread stopped") end end @@ -275,7 +287,7 @@ def start_ghostferry(resuming_state = nil) environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia] end - @logger.debug("starting ghostferry test binary #{@compiled_binary_path}") + @logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}") Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| stdin.puts(resuming_state) unless resuming_state.nil? stdin.close @@ -297,7 +309,7 @@ def start_ghostferry(resuming_state = nil) if reader == stdout @stdout << line - @logger.debug("stdout: #{line}") + @logger.debug("[#{@tag}] stdout: #{line}") elsif reader == stderr @stderr << line if json_log_line?(line) @@ -310,8 +322,11 @@ def start_ghostferry(resuming_state = nil) if logline["level"] == "error" @error_lines << logline end + + @logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer") + else + @logger.debug("[#{@tag}] stderr: #{line}") end - @logger.debug("stderr: #{line}") end end end @@ -320,9 +335,9 @@ def start_ghostferry(resuming_state = nil) @pid = 0 end - @logger.debug("ghostferry test binary exitted: #{@exit_status}") + @logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + raise GhostferryExitFailure, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -336,14 +351,14 @@ def start_server_watchdog if (now - @last_message_time) > @message_timeout @server.shutdown @log_capturer.print_output - raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 end @server.shutdown - @logger.debug("server watchdog thread stopped") + @logger.debug("[#{@tag}] server watchdog thread stopped") end @server_watchdog_thread.abort_on_exception = true diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 1cbf8c96..b6baccc1 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -11,6 +11,7 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + info("test[09]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -32,6 +33,7 @@ def test_interrupt_and_resume_without_last_known_schema_cache # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + info("test[31]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -448,19 +450,26 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts end def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source + @debug_me = true + info("test[452] start\n\n") ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) datawriter = new_source_datawriter start_datawriter_with_ghostferry(datawriter, ghostferry) + info("test[461] ghostferry#run_expecting_interrupt, no state\n\n") dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) + + info("test[466] ghostferry#run_expecting_interrupt, with state\n\n") ghostferry.run_expecting_interrupt(dumped_state) ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) stop_datawriter_during_cutover(datawriter, ghostferry) + + info("test[472] ghostferry#run_with_logs, with state\n\n") ghostferry.run_with_logs(dumped_state) assert_test_table_is_identical diff --git a/test/test_helper.rb b/test/test_helper.rb index d66c4194..6ca72ba1 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -11,7 +11,7 @@ require "data_writer_helper" class LogCapturer - attr_reader :logger + attr_reader :logger, :logger_device def initialize(level: Logger::DEBUG) @capture = ENV["DEBUG"] != "1" @@ -50,6 +50,7 @@ def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands path = File.join(GO_CODE_PATH, filepath, "main.go") g = Ghostferry.new(path, config: config, log_capturer: @log_capturer) + info("[#{g.tag}] new_ghostferry: create") @ghostferry_instances << g g end @@ -57,11 +58,16 @@ def new_ghostferry(filepath, config: {}) def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) g = new_ghostferry(filepath, config: config) + info("[#{g.tag}] new_ghostferry_wiarc: register status hook") batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do batches_written += 1 + if batches_written >= after_batches_written + info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true") g.send_signal("TERM") + else + info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false") end end @@ -83,6 +89,10 @@ def setup_signal_watcher Signal.trap("TERM") { self.on_term } end + def info(msg) + @log_capturer.logger.info(msg) + end + ############## # Test Hooks # ############## @@ -105,6 +115,7 @@ def before_setup # Same thing with DataWriter as above @datawriter_instances = [] + @debug_me = nil end def after_teardown @@ -116,7 +127,7 @@ def after_teardown datawriter.stop_and_join end - @log_capturer.print_output if self.failure + @log_capturer.print_output if self.failure || @debug_me @log_capturer.reset super end