Skip to content

Commit

Permalink
mysql cdc support exclude table.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Aug 25, 2024
1 parent 060d203 commit fd49f30
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand All @@ -43,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.ZoneId;
import java.util.HashMap;
Expand Down Expand Up @@ -171,21 +172,23 @@ public DataSource createDataSource(Context context) {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
if (scanBinlogNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
configFactory.tableList(newTables);
configFactory.excludeTableList(tablesExclude);

} else {
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
List<String> capturedTables = getTableList(tableIds, selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables =
getTableList(configFactory.createConfig(0), selectExclude);
List<String> excludeTables = getTableList(tableIds, selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
Expand All @@ -201,8 +204,7 @@ public DataSource createDataSource(Context context) {
String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
List<TableId> tableIds =
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
String[] splits = chunkKeyColumn.split(":");
if (splits.length == 2) {
Expand Down Expand Up @@ -284,8 +286,9 @@ public String identifier() {
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";

private static List<String> getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
private static List<String> getTableList(
@Nullable List<TableId> tableIdList, Selectors selectors) {
return tableIdList.stream()
.filter(selectors::isMatch)
.map(TableId::toString)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ private synchronized MySqlAntlrDdlParser getParser() {
private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<CreateTableEvent> createTableEventCache = new ArrayList<>();
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
List<TableId> capturedTableIds =
listTables(
jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -91,6 +92,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
Expand Down Expand Up @@ -187,6 +189,42 @@ public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
.isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing"));
}

@Test
public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() + ".address_beijing");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));

FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.enableCheckpointing(200);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo());

TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(env.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(env, source, resultBuffer, serializer, accumulatorName);
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(
getConnection(), Lists.newArrayList("address_beijing", "address_shanghai"));
List<Event> actual = fetchResults(iterator, 4);
assertThat(((ChangeEvent) actual.get(0)).tableId())
.isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_shanghai"));
}

@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ public static List<TableId> discoverCapturedTables(

final List<TableId> capturedTableIds;
try {
capturedTableIds = TableDiscoveryUtils.listTables(jdbc, sourceConfig.getTableFilters());
capturedTableIds =
TableDiscoveryUtils.listTables(
jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter());
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured tables", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,7 +80,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
private final Set<TableId> pureBinlogPhaseTables;
private Tables.TableFilter capturedTableFilter;
private Predicate capturedTableFilter;
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();

Expand All @@ -100,8 +99,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
configureFilter();
statefulTaskContext.configure(currentBinlogSplit);
this.capturedTableFilter =
statefulTaskContext.getConnectorConfig().getTableFilters().dataCollectionFilter();
this.capturedTableFilter = statefulTaskContext.getSourceConfig().getTableFilter();
this.queue = statefulTaskContext.getQueue();
this.binlogSplitReadTask =
new MySqlBinlogSplitReadTask(
Expand Down Expand Up @@ -247,7 +245,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
} else if (RecordUtils.isSchemaChangeEvent(sourceRecord)) {
if (RecordUtils.isTableChangeRecord(sourceRecord)) {
TableId tableId = RecordUtils.getTableId(sourceRecord);
return capturedTableFilter.isIncluded(tableId);
return capturedTableFilter.test(tableId);
} else {
// Not related to changes in table structure, like `CREATE/DROP DATABASE`, skip it
return false;
Expand All @@ -270,7 +268,7 @@ private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position)
if (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
// the new added sharding table without history records
return !maxSplitHighWatermarkMap.containsKey(tableId)
&& capturedTableFilter.isIncluded(tableId);
&& capturedTableFilter.test(tableId);
}
return false;
}
Expand All @@ -280,7 +278,7 @@ private void configureFilter() {
currentBinlogSplit.getFinishedSnapshotSplitInfos();
Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new HashMap<>();
Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
// specific offset mode
// startup mode which is stream only
if (finishedSplitInfos.isEmpty()) {
for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) {
tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.schema;

import org.apache.flink.cdc.common.utils.Predicates;

import io.debezium.relational.TableId;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

/** Selectors for filtering tables. */
public class Selectors {

private List<Selector> selectors;

private Selectors() {}

/**
* A {@link Selector} that determines whether a table identified by a given {@link TableId} is
* to be included.
*/
private static class Selector {
private final Predicate<String> namespacePred;
private final Predicate<String> tableNamePred;

public Selector(String namespace, String schemaName, String tableName) {
this.namespacePred =
namespace == null ? (namespacePred) -> false : Predicates.includes(namespace);
this.tableNamePred =
tableName == null ? (tableNamePred) -> false : Predicates.includes(tableName);
}

public boolean isMatch(TableId tableId) {

String namespace = tableId.catalog();

if (namespace == null || namespace.isEmpty()) {
return tableNamePred.test(tableId.table());
}
return namespacePred.test(tableId.catalog()) && tableNamePred.test(tableId.table());
}
}

/** Match the {@link TableId} against the {@link Selector}s. * */
public boolean isMatch(TableId tableId) {
for (Selector selector : selectors) {
if (selector.isMatch(tableId)) {
return true;
}
}
return false;
}

/** Builder for {@link Selectors}. */
public static class SelectorsBuilder {

private List<Selector> selectors;

/**
* Current {@link TableId} used in mysql cdc connector will map database name to catalog.
*
* @param tableInclusions
* @return
*/
public SelectorsBuilder includeTables(String tableInclusions) {

if (tableInclusions == null || tableInclusions.isEmpty()) {
throw new IllegalArgumentException(
"Invalid table inclusion pattern cannot be null or empty");
}

List<Selector> selectors = new ArrayList<>();
Set<String> tableSplitSet =
Predicates.setOf(
tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str);
for (String tableSplit : tableSplitSet) {
List<String> tableIdList =
Predicates.listOf(
tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str);
Iterator<String> iterator = tableIdList.iterator();
if (tableIdList.size() == 1) {
selectors.add(new Selector(null, null, iterator.next()));
} else if (tableIdList.size() == 2) {
selectors.add(new Selector(iterator.next(), null, iterator.next()));
} else {
throw new IllegalArgumentException(
"Invalid table inclusion pattern: " + tableInclusions);
}
}
this.selectors = selectors;
return this;
}

public Selectors build() {
Selectors selectors = new Selectors();
selectors.selectors = this.selectors;
return selectors;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,7 @@ private static MySqlChunkSplitter createChunkSplitter(
return new MySqlChunkSplitter(
mySqlSchema,
sourceConfig,
tableId != null
&& sourceConfig
.getTableFilters()
.dataCollectionFilter()
.isIncluded(tableId)
tableId != null && sourceConfig.getTableFilter().test(tableId)
? chunkSplitterState
: ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
Expand Down
Loading

0 comments on commit fd49f30

Please sign in to comment.