diff --git a/crates/polars-core/src/frame/chunks.rs b/crates/polars-core/src/frame/chunks.rs index ef364dda8c2f..30f8cb2bbd1f 100644 --- a/crates/polars-core/src/frame/chunks.rs +++ b/crates/polars-core/src/frame/chunks.rs @@ -19,3 +19,25 @@ impl std::convert::TryFrom<(ArrowChunk, &[ArrowField])> for DataFrame { DataFrame::new(columns?) } } + +impl DataFrame { + pub fn split_chunks(mut self) -> impl Iterator { + self.align_chunks(); + + (0..self.n_chunks()).map(move |i| unsafe { + let columns = self + .get_columns() + .iter() + .map(|s| { + Series::from_chunks_and_dtype_unchecked( + s.name(), + vec![s.chunks()[i].clone()], + s.dtype(), + ) + }) + .collect::>(); + + DataFrame::new_no_checks(columns) + }) + } +} diff --git a/crates/polars-lazy/src/physical_plan/executors/filter.rs b/crates/polars-lazy/src/physical_plan/executors/filter.rs index 60316a8bd480..0274a71846f7 100644 --- a/crates/polars-lazy/src/physical_plan/executors/filter.rs +++ b/crates/polars-lazy/src/physical_plan/executors/filter.rs @@ -1,3 +1,5 @@ +use polars_core::utils::accumulate_dataframes_vertical_unchecked; + use super::*; pub struct FilterExec { @@ -5,6 +7,15 @@ pub struct FilterExec { pub(crate) input: Box, // if the predicate contains a window function has_window: bool, + streamable: bool, +} + +fn series_to_mask(s: &Series) -> PolarsResult<&BooleanChunked> { + s.bool().map_err(|_| { + polars_err!( + ComputeError: "filter predicate must be of type `Boolean`, got `{}`", s.dtype() + ) + }) } impl FilterExec { @@ -12,13 +23,43 @@ impl FilterExec { predicate: Arc, input: Box, has_window: bool, + streamable: bool, ) -> Self { Self { predicate, input, has_window, + streamable, } } + + fn execute_impl( + &mut self, + df: DataFrame, + state: &mut ExecutionState, + ) -> PolarsResult { + // Vertical parallelism. + let df = if self.streamable && df.n_chunks() > 1 && df.height() > 0 { + let chunks = df.split_chunks().collect::>(); + let iter = chunks.into_par_iter().map(|df| { + let s = self.predicate.evaluate(&df, state)?; + df.filter(series_to_mask(&s)?) + }); + + let df = POOL.install(|| iter.collect::>>())?; + accumulate_dataframes_vertical_unchecked(df) + } else { + if self.has_window { + state.insert_has_window_function_flag() + } + let s = self.predicate.evaluate(&df, state)?; + if self.has_window { + state.clear_window_expr_cache() + } + df.filter(series_to_mask(&s)?)? + }; + Ok(df) + } } impl Executor for FilterExec { @@ -32,32 +73,19 @@ impl Executor for FilterExec { } let df = self.input.execute(state)?; - if self.has_window { - state.insert_has_window_function_flag() - } - let s = self.predicate.evaluate(&df, state)?; - if self.has_window { - state.clear_window_expr_cache() - } - let mask = s.bool().map_err(|_| { - polars_err!( - ComputeError: "filter predicate must be of type `Boolean`, got `{}`", s.dtype() - ) - })?; - let profile_name = if state.has_node_timer() { Cow::Owned(format!(".filter({})", &self.predicate.as_ref())) } else { Cow::Borrowed("") }; - state.record( + state.clone().record( || { - let df = df.filter(mask)?; + let df = self.execute_impl(df, state); if state.verbose() { eprintln!("dataframe filtered"); } - Ok(df) + df }, profile_name, ) diff --git a/crates/polars-lazy/src/physical_plan/executors/projection.rs b/crates/polars-lazy/src/physical_plan/executors/projection.rs index 786d174ede1e..7147b1094c59 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection.rs @@ -1,3 +1,5 @@ +use polars_core::utils::accumulate_dataframes_vertical_unchecked; + use super::*; /// Take an input Executor (creates the input DataFrame) @@ -11,6 +13,8 @@ pub struct ProjectionExec { #[cfg(test)] pub(crate) schema: SchemaRef, pub(crate) options: ProjectionOptions, + // Can run all operations elementwise + pub(crate) streamable: bool, } impl ProjectionExec { @@ -19,17 +23,47 @@ impl ProjectionExec { state: &ExecutionState, mut df: DataFrame, ) -> PolarsResult { - #[allow(clippy::let_and_return)] - let selected_cols = evaluate_physical_expressions( - &mut df, - &self.cse_exprs, - &self.expr, - state, - self.has_windows, - self.options.run_parallel, - )?; - #[allow(unused_mut)] - let mut df = check_expand_literals(selected_cols, df.height() == 0)?; + // Vertical and horizontal parallelism. + let df = + if self.streamable && df.n_chunks() > 1 && df.height() > 0 && self.options.run_parallel + { + let chunks = df.split_chunks().collect::>(); + let iter = chunks.into_par_iter().map(|mut df| { + let selected_cols = evaluate_physical_expressions( + &mut df, + &self.cse_exprs, + &self.expr, + state, + self.has_windows, + self.options.run_parallel, + )?; + check_expand_literals( + selected_cols, + df.height() == 0, + self.options.duplicate_check, + ) + }); + + let df = POOL.install(|| iter.collect::>>())?; + accumulate_dataframes_vertical_unchecked(df) + } + // Only horizontal parallelism. + else { + #[allow(clippy::let_and_return)] + let selected_cols = evaluate_physical_expressions( + &mut df, + &self.cse_exprs, + &self.expr, + state, + self.has_windows, + self.options.run_parallel, + )?; + check_expand_literals( + selected_cols, + df.height() == 0, + self.options.duplicate_check, + )? + }; // this only runs during testing and check if the runtime type matches the predicted schema #[cfg(test)] diff --git a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs index 2a4ea1cb24e7..018f8a1ed6e3 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs @@ -269,6 +269,7 @@ pub(super) fn evaluate_physical_expressions( pub(super) fn check_expand_literals( mut selected_columns: Vec, zero_length: bool, + duplicate_check: bool, ) -> PolarsResult { let Some(first_len) = selected_columns.first().map(|s| s.len()) else { return Ok(DataFrame::empty()); @@ -285,7 +286,7 @@ pub(super) fn check_expand_literals( } let name = s.name(); - if !names.insert(name) { + if duplicate_check && !names.insert(name) { let msg = format!( "the name: '{}' is duplicate\n\n\ It's possible that multiple expressions are returning the same default column \ diff --git a/crates/polars-lazy/src/physical_plan/executors/stack.rs b/crates/polars-lazy/src/physical_plan/executors/stack.rs index 9c931f018e3c..9e3dafc3feb2 100644 --- a/crates/polars-lazy/src/physical_plan/executors/stack.rs +++ b/crates/polars-lazy/src/physical_plan/executors/stack.rs @@ -1,3 +1,5 @@ +use polars_core::utils::accumulate_dataframes_vertical_unchecked; + use super::*; pub struct StackExec { @@ -7,6 +9,8 @@ pub struct StackExec { pub(crate) exprs: Vec>, pub(crate) input_schema: SchemaRef, pub(crate) options: ProjectionOptions, + // Can run all operations elementwise + pub(crate) streamable: bool, } impl StackExec { @@ -15,18 +19,44 @@ impl StackExec { state: &ExecutionState, mut df: DataFrame, ) -> PolarsResult { - let res = evaluate_physical_expressions( - &mut df, - &self.cse_exprs, - &self.exprs, - state, - self.has_windows, - self.options.run_parallel, - )?; - state.clear_window_expr_cache(); - let schema = &*self.input_schema; - df._add_columns(res, schema)?; + + // Vertical and horizontal parallelism. + let df = + if self.streamable && df.n_chunks() > 1 && df.height() > 0 && self.options.run_parallel + { + let chunks = df.split_chunks().collect::>(); + let iter = chunks.into_par_iter().map(|mut df| { + let res = evaluate_physical_expressions( + &mut df, + &self.cse_exprs, + &self.exprs, + state, + self.has_windows, + self.options.run_parallel, + )?; + df._add_columns(res, schema)?; + Ok(df) + }); + + let df = POOL.install(|| iter.collect::>>())?; + accumulate_dataframes_vertical_unchecked(df) + } + // Only horizontal parallelism + else { + let res = evaluate_physical_expressions( + &mut df, + &self.cse_exprs, + &self.exprs, + state, + self.has_windows, + self.options.run_parallel, + )?; + df._add_columns(res, schema)?; + df + }; + + state.clear_window_expr_cache(); Ok(df) } diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 2cd03493f69a..90255c6545c4 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -172,6 +172,7 @@ pub fn create_physical_plan( Ok(Box::new(executors::SliceExec { input, offset, len })) }, Filter { input, predicate } => { + let streamable = is_streamable(predicate.node(), expr_arena, Context::Default); let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); let input = create_physical_plan(input, lp_arena, expr_arena)?; let mut state = ExpressionConversionState::default(); @@ -186,6 +187,7 @@ pub fn create_physical_plan( predicate, input, state.has_windows, + streamable, ))) }, #[allow(unused_variables)] @@ -276,6 +278,12 @@ pub fn create_physical_plan( let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); let input = create_physical_plan(input, lp_arena, expr_arena)?; let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len()); + + let streamable = if expr.has_sub_exprs() { + false + } else { + all_streamable(&expr, expr_arena, Context::Default) + }; let phys_expr = create_physical_expressions_from_irs( expr.default_exprs(), Context::Default, @@ -299,6 +307,7 @@ pub fn create_physical_plan( #[cfg(test)] schema: _schema, options, + streamable, })) }, DataFrameScan { @@ -522,6 +531,12 @@ pub fn create_physical_plan( let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); let input = create_physical_plan(input, lp_arena, expr_arena)?; + let streamable = if exprs.has_sub_exprs() { + false + } else { + all_streamable(&exprs, expr_arena, Context::Default) + }; + let mut state = ExpressionConversionState::new(POOL.current_num_threads() > exprs.len()); @@ -547,6 +562,7 @@ pub fn create_physical_plan( exprs: phys_exprs, input_schema, options, + streamable, })) }, MapFunction { diff --git a/crates/polars-lazy/src/physical_plan/streaming/checks.rs b/crates/polars-lazy/src/physical_plan/streaming/checks.rs index 757149174dfb..94f1b98ff285 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/checks.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/checks.rs @@ -17,62 +17,6 @@ pub(super) fn is_streamable_sort( } } -pub(super) fn is_streamable(node: Node, expr_arena: &Arena, context: Context) -> bool { - // check whether leaf column is Col or Lit - let mut seen_column = false; - let mut seen_lit_range = false; - let all = expr_arena.iter(node).all(|(_, ae)| match ae { - AExpr::Function { - function: FunctionExpr::SetSortedFlag(_), - .. - } => true, - AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => match context - { - Context::Default => matches!( - options.collect_groups, - ApplyOptions::ElementWise | ApplyOptions::ApplyList - ), - Context::Aggregation => matches!(options.collect_groups, ApplyOptions::ElementWise), - }, - AExpr::Column(_) => { - seen_column = true; - true - }, - AExpr::Ternary { .. } - | AExpr::BinaryExpr { .. } - | AExpr::Alias(_, _) - | AExpr::Cast { .. } => true, - AExpr::Literal(lv) => match lv { - LiteralValue::Series(_) | LiteralValue::Range { .. } => { - seen_lit_range = true; - true - }, - _ => true, - }, - _ => false, - }); - - if all { - // adding a range or literal series to chunks will fail because sizes don't match - // if column is a leaf column then it is ok - // - so we want to block `with_column(lit(Series))` - // - but we want to allow `with_column(col("foo").is_in(Series))` - // that means that IFF we seen a lit_range, we only allow if we also seen a `column`. - return if seen_lit_range { seen_column } else { true }; - } - false -} - -pub(super) fn all_streamable( - exprs: &[ExprIR], - expr_arena: &Arena, - context: Context, -) -> bool { - exprs - .iter() - .all(|e| is_streamable(e.node(), expr_arena, context)) -} - /// check if all expressions are a simple column projection pub(super) fn all_column(exprs: &[ExprIR], expr_arena: &Arena) -> bool { exprs diff --git a/crates/polars-plan/src/logical_plan/aexpr/mod.rs b/crates/polars-plan/src/logical_plan/aexpr/mod.rs index ea61a8b25da7..52d132e3287c 100644 --- a/crates/polars-plan/src/logical_plan/aexpr/mod.rs +++ b/crates/polars-plan/src/logical_plan/aexpr/mod.rs @@ -1,6 +1,7 @@ #[cfg(feature = "cse")] mod hash; mod schema; +mod utils; use std::hash::{Hash, Hasher}; @@ -10,6 +11,7 @@ use polars_core::prelude::*; use polars_core::utils::{get_time_units, try_get_supertype}; use polars_utils::arena::{Arena, Node}; use strum_macros::IntoStaticStr; +pub use utils::*; use crate::constants::LEN; use crate::logical_plan::Context; diff --git a/crates/polars-plan/src/logical_plan/aexpr/utils.rs b/crates/polars-plan/src/logical_plan/aexpr/utils.rs new file mode 100644 index 000000000000..aef7cd157334 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/aexpr/utils.rs @@ -0,0 +1,71 @@ +use super::*; + +fn has_series_or_range(ae: &AExpr) -> bool { + matches!( + ae, + AExpr::Literal(LiteralValue::Series(_) | LiteralValue::Range { .. }) + ) +} + +pub fn is_streamable(node: Node, expr_arena: &Arena, context: Context) -> bool { + // check whether leaf column is Col or Lit + let mut seen_column = false; + let mut seen_lit_range = false; + let all = expr_arena.iter(node).all(|(_, ae)| match ae { + AExpr::Function { + function: FunctionExpr::SetSortedFlag(_), + .. + } => true, + AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => match context + { + Context::Default => matches!( + options.collect_groups, + ApplyOptions::ElementWise | ApplyOptions::ApplyList + ), + Context::Aggregation => matches!(options.collect_groups, ApplyOptions::ElementWise), + }, + AExpr::Column(_) => { + seen_column = true; + true + }, + AExpr::BinaryExpr { left, right, .. } => { + !has_aexpr(*left, expr_arena, has_series_or_range) + && !has_aexpr(*right, expr_arena, has_series_or_range) + }, + AExpr::Ternary { + truthy, + falsy, + predicate, + } => { + !has_aexpr(*truthy, expr_arena, has_series_or_range) + && !has_aexpr(*falsy, expr_arena, has_series_or_range) + && !has_aexpr(*predicate, expr_arena, has_series_or_range) + }, + AExpr::Alias(_, _) | AExpr::Cast { .. } => true, + AExpr::Literal(lv) => match lv { + LiteralValue::Series(_) | LiteralValue::Range { .. } => { + seen_lit_range = true; + true + }, + _ => true, + }, + _ => false, + }); + + if all { + // adding a range or literal series to chunks will fail because sizes don't match + // if column is a leaf column then it is ok + // - so we want to block `with_column(lit(Series))` + // - but we want to allow `with_column(col("foo").is_in(Series))` + // that means that IFF we seen a lit_range, we only allow if we also seen a `column`. + return if seen_lit_range { seen_column } else { true }; + } + + false +} + +pub fn all_streamable(exprs: &[ExprIR], expr_arena: &Arena, context: Context) -> bool { + exprs + .iter() + .all(|e| is_streamable(e.node(), expr_arena, context)) +} diff --git a/crates/polars-plan/src/logical_plan/alp/mod.rs b/crates/polars-plan/src/logical_plan/alp/mod.rs index 0bb74eca267c..c3d948952d70 100644 --- a/crates/polars-plan/src/logical_plan/alp/mod.rs +++ b/crates/polars-plan/src/logical_plan/alp/mod.rs @@ -54,7 +54,7 @@ pub enum IR { columns: SchemaRef, duplicate_check: bool, }, - // Polars' `select` operation. This may access full data. + // Polars' `select` operation. This may access full materialized data. Select { input: Node, expr: ProjectionExprs, diff --git a/crates/polars-plan/src/logical_plan/projection_expr.rs b/crates/polars-plan/src/logical_plan/projection_expr.rs index 75768828a6f3..79974daf3ebb 100644 --- a/crates/polars-plan/src/logical_plan/projection_expr.rs +++ b/crates/polars-plan/src/logical_plan/projection_expr.rs @@ -45,7 +45,7 @@ impl ProjectionExprs { } } - pub(crate) fn has_sub_exprs(&self) -> bool { + pub fn has_sub_exprs(&self) -> bool { self.common_sub_offset != 0 } diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 16b27d0e72e7..ecc5324ae752 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -467,7 +467,7 @@ def _read_csv_impl( n_rows: int | None = None, encoding: CsvEncoding = "utf8", low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, skip_rows_after_header: int = 0, row_index_name: str | None = None, row_index_offset: int = 0, @@ -611,7 +611,7 @@ def read_csv_batched( n_rows: int | None = None, encoding: CsvEncoding | str = "utf8", low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, skip_rows_after_header: int = 0, row_index_name: str | None = None, row_index_offset: int = 0, @@ -906,7 +906,7 @@ def scan_csv( n_rows: int | None = None, encoding: CsvEncoding = "utf8", low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, skip_rows_after_header: int = 0, row_index_name: str | None = None, row_index_offset: int = 0, @@ -1148,7 +1148,7 @@ def _scan_csv_impl( n_rows: int | None = None, encoding: CsvEncoding = "utf8", low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, skip_rows_after_header: int = 0, row_index_name: str | None = None, row_index_offset: int = 0,