Skip to content

Commit

Permalink
refactor(rust): prepare for multiple files in a node (#11918)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 21, 2023
1 parent 6e8ce9c commit 3251703
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 85 deletions.
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CsvExec {
impl Executor for CsvExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
paths: Arc::new([self.path.clone()]),
predicate: self
.predicate
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl IpcExec {
impl Executor for IpcExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
paths: Arc::new([self.path.clone()]),
predicate: self
.predicate
.as_ref()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ParquetExec {
impl Executor for ParquetExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
paths: Arc::new([self.path.clone()]),
predicate: self
.predicate
.as_ref()
Expand Down
60 changes: 36 additions & 24 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub fn create_physical_plan(
)))
},
Scan {
path,
paths,
file_info,
output_schema,
scan_type,
Expand All @@ -213,35 +213,47 @@ pub fn create_physical_plan(
#[cfg(feature = "csv")]
FileScan::Csv {
options: csv_options,
} => Ok(Box::new(executors::CsvExec {
path,
schema: file_info.schema,
options: csv_options,
predicate,
file_options,
})),
} => {
assert_eq!(paths.len(), 1);
let path = paths[0].clone();
Ok(Box::new(executors::CsvExec {
path,
schema: file_info.schema,
options: csv_options,
predicate,
file_options,
}))
},
#[cfg(feature = "ipc")]
FileScan::Ipc { options } => Ok(Box::new(executors::IpcExec {
path,
schema: file_info.schema,
predicate,
options,
file_options,
})),
FileScan::Ipc { options } => {
assert_eq!(paths.len(), 1);
let path = paths[0].clone();
Ok(Box::new(executors::IpcExec {
path,
schema: file_info.schema,
predicate,
options,
file_options,
}))
},
#[cfg(feature = "parquet")]
FileScan::Parquet {
options,
cloud_options,
metadata
} => Ok(Box::new(executors::ParquetExec::new(
path,
file_info,
predicate,
options,
cloud_options,
file_options,
metadata
))),
} => {
assert_eq!(paths.len(), 1);
let path = paths[0].clone();
Ok(Box::new(executors::ParquetExec::new(
path,
file_info,
predicate,
options,
cloud_options,
file_options,
metadata
)))
},
FileScan::Anonymous {
function,
..
Expand Down
8 changes: 5 additions & 3 deletions crates/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use polars_plan::prelude::*;
/// Get a set of the data source paths in this LogicalPlan
pub(crate) fn agg_source_paths(
root_lp: Node,
paths: &mut PlHashSet<PathBuf>,
acc_paths: &mut PlHashSet<PathBuf>,
lp_arena: &Arena<ALogicalPlan>,
) {
lp_arena.iter(root_lp).for_each(|(_, lp)| {
use ALogicalPlan::*;
if let Scan { path, .. } = lp {
paths.insert(path.clone());
if let Scan { paths, .. } = lp {
for path in paths.as_ref() {
acc_paths.insert(path.clone());
}
}
})
}
8 changes: 5 additions & 3 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box<dyn Source>)
},
Scan {
path,
paths,
file_info,
file_options,
predicate,
Expand All @@ -87,8 +87,9 @@ where
FileScan::Csv {
options: csv_options,
} => {
assert_eq!(paths.len(), 1);
let src = sources::CsvSource::new(
path,
paths[0].clone(),
file_info.schema,
csv_options,
file_options,
Expand All @@ -102,8 +103,9 @@ where
cloud_options,
metadata,
} => {
assert_eq!(paths.len(), 1);
let src = sources::ParquetSource::new(
path,
paths[0].clone(),
parquet_options,
cloud_options,
metadata,
Expand Down
30 changes: 19 additions & 11 deletions crates/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::fmt::{Display, Write};
use std::path::Path;
use std::path::PathBuf;

use polars_core::prelude::*;

Expand Down Expand Up @@ -150,9 +151,9 @@ impl LogicalPlan {
count,
} => {
let fmt = if *count == usize::MAX {
"CACHE".to_string()
Cow::Borrowed("CACHE")
} else {
format!("CACHE: {}times", *count)
Cow::Owned(format!("CACHE: {}times", *count))
};
let current_node = DotNode {
branch: *cache_id,
Expand Down Expand Up @@ -181,7 +182,7 @@ impl LogicalPlan {
acc_str,
prev_node,
"PYTHON",
Path::new(""),
&[],
options.with_columns.as_ref().map(|s| s.as_slice()),
options.schema.len(),
&options.predicate,
Expand Down Expand Up @@ -312,7 +313,7 @@ impl LogicalPlan {
}
},
Scan {
path,
paths,
file_info,
predicate,
scan_type,
Expand All @@ -324,7 +325,7 @@ impl LogicalPlan {
acc_str,
prev_node,
name,
path.as_ref(),
paths.as_ref(),
options.with_columns.as_ref().map(|cols| cols.as_slice()),
file_info.schema.len(),
predicate,
Expand Down Expand Up @@ -409,7 +410,7 @@ impl LogicalPlan {
acc_str: &mut String,
prev_node: DotNode,
name: &str,
path: &Path,
path: &[PathBuf],
with_columns: Option<&[String]>,
total_columns: usize,
predicate: &Option<P>,
Expand All @@ -422,13 +423,20 @@ impl LogicalPlan {
n_columns_fmt = format!("{}", columns.len());
}

let fmt = if path.len() == 1 {
path[0].to_string_lossy()
} else {
Cow::Owned(format!(
"{} files: first file: {}",
path.len(),
path[0].to_string_lossy()
))
};

let pred = fmt_predicate(predicate.as_ref());
let fmt = format!(
"{name} SCAN {};\nπ {}/{};\nσ {}",
path.to_string_lossy(),
n_columns_fmt,
total_columns,
pred,
fmt, n_columns_fmt, total_columns, pred,
);
let current_node = DotNode {
branch,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum ALogicalPlan {
predicate: Node,
},
Scan {
path: PathBuf,
paths: Arc<[PathBuf]>,
file_info: FileInfo,
predicate: Option<Node>,
/// schema of the projected file
Expand Down Expand Up @@ -293,7 +293,7 @@ impl ALogicalPlan {
options: *options,
},
Scan {
path,
paths,
file_info,
output_schema,
predicate,
Expand All @@ -305,7 +305,7 @@ impl ALogicalPlan {
new_predicate = exprs.pop()
}
Scan {
path: path.clone(),
paths: paths.clone(),
file_info: file_info.clone(),
output_schema: output_schema.clone(),
file_options: options.clone(),
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl LogicalPlanBuilder {
};

Ok(LogicalPlan::Scan {
path: "".into(),
paths: Arc::new([]),
file_info,
predicate: None,
file_options,
Expand Down Expand Up @@ -201,7 +201,7 @@ impl LogicalPlanBuilder {
hive_partitioning,
};
Ok(LogicalPlan::Scan {
path,
paths: Arc::new([path]),
file_info,
file_options: options,
predicate: None,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl LogicalPlanBuilder {
hive_partitioning: false,
};
Ok(LogicalPlan::Scan {
path,
paths: Arc::new([path]),
file_info,
file_options,
predicate: None,
Expand Down Expand Up @@ -299,6 +299,8 @@ impl LogicalPlanBuilder {
}
})?;

let paths = Arc::new([path]);

let mut magic_nr = [0u8; 2];
let res = file.read_exact(&mut magic_nr);
if raise_if_empty {
Expand Down Expand Up @@ -362,7 +364,7 @@ impl LogicalPlanBuilder {
hive_partitioning: false,
};
Ok(LogicalPlan::Scan {
path,
paths,
file_info,
file_options: options,
predicate: None,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-plan/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ pub fn to_alp(
let v = match lp {
LogicalPlan::Scan {
file_info,
path,
paths,
predicate,
scan_type,
file_options: options,
} => ALogicalPlan::Scan {
file_info,
path,
paths,
output_schema: None,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
scan_type,
Expand Down Expand Up @@ -597,14 +597,14 @@ impl ALogicalPlan {
};
match lp {
ALogicalPlan::Scan {
path,
paths,
file_info,
predicate,
scan_type,
output_schema: _,
file_options: options,
} => LogicalPlan::Scan {
path,
paths,
file_info,
predicate: predicate.map(|n| node_to_expr(n, expr_arena)),
scan_type,
Expand Down
Loading

0 comments on commit 3251703

Please sign in to comment.