Skip to content

Commit

Permalink
[FLINK-34903][cdc][mysql] Add mysql-pipeline-connector with tables.ex…
Browse files Browse the repository at this point in the history
…clude option to exclude unnecessary tables (apache#3186)
  • Loading branch information
Thorne-coder authored Apr 19, 2024
1 parent d4ed7db commit 0d797a6
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 5 deletions.
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ pipeline:
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
</tr>
<tr>
<td>tables.exclude</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。<br>
用法和tables参数相同</td>
</tr>
<tr>
<td>schema-change.enabled</td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ pipeline:
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
</tr>
<tr>
<td>tables.exclude</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the MySQL database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions. <br>
The usage is the same as the tables parameter</td>
</tr>
<tr>
<td>schema-change.enabled</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
Expand All @@ -70,6 +72,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
Expand All @@ -93,6 +96,7 @@ public DataSource createDataSource(Context context) {
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String tables = config.get(TABLES);
String tablesExclude = config.get(TABLES_EXCLUDE);

String serverId = validateAndGetServerId(config);
ZoneId serverTimeZone = getServerTimeZone(config);
Expand Down Expand Up @@ -151,12 +155,25 @@ public DataSource createDataSource(Context context) {
.jdbcProperties(getJdbcProperties(configMap));

Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
String[] capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.length == 0) {
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
configFactory.tableList(capturedTables);
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables = getTableList(configFactory.createConfig(0), selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
}
}
configFactory.tableList(capturedTables.toArray(new String[0]));

return new MySqlDataSource(configFactory);
}
Expand Down Expand Up @@ -211,11 +228,11 @@ public String identifier() {
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";

private static String[] getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
private static List<String> getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
.filter(selectors::isMatch)
.map(TableId::toString)
.toArray(String[]::new);
.collect(Collectors.toList());
}

private static StartupOptions getStartupOptions(Configuration config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,16 @@ public class MySqlDataSourceOptions {
.defaultValue(true)
.withDescription(
"Whether send schema change events, by default is true. If set to false, the schema changes will not be sent.");

@Experimental
public static final ConfigOption<String> TABLES_EXCLUDE =
ConfigOptions.key("tables.exclude")
.stringType()
.noDefaultValue()
.withDescription(
"Table names of the MySQL tables to Exclude. Regular expressions are supported. "
+ "It is important to note that the dot (.) is treated as a delimiter for database and table names. "
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
Expand Down Expand Up @@ -79,6 +80,50 @@ public void testNoMatchedTable() {
.hasMessageContaining("Cannot find any table by the option 'tables' = " + tables);
}

@Test
public void testExcludeTable() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
String tableExclude = inventoryDatabase.getDatabaseName() + ".orders";
options.put(TABLES_EXCLUDE.key(), tableExclude);
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().getTableList())
.isNotEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".orders"))
.isEqualTo(
Arrays.asList(
inventoryDatabase.getDatabaseName() + ".customers",
inventoryDatabase.getDatabaseName() + ".products"));
}

@Test
public void testExcludeAllTable() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
String tableExclude = inventoryDatabase.getDatabaseName() + ".prod\\.*";
options.put(TABLES_EXCLUDE.key(), tableExclude);
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
assertThatThrownBy(() -> factory.createDataSource(context))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Cannot find any table with by the option 'tables.exclude' = "
+ tableExclude);
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Expand Down

0 comments on commit 0d797a6

Please sign in to comment.