From d295ffc5c8856e1443269c29020beb8621c65286 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 9 Dec 2024 15:28:49 -0800 Subject: [PATCH] Add ict and tests Update docs for ICT Assert selection vector wip ict impl add tmp ict fixup for writes remove non_ict schema fix commit Remove cdf changes for ict remove unused imports Add clarifying comment for inCommitTimestamp Add documentation for ICT Revert "Remove cdf changes for ict" This reverts commit e2e38cb11df5e931d793d7f13072e31fb2178ae6. Fix ict reading Address nits make ICT only work if it is the first row in a commit Rename and patch comments Fix naming referring to CommitInfo Patch up docs --- kernel/src/table_changes/log_replay.rs | 56 +++++++++++--------- kernel/src/table_changes/log_replay/tests.rs | 35 ++++++++++++ kernel/src/table_changes/scan_file.rs | 17 ++++-- 3 files changed, 81 insertions(+), 27 deletions(-) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 89951a39b..99ef71672 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -7,15 +7,17 @@ use std::sync::{Arc, LazyLock}; use crate::actions::schemas::GetStructField; use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor}; use crate::actions::{ - get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME, - PROTOCOL_NAME, REMOVE_NAME, + get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME, + METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, }; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; use crate::path::ParsedLogPath; use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; -use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType}; +use crate::schema::{ + ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType, +}; use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema}; use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported}; use crate::table_properties::TableProperties; @@ -78,6 +80,12 @@ pub(crate) fn table_changes_action_iter( /// Deletion vector resolution affects whether a remove action is selected in the second /// phase, so we must perform it ahead of time in phase 1. /// - Ensure that reading is supported on any protocol updates. +/// - Extract the in-commit timestamps from [`CommitInfo`] actions if they are present. These are +/// generated when in-commit timestamps (ICT) table feature is enabled. This must be done in the +/// first phase because the second phase lazily transforms engine data with an extra timestamp +/// column, so the timestamp must be known ahead of time. Note that when ICT is enabled, CommitInfo +/// should be the first action in every commit. +/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps /// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`] /// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema /// compatibility is checked through schema equality. This will be expanded in the future to @@ -93,12 +101,6 @@ pub(crate) fn table_changes_action_iter( /// /// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors /// -/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo -/// actions to find the timestamp. These are generated when incommit timestamps is enabled. -/// This must be done in the first phase because the second phase lazily transforms engine data with -/// an extra timestamp column. Thus, the timestamp must be known ahead of time. -/// See https://github.com/delta-io/delta-kernel-rs/issues/559 -/// /// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every /// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the /// actions using [`add_transform_expr`], and generating selection vectors with the following rules: @@ -118,14 +120,8 @@ struct LogReplayScanner { // The commit file that this replay scanner will operate on. commit_file: ParsedLogPath, // The timestamp associated with this commit. This is the file modification time - // from the commit's [`FileMeta`]. - // - // - // TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp - // generated by in-commit timestamps, that timestamp will be used instead. - // - // Note: This will be used once an expression is introduced to transform the engine data in - // [`TableChangesScanData`] + // from the commit's [`FileMeta`]. If in-commit timestamps feature is enabled, this will be the + // in-commit timestamp from the [`CommitInfo`] action. timestamp: i64, } @@ -136,6 +132,7 @@ impl LogReplayScanner { /// 2. Construct a map from path to deletion vector of remove actions that share the same path /// as an add action. /// 3. Perform validation on each protocol and metadata action in the commit. + /// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present. /// /// For more details, see the documentation for [`LogReplayScanner`]. fn try_new( @@ -143,8 +140,6 @@ impl LogReplayScanner { commit_file: ParsedLogPath, table_schema: &SchemaRef, ) -> DeltaResult { - let visitor_schema = PreparePhaseVisitor::schema(); - // Note: We do not perform data skipping yet because we need to visit all add and // remove actions for deletion vector resolution to be correct. // @@ -156,22 +151,25 @@ impl LogReplayScanner { // vectors are resolved so that we can skip both actions in the pair. let action_iter = engine.get_json_handler().read_json_files( &[commit_file.location.clone()], - visitor_schema, + PreparePhaseVisitor::schema(), None, // not safe to apply data skipping yet )?; let mut remove_dvs = HashMap::default(); let mut add_paths = HashSet::default(); let mut has_cdc_action = false; - for actions in action_iter { + let mut timestamp = commit_file.location.last_modified; + for (i, actions) in action_iter.enumerate() { let actions = actions?; let mut visitor = PreparePhaseVisitor { add_paths: &mut add_paths, remove_dvs: &mut remove_dvs, has_cdc_action: &mut has_cdc_action, + commit_timestamp: &mut timestamp, protocol: None, metadata_info: None, + is_first_batch: i == 0, }; visitor.visit_rows_of(actions.as_ref())?; @@ -202,7 +200,7 @@ impl LogReplayScanner { remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); } Ok(LogReplayScanner { - timestamp: commit_file.location.last_modified, + timestamp, commit_file, has_cdc_action, remove_dvs, @@ -220,7 +218,6 @@ impl LogReplayScanner { has_cdc_action, remove_dvs, commit_file, - // TODO: Add the timestamp as a column with an expression timestamp, } = self; let remove_dvs = Arc::new(remove_dvs); @@ -274,15 +271,19 @@ struct PreparePhaseVisitor<'a> { has_cdc_action: &'a mut bool, add_paths: &'a mut HashSet, remove_dvs: &'a mut HashMap, + commit_timestamp: &'a mut i64, + is_first_batch: bool, } impl PreparePhaseVisitor<'_> { fn schema() -> Arc { + let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true); Arc::new(StructType::new(vec![ Option::::get_struct_field(ADD_NAME), Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(CDC_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), + StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true), ])) } } @@ -314,6 +315,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { (INTEGER, column_name!("protocol.minWriterVersion")), (string_list.clone(), column_name!("protocol.readerFeatures")), (string_list, column_name!("protocol.writerFeatures")), + (LONG, column_name!("commitInfo.inCommitTimestamp")), ]; let (types, names) = types_and_names.into_iter().unzip(); (names, types).into() @@ -323,7 +325,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> { require!( - getters.len() == 16, + getters.len() == 17, Error::InternalError(format!( "Wrong number of PreparePhaseVisitor getters: {}", getters.len() @@ -354,6 +356,12 @@ impl RowVisitor for PreparePhaseVisitor<'_> { let protocol = ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?; self.protocol = Some(protocol); + } else if let Some(in_commit_timestamp) = + getters[16].get_long(i, "commitInfo.inCommitTimestamp")? + { + if self.is_first_batch && i == 0 { + *self.commit_timestamp = in_commit_timestamp; + } } } Ok(()) diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 35c4a99f8..e0147a156 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -1,6 +1,7 @@ use super::table_changes_action_iter; use super::TableChangesScanData; use crate::actions::deletion_vector::DeletionVectorDescriptor; +use crate::actions::CommitInfo; use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; use crate::expressions::Scalar; @@ -609,3 +610,37 @@ async fn file_meta_timestamp() { let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); assert_eq!(scanner.timestamp, file_meta_ts); } + +#[tokio::test] +async fn table_changes_in_commit_timestamp() { + let engine = Arc::new(SyncEngine::new()); + let mut mock_table = LocalMockTable::new(); + + let timestamp = 12345678; + + mock_table + .commit([ + Action::CommitInfo(CommitInfo { + in_commit_timestamp: Some(timestamp), + ..Default::default() + }), + Action::Add(Add { + path: "fake_path_1".into(), + data_change: true, + ..Default::default() + }), + ]) + .await; + + let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + .unwrap() + .into_iter(); + + let commit = commits.next().unwrap(); + let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); + assert_eq!(scanner.timestamp, timestamp); + + let iter = scanner.into_scan_batches(engine, None).unwrap(); + let sv = result_to_sv(iter); + assert_eq!(sv, vec![false, true]); +} diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index f428e09df..fd207daed 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -244,7 +244,7 @@ mod tests { use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; use crate::actions::deletion_vector::DeletionVectorDescriptor; - use crate::actions::{Add, Cdc, Remove}; + use crate::actions::{Add, Cdc, CommitInfo, Remove}; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; use crate::scan::state::DvInfo; @@ -312,6 +312,12 @@ mod tests { ..Default::default() }; + let cdc_timestamp = 12345678; + let commit_info = CommitInfo { + in_commit_timestamp: Some(cdc_timestamp), + ..Default::default() + }; + mock_table .commit([ Action::Remove(remove_paired.clone()), @@ -319,7 +325,12 @@ mod tests { Action::Remove(remove.clone()), ]) .await; - mock_table.commit([Action::Cdc(cdc.clone())]).await; + mock_table + .commit([ + Action::CommitInfo(commit_info.clone()), + Action::Cdc(cdc.clone()), + ]) + .await; mock_table .commit([Action::Remove(remove_no_partition.clone())]) .await; @@ -386,7 +397,7 @@ mod tests { }, partition_values: cdc.partition_values, commit_version: 1, - commit_timestamp: timestamps[1], + commit_timestamp: cdc_timestamp, remove_dv: None, }, CdfScanFile {