From 6059678aefa70571481202b3ba1bc2c22afa80bd Mon Sep 17 00:00:00 2001 From: Cameron Morgan Date: Wed, 31 Jul 2024 09:46:16 -0700 Subject: [PATCH 1/3] cast json to string --- dml_events.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/dml_events.go b/dml_events.go index a87b1c13..14410ba5 100644 --- a/dml_events.go +++ b/dml_events.go @@ -42,14 +42,14 @@ type RowData []interface{} // https://github.com/Shopify/ghostferry/issues/165. // // In summary: -// - This code receives values from both go-sql-driver/mysql and -// go-mysql-org/go-mysql. -// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte -// slice for unsigned integer. -// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for -// unsigned integer. -// - We currently make this function deal with both cases. In the future we can -// investigate alternative solutions. +// - This code receives values from both go-sql-driver/mysql and +// go-mysql-org/go-mysql. +// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte +// slice for unsigned integer. +// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for +// unsigned integer. +// - We currently make this function deal with both cases. In the future we can +// investigate alternative solutions. func (r RowData) GetUint64(colIdx int) (uint64, error) { u64, ok := Uint64Value(r[colIdx]) if ok { @@ -292,6 +292,12 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re ) } for i, col := range table.Columns { + if col.Type == schema.TYPE_JSON { + bytes, ok := row[i].([]uint8) + if ok { + row[i] = string(bytes) + } + } if col.IsUnsigned { switch v := row[i].(type) { case int64: @@ -501,10 +507,10 @@ func Int64Value(value interface{}) (int64, bool) { // // This is specifically mentioned in the the below link: // -// When BINARY values are stored, they are right-padded with the pad value -// to the specified length. The pad value is 0x00 (the zero byte). Values -// are right-padded with 0x00 for inserts, and no trailing bytes are removed -// for retrievals. +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. // // ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { From b5bdc2d2d9dbc88c89f0abf4aab65a46a84b49dc Mon Sep 17 00:00:00 2001 From: Cameron Morgan Date: Wed, 31 Jul 2024 09:49:40 -0700 Subject: [PATCH 2/3] integration test --- test/integration/types_test.rb | 44 +++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index 0b908e8d..40b4589b 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -1,13 +1,14 @@ require "test_helper" class TypesTest < GhostferryTestCase - JSON_OBJ = '{"data": {"quote": "\\\'", "value": [1]}}' + JSON_OBJ = '{"data": {"float": 32.0, "quote": "\\\'", "value": [1]}}' EMPTY_JSON = '{}' JSON_ARRAY = '[\"test_data\", \"test_data_2\"]' JSON_NULL = 'null' JSON_TRUE = 'true' JSON_FALSE = 'false' JSON_NUMBER = '42' + JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART = '32.0' def test_json_colum_not_null_with_no_default_is_invalid_this_is_fine # See: https://bugs.mysql.com/bug.php?id=98496 @@ -103,10 +104,10 @@ def test_json_data_insert # with a JSON column is broken on 5.7. # See: https://bugs.mysql.com/bug.php?id=87847 res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - assert_equal 16, res.first["cnt"] + assert_equal 18, res.first["cnt"] expected = [ - {"id"=>1, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, + {"id"=>1, "data"=>"{\"data\": {\"float\": 32.0, \"quote\": \"'\", \"value\": [1]}}"}, {"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"}, {"id"=>3, "data"=>"{}"}, {"id"=>4, "data"=>nil}, @@ -114,15 +115,17 @@ def test_json_data_insert {"id"=>6, "data"=>"true"}, {"id"=>7, "data"=>"false"}, {"id"=>8, "data"=>"42"}, - - {"id"=>9, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, - {"id"=>10, "data"=>"[\"test_data\", \"test_data_2\"]"}, - {"id"=>11, "data"=>"{}"}, - {"id"=>12, "data"=>nil}, - {"id"=>13, "data"=>"null"}, - {"id"=>14, "data"=>"true"}, - {"id"=>15, "data"=>"false"}, - {"id"=>16, "data"=>"42"}, + {"id"=>9, "data"=>"32.0"}, + + {"id"=>10, "data"=>"{\"data\": {\"float\": 32.0, \"quote\": \"'\", \"value\": [1]}}"}, + {"id"=>11, "data"=>"[\"test_data\", \"test_data_2\"]"}, + {"id"=>12, "data"=>"{}"}, + {"id"=>13, "data"=>nil}, + {"id"=>14, "data"=>"null"}, + {"id"=>15, "data"=>"true"}, + {"id"=>16, "data"=>"false"}, + {"id"=>17, "data"=>"42"}, + {"id"=>18, "data"=>"32.0"}, ] res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC") @@ -194,15 +197,16 @@ def test_json_data_update loop do sleep 0.1 res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - if res.first["cnt"] == 8 + if res.first["cnt"] == 9 source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{EMPTY_JSON}' WHERE id = 1") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_ARRAY}' WHERE id = 2") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = NULL WHERE id = 3") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ}' WHERE id = 4") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_TRUE}' WHERE id = 5") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FALSE}' WHERE id = 6") - source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 7") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}' WHERE id = 7") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NULL}' WHERE id = 8") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 9") break end @@ -217,17 +221,18 @@ def test_json_data_update refute timedout, "failed due to time out while waiting for the 4 insert binlogs to be written to the target" res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - assert_equal 8, res.first["cnt"] + assert_equal 9, res.first["cnt"] expected = [ {"id"=>1, "data"=>"{}"}, {"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"}, {"id"=>3, "data"=>nil}, - {"id"=>4, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, + {"id"=>4, "data"=>"{\"data\": {\"float\": 32.0, \"quote\": \"'\", \"value\": [1]}}"}, {"id"=>5, "data"=>"true"}, {"id"=>6, "data"=>"false"}, - {"id"=>7, "data"=>"42"}, + {"id"=>7, "data"=>"32.0"}, {"id"=>8, "data"=>"null"}, + {"id"=>9, "data"=>"42"}, ] res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC") @@ -319,7 +324,7 @@ def test_copy_data_in_fixed_size_binary_column def test_copy_data_in_fixed_size_binary_column__value_completely_filled # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 - # + # # NOTE: This test is interesting (beyond what is covered above already), # because it seems the server strips the trailing 0-bytes before sending # them to the binlog even when the trailing 0-bytes are inserted by the user. @@ -334,7 +339,7 @@ def test_copy_data_in_fixed_size_binary_column__value_completely_filled def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 - # + # # slight variation to cover the corner-case where there is no data in the # column at all and the entire value is 0-padded (here, only 1 byte) execute_copy_data_in_fixed_size_binary_column( @@ -406,6 +411,7 @@ def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_TRUE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}')") end def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:) From 8e44c230c5b588f62dcec417004d965007495d64 Mon Sep 17 00:00:00 2001 From: Cameron Morgan Date: Wed, 31 Jul 2024 10:03:58 -0700 Subject: [PATCH 3/3] more --- dml_events.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/dml_events.go b/dml_events.go index 14410ba5..9145e1ef 100644 --- a/dml_events.go +++ b/dml_events.go @@ -543,16 +543,6 @@ func rightPadBufferWithZeroBytes(buffer []byte, padLength int) []byte { } func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte { - if isJSON { - // See https://bugs.mysql.com/bug.php?id=98496 - if len(value) == 0 { - value = []byte("null") - } - - buffer = append(buffer, "CAST("...) - } else { - buffer = append(buffer, "_binary"...) - } buffer = append(buffer, '\'') @@ -567,10 +557,6 @@ func appendEscapedBuffer(buffer, value []byte, isJSON bool) []byte { buffer = append(buffer, '\'') - if isJSON { - buffer = append(buffer, " AS JSON)"...) - } - return buffer }