From fd49f30398a5ddc7b3ed6341b856df7de8205647 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Sat, 24 Aug 2024 15:49:04 +0800 Subject: [PATCH] mysql cdc support exclude table. --- .../mysql/factory/MySqlDataSourceFactory.java | 19 +-- .../reader/MySqlPipelineRecordEmitter.java | 4 +- .../MysqlPipelineNewlyAddedTableITCase.java | 38 ++++++ .../mysql/debezium/DebeziumUtils.java | 4 +- .../debezium/reader/BinlogSplitReader.java | 12 +- .../connectors/mysql/schema/Selectors.java | 119 ++++++++++++++++++ .../assigners/MySqlSnapshotSplitAssigner.java | 6 +- .../source/config/MySqlSourceConfig.java | 23 ++++ .../config/MySqlSourceConfigFactory.java | 7 ++ .../source/reader/MySqlSourceReader.java | 5 +- .../source/utils/TableDiscoveryUtils.java | 19 ++- 11 files changed, 224 insertions(+), 32 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 5f538eeb1c..64095bfa55 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -28,7 +28,6 @@ import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions; -import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -43,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.time.Duration; import java.time.ZoneId; import java.util.HashMap; @@ -171,12 +172,15 @@ public DataSource createDataSource(Context context) { .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); if (scanBinlogNewlyAddedTableEnabled) { String newTables = validateTableAndReturnDebeziumStyle(tables); configFactory.tableList(newTables); + configFactory.excludeTableList(tablesExclude); + } else { Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); - List capturedTables = getTableList(configFactory.createConfig(0), selectors); + List capturedTables = getTableList(tableIds, selectors); if (capturedTables.isEmpty()) { throw new IllegalArgumentException( "Cannot find any table by the option 'tables' = " + tables); @@ -184,8 +188,7 @@ public DataSource createDataSource(Context context) { if (tablesExclude != null) { Selectors selectExclude = new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); - List excludeTables = - getTableList(configFactory.createConfig(0), selectExclude); + List excludeTables = getTableList(tableIds, selectExclude); if (!excludeTables.isEmpty()) { capturedTables.removeAll(excludeTables); } @@ -201,8 +204,7 @@ public DataSource createDataSource(Context context) { String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); if (chunkKeyColumns != null) { Map chunkKeyColumnMap = new HashMap<>(); - List tableIds = - MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); + for (String chunkKeyColumn : chunkKeyColumns.split(";")) { String[] splits = chunkKeyColumn.split(":"); if (splits.length == 2) { @@ -284,8 +286,9 @@ public String identifier() { private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; - private static List getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) { - return MySqlSchemaUtils.listTables(sourceConfig, null).stream() + private static List getTableList( + @Nullable List tableIdList, Selectors selectors) { + return tableIdList.stream() .filter(selectors::isMatch) .map(TableId::toString) .collect(Collectors.toList()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 909ed6c5b1..4f801e0fa1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -237,7 +237,9 @@ private synchronized MySqlAntlrDdlParser getParser() { private List generateCreateTableEvent(MySqlSourceConfig sourceConfig) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { List createTableEventCache = new ArrayList<>(); - List capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters()); + List capturedTableIds = + listTables( + jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter()); for (TableId tableId : capturedTableIds) { Schema schema = getSchema(jdbc, tableId); createTableEventCache.add( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java index 4cc1e952ab..8c9545e23f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -60,6 +60,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.testcontainers.shaded.com.google.common.collect.Lists; import java.nio.file.Files; import java.nio.file.Path; @@ -91,6 +92,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; @@ -187,6 +189,42 @@ public void testScanBinlogNewlyAddedTableEnabled() throws Exception { .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing")); } + @Test + public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws Exception { + List tables = Collections.singletonList("address_\\.*"); + Map options = new HashMap<>(); + options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() + ".address_beijing"); + options.put(SCAN_STARTUP_MODE.key(), "timestamp"); + options.put( + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); + + FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + env.enableCheckpointing(200); + DataStreamSource source = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()); + + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + CheckpointedCollectResultBuffer resultBuffer = + new CheckpointedCollectResultBuffer<>(serializer); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectResultIterator iterator = + addCollector(env, source, resultBuffer, serializer, accumulatorName); + env.executeAsync("AddNewlyTablesWhenReadingBinlog"); + initialAddressTables( + getConnection(), Lists.newArrayList("address_beijing", "address_shanghai")); + List actual = fetchResults(iterator, 4); + assertThat(((ChangeEvent) actual.get(0)).tableId()) + .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_shanghai")); + } + @Test public void testAddNewTableOneByOneSingleParallelism() throws Exception { TestParam testParam = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index ac3b20c4fe..7ca60be5bf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -193,7 +193,9 @@ public static List discoverCapturedTables( final List capturedTableIds; try { - capturedTableIds = TableDiscoveryUtils.listTables(jdbc, sourceConfig.getTableFilters()); + capturedTableIds = + TableDiscoveryUtils.listTables( + jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter()); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to discover captured tables", e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index e1ef3895f8..31173469b4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -40,7 +40,6 @@ import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; import io.debezium.relational.TableId; -import io.debezium.relational.Tables; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -81,7 +80,7 @@ public class BinlogSplitReader implements DebeziumReader the max splitHighWatermark private Map maxSplitHighWatermarkMap; private final Set pureBinlogPhaseTables; - private Tables.TableFilter capturedTableFilter; + private Predicate capturedTableFilter; private final StoppableChangeEventSourceContext changeEventSourceContext = new StoppableChangeEventSourceContext(); @@ -100,8 +99,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { this.currentBinlogSplit = mySqlSplit.asBinlogSplit(); configureFilter(); statefulTaskContext.configure(currentBinlogSplit); - this.capturedTableFilter = - statefulTaskContext.getConnectorConfig().getTableFilters().dataCollectionFilter(); + this.capturedTableFilter = statefulTaskContext.getSourceConfig().getTableFilter(); this.queue = statefulTaskContext.getQueue(); this.binlogSplitReadTask = new MySqlBinlogSplitReadTask( @@ -247,7 +245,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) { } else if (RecordUtils.isSchemaChangeEvent(sourceRecord)) { if (RecordUtils.isTableChangeRecord(sourceRecord)) { TableId tableId = RecordUtils.getTableId(sourceRecord); - return capturedTableFilter.isIncluded(tableId); + return capturedTableFilter.test(tableId); } else { // Not related to changes in table structure, like `CREATE/DROP DATABASE`, skip it return false; @@ -270,7 +268,7 @@ private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) if (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) { // the new added sharding table without history records return !maxSplitHighWatermarkMap.containsKey(tableId) - && capturedTableFilter.isIncluded(tableId); + && capturedTableFilter.test(tableId); } return false; } @@ -280,7 +278,7 @@ private void configureFilter() { currentBinlogSplit.getFinishedSnapshotSplitInfos(); Map> splitsInfoMap = new HashMap<>(); Map tableIdBinlogPositionMap = new HashMap<>(); - // specific offset mode + // startup mode which is stream only if (finishedSplitInfos.isEmpty()) { for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) { tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java new file mode 100644 index 0000000000..5561dd3a1a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.schema; + +import org.apache.flink.cdc.common.utils.Predicates; + +import io.debezium.relational.TableId; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +/** Selectors for filtering tables. */ +public class Selectors { + + private List selectors; + + private Selectors() {} + + /** + * A {@link Selector} that determines whether a table identified by a given {@link TableId} is + * to be included. + */ + private static class Selector { + private final Predicate namespacePred; + private final Predicate tableNamePred; + + public Selector(String namespace, String schemaName, String tableName) { + this.namespacePred = + namespace == null ? (namespacePred) -> false : Predicates.includes(namespace); + this.tableNamePred = + tableName == null ? (tableNamePred) -> false : Predicates.includes(tableName); + } + + public boolean isMatch(TableId tableId) { + + String namespace = tableId.catalog(); + + if (namespace == null || namespace.isEmpty()) { + return tableNamePred.test(tableId.table()); + } + return namespacePred.test(tableId.catalog()) && tableNamePred.test(tableId.table()); + } + } + + /** Match the {@link TableId} against the {@link Selector}s. * */ + public boolean isMatch(TableId tableId) { + for (Selector selector : selectors) { + if (selector.isMatch(tableId)) { + return true; + } + } + return false; + } + + /** Builder for {@link Selectors}. */ + public static class SelectorsBuilder { + + private List selectors; + + /** + * Current {@link TableId} used in mysql cdc connector will map database name to catalog. + * + * @param tableInclusions + * @return + */ + public SelectorsBuilder includeTables(String tableInclusions) { + + if (tableInclusions == null || tableInclusions.isEmpty()) { + throw new IllegalArgumentException( + "Invalid table inclusion pattern cannot be null or empty"); + } + + List selectors = new ArrayList<>(); + Set tableSplitSet = + Predicates.setOf( + tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str); + for (String tableSplit : tableSplitSet) { + List tableIdList = + Predicates.listOf( + tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str); + Iterator iterator = tableIdList.iterator(); + if (tableIdList.size() == 1) { + selectors.add(new Selector(null, null, iterator.next())); + } else if (tableIdList.size() == 2) { + selectors.add(new Selector(iterator.next(), null, iterator.next())); + } else { + throw new IllegalArgumentException( + "Invalid table inclusion pattern: " + tableInclusions); + } + } + this.selectors = selectors; + return this; + } + + public Selectors build() { + Selectors selectors = new Selectors(); + selectors.selectors = this.selectors; + return selectors; + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 9ea69b11ad..89985ae2f5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -601,11 +601,7 @@ private static MySqlChunkSplitter createChunkSplitter( return new MySqlChunkSplitter( mySqlSchema, sourceConfig, - tableId != null - && sourceConfig - .getTableFilters() - .dataCollectionFilter() - .isIncluded(tableId) + tableId != null && sourceConfig.getTableFilter().test(tableId) ? chunkSplitterState : ChunkSplitterState.NO_SPLITTING_TABLE_STATE); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 2e19156e52..dd0ac78966 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.config; +import org.apache.flink.cdc.connectors.mysql.schema.Selectors; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.catalog.ObjectPath; @@ -24,6 +25,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; import javax.annotation.Nullable; @@ -32,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Predicate; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -45,6 +48,7 @@ public class MySqlSourceConfig implements Serializable { private final String password; private final List databaseList; private final List tableList; + private final String excludeTableList; @Nullable private final ServerIdRange serverIdRange; private final StartupOptions startupOptions; private final int splitSize; @@ -77,6 +81,7 @@ public class MySqlSourceConfig implements Serializable { String password, List databaseList, List tableList, + @Nullable String excludeTableList, @Nullable ServerIdRange serverIdRange, StartupOptions startupOptions, int splitSize, @@ -101,6 +106,7 @@ public class MySqlSourceConfig implements Serializable { this.password = password; this.databaseList = checkNotNull(databaseList); this.tableList = checkNotNull(tableList); + this.excludeTableList = excludeTableList; this.serverIdRange = serverIdRange; this.startupOptions = checkNotNull(startupOptions); this.splitSize = splitSize; @@ -216,10 +222,27 @@ public MySqlConnectorConfig getMySqlConnectorConfig() { return dbzMySqlConfig; } + @Deprecated public RelationalTableFilters getTableFilters() { return dbzMySqlConfig.getTableFilters(); } + public Predicate getDatabaseFilter() { + RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters(); + return (String databaseName) -> tableFilters.databaseFilter().test(databaseName); + } + + public Predicate getTableFilter() { + RelationalTableFilters tableFilters = dbzMySqlConfig.getTableFilters(); + Selectors excludeTableFilter = + (excludeTableList == null + ? null + : new Selectors.SelectorsBuilder().includeTables(excludeTableList).build()); + return (TableId tableId) -> + tableFilters.dataCollectionFilter().isIncluded(tableId) + && (excludeTableFilter == null || !excludeTableFilter.isMatch(tableId)); + } + public Properties getJdbcProperties() { return jdbcProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index c994c82410..8b65055ca1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -49,6 +49,7 @@ public class MySqlSourceConfigFactory implements Serializable { private ServerIdRange serverIdRange; private List databaseList; private List tableList; + private String excludeTableList; private String serverTimeZone = ZoneId.systemDefault().getId(); private StartupOptions startupOptions = StartupOptions.initial(); private int splitSize = MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(); @@ -102,6 +103,11 @@ public MySqlSourceConfigFactory tableList(String... tableList) { return this; } + public MySqlSourceConfigFactory excludeTableList(String tableInclusions) { + this.excludeTableList = tableInclusions; + return this; + } + /** Name of the MySQL database to use when connecting to the MySQL database server. */ public MySqlSourceConfigFactory username(String username) { this.username = username; @@ -360,6 +366,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { password, databaseList, tableList, + excludeTableList, serverIdRange, startupOptions, splitSize, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index ae30843639..4ffea7ce0d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -235,10 +235,7 @@ private void addSplits(List splits, boolean checkTableChangeForBinlo LOG.info("Source reader {} adds split {}", subtaskId, split); if (split.isSnapshotSplit()) { MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); - if (sourceConfig - .getTableFilters() - .dataCollectionFilter() - .isIncluded(split.asSnapshotSplit().getTableId())) { + if (sourceConfig.getTableFilter().test(split.asSnapshotSplit().getTableId())) { if (snapshotSplit.isSnapshotReadFinished()) { finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); } else { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index c7b74b654d..dedeab2dd5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -24,7 +24,6 @@ import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.RelationalTableFilters; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; import org.slf4j.Logger; @@ -35,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; /** Utilities to discovery matched tables. */ @@ -42,7 +42,8 @@ public class TableDiscoveryUtils { private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class); - public static List listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters) + public static List listTables( + JdbcConnection jdbc, Predicate databaseFilter, Predicate tableFilter) throws SQLException { final List capturedTableIds = new ArrayList<>(); // ------------------- @@ -57,7 +58,7 @@ public static List listTables(JdbcConnection jdbc, RelationalTableFilte rs -> { while (rs.next()) { String databaseName = rs.getString(1); - if (tableFilters.databaseFilter().test(databaseName)) { + if (databaseFilter.test(databaseName)) { databaseNames.add(databaseName); } } @@ -81,7 +82,7 @@ public static List listTables(JdbcConnection jdbc, RelationalTableFilte rs -> { while (rs.next()) { TableId tableId = new TableId(dbName, null, rs.getString(1)); - if (tableFilters.dataCollectionFilter().isIncluded(tableId)) { + if (tableFilter.test(tableId)) { capturedTableIds.add(tableId); LOG.info( "\t including table '{}' for further processing", @@ -106,7 +107,9 @@ public static Map discoverSchemaForCapturedTables( MySqlPartition partition, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) { final List capturedTableIds; try { - capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters()); + capturedTableIds = + listTables( + jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter()); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to discover captured tables", e); } @@ -121,7 +124,11 @@ public static Map discoverSchemaForNewAddedTables( final List capturedTableIds; try { capturedTableIds = - listTables(jdbc, sourceConfig.getTableFilters()).stream() + listTables( + jdbc, + sourceConfig.getDatabaseFilter(), + sourceConfig.getTableFilter()) + .stream() .filter(tableId -> !existedTables.contains(tableId)) .collect(Collectors.toList()); } catch (SQLException e) {