diff --git a/dml_events.go b/dml_events.go index abebbe12..5cc7d0c7 100644 --- a/dml_events.go +++ b/dml_events.go @@ -423,7 +423,14 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: - return appendEscapedString(buffer, v) + var rightPadLengthForBinaryColumn int = 0 + // see appendEscapedString() for details why we need special + // handling of BINARY column types + if column.Type == schema.TYPE_BINARY { + rightPadLengthForBinaryColumn = int(column.FixedSize) + } + + return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn) case []byte: return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON) case bool: @@ -437,7 +444,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol case float32: return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64) case decimal.Decimal: - return appendEscapedString(buffer, v.String()) + return appendEscapedString(buffer, v.String(), 0) default: panic(fmt.Sprintf("unsupported type %t", value)) } @@ -481,10 +488,25 @@ func Int64Value(value interface{}) (int64, bool) { // // ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038 // ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758 -func appendEscapedString(buffer []byte, value string) []byte { +// +// We also need to support right-padding of the generated string using 0-bytes +// to mimic what a MySQL server would do for BINARY columns (with fixed length). +// +// ref: https://github.com/Shopify/ghostferry/pull/159 +// +// This is specifically mentioned in the the below link: +// +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. +// +// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html +func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { buffer = append(buffer, '\'') - for i := 0; i < len(value); i++ { + var i int + for i = 0; i < len(value); i++ { c := value[i] if c == '\'' { buffer = append(buffer, '\'', '\'') @@ -493,9 +515,22 @@ func appendEscapedString(buffer []byte, value string) []byte { } } + // continue 0-padding up to the desired length as provided by the + // caller + if i < rightPadToLengthWithZeroBytes { + buffer = rightPadBufferWithZeroBytes(buffer, rightPadToLengthWithZeroBytes-i) + } + return append(buffer, '\'') } +func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte { + for i := 0; i < padLength; i++ { + buffer = append(buffer, '\x00') + } + return buffer +} + func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte { if isJSON { // See https://bugs.mysql.com/bug.php?id=98496 diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index 3504d403..0b908e8d 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -345,6 +345,56 @@ def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 ) end + def test_decimal + # decimals are treated specially in binlog writing (they are inserted after + # conversion to string), so we add this test to make sure we don't corrupt + # data in the process of handling BINARY column (which requires a right pad + # for string data). + [source_db, target_db].each do |db| + db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}") + db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data decimal, primary key(id))") + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 2)") + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + row_copy_called = false + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + # this hook follows the design in the helper method + # execute_copy_data_in_fixed_size_binary_column below. See detailed + # comments there + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 1, res.count + res.each do |row| + assert_equal 1, row["id"] + assert_equal 2, row["data"] + end + + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (3, 4)") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 5 WHERE id = 1") + + row_copy_called = true + end + + ghostferry.run + + assert row_copy_called + assert_test_table_is_identical + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}") + assert_equal 2, res.count + res.each do |row| + if row["id"] == 1 + assert_equal 5, row["data"] + else + assert_equal 3, row["id"] + assert_equal 4, row["data"] + end + end + end + + + private def insert_json_on_source