Skip to content

Commit

Permalink
Merge pull request #308 from Shopify/binary-column-fix
Browse files Browse the repository at this point in the history
Fixes Ghostferry for BINARY columns
  • Loading branch information
shuhaowu authored Sep 22, 2021
2 parents 8e83826 + 49c4c8a commit 8f5e083
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 4 deletions.
43 changes: 39 additions & 4 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,14 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol

switch v := value.(type) {
case string:
return appendEscapedString(buffer, v)
var rightPadLengthForBinaryColumn int = 0
// see appendEscapedString() for details why we need special
// handling of BINARY column types
if column.Type == schema.TYPE_BINARY {
rightPadLengthForBinaryColumn = int(column.FixedSize)
}

return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn)
case []byte:
return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON)
case bool:
Expand All @@ -437,7 +444,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol
case float32:
return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64)
case decimal.Decimal:
return appendEscapedString(buffer, v.String())
return appendEscapedString(buffer, v.String(), 0)
default:
panic(fmt.Sprintf("unsupported type %t", value))
}
Expand Down Expand Up @@ -481,10 +488,25 @@ func Int64Value(value interface{}) (int64, bool) {
//
// ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038
// ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758
func appendEscapedString(buffer []byte, value string) []byte {
//
// We also need to support right-padding of the generated string using 0-bytes
// to mimic what a MySQL server would do for BINARY columns (with fixed length).
//
// ref: https://github.com/Shopify/ghostferry/pull/159
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {
buffer = append(buffer, '\'')

for i := 0; i < len(value); i++ {
var i int
for i = 0; i < len(value); i++ {
c := value[i]
if c == '\'' {
buffer = append(buffer, '\'', '\'')
Expand All @@ -493,9 +515,22 @@ func appendEscapedString(buffer []byte, value string) []byte {
}
}

// continue 0-padding up to the desired length as provided by the
// caller
if i < rightPadToLengthWithZeroBytes {
buffer = rightPadBufferWithZeroBytes(buffer, rightPadToLengthWithZeroBytes-i)
}

return append(buffer, '\'')
}

func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte {
for i := 0; i < padLength; i++ {
buffer = append(buffer, '\x00')
}
return buffer
}

func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte {
if isJSON {
// See https://bugs.mysql.com/bug.php?id=98496
Expand Down
154 changes: 154 additions & 0 deletions test/integration/types_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,103 @@ def test_unsigned_bigint_pk_with_inline_verifier
end
end

def test_copy_data_in_fixed_size_binary_column
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
#
# We explicitly test with a value that is shorter than the max column
# size - MySQL will 0-pad the value up the full length of the BINARY column,
# but the MySQL replication binlogs will *not* contain these 0-bytes.
#
# As a result, the binlog writer must explicitly add then when building
# update/delete statements, as the WHERE clause would not match existing
# rows in the target DB
inserted_data = "ABC"
execute_copy_data_in_fixed_size_binary_column(
column_size: 4,
inserted_data: inserted_data,
expected_inserted_data: "#{inserted_data}\x00",
updated_data: "EFGH"
)
end

def test_copy_data_in_fixed_size_binary_column__value_completely_filled
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
#
# NOTE: This test is interesting (beyond what is covered above already),
# because it seems the server strips the trailing 0-bytes before sending
# them to the binlog even when the trailing 0-bytes are inserted by the user.
inserted_data = "ABC\x00"
execute_copy_data_in_fixed_size_binary_column(
column_size: 4,
inserted_data: inserted_data,
expected_inserted_data: inserted_data,
updated_data: "EFGH"
)
end

def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
#
# slight variation to cover the corner-case where there is no data in the
# column at all and the entire value is 0-padded (here, only 1 byte)
execute_copy_data_in_fixed_size_binary_column(
column_size: 1,
inserted_data: "",
expected_inserted_data: "\x00",
updated_data: "A"
)
end

def test_decimal
# decimals are treated specially in binlog writing (they are inserted after
# conversion to string), so we add this test to make sure we don't corrupt
# data in the process of handling BINARY column (which requires a right pad
# for string data).
[source_db, target_db].each do |db|
db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data decimal, primary key(id))")
end

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 2)")

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

row_copy_called = false
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
# this hook follows the design in the helper method
# execute_copy_data_in_fixed_size_binary_column below. See detailed
# comments there
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal 2, row["data"]
end

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (3, 4)")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 5 WHERE id = 1")

row_copy_called = true
end

ghostferry.run

assert row_copy_called
assert_test_table_is_identical
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 2, res.count
res.each do |row|
if row["id"] == 1
assert_equal 5, row["data"]
else
assert_equal 3, row["id"]
assert_equal 4, row["data"]
end
end
end



private

def insert_json_on_source
Expand All @@ -310,4 +407,61 @@ def insert_json_on_source
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')")
end

def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:)
# test for the BINARY columns needing 0-byte padding
#
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
[source_db, target_db].each do |db|
db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data BINARY(#{column_size}), primary key(id))")
end

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, _binary'#{inserted_data}')")

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

row_copy_called = false
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
# select row from the target and then make sure the data with 0 padding
# is present. We do this to make sure there are no races in the test
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal expected_inserted_data, row["data"]
end

# now that the target is guaranteed to be in the same state as the
# source, trigger an update that will cause the binlog to stream an
# entry that needs the 0-byte padding
#
# NOTE: If we use BINLOG_STREAMING_STARTED as hook instead, we race
# with getting the update into the batch-copy instead of into the
# streaming
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = _binary'#{updated_data}' WHERE id = 1")

# NOTE: We move this flag to the end of the callback to make sure that
# we don't confuse ourselves if the callback crashes before completing
row_copy_called = true
end

ghostferry.run

# make sure the test framework called the expected hooks above - otherwise
# the test doesn't make much sense
assert row_copy_called
assert_test_table_is_identical

# just being paranoid here: make sure the test outcome is as expected. It
# should be, since we made sure the tables have the same checksums, but it
# helps understand what the test code does
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal updated_data, row["data"]
end

end
end

0 comments on commit 8f5e083

Please sign in to comment.