Skip to content

Commit

Permalink
[FLINK-34634][cdc-base][mysql] Fix that Restarting the job will not r…
Browse files Browse the repository at this point in the history
…ead the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed (apache#3134)
  • Loading branch information
loserwang1024 authored Apr 12, 2024
1 parent af7665d commit 48ca862
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,7 +92,7 @@ public SnapshotSplitAssigner(
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
new HashMap<>(),
new LinkedHashMap<>(),
new HashMap<>(),
new HashMap<>(),
INITIAL_ASSIGNING,
Expand Down Expand Up @@ -143,7 +143,17 @@ private SnapshotSplitAssigner(
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' StreamSplitMetaEvent
this.assignedSplits =
assignedSplits.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(o, o2) -> o,
LinkedHashMap::new));
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
Expand Down Expand Up @@ -230,6 +240,7 @@ private void captureNewlyAddedTables() {
tableSchemas
.entrySet()
.removeIf(schema -> tablesToRemove.contains(schema.getKey()));
LOG.info("Enumerator remove tables after restart: {}", tablesToRemove);
remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId()));
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId));
Expand Down Expand Up @@ -303,9 +314,7 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
"The assigner is not ready to offer finished split information, this should not be called");
}
final List<SchemalessSnapshotSplit> assignedSnapshotSplit =
assignedSplits.values().stream()
.sorted(Comparator.comparing(SourceSplitBase::splitId))
.collect(Collectors.toList());
new ArrayList<>(assignedSplits.values());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
Offset finishedOffset = splitFinishedOffsets.get(split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,22 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize();
final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size();
if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) {
LOG.warn(
"Total finished split size of subtask {} is {}, while total finished split size of enumerator is only {}. Try to truncate it",
subTask,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator);
StreamSplitMetaEvent metadataEvent =
new StreamSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
null,
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend =
finishedSnapshotSplitMeta.get(requestMetaGroupId);
StreamSplitMetaEvent metadataEvent =
Expand All @@ -317,13 +331,17 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
LOG.error(
"Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",
requestMetaGroupId,
finishedSnapshotSplitMeta.size() - 1);
throw new FlinkRuntimeException(
String.format(
"The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.",
requestMetaGroupId,
finishedSnapshotSplitMeta.size() - 1,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;

import javax.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -43,10 +45,17 @@ public class StreamSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;

public StreamSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> metaGroup) {
private final int totalFinishedSplitSize;

public StreamSplitMetaEvent(
String splitId,
int metaGroupId,
@Nullable List<byte[]> metaGroup,
int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -60,4 +69,8 @@ public int getMetaGroupId() {
public List<byte[]> getMetaGroup() {
return metaGroup;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public class StreamSplitMetaRequestEvent implements SourceEvent {
private final String splitId;
private final int requestMetaGroupId;

public StreamSplitMetaRequestEvent(String splitId, int requestMetaGroupId) {
private final int totalFinishedSplitSize;

public StreamSplitMetaRequestEvent(
String splitId, int requestMetaGroupId, int totalFinishedSplitSize) {
this.splitId = splitId;
this.requestMetaGroupId = requestMetaGroupId;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -45,4 +49,8 @@ public String getSplitId() {
public int getRequestMetaGroupId() {
return requestMetaGroupId;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -29,11 +31,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/** The split to describe the change log of database table(s). */
public class StreamSplit extends SourceSplitBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplit.class);
public static final String STREAM_SPLIT_ID = "stream-split";

private final Offset startingOffset;
Expand Down Expand Up @@ -179,9 +183,20 @@ public static StreamSplit appendFinishedSplitInfos(
*/
public static StreamSplit filterOutdatedSplitInfos(
StreamSplit streamSplit, Predicate<TableId> currentTableFilter) {

Set<TableId> tablesToRemove =
streamSplit.getFinishedSnapshotSplitInfos().stream()
.filter(i -> !currentTableFilter.test(i.getTableId()))
.map(split -> split.getTableId())
.collect(Collectors.toSet());
if (tablesToRemove.isEmpty()) {
return streamSplit;
}

LOG.info("Reader remove tables after restart: {}", tablesToRemove);
List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
streamSplit.getFinishedSnapshotSplitInfos().stream()
.filter(i -> currentTableFilter.test(i.getTableId()))
.filter(i -> !tablesToRemove.contains(i.getTableId()))
.collect(Collectors.toList());
Map<TableId, TableChange> previousTableSchemas = streamSplit.getTableSchemas();
Map<TableId, TableChange> newTableSchemas = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,20 @@ private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) {
StreamSplit streamSplit = uncompletedStreamSplits.get(metadataEvent.getSplitId());
if (streamSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
final int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize();
final int expectedMetaGroupId =
getNextMetaGroupId(
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
if (receivedMetaGroupId == expectedMetaGroupId) {
if (receivedTotalFinishedSplitSize < streamSplit.getTotalFinishedSplitSize()) {
LOG.warn(
"Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it",
subtaskId,
receivedTotalFinishedSplitSize,
streamSplit.getTotalFinishedSplitSize());
streamSplit = toNormalStreamSplit(streamSplit, receivedTotalFinishedSplitSize);
uncompletedStreamSplits.put(streamSplit.splitId(), streamSplit);
} else if (receivedMetaGroupId == expectedMetaGroupId) {
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
streamSplit.getFinishedSnapshotSplitInfos(),
Expand Down Expand Up @@ -461,7 +470,8 @@ private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
StreamSplitMetaRequestEvent splitMetaRequestEvent =
new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId);
new StreamSplitMetaRequestEvent(
splitId, nextMetaGroupId, streamSplit.getTotalFinishedSplitSize());
context.sendSourceEventToCoordinator(splitMetaRequestEvent);
} else {
LOG.info("The meta of stream split {} has been collected success", splitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -948,20 +947,11 @@ private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint(
// Close sink upsert materialize to show more clear test output.
Configuration tableConfig = new Configuration();
tableConfig.setString("table.exec.sink.upsert-materialize", "none");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
if (finishedSavePointPath != null) {
// restore from savepoint
// hack for test to visit protected TestStreamEnvironment#getConfiguration() method
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> clazz =
classLoader.loadClass(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
Field field = clazz.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
Expand All @@ -987,6 +977,7 @@ private String getCreateTableStatement(
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ " 'chunk-meta.group.size' = '2',"
+ " 'heartbeat.interval.ms' = '100',"
+ " 'scan.full-changelog' = 'true',"
+ " 'scan.newly-added-table.enabled' = 'true'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private void captureNewlyAddedTables() {
.entrySet()
.removeIf(schema -> tablesToRemove.contains(schema.getKey()));
remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId()));
LOG.info("Enumerator remove tables after restart: {}", tablesToRemove);
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,22 +298,40 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

if (binlogSplitMeta.size() > requestMetaGroupId) {
final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize();
final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size();
if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) {
LOG.warn(
"Total finished split size of subtask {} is {}, while total finished split size of Enumerator is only {}. Try to truncate it",
subTask,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
null,
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else if (binlogSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
LOG.error(
"The enumerator received invalid request meta group id {}, the valid meta group id range is [0, {}]",
requestMetaGroupId,
binlogSplitMeta.size() - 1);
throw new FlinkRuntimeException(
String.format(
"The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.",
requestMetaGroupId,
binlogSplitMeta.size() - 1,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;

import javax.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -43,10 +45,17 @@ public class BinlogSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;

public BinlogSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> metaGroup) {
private final int totalFinishedSplitSize;

public BinlogSplitMetaEvent(
String splitId,
int metaGroupId,
@Nullable List<byte[]> metaGroup,
int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -60,4 +69,8 @@ public int getMetaGroupId() {
public List<byte[]> getMetaGroup() {
return metaGroup;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Loading

0 comments on commit 48ca862

Please sign in to comment.