From 3251703baba52c9febe9499a203c2a0d6ed587de Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 21 Oct 2023 10:11:19 +0200 Subject: [PATCH] refactor(rust): prepare for multiple files in a node (#11918) --- .../src/physical_plan/executors/scan/csv.rs | 2 +- .../src/physical_plan/executors/scan/ipc.rs | 2 +- .../physical_plan/executors/scan/parquet.rs | 2 +- .../src/physical_plan/planner/lp.rs | 60 +++++++++++-------- crates/polars-lazy/src/utils.rs | 8 ++- crates/polars-pipe/src/pipeline/convert.rs | 8 ++- crates/polars-plan/src/dot.rs | 30 ++++++---- crates/polars-plan/src/logical_plan/alp.rs | 6 +- .../polars-plan/src/logical_plan/builder.rs | 10 ++-- .../src/logical_plan/conversion.rs | 8 +-- crates/polars-plan/src/logical_plan/format.rs | 22 +++++-- crates/polars-plan/src/logical_plan/mod.rs | 2 +- .../src/logical_plan/optimizer/cse.rs | 4 +- .../logical_plan/optimizer/file_caching.rs | 22 +++---- .../optimizer/predicate_pushdown/mod.rs | 9 +-- .../optimizer/projection_pushdown/mod.rs | 4 +- .../optimizer/slice_pushdown_lp.rs | 8 +-- 17 files changed, 122 insertions(+), 85 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 8542432b31bc..bac591b84f86 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -54,7 +54,7 @@ impl CsvExec { impl Executor for CsvExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - path: self.path.clone(), + paths: Arc::new([self.path.clone()]), predicate: self .predicate .as_ref() diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index e5ee49c06a16..5256252d3a5d 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -34,7 +34,7 @@ impl IpcExec { impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - path: self.path.clone(), + paths: Arc::new([self.path.clone()]), predicate: self .predicate .as_ref() diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 3579bd9de004..9f99c8580870 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -100,7 +100,7 @@ impl ParquetExec { impl Executor for ParquetExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - path: self.path.clone(), + paths: Arc::new([self.path.clone()]), predicate: self .predicate .as_ref() diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 45af5e1f3a11..71fc599be227 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -189,7 +189,7 @@ pub fn create_physical_plan( ))) }, Scan { - path, + paths, file_info, output_schema, scan_type, @@ -213,35 +213,47 @@ pub fn create_physical_plan( #[cfg(feature = "csv")] FileScan::Csv { options: csv_options, - } => Ok(Box::new(executors::CsvExec { - path, - schema: file_info.schema, - options: csv_options, - predicate, - file_options, - })), + } => { + assert_eq!(paths.len(), 1); + let path = paths[0].clone(); + Ok(Box::new(executors::CsvExec { + path, + schema: file_info.schema, + options: csv_options, + predicate, + file_options, + })) + }, #[cfg(feature = "ipc")] - FileScan::Ipc { options } => Ok(Box::new(executors::IpcExec { - path, - schema: file_info.schema, - predicate, - options, - file_options, - })), + FileScan::Ipc { options } => { + assert_eq!(paths.len(), 1); + let path = paths[0].clone(); + Ok(Box::new(executors::IpcExec { + path, + schema: file_info.schema, + predicate, + options, + file_options, + })) + }, #[cfg(feature = "parquet")] FileScan::Parquet { options, cloud_options, metadata - } => Ok(Box::new(executors::ParquetExec::new( - path, - file_info, - predicate, - options, - cloud_options, - file_options, - metadata - ))), + } => { + assert_eq!(paths.len(), 1); + let path = paths[0].clone(); + Ok(Box::new(executors::ParquetExec::new( + path, + file_info, + predicate, + options, + cloud_options, + file_options, + metadata + ))) + }, FileScan::Anonymous { function, .. diff --git a/crates/polars-lazy/src/utils.rs b/crates/polars-lazy/src/utils.rs index e8fa1ed4df79..fac410b109fb 100644 --- a/crates/polars-lazy/src/utils.rs +++ b/crates/polars-lazy/src/utils.rs @@ -6,13 +6,15 @@ use polars_plan::prelude::*; /// Get a set of the data source paths in this LogicalPlan pub(crate) fn agg_source_paths( root_lp: Node, - paths: &mut PlHashSet, + acc_paths: &mut PlHashSet, lp_arena: &Arena, ) { lp_arena.iter(root_lp).for_each(|(_, lp)| { use ALogicalPlan::*; - if let Scan { path, .. } = lp { - paths.insert(path.clone()); + if let Scan { paths, .. } = lp { + for path in paths.as_ref() { + acc_paths.insert(path.clone()); + } } }) } diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 0c2d48ffa89e..04fb4c287e62 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -68,7 +68,7 @@ where Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box) }, Scan { - path, + paths, file_info, file_options, predicate, @@ -87,8 +87,9 @@ where FileScan::Csv { options: csv_options, } => { + assert_eq!(paths.len(), 1); let src = sources::CsvSource::new( - path, + paths[0].clone(), file_info.schema, csv_options, file_options, @@ -102,8 +103,9 @@ where cloud_options, metadata, } => { + assert_eq!(paths.len(), 1); let src = sources::ParquetSource::new( - path, + paths[0].clone(), parquet_options, cloud_options, metadata, diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index a581eb7dafe6..0c3bb9ce9085 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -1,5 +1,6 @@ +use std::borrow::Cow; use std::fmt::{Display, Write}; -use std::path::Path; +use std::path::PathBuf; use polars_core::prelude::*; @@ -150,9 +151,9 @@ impl LogicalPlan { count, } => { let fmt = if *count == usize::MAX { - "CACHE".to_string() + Cow::Borrowed("CACHE") } else { - format!("CACHE: {}times", *count) + Cow::Owned(format!("CACHE: {}times", *count)) }; let current_node = DotNode { branch: *cache_id, @@ -181,7 +182,7 @@ impl LogicalPlan { acc_str, prev_node, "PYTHON", - Path::new(""), + &[], options.with_columns.as_ref().map(|s| s.as_slice()), options.schema.len(), &options.predicate, @@ -312,7 +313,7 @@ impl LogicalPlan { } }, Scan { - path, + paths, file_info, predicate, scan_type, @@ -324,7 +325,7 @@ impl LogicalPlan { acc_str, prev_node, name, - path.as_ref(), + paths.as_ref(), options.with_columns.as_ref().map(|cols| cols.as_slice()), file_info.schema.len(), predicate, @@ -409,7 +410,7 @@ impl LogicalPlan { acc_str: &mut String, prev_node: DotNode, name: &str, - path: &Path, + path: &[PathBuf], with_columns: Option<&[String]>, total_columns: usize, predicate: &Option

, @@ -422,13 +423,20 @@ impl LogicalPlan { n_columns_fmt = format!("{}", columns.len()); } + let fmt = if path.len() == 1 { + path[0].to_string_lossy() + } else { + Cow::Owned(format!( + "{} files: first file: {}", + path.len(), + path[0].to_string_lossy() + )) + }; + let pred = fmt_predicate(predicate.as_ref()); let fmt = format!( "{name} SCAN {};\nπ {}/{};\nσ {}", - path.to_string_lossy(), - n_columns_fmt, - total_columns, - pred, + fmt, n_columns_fmt, total_columns, pred, ); let current_node = DotNode { branch, diff --git a/crates/polars-plan/src/logical_plan/alp.rs b/crates/polars-plan/src/logical_plan/alp.rs index d6a96e2394a1..1c63851a0844 100644 --- a/crates/polars-plan/src/logical_plan/alp.rs +++ b/crates/polars-plan/src/logical_plan/alp.rs @@ -30,7 +30,7 @@ pub enum ALogicalPlan { predicate: Node, }, Scan { - path: PathBuf, + paths: Arc<[PathBuf]>, file_info: FileInfo, predicate: Option, /// schema of the projected file @@ -293,7 +293,7 @@ impl ALogicalPlan { options: *options, }, Scan { - path, + paths, file_info, output_schema, predicate, @@ -305,7 +305,7 @@ impl ALogicalPlan { new_predicate = exprs.pop() } Scan { - path: path.clone(), + paths: paths.clone(), file_info: file_info.clone(), output_schema: output_schema.clone(), file_options: options.clone(), diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 6ccfd66afb93..b736c6cc8a98 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -119,7 +119,7 @@ impl LogicalPlanBuilder { }; Ok(LogicalPlan::Scan { - path: "".into(), + paths: Arc::new([]), file_info, predicate: None, file_options, @@ -201,7 +201,7 @@ impl LogicalPlanBuilder { hive_partitioning, }; Ok(LogicalPlan::Scan { - path, + paths: Arc::new([path]), file_info, file_options: options, predicate: None, @@ -253,7 +253,7 @@ impl LogicalPlanBuilder { hive_partitioning: false, }; Ok(LogicalPlan::Scan { - path, + paths: Arc::new([path]), file_info, file_options, predicate: None, @@ -299,6 +299,8 @@ impl LogicalPlanBuilder { } })?; + let paths = Arc::new([path]); + let mut magic_nr = [0u8; 2]; let res = file.read_exact(&mut magic_nr); if raise_if_empty { @@ -362,7 +364,7 @@ impl LogicalPlanBuilder { hive_partitioning: false, }; Ok(LogicalPlan::Scan { - path, + paths, file_info, file_options: options, predicate: None, diff --git a/crates/polars-plan/src/logical_plan/conversion.rs b/crates/polars-plan/src/logical_plan/conversion.rs index f1910f2be2a9..2aecd7a693fc 100644 --- a/crates/polars-plan/src/logical_plan/conversion.rs +++ b/crates/polars-plan/src/logical_plan/conversion.rs @@ -168,13 +168,13 @@ pub fn to_alp( let v = match lp { LogicalPlan::Scan { file_info, - path, + paths, predicate, scan_type, file_options: options, } => ALogicalPlan::Scan { file_info, - path, + paths, output_schema: None, predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)), scan_type, @@ -597,14 +597,14 @@ impl ALogicalPlan { }; match lp { ALogicalPlan::Scan { - path, + paths, file_info, predicate, scan_type, output_schema: _, file_options: options, } => LogicalPlan::Scan { - path, + paths, file_info, predicate: predicate.map(|n| node_to_expr(n, expr_arena)), scan_type, diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index ae7e4e48efd6..3f6631163716 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::fmt; use std::fmt::{Debug, Display, Formatter, Write}; -use std::path::Path; +use std::path::PathBuf; use crate::prelude::*; @@ -9,7 +9,7 @@ use crate::prelude::*; fn write_scan( f: &mut Formatter, name: &str, - path: &Path, + path: &[PathBuf], indent: usize, n_columns: i64, total_columns: usize, @@ -19,7 +19,17 @@ fn write_scan( if indent != 0 { writeln!(f)?; } - write!(f, "{:indent$}{} SCAN {}", "", name, path.display())?; + let path_fmt = if path.len() == 1 { + path[0].to_string_lossy() + } else { + Cow::Owned(format!( + "{} files: first file: {}", + path.len(), + path[0].to_string_lossy() + )) + }; + + write!(f, "{:indent$}{} SCAN {}", "", name, path_fmt)?; if n_columns > 0 { write!( f, @@ -58,7 +68,7 @@ impl LogicalPlan { write_scan( f, "PYTHON", - Path::new(""), + &[], sub_indent, n_columns, total_columns, @@ -91,7 +101,7 @@ impl LogicalPlan { input._format(f, sub_indent) }, Scan { - path, + paths, file_info, predicate, scan_type, @@ -106,7 +116,7 @@ impl LogicalPlan { write_scan( f, scan_type.into(), - path, + paths, sub_indent, n_columns, file_info.schema.len(), diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index ecb6d1e8917b..d394327e41f4 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -156,7 +156,7 @@ pub enum LogicalPlan { count: usize, }, Scan { - path: PathBuf, + paths: Arc<[PathBuf]>, file_info: FileInfo, predicate: Option, file_options: FileScanOptions, diff --git a/crates/polars-plan/src/logical_plan/optimizer/cse.rs b/crates/polars-plan/src/logical_plan/optimizer/cse.rs index 11e4c45ca925..dce16147e60a 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/cse.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/cse.rs @@ -117,13 +117,13 @@ fn lp_node_equal(a: &ALogicalPlan, b: &ALogicalPlan, expr_arena: &Arena) ) => Arc::ptr_eq(left_df, right_df), ( Scan { - path: path_left, + paths: path_left, predicate: predicate_left, scan_type: scan_type_left, .. }, Scan { - path: path_right, + paths: path_right, predicate: predicate_right, scan_type: scan_type_right, .. diff --git a/crates/polars-plan/src/logical_plan/optimizer/file_caching.rs b/crates/polars-plan/src/logical_plan/optimizer/file_caching.rs index 92e47d12c303..23791d3dd6b0 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/file_caching.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/file_caching.rs @@ -1,4 +1,4 @@ -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; use polars_core::datatypes::PlHashMap; @@ -9,14 +9,14 @@ use crate::prelude::*; #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub struct FileFingerPrint { - pub path: PathBuf, + pub paths: Arc<[PathBuf]>, pub predicate: Option, pub slice: (usize, Option), } #[allow(clippy::type_complexity)] fn process_with_columns( - path: &Path, + paths: &Arc<[PathBuf]>, with_columns: Option<&Vec>, predicate: Option, slice: (usize, Option), @@ -25,7 +25,7 @@ fn process_with_columns( ) { let cols = file_count_and_column_union .entry(FileFingerPrint { - path: path.into(), + paths: paths.clone(), predicate, slice, }) @@ -59,7 +59,7 @@ pub fn collect_fingerprints( use ALogicalPlan::*; match lp_arena.get(root) { Scan { - path, + paths, file_options: options, predicate, scan_type, @@ -68,7 +68,7 @@ pub fn collect_fingerprints( let slice = (scan_type.skip_rows(), options.n_rows); let predicate = predicate.map(|node| node_to_expr(node, expr_arena)); let fp = FileFingerPrint { - path: path.clone(), + paths: paths.clone(), predicate, slice, }; @@ -96,7 +96,7 @@ pub fn find_column_union_and_fingerprints( use ALogicalPlan::*; match lp_arena.get(root) { Scan { - path, + paths, file_options: options, predicate, file_info, @@ -106,7 +106,7 @@ pub fn find_column_union_and_fingerprints( let slice = (scan_type.skip_rows(), options.n_rows); let predicate = predicate.map(|node| node_to_expr(node, expr_arena)); process_with_columns( - path, + paths, options.with_columns.as_deref(), predicate, slice, @@ -204,7 +204,7 @@ impl FileCacher { let lp = lp_arena.take(root); match lp { ALogicalPlan::Scan { - path, + paths, file_info, predicate, output_schema, @@ -213,7 +213,7 @@ impl FileCacher { } => { let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena)); let finger_print = FileFingerPrint { - path, + paths, predicate: predicate_expr, slice: (scan_type.skip_rows(), options.n_rows), }; @@ -230,7 +230,7 @@ impl FileCacher { options.with_columns = with_columns; let lp = ALogicalPlan::Scan { - path: finger_print.path.clone(), + paths: finger_print.paths.clone(), file_info, output_schema, predicate, diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index eec0ddaff940..615d3f6dcc8a 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -225,7 +225,7 @@ impl<'a> PredicatePushDown<'a> { Ok(lp) } Scan { - path, + paths, file_info, predicate, scan_type, @@ -235,12 +235,13 @@ impl<'a> PredicatePushDown<'a> { let local_predicates = partition_by_full_context(&mut acc_predicates, expr_arena); let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena); + // TODO! this still assumes a single file. Fix hive partitioning for multiple files if let (Some(hive_part_stats), Some(predicate)) = (file_info.hive_parts.as_deref(), predicate) { if let Some(io_expr) = self.hive_partition_eval.unwrap()(predicate, expr_arena) { if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { if !stats_evaluator.should_read(hive_part_stats.get_statistics())? { if self.verbose { - eprintln!("hive partitioning: skipped: {}", path.display()) + eprintln!("hive partitioning: skipped: {}", paths[0].display()) } let schema = output_schema.as_ref().unwrap_or(&file_info.schema); let df = DataFrame::from(schema.as_ref()); @@ -267,7 +268,7 @@ impl<'a> PredicatePushDown<'a> { let lp = if do_optimization { Scan { - path, + paths, file_info, predicate, file_options: options, @@ -276,7 +277,7 @@ impl<'a> PredicatePushDown<'a> { } } else { let lp = Scan { - path, + paths, file_info, predicate: None, file_options: options, diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index 3ff672683211..b1ed7963aab6 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -377,7 +377,7 @@ impl ProjectionPushDown { Ok(PythonScan { options, predicate }) }, Scan { - path, + paths, file_info, scan_type, predicate, @@ -421,7 +421,7 @@ impl ProjectionPushDown { } let lp = Scan { - path, + paths, file_info, output_schema, scan_type, diff --git a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs index 66887f25ee62..63e5a9fdd2af 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs @@ -121,7 +121,7 @@ impl SlicePushDown { } #[cfg(feature = "csv")] (Scan { - path, + paths, file_info, output_schema, file_options: mut options, @@ -132,7 +132,7 @@ impl SlicePushDown { csv_options.skip_rows += state.offset as usize; let lp = Scan { - path, + paths, file_info, output_schema, scan_type: FileScan::Csv {options: csv_options}, @@ -143,7 +143,7 @@ impl SlicePushDown { }, // TODO! we currently skip slice pushdown if there is a predicate. (Scan { - path, + paths, file_info, output_schema, file_options: mut options, @@ -152,7 +152,7 @@ impl SlicePushDown { }, Some(state)) if state.offset == 0 && predicate.is_none() => { options.n_rows = Some(state.len as usize); let lp = Scan { - path, + paths, file_info, output_schema, predicate,