Skip to content

Commit

Permalink
Enable file merging by last modification time using preserve-insertio…
Browse files Browse the repository at this point in the history
…n-order

This change leverages the previously unused `preserve-insertion-order`
configuration to enable merging files sorted by their last modification
time during compaction. This is particularly beneficial for append-only
workloads, improving data locality after optimize runs by merging files
that were created around similar times.
  • Loading branch information
esarili committed Jan 24, 2025
1 parent f67e828 commit a03f097
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
16 changes: 13 additions & 3 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.preserve_insertion_order,
)?;
let metrics = plan
.execute(
Expand Down Expand Up @@ -877,12 +878,15 @@ pub fn create_merge_plan(
filters: &[PartitionFilter],
target_size: Option<i64>,
writer_properties: WriterProperties,
preserve_insertion_order: bool,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());
let partitions_keys = &snapshot.metadata().partition_columns;

let (operations, metrics) = match optimize_type {
OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?,
OptimizeType::Compact => {
build_compaction_plan(snapshot, filters, target_size, preserve_insertion_order)?
}
OptimizeType::ZOrder(zorder_columns) => {
build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)?
}
Expand Down Expand Up @@ -958,6 +962,7 @@ fn build_compaction_plan(
snapshot: &DeltaTableState,
filters: &[PartitionFilter],
target_size: i64,
perserve_insertion_order: bool,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

Expand Down Expand Up @@ -985,8 +990,13 @@ fn build_compaction_plan(
}

for (_, file) in partition_files.values_mut() {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
if perserve_insertion_order {
// sort files by modification date
file.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
} else {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
}
}

let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -351,6 +352,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -410,6 +412,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
&[],
None,
WriterProperties::builder().build(),
false,
)?;

let metrics = plan
Expand Down

0 comments on commit a03f097

Please sign in to comment.