diff --git a/Makefile b/Makefile index 1100bbb3..c25d2f48 100644 --- a/Makefile +++ b/Makefile @@ -13,11 +13,12 @@ VERSION_STR := $(VERSION)+$(DATETIME)+$(COMMIT) LDFLAGS += -X github.com/Shopify/ghostferry.VersionString=$(VERSION_STR) # Paths -GOBIN := $(GOPATH)/bin -BUILD_DIR := build -DEB_PREFIX := $(BUILD_DIR)/debian -SHARE_DIR := usr/share/ghostferry -BIN_DIR := usr/bin +GOBIN := $(GOPATH)/bin +BUILD_DIR := build +DEB_PREFIX := $(BUILD_DIR)/debian +SHARE_DIR := usr/share/ghostferry +BIN_DIR := usr/bin +RB_INTEGRATION_DIR := test/integration # Targets PROJECTS := copydb sharding @@ -52,6 +53,7 @@ $(GOBIN): test: @go version go test ./test ./copydb/test ./sharding/test -p 1 -v + cd $(RB_INTEGRATION_DIR) && bundle install && bundle exec ruby test.rb clean: rm -rf build diff --git a/dev.yml b/dev.yml index e47fbda2..82c478fc 100644 --- a/dev.yml +++ b/dev.yml @@ -4,6 +4,9 @@ up: - homebrew: - glide - mysql + - ruby: 2.5.1 + - bundler: + gemfile: test/integration/Gemfile - go: version: 1.10.3 - custom: diff --git a/test/integration/.gitignore b/test/integration/.gitignore new file mode 100644 index 00000000..4541a7b9 --- /dev/null +++ b/test/integration/.gitignore @@ -0,0 +1,2 @@ +.bundle/ +vendor/ diff --git a/test/integration/Gemfile b/test/integration/Gemfile new file mode 100644 index 00000000..cac02822 --- /dev/null +++ b/test/integration/Gemfile @@ -0,0 +1,5 @@ +source "https://rubygems.org" + +gem "minitest" +gem "minitest-hooks" +gem "mysql2" diff --git a/test/integration/Gemfile.lock b/test/integration/Gemfile.lock new file mode 100644 index 00000000..265557af --- /dev/null +++ b/test/integration/Gemfile.lock @@ -0,0 +1,18 @@ +GEM + remote: https://rubygems.org/ + specs: + minitest (5.11.3) + minitest-hooks (1.5.0) + minitest (> 5.3) + mysql2 (0.5.2) + +PLATFORMS + ruby + +DEPENDENCIES + minitest + minitest-hooks + mysql2 + +BUNDLED WITH + 1.16.1 diff --git a/test/integration/go/integrationferry/integrationferry.go b/test/integration/go/integrationferry/integrationferry.go new file mode 100644 index 00000000..b5aa0c17 --- /dev/null +++ b/test/integration/go/integrationferry/integrationferry.go @@ -0,0 +1,177 @@ +package integrationferry + +import ( + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/Shopify/ghostferry" +) + +const ( + socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + socketTimeout time.Duration = 30 * time.Second + maxMessageSize int = 256 +) + +const ( + CommandContinue string = "CONTINUE" +) + +const ( + // Could only be sent once by the main thread + StatusReady string = "READY" + StatusBinlogStreamingStarted string = "BINLOG_STREAMING_STARTED" + StatusRowCopyCompleted string = "ROW_COPY_COMPLETED" + StatusDone string = "DONE" + + // Could be sent by multiple goroutines in parallel + StatusBeforeRowCopy string = "BEFORE_ROW_COPY" + StatusAfterRowCopy string = "AFTER_ROW_COPY" + StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY" + StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY" +) + +type IntegrationFerry struct { + *ghostferry.Ferry +} + +// ========================================= +// Code for integration server communication +// ========================================= +func (f *IntegrationFerry) connect() (net.Conn, error) { + socketAddress := os.Getenv(socketEnvName) + if socketAddress == "" { + return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName) + } + + return net.DialTimeout("unix", socketAddress, socketTimeout) +} + +func (f *IntegrationFerry) send(conn net.Conn, status string) error { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + _, err := conn.Write([]byte(status)) + return err +} + +func (f *IntegrationFerry) receive(conn net.Conn) (string, error) { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + var buf [maxMessageSize]byte + + n, err := conn.Read(buf[:]) + if err != nil { + return "", err + } + + return string(buf[0:n]), nil +} + +// Sends a status string to the integration server and block until we receive +// "CONTINUE" from the server. +// +// We need to establish a new connection to the integration server for each +// message as there are multiple goroutines sending messages simultaneously. +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string) error { + conn, err := f.connect() + if err != nil { + return err + } + defer conn.Close() + + err = f.send(conn, status) + if err != nil { + return err + } + + command, err := f.receive(conn) + if err != nil { + return err + } + + if command == CommandContinue { + return nil + } + + return fmt.Errorf("unrecognized command %s from integration server", command) +} + +// Method override for Start in order to send status to the integration +// server. +func (f *IntegrationFerry) Start() error { + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeBinlogApply) + }) + + err := f.Ferry.Start() + if err != nil { + return err + } + + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterRowCopy) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterBinlogApply) + }) + + return nil +} + +// =========================================== +// Code to handle an almost standard Ferry run +// =========================================== +// TODO: allow custom go code to hook into each stages so we can +// run things like the iterative verifier. +func (f *IntegrationFerry) Main() error { + var err error + + err = f.SendStatusAndWaitUntilContinue(StatusReady) + if err != nil { + return err + } + + err = f.Initialize() + if err != nil { + return err + } + + err = f.Start() + if err != nil { + return err + } + + err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + return f.SendStatusAndWaitUntilContinue(StatusDone) +} diff --git a/test/integration/go/minimal.go b/test/integration/go/minimal.go new file mode 100644 index 00000000..2508aef6 --- /dev/null +++ b/test/integration/go/minimal.go @@ -0,0 +1,78 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "os" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/test/integration/go/integrationferry" + "github.com/Shopify/ghostferry/testhelpers" + "github.com/sirupsen/logrus" +) + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config := &ghostferry.Config{ + Source: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29291), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + Target: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29292), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + AutomaticCutover: true, + TableFilter: &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter([]string{"gftest"}), + TablesFunc: nil, + }, + + DumpStateToStdoutOnSignal: true, + } + + resumeStateJSON, err := ioutil.ReadAll(os.Stdin) + if err != nil { + panic(err) + } + + if len(resumeStateJSON) > 0 { + config.StateToResumeFrom = &ghostferry.SerializableState{} + err = json.Unmarshal(resumeStateJSON, config.StateToResumeFrom) + if err != nil { + panic(err) + } + } + + err = config.ValidateConfig() + if err != nil { + panic(err) + } + + f := &integrationferry.IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} diff --git a/test/integration/ruby/ghostferry_integration.rb b/test/integration/ruby/ghostferry_integration.rb new file mode 100644 index 00000000..29aa877c --- /dev/null +++ b/test/integration/ruby/ghostferry_integration.rb @@ -0,0 +1,15 @@ +require "thread" + +module GhostferryIntegration + # For random data generation + ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a + + def self.rand_data(length: 32) + ALPHANUMERICS.sample(32).join("") + "👻⛴️" + end +end + +require_relative "ghostferry_integration/db_manager" +require_relative "ghostferry_integration/data_writer" +require_relative "ghostferry_integration/ghostferry" +require_relative "ghostferry_integration/test_case" diff --git a/test/integration/ruby/ghostferry_integration/ghostferry.rb b/test/integration/ruby/ghostferry_integration/ghostferry.rb new file mode 100644 index 00000000..8454da26 --- /dev/null +++ b/test/integration/ruby/ghostferry_integration/ghostferry.rb @@ -0,0 +1,256 @@ +require "json" +require "logger" +require "open3" +require "socket" +require "thread" +require "tmpdir" + +module GhostferryIntegration + class GhostferryExitFailure < StandardError + end + + class Ghostferry + # Manages compiling, running, and communicating with Ghostferry. + # + # To use this class: + # + # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do + # # do custom work here, such as injecting data into the database + # end + # ghostferry.run + + ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + + SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock" + MAX_MESSAGE_SIZE = 256 + + CONTINUE = "CONTINUE" + + module Status + # This should be in sync with integrationferry.go + READY = "READY" + BINLOG_STREAMING_STARTED = "BINLOG_STREAMING_STARTED" + ROW_COPY_COMPLETED = "ROW_COPY_COMPLETED" + DONE = "DONE" + + BEFORE_ROW_COPY = "BEFORE_ROW_COPY" + AFTER_ROW_COPY = "AFTER_ROW_COPY" + BEFORE_BINLOG_APPLY = "BEFORE_BINLOG_APPLY" + AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" + end + + attr_reader :stdout, :stderr, :exit_status, :pid + + def initialize(main_path, logger: nil, message_timeout: 30) + @main_path = main_path + @message_timeout = message_timeout + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + + @tempdir = Dir.mktmpdir("ghostferry-integration") + + # full name relative to the ghostferry root dir, with / replaced with _ + # and the extension stripped. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(@tempdir, binary_name) + + reset_state + end + + def reset_state + @status_handlers = {} + @stop_requested = false + + @server = nil + @server_started_notifier = Queue.new + + @pid = 0 + @exit_status = nil + @stdout = [] + @stderr = [] + end + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] = block + end + + def compile_binary + return if File.exists?(@compiled_binary_path) + + @logger.info("compiling test binary to #{@compiled_binary_path}") + rc = system( + "go", "build", + "-o", @compiled_binary_path, + @main_path + ) + + raise "could not compile ghostferry" unless rc + end + + def start_server + @server_thread = Thread.new do + @logger.info("starting integration test server") + @server = UNIXServer.new(SOCKET_PATH) + @server_started_notifier.push(true) + + reads = [@server] + last_message_time = Time.now + + while (!@stop_requested && @exit_status.nil?) do + ready = IO.select(reads, nil, nil, 0.2) + + if ready.nil? + next if Time.now - last_message_time < @message_timeout + + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}" + end + + last_message_time = Time.now + + # Each client should send one message, expects a message back, and + # then close the connection. + # + # This is done because there are many goroutines operating in + # parallel and sending messages over a shared connection would result + # in multiplexing issues. Using separate connections gets around this + # problem. + ready[0].each do |socket| + if socket == @server + # A new message is to be sent by a goroutine + client = @server.accept_nonblock + reads << client + elsif socket.eof? + # A message was complete + @logger.warn("client disconnected?") + socket.close + reads.delete(socket) + else + # Receiving a message + status = socket.read_nonblock(MAX_MESSAGE_SIZE) + @logger.debug("server received status: #{status}") + + @status_handlers[status].call unless @status_handlers[status].nil? + socket.write(CONTINUE) + + reads.delete(socket) + end + end + end + + @server.close + @logger.info("server thread stopped") + end + end + + def start_ghostferry(resuming_state = nil) + @subprocess_thread = Thread.new do + Thread.current.report_on_exception = false + + environment = { + ENV_KEY_SOCKET_PATH => SOCKET_PATH + } + + @logger.info("starting ghostferry test binary #{@compiled_binary_path}") + Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| + stdin.puts(resuming_state) unless resuming_state.nil? + stdin.close + + @pid = wait_thr.pid + + reads = [stdout, stderr] + until reads.empty? do + ready_reads, _, _ = IO.select(reads) + ready_reads.each do |reader| + line = reader.gets + if line.nil? + # EOF effectively + reads.delete(reader) + next + end + + if reader == stdout + @stdout << line + @logger.debug("stdout: #{line}") + elsif reader == stderr + @stderr << line + @logger.debug("stderr: #{line}") + end + end + end + + @exit_status = wait_thr.value + @pid = 0 + end + + @logger.info("ghostferry test binary exitted: #{@exit_status}") + if @exit_status.exitstatus != 0 + raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + end + end + end + + def wait_until_server_has_started + @server_started_notifier.pop + @logger.info("integration test server started and listening for connection") + end + + def wait_until_ghostferry_run_is_complete + # Server thread should always join first because the loop within it + # should exit if @exit_status != nil. + @server_thread.join if @server_thread + @subprocess_thread.join if @subprocess_thread + end + + def remove_socket + File.unlink(SOCKET_PATH) if File.exists?(SOCKET_PATH) + end + + def remove_binary + FileUtils.remove_entry(@tempdir) unless @tempdir.nil? + end + + def send_signal(signal) + Process.kill(signal, @pid) if @pid != 0 + end + + def stop_and_cleanup + @stop_requested = true + send_signal("KILL") + begin + wait_until_ghostferry_run_is_complete + rescue GhostferryExitFailure + # ignore + end + reset_state + end + + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + wait_until_server_has_started + start_ghostferry(resuming_state) + wait_until_ghostferry_run_is_complete + ensure + remove_socket + end + + # TODO: Need to stop the integration server + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + end +end