From 7242904a216eeb9628ebf2002d2cffdace40e61e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 9 Jan 2025 16:00:26 -0800 Subject: [PATCH] make ICT only work if it is the first row in a commit --- kernel/src/table_changes/log_replay.rs | 8 ++++++-- kernel/src/table_changes/scan_file.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index e9c32429d..0533bfce3 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -164,7 +164,7 @@ impl LogReplayScanner { let mut add_paths = HashSet::default(); let mut has_cdc_action = false; let mut timestamp = commit_file.location.last_modified; - for actions in action_iter { + for (i, actions) in action_iter.enumerate() { let actions = actions?; let mut visitor = PreparePhaseVisitor { @@ -174,6 +174,7 @@ impl LogReplayScanner { commit_timestamp: &mut timestamp, protocol: None, metadata_info: None, + is_first_batch: i == 0, }; visitor.visit_rows_of(actions.as_ref())?; @@ -276,6 +277,7 @@ struct PreparePhaseVisitor<'a> { add_paths: &'a mut HashSet, remove_dvs: &'a mut HashMap, commit_timestamp: &'a mut i64, + is_first_batch: bool, } impl PreparePhaseVisitor<'_> { fn schema() -> Arc { @@ -362,7 +364,9 @@ impl RowVisitor for PreparePhaseVisitor<'_> { } else if let Some(timestamp) = getters[16].get_long(i, "commitInfo.inCommitTimestamp")? { - *self.commit_timestamp = timestamp; + if self.is_first_batch && i == 0 { + *self.commit_timestamp = timestamp; + } } } Ok(()) diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 12cd75adf..8c993a514 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -327,8 +327,8 @@ mod tests { .await; mock_table .commit([ - Action::Cdc(cdc.clone()), Action::CommitInfo(commit_info.clone()), + Action::Cdc(cdc.clone()), ]) .await; mock_table