From b69ce739df869f3ac78176040e1c24c3a069db55 Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Tue, 21 Sep 2021 15:20:27 +0000 Subject: [PATCH 1/2] Added test for BINARY column data corruption This was manually picked from https://github.com/Shopify/ghostferry/pull/159. I made some minor adjustments to the comments and the function names to be more clear. Co-authored-by: Clemens Kolbitsch Co-authored-by: Shuhao Wu --- test/integration/types_test.rb | 104 +++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index eaebe86a..3504d403 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -298,6 +298,53 @@ def test_unsigned_bigint_pk_with_inline_verifier end end + def test_copy_data_in_fixed_size_binary_column + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # We explicitly test with a value that is shorter than the max column + # size - MySQL will 0-pad the value up the full length of the BINARY column, + # but the MySQL replication binlogs will *not* contain these 0-bytes. + # + # As a result, the binlog writer must explicitly add then when building + # update/delete statements, as the WHERE clause would not match existing + # rows in the target DB + inserted_data = "ABC" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: "#{inserted_data}\x00", + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__value_completely_filled + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # NOTE: This test is interesting (beyond what is covered above already), + # because it seems the server strips the trailing 0-bytes before sending + # them to the binlog even when the trailing 0-bytes are inserted by the user. + inserted_data = "ABC\x00" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: inserted_data, + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # slight variation to cover the corner-case where there is no data in the + # column at all and the entire value is 0-padded (here, only 1 byte) + execute_copy_data_in_fixed_size_binary_column( + column_size: 1, + inserted_data: "", + expected_inserted_data: "\x00", + updated_data: "A" + ) + end + private def insert_json_on_source @@ -310,4 +357,61 @@ def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')") end + + def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:) + # test for the BINARY columns needing 0-byte padding + # + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data BINARY(#{column_size}), primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, _binary'#{inserted_data}')") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # select row from the target and then make sure the data with 0 padding + # is present. We do this to make sure there are no races in the test + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal expected_inserted_data, row["data"] + end + + # now that the target is guaranteed to be in the same state as the + # source, trigger an update that will cause the binlog to stream an + # entry that needs the 0-byte padding + # + # NOTE: If we use BINLOG_STREAMING_STARTED as hook instead, we race + # with getting the update into the batch-copy instead of into the + # streaming + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = _binary'#{updated_data}' WHERE id = 1") + + # NOTE: We move this flag to the end of the callback to make sure that + # we don't confuse ourselves if the callback crashes before completing + row_copy_called = true + end + + ghostferry.run + + # make sure the test framework called the expected hooks above - otherwise + # the test doesn't make much sense + assert row_copy_called + assert_test_table_is_identical + + # just being paranoid here: make sure the test outcome is as expected. It + # should be, since we made sure the tables have the same checksums, but it + # helps understand what the test code does + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal updated_data, row["data"] + end + + end end From 49c4c8a4ef54298a4cc647c9c8f94bfe98cf1bfe Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Wed, 22 Sep 2021 15:07:17 +0000 Subject: [PATCH 2/2] Fix binary-column data corruption This commit addresses a data corruption bug in the binlog streaming phase, where the binlog writer incorrectly propagates values in fixed-length BINARY columns that have trailing 0s in the original value. These trailing 0s are removed from the binlog by the SQL master and therefore do not show up in the WHERE clause for update/delete statements executed by the binlog writer. This commit is manually cherry-picked from https://github.com/Shopify/ghostferry/pull/159. --- dml_events.go | 43 ++++++++++++++++++++++++++--- test/integration/types_test.rb | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/dml_events.go b/dml_events.go index abebbe12..5cc7d0c7 100644 --- a/dml_events.go +++ b/dml_events.go @@ -423,7 +423,14 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: - return appendEscapedString(buffer, v) + var rightPadLengthForBinaryColumn int = 0 + // see appendEscapedString() for details why we need special + // handling of BINARY column types + if column.Type == schema.TYPE_BINARY { + rightPadLengthForBinaryColumn = int(column.FixedSize) + } + + return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn) case []byte: return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON) case bool: @@ -437,7 +444,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol case float32: return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64) case decimal.Decimal: - return appendEscapedString(buffer, v.String()) + return appendEscapedString(buffer, v.String(), 0) default: panic(fmt.Sprintf("unsupported type %t", value)) } @@ -481,10 +488,25 @@ func Int64Value(value interface{}) (int64, bool) { // // ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038 // ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758 -func appendEscapedString(buffer []byte, value string) []byte { +// +// We also need to support right-padding of the generated string using 0-bytes +// to mimic what a MySQL server would do for BINARY columns (with fixed length). +// +// ref: https://github.com/Shopify/ghostferry/pull/159 +// +// This is specifically mentioned in the the below link: +// +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. +// +// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html +func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { buffer = append(buffer, '\'') - for i := 0; i < len(value); i++ { + var i int + for i = 0; i < len(value); i++ { c := value[i] if c == '\'' { buffer = append(buffer, '\'', '\'') @@ -493,9 +515,22 @@ func appendEscapedString(buffer []byte, value string) []byte { } } + // continue 0-padding up to the desired length as provided by the + // caller + if i < rightPadToLengthWithZeroBytes { + buffer = rightPadBufferWithZeroBytes(buffer, rightPadToLengthWithZeroBytes-i) + } + return append(buffer, '\'') } +func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte { + for i := 0; i < padLength; i++ { + buffer = append(buffer, '\x00') + } + return buffer +} + func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte { if isJSON { // See https://bugs.mysql.com/bug.php?id=98496 diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index 3504d403..0b908e8d 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -345,6 +345,56 @@ def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 ) end + def test_decimal + # decimals are treated specially in binlog writing (they are inserted after + # conversion to string), so we add this test to make sure we don't corrupt + # data in the process of handling BINARY column (which requires a right pad + # for string data). + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data decimal, primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 2)") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # this hook follows the design in the helper method + # execute_copy_data_in_fixed_size_binary_column below. See detailed + # comments there + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal 2, row["data"] + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (3, 4)") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 5 WHERE id = 1") + + row_copy_called = true + end + + ghostferry.run + + assert row_copy_called + assert_test_table_is_identical + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 2, res.count + res.each do |row| + if row["id"] == 1 + assert_equal 5, row["data"] + else + assert_equal 3, row["id"] + assert_equal 4, row["data"] + end + end + end + + + private def insert_json_on_source