Skip to content

Commit

Permalink
feat: CSE don't scan share if predicate pushdown predicates don't mat…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 27, 2024
1 parent 67faaed commit af81de0
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 71 deletions.
42 changes: 42 additions & 0 deletions crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ fn cached_before_root(q: LazyFrame) {
}
}

fn count_caches(q: LazyFrame) -> usize {
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena)
.iter(node)
.filter(|(_node, lp)| matches!(lp, ALogicalPlan::Cache { .. }))
.count()
}

#[test]
fn test_cse_self_joins() -> PolarsResult<()> {
let lf = scan_foods_ipc();
Expand Down Expand Up @@ -312,3 +320,37 @@ fn test_cse_columns_projections() -> PolarsResult<()> {

Ok(())
}

#[test]
fn test_cse_prune_scan_filter_difference() -> PolarsResult<()> {
let lf = scan_foods_ipc();
let lf = lf.with_column(col("category").str().to_uppercase());

let pred = col("fats_g").gt(2.0);

// If filter are the same, we can cache
let q = lf
.clone()
.filter(pred.clone())
.left_join(lf.clone().filter(pred), col("fats_g"), col("fats_g"))
.with_comm_subplan_elim(true);
cached_before_root(q);

// If the filters are different the caches are removed.
let q = lf
.clone()
.filter(col("fats_g").gt(2.0))
.clone()
.left_join(
lf.filter(col("fats_g").gt(1.0)),
col("fats_g"),
col("fats_g"),
)
.with_comm_subplan_elim(true);

// Check that the caches are removed and that both predicates have been pushed down instead.
assert_eq!(count_caches(q.clone()), 0);
assert!(predicate_at_scan(q));

Ok(())
}
67 changes: 53 additions & 14 deletions crates/polars-plan/src/dsl/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
pub use super::expr_dyn_fn::*;
use crate::prelude::*;

#[derive(PartialEq, Clone)]
#[derive(PartialEq, Clone, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum AggExpr {
Min {
Expand Down Expand Up @@ -220,22 +220,61 @@ impl Hash for Expr {
std::mem::discriminant(function).hash(state);
options.hash(state);
},
Expr::Gather {
expr,
idx,
returns_scalar,
} => {
expr.hash(state);
idx.hash(state);
returns_scalar.hash(state);
},
// already hashed by discriminant
Expr::Wildcard | Expr::Len => {},
#[allow(unreachable_code)]
_ => {
// the panic checks if we hit this
#[cfg(debug_assertions)]
{
todo!("IMPLEMENT")
}
// TODO! derive. This is only a temporary fix
// Because PartialEq will have a lot of `false`, e.g. on Function
// Types, this may lead to many file reads, as we use predicate comparison
// to check if we can cache a file
let s = format!("{self:?}");
s.hash(state)
Expr::SortBy {
expr,
by,
descending,
} => {
expr.hash(state);
by.hash(state);
descending.hash(state);
},
Expr::Agg(input) => input.hash(state),
Expr::Explode(input) => input.hash(state),
Expr::Window {
function,
partition_by,
options,
} => {
function.hash(state);
partition_by.hash(state);
options.hash(state);
},
Expr::Slice {
input,
offset,
length,
} => {
input.hash(state);
offset.hash(state);
length.hash(state);
},
Expr::Exclude(input, excl) => {
input.hash(state);
excl.hash(state);
},
Expr::RenameAlias { function: _, expr } => expr.hash(state),
Expr::AnonymousFunction {
input,
function: _,
output_type: _,
options,
} => {
input.hash(state);
options.hash(state);
},
Expr::SubPlan(_, names) => names.hash(state),
}
}
}
Expand Down
Loading

0 comments on commit af81de0

Please sign in to comment.