From 6e7d65cb55f5070ff0a61e41ae1acae8d6f91d46 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 11 Sep 2018 15:42:29 -0400 Subject: [PATCH] Added some DB helpers Added some helper methods that allow us to seed the database and verify the database. Nothing fancy for now, but is functional. This mirrors some method we have for the integration tests inside go. --- .../ruby/ghostferry_integration/db_manager.rb | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 test/integration/ruby/ghostferry_integration/db_manager.rb diff --git a/test/integration/ruby/ghostferry_integration/db_manager.rb b/test/integration/ruby/ghostferry_integration/db_manager.rb new file mode 100644 index 00000000..56f0a893 --- /dev/null +++ b/test/integration/ruby/ghostferry_integration/db_manager.rb @@ -0,0 +1,147 @@ +require "logger" + +require "mysql2" + +module GhostferryIntegration + class DbManager + KNOWN_PORTS = [29291, 29292] + + DEFAULT_DB = "gftest" + DEFAULT_TABLE = "test_table_1" + + def self.full_table_name(db, table) + "`#{db}`.`#{table}`" + end + + DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE) + + def self.default_db_config(port:) + { + host: "127.0.0.1", + port: port, + username: "root", + password: "", + encoding: "utf8mb4", + collation: "utf8mb4_unicode_ci", + } + end + + def self.transaction(connection) + raise ArgumentError, "must pass a block" if !block_given? + + begin + connection.query("BEGIN") + yield + rescue + connection.query("ROLLBACK") + raise + else + connection.query("COMMIT") + end + end + + def initialize(ports: KNOWN_PORTS, logger: nil) + @ports = ports + + @connections = [] + ports.each do |port| + @connections << Mysql2::Client.new(self.class.default_db_config(port: port)) + end + + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + end + + # Do not use these methods in a separate thread as these are the raw mysql2 + # connections, which are not threadsafe. + # + # If you need to use connections in another thread for some reason, create + # your own connections via source_db_config, such as in the case of the + # DataWriter. + def source + @connections[0] + end + + def target + @connections[1] + end + + def source_db_config + self.class.default_db_config(port: @ports[0]) + end + + def target_db_config + self.class.default_db_config(port: @ports[1]) + end + + def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111) + full_table_name = self.class.full_table_name(database_name, table_name) + + connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") + connection.query("CREATE TABLE IF NOT EXISTS #{full_table_name} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + + self.class.transaction(connection) do + insert_statement = connection.prepare("INSERT INTO #{full_table_name} (id, data) VALUES (?, ?)") + + number_of_rows.times do + insert_statement.execute(nil, GhostferryIntegration.rand_data) + end + end + end + + def seed_simple_database_with_single_table + # Setup the source database with data. + max_id = 1111 + seed_random_data(source, number_of_rows: max_id) + + # Create some holes in the data. + delete_statement = source.prepare("DELETE FROM #{self.class.full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?") + 140.times do + delete_statement.execute(Random.rand(max_id) + 1) + end + + # Setup the target database with no data but the correct schema. + seed_random_data(target, number_of_rows: 0) + end + + def reset_data + @connections.each do |connection| + connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`") + end + end + + def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME]) + source_metrics = {} + target_metrics = {} + + tables.each do |table| + source_metrics[table] = table_metric(source, table) + target_metrics[table] = table_metric(target, table, sample_id: source_metrics[table][:sample_row]["id"]) + end + + [source_metrics, target_metrics] + end + + def table_metric(conn, table, sample_id: nil) + metrics = {} + result = conn.query("CHECKSUM TABLE #{table}") + metrics[:checksum] = result.first["Checksum"] + + result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}") + metrics[:row_count] = result.first["cnt"] + + if sample_id.nil? + result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1") + metrics[:sample_row] = result.first + else + result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1") + metrics[:sample_row] = result.first + end + + metrics + end + end +end