From d0221567568c8e99c807900c78de4dedaa267709 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 11 Sep 2018 15:39:57 -0400 Subject: [PATCH] Integration test running Ghostferry binary Instead of running integration tests directly in Go, we run it in Ruby by calling Ghostferry as a subprocess. The idea is that we would eventually be able to test the interrupt/resume work more easily as well as making integration tests easier to write. In order to allow the ruby tests to inject race conditions for testing purposes, we need a way for the ruby code to pause and restart the execution of Go code at strategic locations. This is done with a local Unix socket. At locations of interests, the Go code will send some string via the Unix socket to the Ruby code. The ruby code will perform some sort of callback (such as injecting data, or locking a row) and then send a command back to the Go code via the same Unix socket, allowing it to continue executing. Note that each time we send some status to the ruby server, we do it in a new connection. This is to avoid race conditions if one shared connection was used between all the goroutines we employ inside Ghostferry. Stdout and stderr are captured and they can be asserted against/examined, more helper methods can be created later to give better structures for the stdout/stderr output such as being able to recognize a panic, or recognize particular stages of execution from within the logs. Lastly, the intention is to replace all integration tests with this framework in the future as it would likely be more robust and cause less issues such as goroutine leaks during the go test run. However, for the time being we will have both integration tests in Go and Ruby. --- Makefile | 12 +- dev.yml | 3 + test/integration/.gitignore | 2 + test/integration/Gemfile | 5 + test/integration/Gemfile.lock | 18 ++ .../go/integrationferry/integrationferry.go | 177 ++++++++++++ test/integration/go/minimal.go | 78 ++++++ .../ruby/ghostferry_integration.rb | 15 + .../ruby/ghostferry_integration/ghostferry.rb | 256 ++++++++++++++++++ 9 files changed, 561 insertions(+), 5 deletions(-) create mode 100644 test/integration/.gitignore create mode 100644 test/integration/Gemfile create mode 100644 test/integration/Gemfile.lock create mode 100644 test/integration/go/integrationferry/integrationferry.go create mode 100644 test/integration/go/minimal.go create mode 100644 test/integration/ruby/ghostferry_integration.rb create mode 100644 test/integration/ruby/ghostferry_integration/ghostferry.rb diff --git a/Makefile b/Makefile index 1100bbb3..c25d2f48 100644 --- a/Makefile +++ b/Makefile @@ -13,11 +13,12 @@ VERSION_STR := $(VERSION)+$(DATETIME)+$(COMMIT) LDFLAGS += -X github.com/Shopify/ghostferry.VersionString=$(VERSION_STR) # Paths -GOBIN := $(GOPATH)/bin -BUILD_DIR := build -DEB_PREFIX := $(BUILD_DIR)/debian -SHARE_DIR := usr/share/ghostferry -BIN_DIR := usr/bin +GOBIN := $(GOPATH)/bin +BUILD_DIR := build +DEB_PREFIX := $(BUILD_DIR)/debian +SHARE_DIR := usr/share/ghostferry +BIN_DIR := usr/bin +RB_INTEGRATION_DIR := test/integration # Targets PROJECTS := copydb sharding @@ -52,6 +53,7 @@ $(GOBIN): test: @go version go test ./test ./copydb/test ./sharding/test -p 1 -v + cd $(RB_INTEGRATION_DIR) && bundle install && bundle exec ruby test.rb clean: rm -rf build diff --git a/dev.yml b/dev.yml index e47fbda2..82c478fc 100644 --- a/dev.yml +++ b/dev.yml @@ -4,6 +4,9 @@ up: - homebrew: - glide - mysql + - ruby: 2.5.1 + - bundler: + gemfile: test/integration/Gemfile - go: version: 1.10.3 - custom: diff --git a/test/integration/.gitignore b/test/integration/.gitignore new file mode 100644 index 00000000..4541a7b9 --- /dev/null +++ b/test/integration/.gitignore @@ -0,0 +1,2 @@ +.bundle/ +vendor/ diff --git a/test/integration/Gemfile b/test/integration/Gemfile new file mode 100644 index 00000000..cac02822 --- /dev/null +++ b/test/integration/Gemfile @@ -0,0 +1,5 @@ +source "https://rubygems.org" + +gem "minitest" +gem "minitest-hooks" +gem "mysql2" diff --git a/test/integration/Gemfile.lock b/test/integration/Gemfile.lock new file mode 100644 index 00000000..265557af --- /dev/null +++ b/test/integration/Gemfile.lock @@ -0,0 +1,18 @@ +GEM + remote: https://rubygems.org/ + specs: + minitest (5.11.3) + minitest-hooks (1.5.0) + minitest (> 5.3) + mysql2 (0.5.2) + +PLATFORMS + ruby + +DEPENDENCIES + minitest + minitest-hooks + mysql2 + +BUNDLED WITH + 1.16.1 diff --git a/test/integration/go/integrationferry/integrationferry.go b/test/integration/go/integrationferry/integrationferry.go new file mode 100644 index 00000000..b5aa0c17 --- /dev/null +++ b/test/integration/go/integrationferry/integrationferry.go @@ -0,0 +1,177 @@ +package integrationferry + +import ( + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/Shopify/ghostferry" +) + +const ( + socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + socketTimeout time.Duration = 30 * time.Second + maxMessageSize int = 256 +) + +const ( + CommandContinue string = "CONTINUE" +) + +const ( + // Could only be sent once by the main thread + StatusReady string = "READY" + StatusBinlogStreamingStarted string = "BINLOG_STREAMING_STARTED" + StatusRowCopyCompleted string = "ROW_COPY_COMPLETED" + StatusDone string = "DONE" + + // Could be sent by multiple goroutines in parallel + StatusBeforeRowCopy string = "BEFORE_ROW_COPY" + StatusAfterRowCopy string = "AFTER_ROW_COPY" + StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY" + StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY" +) + +type IntegrationFerry struct { + *ghostferry.Ferry +} + +// ========================================= +// Code for integration server communication +// ========================================= +func (f *IntegrationFerry) connect() (net.Conn, error) { + socketAddress := os.Getenv(socketEnvName) + if socketAddress == "" { + return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName) + } + + return net.DialTimeout("unix", socketAddress, socketTimeout) +} + +func (f *IntegrationFerry) send(conn net.Conn, status string) error { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + _, err := conn.Write([]byte(status)) + return err +} + +func (f *IntegrationFerry) receive(conn net.Conn) (string, error) { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + var buf [maxMessageSize]byte + + n, err := conn.Read(buf[:]) + if err != nil { + return "", err + } + + return string(buf[0:n]), nil +} + +// Sends a status string to the integration server and block until we receive +// "CONTINUE" from the server. +// +// We need to establish a new connection to the integration server for each +// message as there are multiple goroutines sending messages simultaneously. +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string) error { + conn, err := f.connect() + if err != nil { + return err + } + defer conn.Close() + + err = f.send(conn, status) + if err != nil { + return err + } + + command, err := f.receive(conn) + if err != nil { + return err + } + + if command == CommandContinue { + return nil + } + + return fmt.Errorf("unrecognized command %s from integration server", command) +} + +// Method override for Start in order to send status to the integration +// server. +func (f *IntegrationFerry) Start() error { + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeBinlogApply) + }) + + err := f.Ferry.Start() + if err != nil { + return err + } + + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterRowCopy) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterBinlogApply) + }) + + return nil +} + +// =========================================== +// Code to handle an almost standard Ferry run +// =========================================== +// TODO: allow custom go code to hook into each stages so we can +// run things like the iterative verifier. +func (f *IntegrationFerry) Main() error { + var err error + + err = f.SendStatusAndWaitUntilContinue(StatusReady) + if err != nil { + return err + } + + err = f.Initialize() + if err != nil { + return err + } + + err = f.Start() + if err != nil { + return err + } + + err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + return f.SendStatusAndWaitUntilContinue(StatusDone) +} diff --git a/test/integration/go/minimal.go b/test/integration/go/minimal.go new file mode 100644 index 00000000..2508aef6 --- /dev/null +++ b/test/integration/go/minimal.go @@ -0,0 +1,78 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "os" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/test/integration/go/integrationferry" + "github.com/Shopify/ghostferry/testhelpers" + "github.com/sirupsen/logrus" +) + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config := &ghostferry.Config{ + Source: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29291), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + Target: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29292), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + AutomaticCutover: true, + TableFilter: &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter([]string{"gftest"}), + TablesFunc: nil, + }, + + DumpStateToStdoutOnSignal: true, + } + + resumeStateJSON, err := ioutil.ReadAll(os.Stdin) + if err != nil { + panic(err) + } + + if len(resumeStateJSON) > 0 { + config.StateToResumeFrom = &ghostferry.SerializableState{} + err = json.Unmarshal(resumeStateJSON, config.StateToResumeFrom) + if err != nil { + panic(err) + } + } + + err = config.ValidateConfig() + if err != nil { + panic(err) + } + + f := &integrationferry.IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} diff --git a/test/integration/ruby/ghostferry_integration.rb b/test/integration/ruby/ghostferry_integration.rb new file mode 100644 index 00000000..29aa877c --- /dev/null +++ b/test/integration/ruby/ghostferry_integration.rb @@ -0,0 +1,15 @@ +require "thread" + +module GhostferryIntegration + # For random data generation + ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a + + def self.rand_data(length: 32) + ALPHANUMERICS.sample(32).join("") + "👻⛴️" + end +end + +require_relative "ghostferry_integration/db_manager" +require_relative "ghostferry_integration/data_writer" +require_relative "ghostferry_integration/ghostferry" +require_relative "ghostferry_integration/test_case" diff --git a/test/integration/ruby/ghostferry_integration/ghostferry.rb b/test/integration/ruby/ghostferry_integration/ghostferry.rb new file mode 100644 index 00000000..8454da26 --- /dev/null +++ b/test/integration/ruby/ghostferry_integration/ghostferry.rb @@ -0,0 +1,256 @@ +require "json" +require "logger" +require "open3" +require "socket" +require "thread" +require "tmpdir" + +module GhostferryIntegration + class GhostferryExitFailure < StandardError + end + + class Ghostferry + # Manages compiling, running, and communicating with Ghostferry. + # + # To use this class: + # + # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do + # # do custom work here, such as injecting data into the database + # end + # ghostferry.run + + ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + + SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock" + MAX_MESSAGE_SIZE = 256 + + CONTINUE = "CONTINUE" + + module Status + # This should be in sync with integrationferry.go + READY = "READY" + BINLOG_STREAMING_STARTED = "BINLOG_STREAMING_STARTED" + ROW_COPY_COMPLETED = "ROW_COPY_COMPLETED" + DONE = "DONE" + + BEFORE_ROW_COPY = "BEFORE_ROW_COPY" + AFTER_ROW_COPY = "AFTER_ROW_COPY" + BEFORE_BINLOG_APPLY = "BEFORE_BINLOG_APPLY" + AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" + end + + attr_reader :stdout, :stderr, :exit_status, :pid + + def initialize(main_path, logger: nil, message_timeout: 30) + @main_path = main_path + @message_timeout = message_timeout + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + + @tempdir = Dir.mktmpdir("ghostferry-integration") + + # full name relative to the ghostferry root dir, with / replaced with _ + # and the extension stripped. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(@tempdir, binary_name) + + reset_state + end + + def reset_state + @status_handlers = {} + @stop_requested = false + + @server = nil + @server_started_notifier = Queue.new + + @pid = 0 + @exit_status = nil + @stdout = [] + @stderr = [] + end + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] = block + end + + def compile_binary + return if File.exists?(@compiled_binary_path) + + @logger.info("compiling test binary to #{@compiled_binary_path}") + rc = system( + "go", "build", + "-o", @compiled_binary_path, + @main_path + ) + + raise "could not compile ghostferry" unless rc + end + + def start_server + @server_thread = Thread.new do + @logger.info("starting integration test server") + @server = UNIXServer.new(SOCKET_PATH) + @server_started_notifier.push(true) + + reads = [@server] + last_message_time = Time.now + + while (!@stop_requested && @exit_status.nil?) do + ready = IO.select(reads, nil, nil, 0.2) + + if ready.nil? + next if Time.now - last_message_time < @message_timeout + + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}" + end + + last_message_time = Time.now + + # Each client should send one message, expects a message back, and + # then close the connection. + # + # This is done because there are many goroutines operating in + # parallel and sending messages over a shared connection would result + # in multiplexing issues. Using separate connections gets around this + # problem. + ready[0].each do |socket| + if socket == @server + # A new message is to be sent by a goroutine + client = @server.accept_nonblock + reads << client + elsif socket.eof? + # A message was complete + @logger.warn("client disconnected?") + socket.close + reads.delete(socket) + else + # Receiving a message + status = socket.read_nonblock(MAX_MESSAGE_SIZE) + @logger.debug("server received status: #{status}") + + @status_handlers[status].call unless @status_handlers[status].nil? + socket.write(CONTINUE) + + reads.delete(socket) + end + end + end + + @server.close + @logger.info("server thread stopped") + end + end + + def start_ghostferry(resuming_state = nil) + @subprocess_thread = Thread.new do + Thread.current.report_on_exception = false + + environment = { + ENV_KEY_SOCKET_PATH => SOCKET_PATH + } + + @logger.info("starting ghostferry test binary #{@compiled_binary_path}") + Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| + stdin.puts(resuming_state) unless resuming_state.nil? + stdin.close + + @pid = wait_thr.pid + + reads = [stdout, stderr] + until reads.empty? do + ready_reads, _, _ = IO.select(reads) + ready_reads.each do |reader| + line = reader.gets + if line.nil? + # EOF effectively + reads.delete(reader) + next + end + + if reader == stdout + @stdout << line + @logger.debug("stdout: #{line}") + elsif reader == stderr + @stderr << line + @logger.debug("stderr: #{line}") + end + end + end + + @exit_status = wait_thr.value + @pid = 0 + end + + @logger.info("ghostferry test binary exitted: #{@exit_status}") + if @exit_status.exitstatus != 0 + raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + end + end + end + + def wait_until_server_has_started + @server_started_notifier.pop + @logger.info("integration test server started and listening for connection") + end + + def wait_until_ghostferry_run_is_complete + # Server thread should always join first because the loop within it + # should exit if @exit_status != nil. + @server_thread.join if @server_thread + @subprocess_thread.join if @subprocess_thread + end + + def remove_socket + File.unlink(SOCKET_PATH) if File.exists?(SOCKET_PATH) + end + + def remove_binary + FileUtils.remove_entry(@tempdir) unless @tempdir.nil? + end + + def send_signal(signal) + Process.kill(signal, @pid) if @pid != 0 + end + + def stop_and_cleanup + @stop_requested = true + send_signal("KILL") + begin + wait_until_ghostferry_run_is_complete + rescue GhostferryExitFailure + # ignore + end + reset_state + end + + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + wait_until_server_has_started + start_ghostferry(resuming_state) + wait_until_ghostferry_run_is_complete + ensure + remove_socket + end + + # TODO: Need to stop the integration server + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + end +end