diff --git a/docs/content.zh/docs/connectors/mysql.md b/docs/content.zh/docs/connectors/mysql.md
index 564dd3dbf2..3410cdea02 100644
--- a/docs/content.zh/docs/connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/mysql.md
@@ -107,6 +107,14 @@ pipeline:
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
+
+ tables.exclude |
+ optional |
+ (none) |
+ String |
+ 需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
+ 用法和tables参数相同 |
+
schema-change.enabled |
optional |
diff --git a/docs/content/docs/connectors/mysql.md b/docs/content/docs/connectors/mysql.md
index c6ef38252e..879920fec7 100644
--- a/docs/content/docs/connectors/mysql.md
+++ b/docs/content/docs/connectors/mysql.md
@@ -109,6 +109,14 @@ pipeline:
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.
eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
+
+ tables.exclude |
+ optional |
+ (none) |
+ String |
+ Table name of the MySQL database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions.
+ The usage is the same as the tables parameter |
+
schema-change.enabled |
optional |
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 744041d25a..dc9972fe0f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -42,9 +42,11 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -70,6 +72,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -93,6 +96,7 @@ public DataSource createDataSource(Context context) {
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String tables = config.get(TABLES);
+ String tablesExclude = config.get(TABLES_EXCLUDE);
String serverId = validateAndGetServerId(config);
ZoneId serverTimeZone = getServerTimeZone(config);
@@ -151,12 +155,25 @@ public DataSource createDataSource(Context context) {
.jdbcProperties(getJdbcProperties(configMap));
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
- String[] capturedTables = getTableList(configFactory.createConfig(0), selectors);
- if (capturedTables.length == 0) {
+ List capturedTables = getTableList(configFactory.createConfig(0), selectors);
+ if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
- configFactory.tableList(capturedTables);
+ if (tablesExclude != null) {
+ Selectors selectExclude =
+ new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
+ List excludeTables = getTableList(configFactory.createConfig(0), selectExclude);
+ if (!excludeTables.isEmpty()) {
+ capturedTables.removeAll(excludeTables);
+ }
+ if (capturedTables.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Cannot find any table with by the option 'tables.exclude' = "
+ + tablesExclude);
+ }
+ }
+ configFactory.tableList(capturedTables.toArray(new String[0]));
return new MySqlDataSource(configFactory);
}
@@ -211,11 +228,11 @@ public String identifier() {
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
- private static String[] getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
+ private static List getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
.filter(selectors::isMatch)
.map(TableId::toString)
- .toArray(String[]::new);
+ .collect(Collectors.toList());
}
private static StartupOptions getStartupOptions(Configuration config) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 7c7dcadb83..e852eb3d7a 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -231,4 +231,16 @@ public class MySqlDataSourceOptions {
.defaultValue(true)
.withDescription(
"Whether send schema change events, by default is true. If set to false, the schema changes will not be sent.");
+
+ @Experimental
+ public static final ConfigOption TABLES_EXCLUDE =
+ ConfigOptions.key("tables.exclude")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Table names of the MySQL tables to Exclude. Regular expressions are supported. "
+ + "It is important to note that the dot (.) is treated as a delimiter for database and table names. "
+ + "If there is a need to use a dot (.) in a regular expression to match any character, "
+ + "it is necessary to escape the dot with a backslash."
+ + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index 025415abb4..59dcdb6cfc 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -32,6 +32,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
@@ -79,6 +80,50 @@ public void testNoMatchedTable() {
.hasMessageContaining("Cannot find any table by the option 'tables' = " + tables);
}
+ @Test
+ public void testExcludeTable() {
+ inventoryDatabase.createAndInitialize();
+ Map options = new HashMap<>();
+ options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+ options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
+ String tableExclude = inventoryDatabase.getDatabaseName() + ".orders";
+ options.put(TABLES_EXCLUDE.key(), tableExclude);
+ Factory.Context context = new MockContext(Configuration.fromMap(options));
+
+ MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+ MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
+ assertThat(dataSource.getSourceConfig().getTableList())
+ .isNotEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".orders"))
+ .isEqualTo(
+ Arrays.asList(
+ inventoryDatabase.getDatabaseName() + ".customers",
+ inventoryDatabase.getDatabaseName() + ".products"));
+ }
+
+ @Test
+ public void testExcludeAllTable() {
+ inventoryDatabase.createAndInitialize();
+ Map options = new HashMap<>();
+ options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+ options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
+ String tableExclude = inventoryDatabase.getDatabaseName() + ".prod\\.*";
+ options.put(TABLES_EXCLUDE.key(), tableExclude);
+ Factory.Context context = new MockContext(Configuration.fromMap(options));
+
+ MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+ assertThatThrownBy(() -> factory.createDataSource(context))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Cannot find any table with by the option 'tables.exclude' = "
+ + tableExclude);
+ }
+
class MockContext implements Factory.Context {
Configuration factoryConfiguration;