Skip to content

Commit

Permalink
Fix binary-column data corruption
Browse files Browse the repository at this point in the history
This commit addresses a data corruption bug in the binlog streaming
phase, where the binlog writer incorrectly propagates values in
fixed-length BINARY columns that have trailing 0s in the original value.
These trailing 0s are removed from the binlog by the SQL master and
therefore do not show up in the WHERE clause for update/delete
statements executed by the binlog writer.

This commit is manually cherry-picked from
#159.
  • Loading branch information
Clemens Kolbitsch authored and shuhaowu committed Sep 22, 2021
1 parent b69ce73 commit 49c4c8a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
43 changes: 39 additions & 4 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,14 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol

switch v := value.(type) {
case string:
return appendEscapedString(buffer, v)
var rightPadLengthForBinaryColumn int = 0
// see appendEscapedString() for details why we need special
// handling of BINARY column types
if column.Type == schema.TYPE_BINARY {
rightPadLengthForBinaryColumn = int(column.FixedSize)
}

return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn)
case []byte:
return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON)
case bool:
Expand All @@ -437,7 +444,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol
case float32:
return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64)
case decimal.Decimal:
return appendEscapedString(buffer, v.String())
return appendEscapedString(buffer, v.String(), 0)
default:
panic(fmt.Sprintf("unsupported type %t", value))
}
Expand Down Expand Up @@ -481,10 +488,25 @@ func Int64Value(value interface{}) (int64, bool) {
//
// ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038
// ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758
func appendEscapedString(buffer []byte, value string) []byte {
//
// We also need to support right-padding of the generated string using 0-bytes
// to mimic what a MySQL server would do for BINARY columns (with fixed length).
//
// ref: https://github.com/Shopify/ghostferry/pull/159
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {
buffer = append(buffer, '\'')

for i := 0; i < len(value); i++ {
var i int
for i = 0; i < len(value); i++ {
c := value[i]
if c == '\'' {
buffer = append(buffer, '\'', '\'')
Expand All @@ -493,9 +515,22 @@ func appendEscapedString(buffer []byte, value string) []byte {
}
}

// continue 0-padding up to the desired length as provided by the
// caller
if i < rightPadToLengthWithZeroBytes {
buffer = rightPadBufferWithZeroBytes(buffer, rightPadToLengthWithZeroBytes-i)
}

return append(buffer, '\'')
}

func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte {
for i := 0; i < padLength; i++ {
buffer = append(buffer, '\x00')
}
return buffer
}

func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte {
if isJSON {
// See https://bugs.mysql.com/bug.php?id=98496
Expand Down
50 changes: 50 additions & 0 deletions test/integration/types_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,56 @@ def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1
)
end

def test_decimal
# decimals are treated specially in binlog writing (they are inserted after
# conversion to string), so we add this test to make sure we don't corrupt
# data in the process of handling BINARY column (which requires a right pad
# for string data).
[source_db, target_db].each do |db|
db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data decimal, primary key(id))")
end

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 2)")

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

row_copy_called = false
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
# this hook follows the design in the helper method
# execute_copy_data_in_fixed_size_binary_column below. See detailed
# comments there
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal 2, row["data"]
end

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (3, 4)")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 5 WHERE id = 1")

row_copy_called = true
end

ghostferry.run

assert row_copy_called
assert_test_table_is_identical
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 2, res.count
res.each do |row|
if row["id"] == 1
assert_equal 5, row["data"]
else
assert_equal 3, row["id"]
assert_equal 4, row["data"]
end
end
end



private

def insert_json_on_source
Expand Down

0 comments on commit 49c4c8a

Please sign in to comment.