Skip to content

Commit

Permalink
Add ict and tests
Browse files Browse the repository at this point in the history
Update docs for ICT

Assert selection vector

wip ict impl

add tmp ict fixup for writes

remove non_ict schema

fix commit

Remove cdf changes for ict

remove unused imports

Add clarifying comment for inCommitTimestamp

Add documentation for ICT

Revert "Remove cdf changes for ict"

This reverts commit e2e38cb.

Fix ict reading

Address nits

make ICT only work if it is the first row in a commit

Rename and patch comments

Fix naming referring to CommitInfo

Patch up docs
  • Loading branch information
OussamaSaoudi-db authored and OussamaSaoudi committed Feb 4, 2025
1 parent 06d8dbb commit d295ffc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 27 deletions.
56 changes: 32 additions & 24 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use std::sync::{Arc, LazyLock};
use crate::actions::schemas::GetStructField;
use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor};
use crate::actions::{
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME,
PROTOCOL_NAME, REMOVE_NAME,
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::schema::{
ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType,
};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
use crate::table_properties::TableProperties;
Expand Down Expand Up @@ -78,6 +80,12 @@ pub(crate) fn table_changes_action_iter(
/// Deletion vector resolution affects whether a remove action is selected in the second
/// phase, so we must perform it ahead of time in phase 1.
/// - Ensure that reading is supported on any protocol updates.
/// - Extract the in-commit timestamps from [`CommitInfo`] actions if they are present. These are
/// generated when in-commit timestamps (ICT) table feature is enabled. This must be done in the
/// first phase because the second phase lazily transforms engine data with an extra timestamp
/// column, so the timestamp must be known ahead of time. Note that when ICT is enabled, CommitInfo
/// should be the first action in every commit.
/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
/// compatibility is checked through schema equality. This will be expanded in the future to
Expand All @@ -93,12 +101,6 @@ pub(crate) fn table_changes_action_iter(
///
/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors
///
/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo
/// actions to find the timestamp. These are generated when incommit timestamps is enabled.
/// This must be done in the first phase because the second phase lazily transforms engine data with
/// an extra timestamp column. Thus, the timestamp must be known ahead of time.
/// See https://github.com/delta-io/delta-kernel-rs/issues/559
///
/// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every
/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the
/// actions using [`add_transform_expr`], and generating selection vectors with the following rules:
Expand All @@ -118,14 +120,8 @@ struct LogReplayScanner {
// The commit file that this replay scanner will operate on.
commit_file: ParsedLogPath,
// The timestamp associated with this commit. This is the file modification time
// from the commit's [`FileMeta`].
//
//
// TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp
// generated by in-commit timestamps, that timestamp will be used instead.
//
// Note: This will be used once an expression is introduced to transform the engine data in
// [`TableChangesScanData`]
// from the commit's [`FileMeta`]. If in-commit timestamps feature is enabled, this will be the
// in-commit timestamp from the [`CommitInfo`] action.
timestamp: i64,
}

Expand All @@ -136,15 +132,14 @@ impl LogReplayScanner {
/// 2. Construct a map from path to deletion vector of remove actions that share the same path
/// as an add action.
/// 3. Perform validation on each protocol and metadata action in the commit.
/// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present.
///
/// For more details, see the documentation for [`LogReplayScanner`].
fn try_new(
engine: &dyn Engine,
commit_file: ParsedLogPath,
table_schema: &SchemaRef,
) -> DeltaResult<Self> {
let visitor_schema = PreparePhaseVisitor::schema();

// Note: We do not perform data skipping yet because we need to visit all add and
// remove actions for deletion vector resolution to be correct.
//
Expand All @@ -156,22 +151,25 @@ impl LogReplayScanner {
// vectors are resolved so that we can skip both actions in the pair.
let action_iter = engine.get_json_handler().read_json_files(
&[commit_file.location.clone()],
visitor_schema,
PreparePhaseVisitor::schema(),
None, // not safe to apply data skipping yet
)?;

let mut remove_dvs = HashMap::default();
let mut add_paths = HashSet::default();
let mut has_cdc_action = false;
for actions in action_iter {
let mut timestamp = commit_file.location.last_modified;
for (i, actions) in action_iter.enumerate() {
let actions = actions?;

let mut visitor = PreparePhaseVisitor {
add_paths: &mut add_paths,
remove_dvs: &mut remove_dvs,
has_cdc_action: &mut has_cdc_action,
commit_timestamp: &mut timestamp,
protocol: None,
metadata_info: None,
is_first_batch: i == 0,
};
visitor.visit_rows_of(actions.as_ref())?;

Expand Down Expand Up @@ -202,7 +200,7 @@ impl LogReplayScanner {
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}
Ok(LogReplayScanner {
timestamp: commit_file.location.last_modified,
timestamp,
commit_file,
has_cdc_action,
remove_dvs,
Expand All @@ -220,7 +218,6 @@ impl LogReplayScanner {
has_cdc_action,
remove_dvs,
commit_file,
// TODO: Add the timestamp as a column with an expression
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);
Expand Down Expand Up @@ -274,15 +271,19 @@ struct PreparePhaseVisitor<'a> {
has_cdc_action: &'a mut bool,
add_paths: &'a mut HashSet<String>,
remove_dvs: &'a mut HashMap<String, DvInfo>,
commit_timestamp: &'a mut i64,
is_first_batch: bool,
}
impl PreparePhaseVisitor<'_> {
fn schema() -> Arc<StructType> {
let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true);
Arc::new(StructType::new(vec![
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true),
]))
}
}
Expand Down Expand Up @@ -314,6 +315,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
(INTEGER, column_name!("protocol.minWriterVersion")),
(string_list.clone(), column_name!("protocol.readerFeatures")),
(string_list, column_name!("protocol.writerFeatures")),
(LONG, column_name!("commitInfo.inCommitTimestamp")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
Expand All @@ -323,7 +325,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {

fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 16,
getters.len() == 17,
Error::InternalError(format!(
"Wrong number of PreparePhaseVisitor getters: {}",
getters.len()
Expand Down Expand Up @@ -354,6 +356,12 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
let protocol =
ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?;
self.protocol = Some(protocol);
} else if let Some(in_commit_timestamp) =
getters[16].get_long(i, "commitInfo.inCommitTimestamp")?
{
if self.is_first_batch && i == 0 {
*self.commit_timestamp = in_commit_timestamp;
}
}
}
Ok(())
Expand Down
35 changes: 35 additions & 0 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::table_changes_action_iter;
use super::TableChangesScanData;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::CommitInfo;
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
use crate::expressions::Scalar;
Expand Down Expand Up @@ -609,3 +610,37 @@ async fn file_meta_timestamp() {
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
}

#[tokio::test]
async fn table_changes_in_commit_timestamp() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

let timestamp = 12345678;

mock_table
.commit([
Action::CommitInfo(CommitInfo {
in_commit_timestamp: Some(timestamp),
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
])
.await;

let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let commit = commits.next().unwrap();
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, timestamp);

let iter = scanner.into_scan_batches(engine, None).unwrap();
let sv = result_to_sv(iter);
assert_eq!(sv, vec![false, true]);
}
17 changes: 14 additions & 3 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod tests {

use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Remove};
use crate::actions::{Add, Cdc, CommitInfo, Remove};
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::state::DvInfo;
Expand Down Expand Up @@ -312,14 +312,25 @@ mod tests {
..Default::default()
};

let cdc_timestamp = 12345678;
let commit_info = CommitInfo {
in_commit_timestamp: Some(cdc_timestamp),
..Default::default()
};

mock_table
.commit([
Action::Remove(remove_paired.clone()),
Action::Add(add_paired.clone()),
Action::Remove(remove.clone()),
])
.await;
mock_table.commit([Action::Cdc(cdc.clone())]).await;
mock_table
.commit([
Action::CommitInfo(commit_info.clone()),
Action::Cdc(cdc.clone()),
])
.await;
mock_table
.commit([Action::Remove(remove_no_partition.clone())])
.await;
Expand Down Expand Up @@ -386,7 +397,7 @@ mod tests {
},
partition_values: cdc.partition_values,
commit_version: 1,
commit_timestamp: timestamps[1],
commit_timestamp: cdc_timestamp,
remove_dv: None,
},
CdfScanFile {
Expand Down

0 comments on commit d295ffc

Please sign in to comment.