From 8c817bf58c6fc41b7f3e8240316f51b73012715d Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Mon, 9 Dec 2024 15:05:53 +0100 Subject: [PATCH] Synchronize start_datawriter_with_ghostferry Speed up the setup by creating threads and connecting earlier in initialize. Includes synchronization on DataWriter threads, so that calling #start ensures a first write to db before returning. This attempts to improve flakiness of test_interrupt_resume_inline_verifier_with_datawriter. --- test/helpers/data_writer_helper.rb | 58 ++++++++++++++++++------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/test/helpers/data_writer_helper.rb b/test/helpers/data_writer_helper.rb index d9a7c251..24535ac8 100644 --- a/test/helpers/data_writer_helper.rb +++ b/test/helpers/data_writer_helper.rb @@ -45,6 +45,10 @@ def initialize(db_config, @started = false @stop_requested = false + @start_cmd = Queue.new + @started_callback_cmd = Queue.new + start_synchronized_datawriter_threads + @logger = logger if @logger.nil? @logger = Logger.new(STDOUT) @@ -54,29 +58,10 @@ def initialize(db_config, def start(&on_write) raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started - @started = true - @number_of_writers.times do |i| - @threads << Thread.new do - @logger.info("starting data writer thread #{i}") - - n = 0 - begin - connection = Mysql2::Client.new(@db_config) - - until @stop_requested do - write_data(connection, &on_write) - # Kind of makes the following race condition a bit better... - # https://github.com/Shopify/ghostferry/issues/280 - sleep(0.03) if n > 10 - n += 1 - end - ensure - connection.close - end - @logger.info("stopped data writer thread #{i} with a total of #{n} data writes") - end - end + @number_of_writers.times { @start_cmd << on_write } + @started_callback_cmd.pop + @started = true end def stop_and_join @@ -143,5 +128,34 @@ def random_real_id(connection, table) raise "No rows in the database?" if result.first.nil? result.first["id"] end + + private + + def start_synchronized_datawriter_threads + @number_of_writers.times do |i| + @threads << Thread.new do + @logger.info("data writer thread in wait mode #{i}") + connection = Mysql2::Client.new(@db_config) + on_write = @start_cmd.pop + @logger.info("starting data writer thread #{i}") + + n = 0 + begin + until @stop_requested do + write_data(connection, &on_write) + @started_callback_cmd << n unless @started + n += 1 + # Kind of makes the following race condition a bit better... + # https://github.com/Shopify/ghostferry/issues/280 + sleep(0.03) + end + ensure + connection.close + end + + @logger.info("stopped data writer thread #{i} with a total of #{n} data writes") + end + end + end end end