diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index f44f2cb5e6..8f2b5fa59b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -331,6 +331,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.filters, this.target_size.to_owned(), writer_properties, + this.preserve_insertion_order, )?; let metrics = plan .execute( @@ -877,12 +878,15 @@ pub fn create_merge_plan( filters: &[PartitionFilter], target_size: Option, writer_properties: WriterProperties, + preserve_insertion_order: bool, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); let partitions_keys = &snapshot.metadata().partition_columns; let (operations, metrics) = match optimize_type { - OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?, + OptimizeType::Compact => { + build_compaction_plan(snapshot, filters, target_size, preserve_insertion_order)? + } OptimizeType::ZOrder(zorder_columns) => { build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)? } @@ -958,6 +962,7 @@ fn build_compaction_plan( snapshot: &DeltaTableState, filters: &[PartitionFilter], target_size: i64, + perserve_insertion_order: bool, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { let mut metrics = Metrics::default(); @@ -985,8 +990,13 @@ fn build_compaction_plan( } for (_, file) in partition_files.values_mut() { - // Sort files by size: largest to smallest - file.sort_by(|a, b| b.size.cmp(&a.size)); + if perserve_insertion_order { + // sort files by modification date + file.sort_by(|a, b| b.last_modified.cmp(&a.last_modified)); + } else { + // Sort files by size: largest to smallest + file.sort_by(|a, b| b.size.cmp(&a.size)); + } } let mut operations: HashMap, Vec)> = HashMap::new(); diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 4826647750..b40e0c3b32 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -289,6 +289,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), + false, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -351,6 +352,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), + false, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -410,6 +412,7 @@ async fn test_commit_interval() -> Result<(), Box> { &[], None, WriterProperties::builder().build(), + false, )?; let metrics = plan