Skip to content

Commit

Permalink
[FLINK-36558] Fix column metadata parsing compatibility with MySQL 8.…
Browse files Browse the repository at this point in the history
…0.17 / 8.0.18

Signed-off-by: yuxiqian <[email protected]>
  • Loading branch information
yuxiqian committed Oct 17, 2024
1 parent dd69756 commit 67e4101
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public enum ColumnType {
DATETIME_V2(18),
TIME_V2(19),
TYPED_ARRAY(20),
// TYPED_ARRAY enum value has been changed from 244 to 20 in MySQL 8.0.18. Since the JSON_ARRAY
// cast syntax was not added before MySQL 8.0.16, and the TYPED_ARRAY enum has been fixed in
// MySQL 8.0.18, so the only affected version is 8.0.17.
// https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855#diff-b9bac49e04a17ad0503e56a4c53d979c90eb64618387d20b9ea2cf1dbf47e5e7L25
TYPED_ARRAY_OLD(244),
JSON(245),
NEWDECIMAL(246),
ENUM(247),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.io.IOException;

import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY;
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY_OLD;

/**
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
*
* <p>Line 51 ~ 53: load column metadata bytes based on the length encoded before, instead of
* relying on readMetadata to parse it.
*
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>.
*
Expand All @@ -50,8 +54,11 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
eventData.setTable(inputStream.readZeroTerminatedString());
int numberOfColumns = inputStream.readPackedInteger();
eventData.setColumnTypes(inputStream.read(numberOfColumns));
inputStream.readPackedInteger(); // metadata length
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
int columnMetadataLength = inputStream.readPackedInteger(); // column metadata length
eventData.setColumnMetadata(
readMetadata(
new ByteArrayInputStream(inputStream.read(columnMetadataLength)),
eventData.getColumnTypes()));
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
int metadataLength = inputStream.available();
TableMapEventMetadata metadata = null;
Expand Down Expand Up @@ -93,7 +100,7 @@ private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
int[] metadata = new int[columnTypes.length];
for (int i = 0; i < columnTypes.length; i++) {
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF);
if (columnType == TYPED_ARRAY) {
if (columnType == TYPED_ARRAY || columnType == TYPED_ARRAY_OLD) {
byte[] arrayType = inputStream.read(1);
columnType = ColumnType.byCode(arrayType[0] & 0xFF);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.table;

import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

import static org.apache.flink.api.common.JobStatus.RUNNING;

/** Integration tests for MySQL Table source. */
@RunWith(Parameterized.class)
public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlJsonArrayAsKeyIndexITCase.class);

private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

@Parameterized.Parameters(name = "incrementalSnapshot: {0}")
public static Object[] parameters() {
// MySQL 8.0.17 brought the `CAST(JSON_EXTRACT AS ARRAY)` syntax firstly, and originates the
// "extra 0 byte" bug.
// MySQL 8.0.18 changed the TYPED_ARRAY internal enum value from 244 to 20, but didn't fix
// the bug.
// MySQL 8.0.19 fixed this issue (eventually).
return new Object[][] {
new Object[] {MySqlVersion.V8_0_17},
new Object[] {MySqlVersion.V8_0_18},
new Object[] {MySqlVersion.V8_0_19}
};
}

private final MySqlVersion version;
private final MySqlContainer container;

public MySqlJsonArrayAsKeyIndexITCase(MySqlVersion version) {
this.version = version;
this.container = createMySqlContainer(version, "docker/server-gtids/expire-seconds/my.cnf");
}

@Before
public void before() {
LOG.info("Starting MySQL {} containers...", version);
Startables.deepStart(Stream.of(container)).join();
LOG.info("Container MySQL {} is started.", version);
}

@After
public void after() {
LOG.info("Stopping MySQL {} containers...", version);
container.stop();
LOG.info("Container MySQL {} is stopped.", version);
}

@Test
public void testJsonArrayAsKeyIndex() {
UniqueDatabase jaakiDatabase =
new UniqueDatabase(container, "json_array_as_key", TEST_USER, TEST_PASSWORD);
jaakiDatabase.createAndInitialize();

String sourceDDL =
String.format(
"CREATE TABLE json_array_as_key (\n"
+ " id BIGINT NOT NULL,\n"
+ " PRIMARY KEY(id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'earliest-offset',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = 'true'"
+ ")",
container.getHost(),
container.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
jaakiDatabase.getDatabaseName(),
"json_array_as_key",
getServerId());
tEnv.executeSql(sourceDDL);

try (Connection connection = jaakiDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO json_array_as_key(id) VALUES (18),(19);");
statement.execute("DELETE FROM json_array_as_key WHERE id=19;");
} catch (Exception e) {
throw new RuntimeException(e);
}

// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM json_array_as_key");

try {
// wait for the source startup, we don't have a better way to wait it, use sleep for
// now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
} catch (Exception e) {
throw new RuntimeException(e);
}

CloseableIterator<Row> iterator = result.collect();

String[] expected =
new String[] {
// snapshot records
"+I[17]", "+I[18]", "+I[19]", "-D[19]",
};

assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));

try {
result.getJobClient().get().cancel().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
System.out.println("Gotta rows: " + rows);
}
return rows;
}

private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public enum MySqlVersion {
V5_5("5.5"),
V5_6("5.6"),
V5_7("5.7"),
V8_0_17("8.0.17"),
V8_0_18("8.0.18"),
V8_0_19("8.0.19"),
V8_0("8.0");

private String version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the `License`); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an `AS IS` BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- Create a table with complicated metadata to make sure the following
-- optional metadata parsing will not succeed unless the previous column
-- metadata has been correctly parsed.

CREATE TABLE `json_array_as_key` (
`id` bigint(20) unsigned NOT NULL,
`c2` bigint(20) NOT NULL DEFAULT '0',
`c3` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c4` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c5` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c6` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c7` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c8` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c9` int(11) DEFAULT '0',
`c10` int(11) DEFAULT '0',
`c11` int(11) NOT NULL DEFAULT '0',
`c12` int(11) DEFAULT '0',
`c13` json DEFAULT NULL,
`c14` json DEFAULT NULL,
`c15` json DEFAULT NULL,
`c16` json DEFAULT NULL,
`c17` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c18` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c19` tinyint(4) DEFAULT '0',
`c20` tinyint(4) NOT NULL DEFAULT '0',
`c21` tinyint(4) NOT NULL DEFAULT '0',
`c22` tinyint(4) NOT NULL DEFAULT '0',
`c23` tinyint(4) DEFAULT '0',
`c24` tinyint(3) unsigned NOT NULL DEFAULT '1',
`c25` int(10) unsigned NOT NULL DEFAULT '0',
`c26` int(10) unsigned NOT NULL DEFAULT '0',
`c27` int(10) unsigned NOT NULL DEFAULT '0',
`c28` int(10) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
KEY `k6` ((cast(json_extract(`c13`,_utf8mb4'$[*]') as CHAR(32) ARRAY)))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

INSERT INTO json_array_as_key(ID) VALUES (17);

0 comments on commit 67e4101

Please sign in to comment.