Skip to content

Commit

Permalink
Integration test running Ghostferry binary
Browse files Browse the repository at this point in the history
Instead of running integration tests directly in Go, we run it in Ruby
by calling Ghostferry as a subprocess. The idea is that we would
eventually be able to test the interrupt/resume work more easily as well
as making integration tests easier to write.

In order to allow the ruby tests to inject race conditions for testing
purposes, we need a way for the ruby code to pause and restart the
execution of Go code at strategic locations. This is done with a local
Unix socket. At locations of interests, the Go code will send some
string via the Unix socket to the Ruby code. The ruby code will perform
some sort of callback (such as injecting data, or locking a row) and
then send a command back to the Go code via the same Unix socket,
allowing it to continue executing. Note that each time we send some
status to the ruby server, we do it in a new connection. This is to
avoid race conditions if one shared connection was used between all the
goroutines we employ inside Ghostferry.

Stdout and stderr are captured and they can be asserted
against/examined, more helper methods can be created later to give
better structures for the stdout/stderr output such as being able to
recognize a panic, or recognize particular stages of execution from
within the logs.

Lastly, the intention is to replace all integration tests with this
framework in the future as it would likely be more robust and cause less
issues such as goroutine leaks during the go test run. However, for the
time being we will have both integration tests in Go and Ruby.
  • Loading branch information
shuhaowu committed Oct 30, 2018
1 parent 4398c6f commit d022156
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 5 deletions.
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ VERSION_STR := $(VERSION)+$(DATETIME)+$(COMMIT)
LDFLAGS += -X github.com/Shopify/ghostferry.VersionString=$(VERSION_STR)

# Paths
GOBIN := $(GOPATH)/bin
BUILD_DIR := build
DEB_PREFIX := $(BUILD_DIR)/debian
SHARE_DIR := usr/share/ghostferry
BIN_DIR := usr/bin
GOBIN := $(GOPATH)/bin
BUILD_DIR := build
DEB_PREFIX := $(BUILD_DIR)/debian
SHARE_DIR := usr/share/ghostferry
BIN_DIR := usr/bin
RB_INTEGRATION_DIR := test/integration

# Targets
PROJECTS := copydb sharding
Expand Down Expand Up @@ -52,6 +53,7 @@ $(GOBIN):
test:
@go version
go test ./test ./copydb/test ./sharding/test -p 1 -v
cd $(RB_INTEGRATION_DIR) && bundle install && bundle exec ruby test.rb

clean:
rm -rf build
Expand Down
3 changes: 3 additions & 0 deletions dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ up:
- homebrew:
- glide
- mysql
- ruby: 2.5.1
- bundler:
gemfile: test/integration/Gemfile
- go:
version: 1.10.3
- custom:
Expand Down
2 changes: 2 additions & 0 deletions test/integration/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.bundle/
vendor/
5 changes: 5 additions & 0 deletions test/integration/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source "https://rubygems.org"

gem "minitest"
gem "minitest-hooks"
gem "mysql2"
18 changes: 18 additions & 0 deletions test/integration/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
GEM
remote: https://rubygems.org/
specs:
minitest (5.11.3)
minitest-hooks (1.5.0)
minitest (> 5.3)
mysql2 (0.5.2)

PLATFORMS
ruby

DEPENDENCIES
minitest
minitest-hooks
mysql2

BUNDLED WITH
1.16.1
177 changes: 177 additions & 0 deletions test/integration/go/integrationferry/integrationferry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package integrationferry

import (
"fmt"
"net"
"os"
"sync"
"time"

"github.com/Shopify/ghostferry"
)

const (
socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH"
socketTimeout time.Duration = 30 * time.Second
maxMessageSize int = 256
)

const (
CommandContinue string = "CONTINUE"
)

const (
// Could only be sent once by the main thread
StatusReady string = "READY"
StatusBinlogStreamingStarted string = "BINLOG_STREAMING_STARTED"
StatusRowCopyCompleted string = "ROW_COPY_COMPLETED"
StatusDone string = "DONE"

// Could be sent by multiple goroutines in parallel
StatusBeforeRowCopy string = "BEFORE_ROW_COPY"
StatusAfterRowCopy string = "AFTER_ROW_COPY"
StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY"
StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY"
)

type IntegrationFerry struct {
*ghostferry.Ferry
}

// =========================================
// Code for integration server communication
// =========================================
func (f *IntegrationFerry) connect() (net.Conn, error) {
socketAddress := os.Getenv(socketEnvName)
if socketAddress == "" {
return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName)
}

return net.DialTimeout("unix", socketAddress, socketTimeout)
}

func (f *IntegrationFerry) send(conn net.Conn, status string) error {
conn.SetDeadline(time.Now().Add(socketTimeout))

_, err := conn.Write([]byte(status))
return err
}

func (f *IntegrationFerry) receive(conn net.Conn) (string, error) {
conn.SetDeadline(time.Now().Add(socketTimeout))

var buf [maxMessageSize]byte

n, err := conn.Read(buf[:])
if err != nil {
return "", err
}

return string(buf[0:n]), nil
}

// Sends a status string to the integration server and block until we receive
// "CONTINUE" from the server.
//
// We need to establish a new connection to the integration server for each
// message as there are multiple goroutines sending messages simultaneously.
func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string) error {
conn, err := f.connect()
if err != nil {
return err
}
defer conn.Close()

err = f.send(conn, status)
if err != nil {
return err
}

command, err := f.receive(conn)
if err != nil {
return err
}

if command == CommandContinue {
return nil
}

return fmt.Errorf("unrecognized command %s from integration server", command)
}

// Method override for Start in order to send status to the integration
// server.
func (f *IntegrationFerry) Start() error {
f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error {
return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy)
})

f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error {
return f.SendStatusAndWaitUntilContinue(StatusBeforeBinlogApply)
})

err := f.Ferry.Start()
if err != nil {
return err
}

f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error {
return f.SendStatusAndWaitUntilContinue(StatusAfterRowCopy)
})

f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error {
return f.SendStatusAndWaitUntilContinue(StatusAfterBinlogApply)
})

return nil
}

// ===========================================
// Code to handle an almost standard Ferry run
// ===========================================
// TODO: allow custom go code to hook into each stages so we can
// run things like the iterative verifier.
func (f *IntegrationFerry) Main() error {
var err error

err = f.SendStatusAndWaitUntilContinue(StatusReady)
if err != nil {
return err
}

err = f.Initialize()
if err != nil {
return err
}

err = f.Start()
if err != nil {
return err
}

err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted)
if err != nil {
return err
}

wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
f.Run()
}()

f.WaitUntilRowCopyIsComplete()
err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted)
if err != nil {
return err
}

// TODO: this method should return errors rather than calling
// the error handler to panic directly.
f.FlushBinlogAndStopStreaming()
wg.Wait()

return f.SendStatusAndWaitUntilContinue(StatusDone)
}
78 changes: 78 additions & 0 deletions test/integration/go/minimal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"encoding/json"
"io/ioutil"
"os"

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/test/integration/go/integrationferry"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/sirupsen/logrus"
)

func main() {
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)

config := &ghostferry.Config{
Source: ghostferry.DatabaseConfig{
Host: "127.0.0.1",
Port: uint16(29291),
User: "root",
Pass: "",
Collation: "utf8mb4_unicode_ci",
Params: map[string]string{
"charset": "utf8mb4",
},
},

Target: ghostferry.DatabaseConfig{
Host: "127.0.0.1",
Port: uint16(29292),
User: "root",
Pass: "",
Collation: "utf8mb4_unicode_ci",
Params: map[string]string{
"charset": "utf8mb4",
},
},

AutomaticCutover: true,
TableFilter: &testhelpers.TestTableFilter{
DbsFunc: testhelpers.DbApplicabilityFilter([]string{"gftest"}),
TablesFunc: nil,
},

DumpStateToStdoutOnSignal: true,
}

resumeStateJSON, err := ioutil.ReadAll(os.Stdin)
if err != nil {
panic(err)
}

if len(resumeStateJSON) > 0 {
config.StateToResumeFrom = &ghostferry.SerializableState{}
err = json.Unmarshal(resumeStateJSON, config.StateToResumeFrom)
if err != nil {
panic(err)
}
}

err = config.ValidateConfig()
if err != nil {
panic(err)
}

f := &integrationferry.IntegrationFerry{
Ferry: &ghostferry.Ferry{
Config: config,
},
}

err = f.Main()
if err != nil {
panic(err)
}
}
15 changes: 15 additions & 0 deletions test/integration/ruby/ghostferry_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require "thread"

module GhostferryIntegration
# For random data generation
ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a

def self.rand_data(length: 32)
ALPHANUMERICS.sample(32).join("") + "👻⛴️"
end
end

require_relative "ghostferry_integration/db_manager"
require_relative "ghostferry_integration/data_writer"
require_relative "ghostferry_integration/ghostferry"
require_relative "ghostferry_integration/test_case"
Loading

0 comments on commit d022156

Please sign in to comment.