From 48ca8623bb8fa405adb56dbe505dbad10902db89 Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Fri, 12 Apr 2024 10:15:46 +0800 Subject: [PATCH] [FLINK-34634][cdc-base][mysql] Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (#3134) --- .../assigner/SnapshotSplitAssigner.java | 21 ++++++++---- .../IncrementalSourceEnumerator.java | 32 +++++++++++++++---- .../meta/events/StreamSplitMetaEvent.java | 15 ++++++++- .../events/StreamSplitMetaRequestEvent.java | 10 +++++- .../base/source/meta/split/StreamSplit.java | 17 +++++++++- .../reader/IncrementalSourceReader.java | 14 ++++++-- .../mongodb/source/NewlyAddedTableITCase.java | 17 +++------- .../assigners/MySqlSnapshotSplitAssigner.java | 1 + .../enumerator/MySqlSourceEnumerator.java | 32 +++++++++++++++---- .../source/events/BinlogSplitMetaEvent.java | 15 ++++++++- .../events/BinlogSplitMetaRequestEvent.java | 10 +++++- .../source/reader/MySqlSourceReader.java | 16 ++++++++-- .../mysql/source/split/MySqlBinlogSplit.java | 16 +++++++++- .../mysql/source/NewlyAddedTableITCase.java | 15 +++------ .../source/NewlyAddedTableITCase.java | 15 +++------ 15 files changed, 181 insertions(+), 65 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index 48c55cd8a5..d424e89b70 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -39,10 +39,10 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -92,7 +92,7 @@ public SnapshotSplitAssigner( currentParallelism, new ArrayList<>(), new ArrayList<>(), - new HashMap<>(), + new LinkedHashMap<>(), new HashMap<>(), new HashMap<>(), INITIAL_ASSIGNING, @@ -143,7 +143,17 @@ private SnapshotSplitAssigner( this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; this.remainingSplits = remainingSplits; - this.assignedSplits = assignedSplits; + // When job restore from savepoint, sort the existing tables and newly added tables + // to let enumerator only send newly added tables' StreamSplitMetaEvent + this.assignedSplits = + assignedSplits.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (o, o2) -> o, + LinkedHashMap::new)); this.tableSchemas = tableSchemas; this.splitFinishedOffsets = splitFinishedOffsets; this.assignerStatus = assignerStatus; @@ -230,6 +240,7 @@ private void captureNewlyAddedTables() { tableSchemas .entrySet() .removeIf(schema -> tablesToRemove.contains(schema.getKey())); + LOG.info("Enumerator remove tables after restart: {}", tablesToRemove); remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId())); remainingTables.removeAll(tablesToRemove); alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId)); @@ -303,9 +314,7 @@ public List getFinishedSplitInfos() { "The assigner is not ready to offer finished split information, this should not be called"); } final List assignedSnapshotSplit = - assignedSplits.values().stream() - .sorted(Comparator.comparing(SourceSplitBase::splitId)) - .collect(Collectors.toList()); + new ArrayList<>(assignedSplits.values()); List finishedSnapshotSplitInfos = new ArrayList<>(); for (SchemalessSnapshotSplit split : assignedSnapshotSplit) { Offset finishedOffset = splitFinishedOffsets.get(split.splitId()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index af137ab40b..05dcc6bd27 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -307,8 +307,22 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - - if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) { + final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize(); + final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size(); + if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) { + LOG.warn( + "Total finished split size of subtask {} is {}, while total finished split size of enumerator is only {}. Try to truncate it", + subTask, + totalFinishedSplitSizeOfReader, + totalFinishedSplitSizeOfEnumerator); + StreamSplitMetaEvent metadataEvent = + new StreamSplitMetaEvent( + requestEvent.getSplitId(), + requestMetaGroupId, + null, + totalFinishedSplitSizeOfEnumerator); + context.sendEventToSourceReader(subTask, metadataEvent); + } else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) { List metaToSend = finishedSnapshotSplitMeta.get(requestMetaGroupId); StreamSplitMetaEvent metadataEvent = @@ -317,13 +331,17 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent requestMetaGroupId, metaToSend.stream() .map(FinishedSnapshotSplitInfo::serialize) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + totalFinishedSplitSizeOfEnumerator); context.sendEventToSourceReader(subTask, metadataEvent); } else { - LOG.error( - "Received invalid request meta group id {}, the invalid meta group id range is [0, {}]", - requestMetaGroupId, - finishedSnapshotSplitMeta.size() - 1); + throw new FlinkRuntimeException( + String.format( + "The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.", + requestMetaGroupId, + finishedSnapshotSplitMeta.size() - 1, + totalFinishedSplitSizeOfReader, + totalFinishedSplitSizeOfEnumerator)); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java index 4bf1922f62..8f9f580e96 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java @@ -22,6 +22,8 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; +import javax.annotation.Nullable; + import java.util.List; /** @@ -43,10 +45,17 @@ public class StreamSplitMetaEvent implements SourceEvent { */ private final List metaGroup; - public StreamSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { + private final int totalFinishedSplitSize; + + public StreamSplitMetaEvent( + String splitId, + int metaGroupId, + @Nullable List metaGroup, + int totalFinishedSplitSize) { this.splitId = splitId; this.metaGroupId = metaGroupId; this.metaGroup = metaGroup; + this.totalFinishedSplitSize = totalFinishedSplitSize; } public String getSplitId() { @@ -60,4 +69,8 @@ public int getMetaGroupId() { public List getMetaGroup() { return metaGroup; } + + public int getTotalFinishedSplitSize() { + return totalFinishedSplitSize; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java index 272f1967a0..8d657c2735 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java @@ -33,9 +33,13 @@ public class StreamSplitMetaRequestEvent implements SourceEvent { private final String splitId; private final int requestMetaGroupId; - public StreamSplitMetaRequestEvent(String splitId, int requestMetaGroupId) { + private final int totalFinishedSplitSize; + + public StreamSplitMetaRequestEvent( + String splitId, int requestMetaGroupId, int totalFinishedSplitSize) { this.splitId = splitId; this.requestMetaGroupId = requestMetaGroupId; + this.totalFinishedSplitSize = totalFinishedSplitSize; } public String getSplitId() { @@ -45,4 +49,8 @@ public String getSplitId() { public int getRequestMetaGroupId() { return requestMetaGroupId; } + + public int getTotalFinishedSplitSize() { + return totalFinishedSplitSize; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java index 53bc21d681..f4143364ad 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java @@ -21,6 +21,8 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -29,11 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; /** The split to describe the change log of database table(s). */ public class StreamSplit extends SourceSplitBase { + private static final Logger LOG = LoggerFactory.getLogger(StreamSplit.class); public static final String STREAM_SPLIT_ID = "stream-split"; private final Offset startingOffset; @@ -179,9 +183,20 @@ public static StreamSplit appendFinishedSplitInfos( */ public static StreamSplit filterOutdatedSplitInfos( StreamSplit streamSplit, Predicate currentTableFilter) { + + Set tablesToRemove = + streamSplit.getFinishedSnapshotSplitInfos().stream() + .filter(i -> !currentTableFilter.test(i.getTableId())) + .map(split -> split.getTableId()) + .collect(Collectors.toSet()); + if (tablesToRemove.isEmpty()) { + return streamSplit; + } + + LOG.info("Reader remove tables after restart: {}", tablesToRemove); List allFinishedSnapshotSplitInfos = streamSplit.getFinishedSnapshotSplitInfos().stream() - .filter(i -> currentTableFilter.test(i.getTableId())) + .filter(i -> !tablesToRemove.contains(i.getTableId())) .collect(Collectors.toList()); Map previousTableSchemas = streamSplit.getTableSchemas(); Map newTableSchemas = new HashMap<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java index de01475323..7be9785fd2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java @@ -418,11 +418,20 @@ private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) { StreamSplit streamSplit = uncompletedStreamSplits.get(metadataEvent.getSplitId()); if (streamSplit != null) { final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); + final int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize(); final int expectedMetaGroupId = getNextMetaGroupId( streamSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); - if (receivedMetaGroupId == expectedMetaGroupId) { + if (receivedTotalFinishedSplitSize < streamSplit.getTotalFinishedSplitSize()) { + LOG.warn( + "Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it", + subtaskId, + receivedTotalFinishedSplitSize, + streamSplit.getTotalFinishedSplitSize()); + streamSplit = toNormalStreamSplit(streamSplit, receivedTotalFinishedSplitSize); + uncompletedStreamSplits.put(streamSplit.splitId(), streamSplit); + } else if (receivedMetaGroupId == expectedMetaGroupId) { Set existedSplitsOfLastGroup = getExistedSplitsOfLastGroup( streamSplit.getFinishedSnapshotSplitInfos(), @@ -461,7 +470,8 @@ private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) { streamSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); StreamSplitMetaRequestEvent splitMetaRequestEvent = - new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId); + new StreamSplitMetaRequestEvent( + splitId, nextMetaGroupId, streamSplit.getTotalFinishedSplitSize()); context.sendSourceEventToCoordinator(splitMetaRequestEvent); } else { LOG.info("The meta of stream split {} has been collected success", splitId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java index dc86f637af..3b1964856b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java @@ -42,7 +42,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; -import java.lang.reflect.Field; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -948,20 +947,11 @@ private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint( // Close sink upsert materialize to show more clear test output. Configuration tableConfig = new Configuration(); tableConfig.setString("table.exec.sink.upsert-materialize", "none"); - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(tableConfig); if (finishedSavePointPath != null) { - // restore from savepoint - // hack for test to visit protected TestStreamEnvironment#getConfiguration() method - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - Class clazz = - classLoader.loadClass( - "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment"); - Field field = clazz.getDeclaredField("configuration"); - field.setAccessible(true); - Configuration configuration = (Configuration) field.get(env); - configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); + tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(tableConfig); env.setParallelism(parallelism); env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L)); @@ -987,6 +977,7 @@ private String getCreateTableStatement( + " 'password' = '%s'," + " 'database' = '%s'," + " 'collection' = '%s'," + + " 'chunk-meta.group.size' = '2'," + " 'heartbeat.interval.ms' = '100'," + " 'scan.full-changelog' = 'true'," + " 'scan.newly-added-table.enabled' = 'true'" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 1cefdfe8ce..e2dd426a62 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -253,6 +253,7 @@ private void captureNewlyAddedTables() { .entrySet() .removeIf(schema -> tablesToRemove.contains(schema.getKey())); remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId())); + LOG.info("Enumerator remove tables after restart: {}", tablesToRemove); remainingTables.removeAll(tablesToRemove); alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId)); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index a311978054..50d7607b60 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -298,8 +298,22 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - - if (binlogSplitMeta.size() > requestMetaGroupId) { + final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize(); + final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size(); + if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) { + LOG.warn( + "Total finished split size of subtask {} is {}, while total finished split size of Enumerator is only {}. Try to truncate it", + subTask, + totalFinishedSplitSizeOfReader, + totalFinishedSplitSizeOfEnumerator); + BinlogSplitMetaEvent metadataEvent = + new BinlogSplitMetaEvent( + requestEvent.getSplitId(), + requestMetaGroupId, + null, + totalFinishedSplitSizeOfEnumerator); + context.sendEventToSourceReader(subTask, metadataEvent); + } else if (binlogSplitMeta.size() > requestMetaGroupId) { List metaToSend = binlogSplitMeta.get(requestMetaGroupId); BinlogSplitMetaEvent metadataEvent = new BinlogSplitMetaEvent( @@ -307,13 +321,17 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven requestMetaGroupId, metaToSend.stream() .map(FinishedSnapshotSplitInfo::serialize) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + totalFinishedSplitSizeOfEnumerator); context.sendEventToSourceReader(subTask, metadataEvent); } else { - LOG.error( - "The enumerator received invalid request meta group id {}, the valid meta group id range is [0, {}]", - requestMetaGroupId, - binlogSplitMeta.size() - 1); + throw new FlinkRuntimeException( + String.format( + "The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.", + requestMetaGroupId, + binlogSplitMeta.size() - 1, + totalFinishedSplitSizeOfReader, + totalFinishedSplitSizeOfEnumerator)); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java index 60688bbd0a..2c8fe0b1ed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java @@ -22,6 +22,8 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; +import javax.annotation.Nullable; + import java.util.List; /** @@ -43,10 +45,17 @@ public class BinlogSplitMetaEvent implements SourceEvent { */ private final List metaGroup; - public BinlogSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { + private final int totalFinishedSplitSize; + + public BinlogSplitMetaEvent( + String splitId, + int metaGroupId, + @Nullable List metaGroup, + int totalFinishedSplitSize) { this.splitId = splitId; this.metaGroupId = metaGroupId; this.metaGroup = metaGroup; + this.totalFinishedSplitSize = totalFinishedSplitSize; } public String getSplitId() { @@ -60,4 +69,8 @@ public int getMetaGroupId() { public List getMetaGroup() { return metaGroup; } + + public int getTotalFinishedSplitSize() { + return totalFinishedSplitSize; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java index d3a2b8f333..6ffef86796 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java @@ -32,9 +32,13 @@ public class BinlogSplitMetaRequestEvent implements SourceEvent { private final String splitId; private final int requestMetaGroupId; - public BinlogSplitMetaRequestEvent(String splitId, int requestMetaGroupId) { + private final int totalFinishedSplitSize; + + public BinlogSplitMetaRequestEvent( + String splitId, int requestMetaGroupId, int totalFinishedSplitSize) { this.splitId = splitId; this.requestMetaGroupId = requestMetaGroupId; + this.totalFinishedSplitSize = totalFinishedSplitSize; } public String getSplitId() { @@ -44,4 +48,8 @@ public String getSplitId() { public int getRequestMetaGroupId() { return requestMetaGroupId; } + + public int getTotalFinishedSplitSize() { + return totalFinishedSplitSize; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 6df83a648a..253d9dc621 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -383,7 +383,8 @@ private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) { binlogSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); BinlogSplitMetaRequestEvent splitMetaRequestEvent = - new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId); + new BinlogSplitMetaRequestEvent( + splitId, nextMetaGroupId, binlogSplit.getTotalFinishedSplitSize()); context.sendSourceEventToCoordinator(splitMetaRequestEvent); } else { LOG.info("Source reader {} collects meta of binlog split success", subtaskId); @@ -395,11 +396,22 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); if (binlogSplit != null) { final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); + final int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize(); final int expectedMetaGroupId = ChunkUtils.getNextMetaGroupId( binlogSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); - if (receivedMetaGroupId == expectedMetaGroupId) { + if (receivedTotalFinishedSplitSize < binlogSplit.getTotalFinishedSplitSize()) { + LOG.warn( + "Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it", + subtaskId, + receivedTotalFinishedSplitSize, + binlogSplit.getTotalFinishedSplitSize()); + binlogSplit = + MySqlBinlogSplit.toNormalBinlogSplit( + binlogSplit, receivedTotalFinishedSplitSize); + uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit); + } else if (receivedMetaGroupId == expectedMetaGroupId) { List newAddedMetadataGroup; Set existedSplitsOfLastGroup = getExistedSplitsOfLastGroup( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java index d67d1989fc..033844ab30 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java @@ -22,6 +22,8 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.history.TableChanges.TableChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -29,10 +31,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** The split to describe the binlog of MySql table(s). */ public class MySqlBinlogSplit extends MySqlSplit { + private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplit.class); private final BinlogOffset startingOffset; private final BinlogOffset endingOffset; @@ -183,9 +187,19 @@ public static MySqlBinlogSplit appendFinishedSplitInfos( */ public static MySqlBinlogSplit filterOutdatedSplitInfos( MySqlBinlogSplit binlogSplit, Tables.TableFilter currentTableFilter) { + Set tablesToRemove = + binlogSplit.getFinishedSnapshotSplitInfos().stream() + .filter(i -> !currentTableFilter.isIncluded(i.getTableId())) + .map(split -> split.getTableId()) + .collect(Collectors.toSet()); + if (tablesToRemove.isEmpty()) { + return binlogSplit; + } + + LOG.info("Reader remove tables after restart: {}", tablesToRemove); List allFinishedSnapshotSplitInfos = binlogSplit.getFinishedSnapshotSplitInfos().stream() - .filter(i -> currentTableFilter.isIncluded(i.getTableId())) + .filter(i -> !tablesToRemove.contains(i.getTableId())) .collect(Collectors.toList()); return new MySqlBinlogSplit( binlogSplit.splitId, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java index 2d78d733df..ac23918d28 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java @@ -57,7 +57,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; -import java.lang.reflect.Field; import java.sql.SQLException; import java.time.ZoneId; import java.util.ArrayList; @@ -893,6 +892,7 @@ private String getCreateTableStatement( + " 'database-name' = '%s'," + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.chunk.size' = '2'," + + " 'chunk-meta.group.size' = '2'," + " 'server-time-zone' = 'UTC'," + " 'server-id' = '%s'," + " 'scan.newly-added-table.enabled' = 'true'" @@ -919,19 +919,12 @@ private String getCreateTableStatement( private StreamExecutionEnvironment getStreamExecutionEnvironment( String finishedSavePointPath, int parallelism) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration configuration = new Configuration(); if (finishedSavePointPath != null) { - // restore from savepoint - // hack for test to visit protected TestStreamEnvironment#getConfiguration() method - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - Class clazz = - classLoader.loadClass( - "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment"); - Field field = clazz.getDeclaredField("configuration"); - field.setAccessible(true); - Configuration configuration = (Configuration) field.get(env); configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(parallelism); env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java index 19e624f5f9..c28395f9d6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java @@ -43,7 +43,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; -import java.lang.reflect.Field; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -904,19 +903,12 @@ private String triggerSavepointWithRetry(JobClient jobClient, String savepointDi private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint( String finishedSavePointPath, int parallelism) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration configuration = new Configuration(); if (finishedSavePointPath != null) { - // restore from savepoint - // hack for test to visit protected TestStreamEnvironment#getConfiguration() method - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - Class clazz = - classLoader.loadClass( - "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment"); - Field field = clazz.getDeclaredField("configuration"); - field.setAccessible(true); - Configuration configuration = (Configuration) field.get(env); configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(parallelism); env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L)); @@ -945,6 +937,7 @@ private String getCreateTableStatement( + " 'table-name' = '%s'," + " 'slot.name' = '%s', " + " 'scan.incremental.snapshot.chunk.size' = '2'," + + " 'chunk-meta.group.size' = '2'," + " 'scan.newly-added-table.enabled' = 'true'" + " %s" + ")",