diff --git a/dml_events.go b/dml_events.go index abebbe120..5cc7d0c77 100644 --- a/dml_events.go +++ b/dml_events.go @@ -423,7 +423,14 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: - return appendEscapedString(buffer, v) + var rightPadLengthForBinaryColumn int = 0 + // see appendEscapedString() for details why we need special + // handling of BINARY column types + if column.Type == schema.TYPE_BINARY { + rightPadLengthForBinaryColumn = int(column.FixedSize) + } + + return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn) case []byte: return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON) case bool: @@ -437,7 +444,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol case float32: return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64) case decimal.Decimal: - return appendEscapedString(buffer, v.String()) + return appendEscapedString(buffer, v.String(), 0) default: panic(fmt.Sprintf("unsupported type %t", value)) } @@ -481,10 +488,25 @@ func Int64Value(value interface{}) (int64, bool) { // // ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038 // ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758 -func appendEscapedString(buffer []byte, value string) []byte { +// +// We also need to support right-padding of the generated string using 0-bytes +// to mimic what a MySQL server would do for BINARY columns (with fixed length). +// +// ref: https://github.com/Shopify/ghostferry/pull/159 +// +// This is specifically mentioned in the the below link: +// +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. +// +// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html +func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { buffer = append(buffer, '\'') - for i := 0; i < len(value); i++ { + var i int + for i = 0; i < len(value); i++ { c := value[i] if c == '\'' { buffer = append(buffer, '\'', '\'') @@ -493,9 +515,22 @@ func appendEscapedString(buffer []byte, value string) []byte { } } + // continue 0-padding up to the desired length as provided by the + // caller + if i < rightPadToLengthWithZeroBytes { + buffer = rightPadBufferWithZeroBytes(buffer, rightPadToLengthWithZeroBytes-i) + } + return append(buffer, '\'') } +func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte { + for i := 0; i < padLength; i++ { + buffer = append(buffer, '\x00') + } + return buffer +} + func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte { if isJSON { // See https://bugs.mysql.com/bug.php?id=98496 diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index eaebe86a2..0b908e8d9 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -298,6 +298,103 @@ def test_unsigned_bigint_pk_with_inline_verifier end end + def test_copy_data_in_fixed_size_binary_column + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # We explicitly test with a value that is shorter than the max column + # size - MySQL will 0-pad the value up the full length of the BINARY column, + # but the MySQL replication binlogs will *not* contain these 0-bytes. + # + # As a result, the binlog writer must explicitly add then when building + # update/delete statements, as the WHERE clause would not match existing + # rows in the target DB + inserted_data = "ABC" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: "#{inserted_data}\x00", + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__value_completely_filled + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # NOTE: This test is interesting (beyond what is covered above already), + # because it seems the server strips the trailing 0-bytes before sending + # them to the binlog even when the trailing 0-bytes are inserted by the user. + inserted_data = "ABC\x00" + execute_copy_data_in_fixed_size_binary_column( + column_size: 4, + inserted_data: inserted_data, + expected_inserted_data: inserted_data, + updated_data: "EFGH" + ) + end + + def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + # + # slight variation to cover the corner-case where there is no data in the + # column at all and the entire value is 0-padded (here, only 1 byte) + execute_copy_data_in_fixed_size_binary_column( + column_size: 1, + inserted_data: "", + expected_inserted_data: "\x00", + updated_data: "A" + ) + end + + def test_decimal + # decimals are treated specially in binlog writing (they are inserted after + # conversion to string), so we add this test to make sure we don't corrupt + # data in the process of handling BINARY column (which requires a right pad + # for string data). + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data decimal, primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 2)") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # this hook follows the design in the helper method + # execute_copy_data_in_fixed_size_binary_column below. See detailed + # comments there + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal 2, row["data"] + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (3, 4)") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 5 WHERE id = 1") + + row_copy_called = true + end + + ghostferry.run + + assert row_copy_called + assert_test_table_is_identical + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 2, res.count + res.each do |row| + if row["id"] == 1 + assert_equal 5, row["data"] + else + assert_equal 3, row["id"] + assert_equal 4, row["data"] + end + end + end + + + private def insert_json_on_source @@ -310,4 +407,61 @@ def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')") end + + def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:) + # test for the BINARY columns needing 0-byte padding + # + # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data BINARY(#{column_size}), primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, _binary'#{inserted_data}')") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # select row from the target and then make sure the data with 0 padding + # is present. We do this to make sure there are no races in the test + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal expected_inserted_data, row["data"] + end + + # now that the target is guaranteed to be in the same state as the + # source, trigger an update that will cause the binlog to stream an + # entry that needs the 0-byte padding + # + # NOTE: If we use BINLOG_STREAMING_STARTED as hook instead, we race + # with getting the update into the batch-copy instead of into the + # streaming + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = _binary'#{updated_data}' WHERE id = 1") + + # NOTE: We move this flag to the end of the callback to make sure that + # we don't confuse ourselves if the callback crashes before completing + row_copy_called = true + end + + ghostferry.run + + # make sure the test framework called the expected hooks above - otherwise + # the test doesn't make much sense + assert row_copy_called + assert_test_table_is_identical + + # just being paranoid here: make sure the test outcome is as expected. It + # should be, since we made sure the tables have the same checksums, but it + # helps understand what the test code does + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal updated_data, row["data"] + end + + end end