Skip to content

Commit

Permalink
Synchronize start_datawriter_with_ghostferry
Browse files Browse the repository at this point in the history
Speed up the setup by creating threads and connecting earlier in initialize. Includes synchronization on DataWriter threads, so that calling #start ensures a first write to db before returning.

This attempts to improve flakiness of test_interrupt_resume_inline_verifier_with_datawriter.
  • Loading branch information
grodowski committed Dec 9, 2024
1 parent 25f3578 commit 95f5d10
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def initialize(db_config,
@started = false
@stop_requested = false

@start_cmd = Queue.new
@started_callback_cmd = Queue.new
start_synchronized_datawriter_threads

@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
Expand All @@ -54,29 +58,10 @@ def initialize(db_config,

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")

n = 0
begin
connection = Mysql2::Client.new(@db_config)

until @stop_requested do
write_data(connection, &on_write)
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03) if n > 10
n += 1
end
ensure
connection.close
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
end
end
@number_of_writers.times { @start_cmd << on_write }
@started_callback_cmd.pop
@started = true
end

def stop_and_join
Expand Down Expand Up @@ -143,5 +128,32 @@ def random_real_id(connection, table)
raise "No rows in the database?" if result.first.nil?
result.first["id"]
end

private

def start_synchronized_datawriter_threads
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("data writer thread in wait mode #{i}")
connection = Mysql2::Client.new(@db_config)
on_write = @start_cmd.pop
@logger.info("starting data writer thread #{i}")

n = 0
begin
until @stop_requested do
write_data(connection, &on_write)
@started_callback_cmd << n unless @started
n += 1
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03)
end
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
end
end
end
end
end

0 comments on commit 95f5d10

Please sign in to comment.