Skip to content

Commit

Permalink
Merge pull request #376 from Shopify/grodowski/flaky-test-fix-round-2
Browse files Browse the repository at this point in the history
Fix flaky test with DataWriter (part 2)
  • Loading branch information
grodowski authored Dec 11, 2024
2 parents 9118912 + 7750793 commit 51d3960
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 29 deletions.
8 changes: 4 additions & 4 deletions sharding/test/trivial_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func setupSingleTableDatabase(f *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) {
testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 100)
testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 1000)
testhelpers.SeedInitialData(targetDB, "gftest", "table1", 0)

testhelpers.AddTenantID(sourceDB, "gftest", "table1", 3)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestSelectiveCopyDataWithoutAnyWritesToSource(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.Equal(t, 33, len(rows))
assert.Equal(t, 333, len(rows))
}

func TestSelectiveCopyDataWithInsertLoadOnOtherTenants(t *testing.T) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestSelectiveCopyDataWithInsertLoadOnOtherTenants(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.Equal(t, 33, len(rows))
assert.Equal(t, 333, len(rows))
}

func TestSelectiveCopyDataWithInsertLoadOnAllTenants(t *testing.T) {
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSelectiveCopyDataWithInsertLoadOnAllTenants(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.True(t, len(rows) > 33)
assert.True(t, len(rows) > 333)
}

type ChangeShardingKeyDataWriter struct {
Expand Down
53 changes: 31 additions & 22 deletions test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def initialize(db_config,
@started = false
@stop_requested = false

@start_cmd = Queue.new
start_synchronized_datawriter_threads

@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
Expand All @@ -54,29 +57,9 @@ def initialize(db_config,

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")

n = 0
begin
connection = Mysql2::Client.new(@db_config)

until @stop_requested do
write_data(connection, &on_write)
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03) if n > 10
n += 1
end
ensure
connection.close
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
end
end
@number_of_writers.times { @start_cmd << on_write }
@started = true
end

def stop_and_join
Expand Down Expand Up @@ -143,5 +126,31 @@ def random_real_id(connection, table)
raise "No rows in the database?" if result.first.nil?
result.first["id"]
end

private

def start_synchronized_datawriter_threads
@number_of_writers.times do |i|
@threads << Thread.new do
connection = Mysql2::Client.new(@db_config)
@logger.info("data writer thread in wait mode #{i}")
on_write = @start_cmd.pop
@logger.info("starting data writer thread #{i}")

n = 0
until @stop_requested do
write_data(connection, &on_write)
n += 1
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03)
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
ensure
connection&.close
end
end
end
end
end
2 changes: 1 addition & 1 deletion test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def test_interrupt_resume_inline_verifier_with_datawriter
batches_written = 0
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1
if batches_written >= 2
if batches_written >= 5
ghostferry.term_and_wait_for_exit
end
end
Expand Down
3 changes: 1 addition & 2 deletions testhelpers/data_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"math/rand"
"sync"

sql "github.com/Shopify/ghostferry/sqlwrapper"

sq "github.com/Masterminds/squirrel"
sql "github.com/Shopify/ghostferry/sqlwrapper"
)

var dataletters = []rune("abcdefghijklmnopqrstuvwxyz")
Expand Down

0 comments on commit 51d3960

Please sign in to comment.