From 6f8b4785c1b373b7740d9865671d624af325f9ed Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 22 Jul 2024 20:13:35 +0200 Subject: [PATCH] perf: Ensure only nodes that are not changed are cached in collapse optimizer (#17791) --- .../plans/optimizer/collapse_and_project.rs | 10 ++++-- .../optimizer/projection_pushdown/joins.rs | 23 +++++++----- py-polars/tests/unit/test_projections.py | 36 +++++++++++++++++++ 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs b/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs index 38ed9b6eed9a..e4c0ac87151a 100644 --- a/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs +++ b/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs @@ -12,7 +12,7 @@ use super::*; /// /// The schema reported after this optimization is also pub(super) struct SimpleProjectionAndCollapse { - /// keep track of nodes that are already processed when they + /// Keep track of nodes that are already processed when they /// can be expensive. Schema materialization can be for instance. processed: BTreeSet, eager: bool, @@ -39,12 +39,14 @@ impl OptimizationRule for SimpleProjectionAndCollapse { match lp { Select { input, expr, .. } => { - if !matches!(lp_arena.get(*input), ExtContext { .. }) && self.processed.insert(node) + if !matches!(lp_arena.get(*input), ExtContext { .. }) + && !self.processed.contains(&node) { // First check if we can apply the optimization before we allocate. if !expr.iter().all(|e| { matches!(expr_arena.get(e.node()), AExpr::Column(_)) && !e.has_alias() }) { + self.processed.insert(node); return None; } @@ -59,6 +61,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse { Some(alp) } else { + self.processed.insert(node); None } }, @@ -73,7 +76,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse { }), // Cleanup projections set in projection pushdown just above caches // they are not needed. - cache_lp @ Cache { .. } if self.processed.insert(node) => { + cache_lp @ Cache { .. } if self.processed.contains(&node) => { let cache_schema = cache_lp.schema(lp_arena); if cache_schema.len() == columns.len() && cache_schema.iter_names().zip(columns.iter_names()).all( @@ -92,6 +95,7 @@ impl OptimizationRule for SimpleProjectionAndCollapse { if *input_schema.as_ref() == *columns { Some(other.clone()) } else { + self.processed.insert(node); None } }, diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs index 6f635f1e354e..007ca07cf206 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs @@ -181,7 +181,7 @@ pub(super) fn process_asof_join( expr_arena, )?; - Ok(resolve_join_suffixes( + resolve_join_suffixes( input_left, input_right, left_on, @@ -190,7 +190,7 @@ pub(super) fn process_asof_join( lp_arena, expr_arena, &local_projection, - )) + ) } #[allow(clippy::too_many_arguments)] @@ -365,7 +365,7 @@ pub(super) fn process_join( expr_arena, )?; - Ok(resolve_join_suffixes( + resolve_join_suffixes( input_left, input_right, left_on, @@ -374,7 +374,7 @@ pub(super) fn process_join( lp_arena, expr_arena, &local_projection, - )) + ) } fn process_projection( @@ -469,13 +469,14 @@ fn resolve_join_suffixes( lp_arena: &mut Arena, expr_arena: &mut Arena, local_projection: &[ColumnNode], -) -> IR { +) -> PolarsResult { let suffix = options.args.suffix(); let alp = IRBuilder::new(input_left, expr_arena, lp_arena) .join(input_right, left_on, right_on, options.clone()) .build(); let schema_after_join = alp.schema(lp_arena); + let mut all_columns = true; let projections = local_projection .iter() .map(|proj| { @@ -484,6 +485,7 @@ fn resolve_join_suffixes( let downstream_name = &name.as_ref()[..name.len() - suffix.len()]; let col = AExpr::Column(ColumnName::from(downstream_name)); let node = expr_arena.add(col); + all_columns = false; ExprIR::new(node, OutputName::Alias(name.clone())) } else { ExprIR::new(proj.0, OutputName::ColumnLhs(name.clone())) @@ -491,7 +493,12 @@ fn resolve_join_suffixes( }) .collect::>(); - IRBuilder::from_lp(alp, expr_arena, lp_arena) - .project(projections, Default::default()) - .build() + let builder = IRBuilder::from_lp(alp, expr_arena, lp_arena); + Ok(if all_columns { + builder + .project_simple(projections.iter().map(|e| e.output_name()))? + .build() + } else { + builder.project(projections, Default::default()).build() + }) } diff --git a/py-polars/tests/unit/test_projections.py b/py-polars/tests/unit/test_projections.py index d4176d2ad265..9586bfb0a2ae 100644 --- a/py-polars/tests/unit/test_projections.py +++ b/py-polars/tests/unit/test_projections.py @@ -529,3 +529,39 @@ def test_projection_literal_no_alias_17739() -> None: assert df.select(pl.lit(False)).select("literal").collect().to_dict( as_series=False ) == {"literal": [False]} + + +def test_projections_collapse_17781() -> None: + frame1 = pl.LazyFrame( + { + "index": [0], + "data1": [0], + "data2": [0], + } + ) + frame2 = pl.LazyFrame( + { + "index": [0], + "label1": [True], + "label2": [False], + "label3": [False], + }, + schema=[ + ("index", pl.Int64), + ("label1", pl.Boolean), + ("label2", pl.Boolean), + ("label3", pl.Boolean), + ], + ) + cols = ["index", "data1", "label1", "label2"] + + lf = None + for lfj in [frame1, frame2]: + use_columns = [c for c in cols if c in lfj.collect_schema().names()] + lfj = lfj.select(use_columns) + lfj = lfj.select(use_columns) + if lf is None: + lf = lfj + else: + lf = lf.join(lfj, on="index", how="left") + assert "SELECT " not in lf.explain() # type: ignore[union-attr]