From 0d797a6021ea2f3973f8dc74c0c741b695e85a12 Mon Sep 17 00:00:00 2001 From: Thorne <46524102+shiyiky@users.noreply.github.com> Date: Fri, 19 Apr 2024 15:49:32 +0800 Subject: [PATCH] [FLINK-34903][cdc][mysql] Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables (#3186) --- docs/content.zh/docs/connectors/mysql.md | 8 ++++ docs/content/docs/connectors/mysql.md | 8 ++++ .../mysql/factory/MySqlDataSourceFactory.java | 27 ++++++++--- .../mysql/source/MySqlDataSourceOptions.java | 12 +++++ .../source/MySqlDataSourceFactoryTest.java | 45 +++++++++++++++++++ 5 files changed, 95 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/connectors/mysql.md b/docs/content.zh/docs/connectors/mysql.md index 564dd3dbf2..3410cdea02 100644 --- a/docs/content.zh/docs/connectors/mysql.md +++ b/docs/content.zh/docs/connectors/mysql.md @@ -107,6 +107,14 @@ pipeline: 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* + + tables.exclude + optional + (none) + String + 需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
+ 用法和tables参数相同 + schema-change.enabled optional diff --git a/docs/content/docs/connectors/mysql.md b/docs/content/docs/connectors/mysql.md index c6ef38252e..879920fec7 100644 --- a/docs/content/docs/connectors/mysql.md +++ b/docs/content/docs/connectors/mysql.md @@ -109,6 +109,14 @@ pipeline: If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.
eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* + + tables.exclude + optional + (none) + String + Table name of the MySQL database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions.
+ The usage is the same as the tables parameter + schema-change.enabled optional diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 744041d25a..dc9972fe0f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -42,9 +42,11 @@ import java.time.Duration; import java.time.ZoneId; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; @@ -70,6 +72,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; @@ -93,6 +96,7 @@ public DataSource createDataSource(Context context) { String username = config.get(USERNAME); String password = config.get(PASSWORD); String tables = config.get(TABLES); + String tablesExclude = config.get(TABLES_EXCLUDE); String serverId = validateAndGetServerId(config); ZoneId serverTimeZone = getServerTimeZone(config); @@ -151,12 +155,25 @@ public DataSource createDataSource(Context context) { .jdbcProperties(getJdbcProperties(configMap)); Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); - String[] capturedTables = getTableList(configFactory.createConfig(0), selectors); - if (capturedTables.length == 0) { + List capturedTables = getTableList(configFactory.createConfig(0), selectors); + if (capturedTables.isEmpty()) { throw new IllegalArgumentException( "Cannot find any table by the option 'tables' = " + tables); } - configFactory.tableList(capturedTables); + if (tablesExclude != null) { + Selectors selectExclude = + new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); + List excludeTables = getTableList(configFactory.createConfig(0), selectExclude); + if (!excludeTables.isEmpty()) { + capturedTables.removeAll(excludeTables); + } + if (capturedTables.isEmpty()) { + throw new IllegalArgumentException( + "Cannot find any table with by the option 'tables.exclude' = " + + tablesExclude); + } + } + configFactory.tableList(capturedTables.toArray(new String[0])); return new MySqlDataSource(configFactory); } @@ -211,11 +228,11 @@ public String identifier() { private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; - private static String[] getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) { + private static List getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) { return MySqlSchemaUtils.listTables(sourceConfig, null).stream() .filter(selectors::isMatch) .map(TableId::toString) - .toArray(String[]::new); + .collect(Collectors.toList()); } private static StartupOptions getStartupOptions(Configuration config) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 7c7dcadb83..e852eb3d7a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -231,4 +231,16 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption TABLES_EXCLUDE = + ConfigOptions.key("tables.exclude") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the MySQL tables to Exclude. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 025415abb4..59dcdb6cfc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -32,6 +32,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; @@ -79,6 +80,50 @@ public void testNoMatchedTable() { .hasMessageContaining("Cannot find any table by the option 'tables' = " + tables); } + @Test + public void testExcludeTable() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*"); + String tableExclude = inventoryDatabase.getDatabaseName() + ".orders"; + options.put(TABLES_EXCLUDE.key(), tableExclude); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + assertThat(dataSource.getSourceConfig().getTableList()) + .isNotEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".orders")) + .isEqualTo( + Arrays.asList( + inventoryDatabase.getDatabaseName() + ".customers", + inventoryDatabase.getDatabaseName() + ".products")); + } + + @Test + public void testExcludeAllTable() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + String tableExclude = inventoryDatabase.getDatabaseName() + ".prod\\.*"; + options.put(TABLES_EXCLUDE.key(), tableExclude); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot find any table with by the option 'tables.exclude' = " + + tableExclude); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration;