diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java index 514b11b969..f97ef8ea7b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java @@ -50,6 +50,7 @@ public class MySqlValidator implements Validator { private static final String BINLOG_FORMAT_ROW = "ROW"; private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL"; + private static final String DEFAULT_BINLOG_ROW_VALUE_OPTIONS = ""; private final Properties dbzProperties; private final MySqlSourceConfig sourceConfig; @@ -70,6 +71,7 @@ public void validate() { checkVersion(connection); checkBinlogFormat(connection); checkBinlogRowImage(connection); + checkBinlogRowValueOptions(connection); checkTimeZone(connection); } catch (SQLException ex) { throw new TableException( @@ -159,6 +161,30 @@ private void checkBinlogRowImage(JdbcConnection connection) throws SQLException } } + /** Check whether the binlog row value options is empty. */ + private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLException { + String rowValueOptions = + connection + .queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'", + rs -> + rs.next() + ? rs.getString(2) + : DEFAULT_BINLOG_ROW_VALUE_OPTIONS) + .trim() + .toUpperCase(); + // This setting was introduced in MySQL 8.0+ with default of empty string '' + // For older versions, assume empty string '' + if (!DEFAULT_BINLOG_ROW_VALUE_OPTIONS.equals(rowValueOptions)) { + throw new ValidationException( + String.format( + "The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events " + + "for the mysql cdc connector. Please remove the binlog_row_value_options setting in the MySQL server and rerun the job." + + "See more details at https://dev.mysql.com/doc/refman/8.0/en/replication-features-json.html.", + rowValueOptions)); + } + } + /** Check whether the server timezone matches the configured timezone. */ private void checkTimeZone(JdbcConnection connection) throws SQLException { String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone");