Skip to content

Commit

Permalink
[connector] Flink Source supports 'scan.startup.timestamp' larger tha…
Browse files Browse the repository at this point in the history
…n max timestamp of bucket (#284)
  • Loading branch information
loserwang1024 authored Jan 18, 2025
1 parent d614576 commit bb6cc93
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,62 @@ void testReadPrimaryKeyPartitionedTable() throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}

@Test
void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
tEnv.executeSql("create table timestamp_table (a int, b varchar) ");
TablePath tablePath = TablePath.of(DEFAULT_DB, "timestamp_table");

// write first bath records
List<InternalRow> rows =
Arrays.asList(
row(DATA1_ROW_TYPE, new Object[] {1, "v1"}),
row(DATA1_ROW_TYPE, new Object[] {2, "v2"}),
row(DATA1_ROW_TYPE, new Object[] {3, "v3"}));

writeRows(tablePath, rows, true);
Thread.sleep(100);
// startup time between write first and second batch records.
long currentTimeMillis = System.currentTimeMillis();

// startup timestamp is larger than current time.
assertThatThrownBy(
() ->
tEnv.executeSql(
String.format(
"select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ",
currentTimeMillis
+ Duration.ofMinutes(5).toMillis()))
.await())
.hasStackTraceContaining(
String.format(
"the fetch timestamp %s is larger than the current timestamp",
currentTimeMillis + Duration.ofMinutes(5).toMillis()));

try (org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(
String.format(
"select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ",
currentTimeMillis))
.collect()) {
Thread.sleep(100);
// write second batch record.
rows =
Arrays.asList(
row(DATA1_ROW_TYPE, new Object[] {4, "v4"}),
row(DATA1_ROW_TYPE, new Object[] {5, "v5"}),
row(DATA1_ROW_TYPE, new Object[] {6, "v6"}));
writeRows(tablePath, rows, true);
List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]");
int expectRecords = expected.size();
List<String> actual = new ArrayList<>(expectRecords);
for (int i = 0; i < expectRecords; i++) {
String row = rowIter.next().toString();
actual.add(row);
}
assertThat(actual).containsExactlyElementsOf(expected);
}
}

// -------------------------------------------------------------------------------------
// Fluss look source tests
// -------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,16 @@ public boolean equals(Object o) {
ListOffsetsResultForBucket that = (ListOffsetsResultForBucket) o;
return offset == that.offset;
}

@Override
public String toString() {
return "ListOffsetsResultForBucket{"
+ "offset="
+ offset
+ ", tableBucket="
+ tableBucket
+ ", error="
+ error
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@Internal
public abstract class ResultForBucket {
protected final TableBucket tableBucket;
private final ApiError error;
protected final ApiError error;

public ResultForBucket(TableBucket tableBucket) {
this(tableBucket, ApiError.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import com.alibaba.fluss.utils.CloseableRegistry;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.IOUtils;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand Down Expand Up @@ -174,6 +175,7 @@ public final class Replica {
private final int tieredLogLocalSegments;
private final AtomicReference<Integer> leaderReplicaIdOpt = new AtomicReference<>();
private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock();
private final Clock clock;

/**
* storing the remote follower replicas' state, used to update leader's highWatermark and
Expand Down Expand Up @@ -213,7 +215,8 @@ public Replica(
ServerMetadataCache metadataCache,
FatalErrorHandler fatalErrorHandler,
BucketMetricGroup bucketMetricGroup,
TableDescriptor tableDescriptor)
TableDescriptor tableDescriptor,
Clock clock)
throws Exception {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
Expand Down Expand Up @@ -241,6 +244,7 @@ public Replica(
this.closeableRegistry = new CloseableRegistry();

this.logTablet = createLog(lazyHighWatermarkCheckpoint);
this.clock = clock;
registerMetrics();
}

Expand Down Expand Up @@ -357,7 +361,7 @@ public void makeLeader(NotifyLeaderAndIsrData data) throws IOException {

coordinatorEpoch = data.getCoordinatorEpoch();

long currentTimeMs = System.currentTimeMillis();
long currentTimeMs = clock.milliseconds();
// Updating the assignment and ISR state is safe if the bucket epoch is
// larger or equal to the current bucket epoch.
updateAssignmentAndIsr(data.getReplicas(), true, data.getIsr());
Expand Down Expand Up @@ -563,7 +567,7 @@ private void mayFlushKv(long newHighWatermark) {
*/
private Optional<CompletedSnapshot> initKvTablet() {
checkNotNull(kvManager);
long startTime = System.currentTimeMillis();
long startTime = clock.milliseconds();
LOG.info("Start to init kv tablet for {} of table {}.", tableBucket, physicalPath);

// todo: we may need to handle the following cases:
Expand Down Expand Up @@ -618,7 +622,7 @@ private Optional<CompletedSnapshot> initKvTablet() {
tableBucket, physicalPath),
e);
}
long endTime = System.currentTimeMillis();
long endTime = clock.milliseconds();
LOG.info(
"Init kv tablet for {} of {} finish, cost {} ms.",
physicalPath,
Expand All @@ -632,7 +636,7 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab
Path kvDbPath = kvTabletDir.resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING);
KvSnapshotDownloadSpec downloadSpec =
new KvSnapshotDownloadSpec(completedSnapshot.getKvSnapshotHandle(), kvDbPath);
long start = System.currentTimeMillis();
long start = clock.milliseconds();
LOG.info("Start to download kv snapshot {} to directory {}.", completedSnapshot, kvDbPath);
KvSnapshotDataDownloader kvSnapshotDataDownloader =
snapshotContext.getSnapshotDataDownloader();
Expand All @@ -641,7 +645,7 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab
} catch (Exception e) {
throw new IOException("Fail to download kv snapshot.", e);
}
long end = System.currentTimeMillis();
long end = clock.milliseconds();
LOG.info(
"Download kv snapshot {} to directory {} finish, cost {} ms.",
completedSnapshot,
Expand All @@ -664,7 +668,7 @@ private Optional<CompletedSnapshot> getLatestSnapshot(TableBucket tableBucket) {
}

private void recoverKvTablet(long startRecoverLogOffset) {
long start = System.currentTimeMillis();
long start = clock.milliseconds();
checkNotNull(kvTablet, "kv tablet should not be null.");
try {
KvRecoverHelper.KvRecoverContext recoverContext =
Expand All @@ -689,7 +693,7 @@ private void recoverKvTablet(long startRecoverLogOffset) {
tableBucket, physicalPath),
e);
}
long end = System.currentTimeMillis();
long end = clock.milliseconds();
LOG.info(
"Recover kv tablet for {} of table {} from log offset {} finish, cost {} ms.",
tableBucket,
Expand Down Expand Up @@ -800,8 +804,7 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in

// we may need to increment high watermark if isr could be down to 1 or the
// replica count is 1.
boolean hwIncreased =
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
boolean hwIncreased = maybeIncrementLeaderHW(logTablet, clock.milliseconds());

if (hwIncreased) {
tryCompleteDelayedOperations();
Expand Down Expand Up @@ -835,7 +838,7 @@ public LogAppendInfo putRecordsToLeader(
kv, "KvTablet for the replica to put kv records shouldn't be null.");
LogAppendInfo logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, schema);
// we may need to increment high watermark.
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
maybeIncrementLeaderHW(logTablet, clock.milliseconds());
return logAppendInfo;
});
}
Expand All @@ -848,7 +851,7 @@ public LogReadInfo fetchRecords(FetchParams fetchParams) throws IOException {
physicalPath));
}
if (fetchParams.isFromFollower()) {
long followerFetchTimeMs = System.currentTimeMillis();
long followerFetchTimeMs = clock.milliseconds();
LogReadInfo logReadInfo =
inReadLock(
leaderIsrUpdateLock,
Expand Down Expand Up @@ -1204,12 +1207,7 @@ public long getOffset(RemoteLogManager remoteLogManager, ListOffsetsParam listOf
return logTablet.localLogStartOffset();
}
} else if (offsetType == ListOffsetsParam.LATEST_OFFSET_TYPE) {
// the request is come from client.
if (listOffsetsParam.getFollowerServerId() < 0) {
return logTablet.getHighWatermark();
} else {
return logTablet.localLogEndOffset();
}
return getLatestOffset(listOffsetsParam.getFollowerServerId());
} else {
throw new IllegalArgumentException(
"Invalid list offset type: " + offsetType);
Expand All @@ -1227,15 +1225,19 @@ private long getOffsetByTimestamp(
}

long fetchTimestamp = startTimestampOpt.getAsLong();
// 1. if the fetch timestamp is larger than the local max timestamp, we will
// throw an invalidTimestamp exception
// 1. If the fetch timestamp is larger than current timestamp, we will throw an
// invalidTimestamp exception. If the fetch timestamp is larger than the local max timestamp
// but no larger than current timestamp, we will latest offset.
long localMaxTimestamp = logTablet.localMaxTimestamp();
if (fetchTimestamp > localMaxTimestamp) {
long currentTimestamp = clock.milliseconds();
if (fetchTimestamp > localMaxTimestamp && fetchTimestamp <= currentTimestamp) {
return getLatestOffset(listOffsetsParam.getFollowerServerId());
} else if (fetchTimestamp > currentTimestamp) {
throw new InvalidTimestampException(
String.format(
"Get offset error for table bucket %s, "
+ "the fetch timestamp %s is larger than the max timestamp %s",
tableBucket, fetchTimestamp, localMaxTimestamp));
+ "the fetch timestamp %s is larger than the current timestamp %s",
tableBucket, fetchTimestamp, currentTimestamp));
}

// 2. we will try to find offset from remote storage.
Expand All @@ -1248,6 +1250,15 @@ private long getOffsetByTimestamp(
return logTablet.lookupOffsetForTimestamp(fetchTimestamp);
}

private long getLatestOffset(int followerServerId) {
// the request is come from client.
if (followerServerId < 0) {
return logTablet.getHighWatermark();
} else {
return logTablet.localLogEndOffset();
}
}

/**
* Truncate the local log of this bucket to the specified offset and checkpoint the recovery
* point to this offset.
Expand Down Expand Up @@ -1526,7 +1537,7 @@ private boolean handleAdjustIsrUpdate(

// We may need to increment high watermark since ISR could be down to 1.
try {
return maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
return maybeIncrementLeaderHW(logTablet, clock.milliseconds());
} catch (IOException e) {
LOG.error("Failed to increment leader HW", e);
return false;
Expand Down Expand Up @@ -1625,7 +1636,7 @@ private List<Integer> getOutOfSyncFollowerReplicas(long maxLagTime) {
if (!currentState.isInflight()) {
Set<Integer> candidateReplicas = new HashSet<>(currentState.isr());
candidateReplicas.remove(localTabletServerId);
long currentTimeMillis = System.currentTimeMillis();
long currentTimeMillis = clock.milliseconds();
long leaderEndOffset = logTablet.localLogEndOffset();
for (int replicaId : candidateReplicas) {
if (isFollowerOutOfSync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.concurrent.Scheduler;

import org.slf4j.Logger;
Expand Down Expand Up @@ -181,6 +182,8 @@ public class ReplicaManager {
// for metrics
private final TabletServerMetricGroup serverMetricGroup;

private final Clock clock;

public ReplicaManager(
Configuration conf,
Scheduler scheduler,
Expand All @@ -193,7 +196,8 @@ public ReplicaManager(
CoordinatorGateway coordinatorGateway,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup)
TabletServerMetricGroup serverMetricGroup,
Clock clock)
throws IOException {
this(
conf,
Expand All @@ -208,7 +212,8 @@ public ReplicaManager(
completedKvSnapshotCommitter,
fatalErrorHandler,
serverMetricGroup,
new RemoteLogManager(conf, zkClient, coordinatorGateway));
new RemoteLogManager(conf, zkClient, coordinatorGateway),
clock);
}

@VisibleForTesting
Expand All @@ -225,7 +230,8 @@ public ReplicaManager(
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup,
RemoteLogManager remoteLogManager)
RemoteLogManager remoteLogManager,
Clock clock)
throws IOException {
this.conf = conf;
this.zkClient = zkClient;
Expand Down Expand Up @@ -262,6 +268,7 @@ public ReplicaManager(
zkClient, completedKvSnapshotCommitter, kvSnapshotResource, conf);
this.remoteLogManager = remoteLogManager;
this.serverMetricGroup = serverMetricGroup;
this.clock = clock;
registerMetrics();
}

Expand Down Expand Up @@ -1446,7 +1453,8 @@ protected Optional<Replica> maybeCreateReplica(NotifyLeaderAndIsrData data) {
metadataCache,
fatalErrorHandler,
bucketMetricGroup,
getTableDescriptor(tablePath, zkClient, schema));
getTableDescriptor(tablePath, zkClient, schema),
clock);
allReplicas.put(tb, new OnlineReplica(replica));
replicaOpt = Optional.of(replica);
} else if (hostedReplica instanceof OnlineReplica) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ protected void startServices() throws Exception {
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();

this.logManager =
LogManager.create(conf, zkClient, scheduler, SystemClock.getInstance());
SystemClock systemClock = SystemClock.getInstance();
this.logManager = LogManager.create(conf, zkClient, scheduler, systemClock);
logManager.startup();

this.kvManager = KvManager.create(conf, zkClient, logManager);
Expand Down Expand Up @@ -188,7 +188,8 @@ protected void startServices() throws Exception {
coordinatorGateway,
DefaultCompletedKvSnapshotCommitter.create(rpcClient, metadataCache),
this,
tabletServerMetricGroup);
tabletServerMetricGroup,
systemClock);
replicaManager.startup();

this.tabletService =
Expand Down
Loading

0 comments on commit bb6cc93

Please sign in to comment.