Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35600] Add timestamp for low and high watermark #3415

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
}
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
lowWatermark
.getOffset()
.put(
BinlogOffset.TIMESTAMP_KEY,
String.valueOf(clock.currentTime().getEpochSecond()));
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
Expand Down Expand Up @@ -187,6 +192,11 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
} else {
// Get the current binlog offset as HW
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
highWatermark
.getOffset()
.put(
BinlogOffset.TIMESTAMP_KEY,
String.valueOf(clock.currentTime().getEpochSecond()));
}

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ public int compareTo(BinlogOffset that) {
}

// The completed events are the same, so compare the row number ...
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
if (this.getRestartSkipRows() != that.getRestartSkipRows()) {
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
}

// The skip rows are the same, so compare the timestamp ...
return Long.compare(this.getTimestampSec(), that.getTimestampSec());
}

public boolean isAtOrBefore(BinlogOffset that) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
private final UniqueDatabase inventoryDatabase8 =
new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);

private static final MySqlContainer MYSQL_CONTAINER_NOGTID =
createMySqlContainer(MySqlVersion.V5_7, "docker/server/my.cnf");
private final UniqueDatabase customerDatabaseNoGtid =
new UniqueDatabase(MYSQL_CONTAINER_NOGTID, "customer", TEST_USER, TEST_PASSWORD);

private BinaryLogClient binaryLogClient;
private MySqlConnection mySqlConnection;

Expand Down Expand Up @@ -890,6 +895,69 @@ public void testRestoreFromCheckpointWithTimestampStartingOffset() throws Except
assertThat(eventFilter.test(event)).isFalse();
}

@Test
public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception {
Startables.deepStart(Stream.of(MYSQL_CONTAINER_NOGTID)).join();
// Preparations
customerDatabaseNoGtid.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL_CONTAINER_NOGTID, customerDatabaseNoGtid, new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);

// step-1: split snapshot
List<MySqlSnapshotSplit> snapshotSplits =
getMySqlSplits(new String[] {"customers"}, sourceConfig, customerDatabaseNoGtid);

final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final SnapshotSplitReader snapshotSplitReader =
new SnapshotSplitReader(statefulTaskContext, 0);

// step-1: read snapshot splits firstly
List<SourceRecord> snapshotRecords = new ArrayList<>();
for (int i = 0; i < snapshotSplits.size(); i++) {
MySqlSplit sqlSplit = snapshotSplits.get(i);
List<SourceRecord> sourceRecords = pollRecordsFromReader(snapshotSplitReader, sqlSplit);
snapshotRecords.addAll(sourceRecords);
// mock binlog event after read chunk1
if (i == 0) {
mySqlConnection.execute(
"INSERT INTO "
+ customerDatabaseNoGtid.qualifiedTableName("customers")
+ " VALUES(999999, 'user_22','Shanghai','123567891234')");
}
}
snapshotSplitReader.close();

// step-2: create binlog split reader
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
getFinishedSplitsInfo(snapshotSplits, snapshotRecords);
BinlogOffset startingOffset = getStartingOffsetOfBinlogSplit(finishedSplitsInfo);
Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (MySqlSplit mySqlSplit : snapshotSplits) {
tableSchemas.putAll(mySqlSplit.getTableSchemas());
}
MySqlSplit binlogSplit =
new MySqlBinlogSplit(
"binlog-split",
startingOffset,
BinlogOffset.ofNonStopping(),
finishedSplitsInfo,
tableSchemas,
finishedSplitsInfo.size());

// step-3: test read binlog split
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
binlogReader.submitSplit(binlogSplit);

List<SourceRecord> sourceRecords =
pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord);
MYSQL_CONTAINER_NOGTID.stop();
assertTrue(sourceRecords.isEmpty());
}

private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false);
}
Expand Down Expand Up @@ -944,6 +1012,29 @@ private List<SourceRecord> pollRecordsFromReader(
return records;
}

private List<SourceRecord> pollRecordsFromReader(
SnapshotSplitReader reader, MySqlSplit sqlSplit) {
List<SourceRecord> records = new ArrayList<>();
if (reader.isFinished()) {
reader.submitSplit(sqlSplit);
}
Iterator<SourceRecords> res;
try {
while ((res = reader.pollSplitRecords()) != null) {
while (res.hasNext()) {
Iterator<SourceRecord> iterator = res.next().iterator();
while (iterator.hasNext()) {
SourceRecord sourceRecord = iterator.next();
records.add(sourceRecord);
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Polling action was interrupted", e);
}
return records;
}

private List<String> readBinlogSplits(
DataType dataType, BinlogSplitReader reader, int expectedSize) {
List<String> actual = new ArrayList<>();
Expand Down Expand Up @@ -1172,10 +1263,10 @@ private List<String> formatResult(List<SourceRecord> records, DataType dataType)
}

private List<MySqlSnapshotSplit> getMySqlSplits(
String[] captureTables, MySqlSourceConfig sourceConfig) {
String[] captureTables, MySqlSourceConfig sourceConfig, UniqueDatabase database) {
List<String> captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.collect(Collectors.toList());
List<TableId> remainingTables =
captureTableIds.stream().map(TableId::parse).collect(Collectors.toList());
Expand All @@ -1197,6 +1288,11 @@ private List<MySqlSnapshotSplit> getMySqlSplits(
return mySqlSplits;
}

private List<MySqlSnapshotSplit> getMySqlSplits(
String[] captureTables, MySqlSourceConfig sourceConfig) {
return getMySqlSplits(captureTables, sourceConfig, customerDatabase);
}

private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) {
return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables);
}
Expand Down
Loading