Skip to content

Commit

Permalink
[FLINK-34689][MySQL][Feature] check binlog_row_value_options (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SML0127 authored Apr 12, 2024
1 parent 1b0b9f8 commit af7665d
Showing 1 changed file with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class MySqlValidator implements Validator {

private static final String BINLOG_FORMAT_ROW = "ROW";
private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";
private static final String DEFAULT_BINLOG_ROW_VALUE_OPTIONS = "";

private final Properties dbzProperties;
private final MySqlSourceConfig sourceConfig;
Expand All @@ -70,6 +71,7 @@ public void validate() {
checkVersion(connection);
checkBinlogFormat(connection);
checkBinlogRowImage(connection);
checkBinlogRowValueOptions(connection);
checkTimeZone(connection);
} catch (SQLException ex) {
throw new TableException(
Expand Down Expand Up @@ -159,6 +161,30 @@ private void checkBinlogRowImage(JdbcConnection connection) throws SQLException
}
}

/** Check whether the binlog row value options is empty. */
private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLException {
String rowValueOptions =
connection
.queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'",
rs ->
rs.next()
? rs.getString(2)
: DEFAULT_BINLOG_ROW_VALUE_OPTIONS)
.trim()
.toUpperCase();
// This setting was introduced in MySQL 8.0+ with default of empty string ''
// For older versions, assume empty string ''
if (!DEFAULT_BINLOG_ROW_VALUE_OPTIONS.equals(rowValueOptions)) {
throw new ValidationException(
String.format(
"The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events "
+ "for the mysql cdc connector. Please remove the binlog_row_value_options setting in the MySQL server and rerun the job."
+ "See more details at https://dev.mysql.com/doc/refman/8.0/en/replication-features-json.html.",
rowValueOptions));
}
}

/** Check whether the server timezone matches the configured timezone. */
private void checkTimeZone(JdbcConnection connection) throws SQLException {
String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone");
Expand Down

0 comments on commit af7665d

Please sign in to comment.