From 9bde4333c6c1bd11a68c1a7d4b5e91aef4695b72 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 2 Aug 2023 09:48:10 +0200 Subject: [PATCH] fix(rust, python): fix cse profile (#10239) --- .../src/physical_plan/executors/mod.rs | 202 +--------------- .../src/physical_plan/executors/projection.rs | 16 +- .../executors/projection_utils.rs | 218 ++++++++++++++++++ .../src/physical_plan/executors/stack.rs | 8 +- .../src/physical_plan/planner/lp.rs | 4 +- crates/polars-plan/src/utils.rs | 13 +- .../tests/unit/operations/test_profile.py | 32 +++ py-polars/tests/unit/test_lazy.py | 14 -- 8 files changed, 281 insertions(+), 226 deletions(-) create mode 100644 crates/polars-lazy/src/physical_plan/executors/projection_utils.rs create mode 100644 py-polars/tests/unit/operations/test_profile.py diff --git a/crates/polars-lazy/src/physical_plan/executors/mod.rs b/crates/polars-lazy/src/physical_plan/executors/mod.rs index 307e856d084a..bf95a06ed6f7 100644 --- a/crates/polars-lazy/src/physical_plan/executors/mod.rs +++ b/crates/polars-lazy/src/physical_plan/executors/mod.rs @@ -8,6 +8,7 @@ mod groupby_partitioned; mod groupby_rolling; mod join; mod projection; +mod projection_utils; #[cfg(feature = "python")] mod python_scan; mod scan; @@ -24,6 +25,7 @@ pub use executor::*; use polars_core::POOL; use polars_plan::global::FETCH_ROWS; use polars_plan::utils::*; +use projection_utils::*; use rayon::prelude::*; pub(super) use self::cache::*; @@ -47,203 +49,3 @@ pub(super) use self::udf::*; pub(super) use self::union::*; pub(super) use self::unique::*; use super::*; - -fn execute_projection_cached_window_fns( - df: &DataFrame, - exprs: &[Arc], - state: &ExecutionState, -) -> PolarsResult> { - // We partition by normal expression and window expression - // - the normal expressions can run in parallel - // - the window expression take more memory and often use the same groupby keys and join tuples - // so they are cached and run sequential - - // the partitioning messes with column order, so we also store the idx - // and use those to restore the original projection order - #[allow(clippy::type_complexity)] - // String: partition_name, - // u32: index, - let mut windows: Vec<(String, Vec<(u32, Arc)>)> = vec![]; - let mut other = Vec::with_capacity(exprs.len()); - - // first we partition the window function by the values they group over. - // the groupby values should be cached - let mut index = 0u32; - exprs.iter().for_each(|phys| { - index += 1; - let e = phys.as_expression().unwrap(); - - let mut is_window = false; - for e in e.into_iter() { - if let Expr::Window { partition_by, .. } = e { - let groupby = format!("{:?}", partition_by.as_slice()); - if let Some(tpl) = windows.iter_mut().find(|tpl| tpl.0 == groupby) { - tpl.1.push((index, phys.clone())) - } else { - windows.push((groupby, vec![(index, phys.clone())])) - } - is_window = true; - break; - } - } - if !is_window { - other.push((index, phys)) - } - }); - - let mut selected_columns = POOL.install(|| { - other - .par_iter() - .map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s))) - .collect::>>() - })?; - - for partition in windows { - // clear the cache for every partitioned group - let mut state = state.split(); - // inform the expression it has window functions. - state.insert_has_window_function_flag(); - - // don't bother caching if we only have a single window function in this partition - if partition.1.len() == 1 { - state.remove_cache_window_flag(); - } else { - state.insert_cache_window_flag(); - } - - for (index, e) in partition.1 { - if e.as_expression() - .unwrap() - .into_iter() - .filter(|e| matches!(e, Expr::Window { .. })) - .count() - == 1 - { - state.insert_cache_window_flag(); - } - // caching more than one window expression is a complicated topic for another day - // see issue #2523 - else { - state.remove_cache_window_flag(); - } - - let s = e.evaluate(df, &state)?; - selected_columns.push((index, s)); - } - } - - selected_columns.sort_unstable_by_key(|tpl| tpl.0); - let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect(); - Ok(selected_columns) -} - -fn run_exprs_par( - df: &DataFrame, - exprs: &[Arc], - state: &ExecutionState, -) -> PolarsResult> { - POOL.install(|| { - exprs - .par_iter() - .map(|expr| expr.evaluate(df, state)) - .collect() - }) -} - -pub(super) fn evaluate_physical_expressions( - df: &mut DataFrame, - cse_exprs: &[Arc], - exprs: &[Arc], - state: &ExecutionState, - has_windows: bool, -) -> PolarsResult> { - let runner = if has_windows { - execute_projection_cached_window_fns - } else { - run_exprs_par - }; - - let selected_columns = if !cse_exprs.is_empty() { - let tmp_cols = runner(df, cse_exprs, state)?; - let width = df.width(); - - // put the cse expressions at the end - unsafe { - df.hstack_mut_unchecked(&tmp_cols); - } - let mut result = run_exprs_par(df, exprs, state)?; - // restore original df - unsafe { - df.get_columns_mut().truncate(width); - } - - // the replace CSE has a temporary name - // we don't want this name in the result - for s in result.iter_mut() { - rename_cse_tmp_series(s); - } - - result - } else { - runner(df, exprs, state)? - }; - - state.clear_window_expr_cache(); - - Ok(selected_columns) -} - -pub(super) fn check_expand_literals( - mut selected_columns: Vec, - zero_length: bool, -) -> PolarsResult { - let first_len = selected_columns[0].len(); - let mut df_height = 0; - let mut all_equal_len = true; - { - let mut names = PlHashSet::with_capacity(selected_columns.len()); - for s in &selected_columns { - let len = s.len(); - df_height = std::cmp::max(df_height, len); - if len != first_len { - all_equal_len = false; - } - let name = s.name(); - polars_ensure!(names.insert(name), duplicate = name); - } - } - // If all series are the same length it is ok. If not we can broadcast Series of length one. - if !all_equal_len { - selected_columns = selected_columns - .into_iter() - .map(|series| { - Ok(if series.len() == 1 && df_height > 1 { - series.new_from_index(0, df_height) - } else if series.len() == df_height || series.len() == 0 { - series - } else { - polars_bail!( - ComputeError: "series length {} doesn't match the dataframe height of {}", - series.len(), df_height - ); - }) - }) - .collect::>()? - } - - let df = DataFrame::new_no_checks(selected_columns); - - // a literal could be projected to a zero length dataframe. - // This prevents a panic. - let df = if zero_length { - let min = df.get_columns().iter().map(|s| s.len()).min(); - if min.is_some() { - df.head(min) - } else { - df - } - } else { - df - }; - Ok(df) -} diff --git a/crates/polars-lazy/src/physical_plan/executors/projection.rs b/crates/polars-lazy/src/physical_plan/executors/projection.rs index b4e5a11ded9c..83b3799c29ce 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection.rs @@ -4,7 +4,7 @@ use super::*; /// and a multiple PhysicalExpressions (create the output Series) pub struct ProjectionExec { pub(crate) input: Box, - pub(crate) cse_expr: Vec>, + pub(crate) cse_exprs: Vec>, pub(crate) expr: Vec>, pub(crate) has_windows: bool, pub(crate) input_schema: SchemaRef, @@ -21,7 +21,7 @@ impl ProjectionExec { #[allow(clippy::let_and_return)] let selected_cols = evaluate_physical_expressions( &mut df, - &self.cse_expr, + &self.cse_exprs, &self.expr, state, self.has_windows, @@ -48,10 +48,10 @@ impl Executor for ProjectionExec { #[cfg(debug_assertions)] { if state.verbose() { - if self.cse_expr.is_empty() { + if self.cse_exprs.is_empty() { println!("run ProjectionExec"); } else { - println!("run ProjectionExec with {} CSE", self.cse_expr.len()) + println!("run ProjectionExec with {} CSE", self.cse_exprs.len()) }; } } @@ -61,7 +61,13 @@ impl Executor for ProjectionExec { let by = self .expr .iter() - .map(|s| Ok(s.to_field(&self.input_schema)?.name)) + .map(|s| { + profile_name( + s.as_ref(), + self.input_schema.as_ref(), + !self.cse_exprs.is_empty(), + ) + }) .collect::>>()?; let name = comma_delimited("projection".to_string(), &by); Cow::Owned(name) diff --git a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs new file mode 100644 index 000000000000..b6ba1017c32b --- /dev/null +++ b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs @@ -0,0 +1,218 @@ +use smartstring::alias::String as SmartString; + +use super::*; + +pub(super) fn profile_name( + s: &dyn PhysicalExpr, + input_schema: &Schema, + has_cse: bool, +) -> PolarsResult { + match (has_cse, s.to_field(input_schema)) { + (false, Err(e)) => Err(e), + (true, Err(_)) => Ok(expr_to_leaf_column_names_iter(s.as_expression().unwrap()) + .map(|n| n.as_ref().into()) + .next() + .unwrap()), + (_, Ok(fld)) => Ok(fld.name), + } +} + +fn execute_projection_cached_window_fns( + df: &DataFrame, + exprs: &[Arc], + state: &ExecutionState, +) -> PolarsResult> { + // We partition by normal expression and window expression + // - the normal expressions can run in parallel + // - the window expression take more memory and often use the same groupby keys and join tuples + // so they are cached and run sequential + + // the partitioning messes with column order, so we also store the idx + // and use those to restore the original projection order + #[allow(clippy::type_complexity)] + // String: partition_name, + // u32: index, + let mut windows: Vec<(String, Vec<(u32, Arc)>)> = vec![]; + let mut other = Vec::with_capacity(exprs.len()); + + // first we partition the window function by the values they group over. + // the groupby values should be cached + let mut index = 0u32; + exprs.iter().for_each(|phys| { + index += 1; + let e = phys.as_expression().unwrap(); + + let mut is_window = false; + for e in e.into_iter() { + if let Expr::Window { partition_by, .. } = e { + let groupby = format!("{:?}", partition_by.as_slice()); + if let Some(tpl) = windows.iter_mut().find(|tpl| tpl.0 == groupby) { + tpl.1.push((index, phys.clone())) + } else { + windows.push((groupby, vec![(index, phys.clone())])) + } + is_window = true; + break; + } + } + if !is_window { + other.push((index, phys)) + } + }); + + let mut selected_columns = POOL.install(|| { + other + .par_iter() + .map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s))) + .collect::>>() + })?; + + for partition in windows { + // clear the cache for every partitioned group + let mut state = state.split(); + // inform the expression it has window functions. + state.insert_has_window_function_flag(); + + // don't bother caching if we only have a single window function in this partition + if partition.1.len() == 1 { + state.remove_cache_window_flag(); + } else { + state.insert_cache_window_flag(); + } + + for (index, e) in partition.1 { + if e.as_expression() + .unwrap() + .into_iter() + .filter(|e| matches!(e, Expr::Window { .. })) + .count() + == 1 + { + state.insert_cache_window_flag(); + } + // caching more than one window expression is a complicated topic for another day + // see issue #2523 + else { + state.remove_cache_window_flag(); + } + + let s = e.evaluate(df, &state)?; + selected_columns.push((index, s)); + } + } + + selected_columns.sort_unstable_by_key(|tpl| tpl.0); + let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect(); + Ok(selected_columns) +} + +fn run_exprs_par( + df: &DataFrame, + exprs: &[Arc], + state: &ExecutionState, +) -> PolarsResult> { + POOL.install(|| { + exprs + .par_iter() + .map(|expr| expr.evaluate(df, state)) + .collect() + }) +} + +pub(super) fn evaluate_physical_expressions( + df: &mut DataFrame, + cse_exprs: &[Arc], + exprs: &[Arc], + state: &ExecutionState, + has_windows: bool, +) -> PolarsResult> { + let runner = if has_windows { + execute_projection_cached_window_fns + } else { + run_exprs_par + }; + + let selected_columns = if !cse_exprs.is_empty() { + let tmp_cols = runner(df, cse_exprs, state)?; + let width = df.width(); + + // put the cse expressions at the end + unsafe { + df.hstack_mut_unchecked(&tmp_cols); + } + let mut result = run_exprs_par(df, exprs, state)?; + // restore original df + unsafe { + df.get_columns_mut().truncate(width); + } + + // the replace CSE has a temporary name + // we don't want this name in the result + for s in result.iter_mut() { + rename_cse_tmp_series(s); + } + + result + } else { + runner(df, exprs, state)? + }; + + state.clear_window_expr_cache(); + + Ok(selected_columns) +} + +pub(super) fn check_expand_literals( + mut selected_columns: Vec, + zero_length: bool, +) -> PolarsResult { + let first_len = selected_columns[0].len(); + let mut df_height = 0; + let mut all_equal_len = true; + { + let mut names = PlHashSet::with_capacity(selected_columns.len()); + for s in &selected_columns { + let len = s.len(); + df_height = std::cmp::max(df_height, len); + if len != first_len { + all_equal_len = false; + } + let name = s.name(); + polars_ensure!(names.insert(name), duplicate = name); + } + } + // If all series are the same length it is ok. If not we can broadcast Series of length one. + if !all_equal_len { + selected_columns = selected_columns + .into_iter() + .map(|series| { + Ok(if series.len() == 1 && df_height > 1 { + series.new_from_index(0, df_height) + } else if series.len() == df_height || series.len() == 0 { + series + } else { + polars_bail!( + ComputeError: "series length {} doesn't match the dataframe height of {}", + series.len(), df_height + ); + }) + }) + .collect::>()? + } + + let df = DataFrame::new_no_checks(selected_columns); + + // a literal could be projected to a zero length dataframe. + // This prevents a panic. + let df = if zero_length { + let min = df.get_columns().iter().map(|s| s.len()).min(); + if min.is_some() { + df.head(min) + } else { + df + } + } else { + df + }; + Ok(df) +} diff --git a/crates/polars-lazy/src/physical_plan/executors/stack.rs b/crates/polars-lazy/src/physical_plan/executors/stack.rs index 18f4572cc526..c3857aef0c31 100644 --- a/crates/polars-lazy/src/physical_plan/executors/stack.rs +++ b/crates/polars-lazy/src/physical_plan/executors/stack.rs @@ -48,7 +48,13 @@ impl Executor for StackExec { let by = self .exprs .iter() - .map(|s| Ok(s.to_field(&self.input_schema)?.name)) + .map(|s| { + profile_name( + s.as_ref(), + self.input_schema.as_ref(), + !self.cse_exprs.is_empty(), + ) + }) .collect::>>()?; let name = comma_delimited("with_column".to_string(), &by); Cow::Owned(name) diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index d79a6a8c156f..0f08d5718cee 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -253,7 +253,7 @@ pub fn create_physical_plan( )?; Ok(Box::new(executors::ProjectionExec { input, - cse_expr, + cse_exprs: cse_expr, expr: phys_expr, has_windows: state.has_windows, input_schema, @@ -280,7 +280,7 @@ pub fn create_physical_plan( )?; Ok(Box::new(executors::ProjectionExec { input, - cse_expr: vec![], + cse_exprs: vec![], expr: phys_expr, has_windows: state.has_windows, input_schema, diff --git a/crates/polars-plan/src/utils.rs b/crates/polars-plan/src/utils.rs index 4facd5aa58a6..212c8fd92a7d 100644 --- a/crates/polars-plan/src/utils.rs +++ b/crates/polars-plan/src/utils.rs @@ -1,6 +1,7 @@ use std::fmt::Formatter; -use std::iter::FlatMap; +use std::iter::{FlatMap, Map}; use std::sync::Arc; +use std::vec::IntoIter; use polars_core::prelude::*; use smartstring::alias::String as SmartString; @@ -200,12 +201,16 @@ pub(crate) fn get_single_leaf(expr: &Expr) -> PolarsResult> { ); } -/// This should gradually replace expr_to_root_column as this will get all names in the tree. -pub fn expr_to_leaf_column_names(expr: &Expr) -> Vec> { +#[allow(clippy::type_complexity)] +pub fn expr_to_leaf_column_names_iter(expr: &Expr) -> Map, fn(Expr) -> Arc> { expr_to_root_column_exprs(expr) .into_iter() .map(|e| expr_to_leaf_column_name(&e).unwrap()) - .collect() +} + +/// This should gradually replace expr_to_root_column as this will get all names in the tree. +pub fn expr_to_leaf_column_names(expr: &Expr) -> Vec> { + expr_to_leaf_column_names_iter(expr).collect() } /// unpack alias(col) to name of the root column name diff --git a/py-polars/tests/unit/operations/test_profile.py b/py-polars/tests/unit/operations/test_profile.py new file mode 100644 index 000000000000..ef7e8b1fd170 --- /dev/null +++ b/py-polars/tests/unit/operations/test_profile.py @@ -0,0 +1,32 @@ +import polars as pl + + +def test_profile_columns() -> None: + ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]}) + + # profile lazyframe operation/plan + lazy = ldf.groupby("a").agg(pl.implode("b")) + profiling_info = lazy.profile() + # ┌──────────────┬───────┬─────┐ + # │ node ┆ start ┆ end │ + # │ --- ┆ --- ┆ --- │ + # │ str ┆ u64 ┆ u64 │ + # ╞══════════════╪═══════╪═════╡ + # │ optimization ┆ 0 ┆ 69 │ + # │ groupby(a) ┆ 69 ┆ 342 │ + # └──────────────┴───────┴─────┘ + assert len(profiling_info) == 2 + assert profiling_info[1].columns == ["node", "start", "end"] + + +def test_profile_with_cse() -> None: + df = pl.DataFrame({"x": [], "y": []}) + + x = pl.col("x") + y = pl.col("y") + + assert df.lazy().with_columns( + pl.when(x.is_null()) + .then(None) + .otherwise(pl.when(y == 0).then(None).otherwise(x + y)) + ).profile(comm_subexpr_elim=True)[1].shape == (2, 3) diff --git a/py-polars/tests/unit/test_lazy.py b/py-polars/tests/unit/test_lazy.py index 849fc555c592..00a42ccd293a 100644 --- a/py-polars/tests/unit/test_lazy.py +++ b/py-polars/tests/unit/test_lazy.py @@ -41,20 +41,6 @@ def test_lazy() -> None: eager = ldf.groupby("a").agg(pl.implode("b")).collect() assert sorted(eager.rows()) == [(1, [[1.0]]), (2, [[2.0]]), (3, [[3.0]])] - # profile lazyframe operation/plan - lazy = ldf.groupby("a").agg(pl.implode("b")) - profiling_info = lazy.profile() - # ┌──────────────┬───────┬─────┐ - # │ node ┆ start ┆ end │ - # │ --- ┆ --- ┆ --- │ - # │ str ┆ u64 ┆ u64 │ - # ╞══════════════╪═══════╪═════╡ - # │ optimization ┆ 0 ┆ 69 │ - # │ groupby(a) ┆ 69 ┆ 342 │ - # └──────────────┴───────┴─────┘ - assert len(profiling_info) == 2 - assert profiling_info[1].columns == ["node", "start", "end"] - @pytest.mark.parametrize( ("data", "repr_"),