diff --git a/polars/polars-core/src/frame/explode.rs b/polars/polars-core/src/frame/explode.rs index 0d8a46fd2d77..29e940021c68 100644 --- a/polars/polars-core/src/frame/explode.rs +++ b/polars/polars-core/src/frame/explode.rs @@ -37,6 +37,7 @@ pub struct MeltArgs { impl DataFrame { pub fn explode_impl(&self, mut columns: Vec) -> PolarsResult { + polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode"); let mut df = self.clone(); if self.height() == 0 { for s in &columns { diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/join.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/join.rs new file mode 100644 index 000000000000..b9e932673b85 --- /dev/null +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/join.rs @@ -0,0 +1,128 @@ +use super::*; + +fn should_block_join_specific(ae: &AExpr, how: &JoinType) -> bool { + use AExpr::*; + match ae { + // joins can produce null values + Function { + function: + FunctionExpr::Boolean(BooleanFunction::IsNotNull) + | FunctionExpr::Boolean(BooleanFunction::IsNull) + | FunctionExpr::FillNull { .. }, + .. + } => join_produces_null(how), + // joins can produce duplicates + #[cfg(feature = "is_unique")] + Function { + function: + FunctionExpr::Boolean(BooleanFunction::IsUnique) + | FunctionExpr::Boolean(BooleanFunction::IsDuplicated), + .. + } => true, + #[cfg(feature = "is_first")] + Function { + function: FunctionExpr::Boolean(BooleanFunction::IsFirst), + .. + } => true, + // any operation that checks for equality or ordering can be wrong because + // the join can produce null values + // TODO! check if we can be less conservative here + BinaryExpr { op, .. } => !matches!(op, Operator::NotEq) && join_produces_null(how), + _ => false, + } +} + +fn join_produces_null(how: &JoinType) -> bool { + #[cfg(feature = "asof_join")] + { + matches!( + how, + JoinType::Left | JoinType::Outer | JoinType::Cross | JoinType::AsOf(_) + ) + } + #[cfg(not(feature = "asof_join"))] + { + matches!(how, JoinType::Left | JoinType::Outer | JoinType::Cross) + } +} + +#[allow(clippy::too_many_arguments)] +pub(super) fn process_join( + opt: &PredicatePushDown, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + input_left: Node, + input_right: Node, + left_on: Vec, + right_on: Vec, + schema: SchemaRef, + options: Arc, + acc_predicates: PlHashMap, Node>, +) -> PolarsResult { + use ALogicalPlan::*; + let schema_left = lp_arena.get(input_left).schema(lp_arena); + let schema_right = lp_arena.get(input_right).schema(lp_arena); + + let mut pushdown_left = init_hashmap(Some(acc_predicates.len())); + let mut pushdown_right = init_hashmap(Some(acc_predicates.len())); + let mut local_predicates = Vec::with_capacity(acc_predicates.len()); + + for (_, predicate) in acc_predicates { + // check if predicate can pass the joins node + if has_aexpr(predicate, expr_arena, |ae| { + should_block_join_specific(ae, &options.args.how) + }) { + local_predicates.push(predicate); + continue; + } + // these indicate to which tables we are going to push down the predicate + let mut filter_left = false; + let mut filter_right = false; + + // predicate should not have an aggregation or window function as that would + // be influenced by join + #[allow(clippy::suspicious_else_formatting)] + if !predicate_is_pushdown_boundary(predicate, expr_arena) { + if check_input_node(predicate, &schema_left, expr_arena) { + insert_and_combine_predicate(&mut pushdown_left, predicate, expr_arena); + filter_left = true; + } + // this is `else if` because if the predicate is in the left hand side + // the right hand side should be renamed with the suffix. + // in that case we should not push down as the user wants to filter on `x` + // not on `x_rhs`. + else if check_input_node(predicate, &schema_right, expr_arena) { + insert_and_combine_predicate(&mut pushdown_right, predicate, expr_arena); + filter_right = true; + } + } + match (filter_left, filter_right, &options.args.how) { + // if not pushed down on one of the tables we have to do it locally. + (false, false, _) | + // if left join and predicate only available in right table, + // 'we should not filter right, because that would lead to + // invalid results. + // see: #2057 + (false, true, JoinType::Left) + => { + local_predicates.push(predicate); + continue; + }, + // business as usual + _ => {} + } + } + + opt.pushdown_and_assign(input_left, pushdown_left, lp_arena, expr_arena)?; + opt.pushdown_and_assign(input_right, pushdown_right, lp_arena, expr_arena)?; + + let lp = Join { + input_left, + input_right, + left_on, + right_on, + schema, + options, + }; + Ok(opt.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) +} diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 1ef1eb3cb370..134787fa78a9 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -1,3 +1,4 @@ +mod join; mod keys; mod rename; mod utils; @@ -9,26 +10,13 @@ use utils::*; use super::*; use crate::dsl::function_expr::FunctionExpr; use crate::logical_plan::{optimizer, Context}; +use crate::prelude::optimizer::predicate_pushdown::join::process_join; use crate::prelude::optimizer::predicate_pushdown::rename::process_rename; use crate::utils::{aexprs_to_schema, check_input_node, has_aexpr}; #[derive(Default)] pub struct PredicatePushDown {} -fn join_produces_null(how: &JoinType) -> bool { - #[cfg(feature = "asof_join")] - { - matches!( - how, - JoinType::Left | JoinType::Outer | JoinType::Cross | JoinType::AsOf(_) - ) - } - #[cfg(not(feature = "asof_join"))] - { - matches!(how, JoinType::Left | JoinType::Outer | JoinType::Cross) - } -} - impl PredicatePushDown { fn optional_apply_predicate( &self, @@ -361,104 +349,16 @@ impl PredicatePushDown { schema, options, } => { - let schema_left = lp_arena.get(input_left).schema(lp_arena); - let schema_right = lp_arena.get(input_right).schema(lp_arena); - - let mut pushdown_left = optimizer::init_hashmap(Some(acc_predicates.len())); - let mut pushdown_right = optimizer::init_hashmap(Some(acc_predicates.len())); - let mut local_predicates = Vec::with_capacity(acc_predicates.len()); - - for (_, predicate) in acc_predicates { - // unique and duplicated can be caused by joins - #[cfg(feature = "is_unique")] - let matches = { - |e: &AExpr| matches!(e, AExpr::Function{ - function: FunctionExpr::Boolean(BooleanFunction::IsDuplicated) - | FunctionExpr::Boolean(BooleanFunction::IsUnique), - .. - }) - }; - #[cfg(not(feature = "is_unique"))] - let matches = { - |_e: &AExpr| false - }; - - - let checks_nulls = - |e: &AExpr| matches!(e, AExpr::Function{ - function: FunctionExpr::Boolean(BooleanFunction::IsNotNull) - | FunctionExpr::Boolean(BooleanFunction::IsNull), - .. - }) || - // any operation that checks for equality or ordering can be wrong because - // the join can produce null values - matches!(e, AExpr::BinaryExpr {op, ..} if !matches!(op, Operator::NotEq)); - if has_aexpr(predicate, expr_arena, matches) - // join might create null values. - || has_aexpr(predicate, expr_arena, checks_nulls) - // only these join types produce null values - && join_produces_null(&options.args.how) { - local_predicates.push(predicate); - continue; - } - // these indicate to which tables we are going to push down the predicate - let mut filter_left = false; - let mut filter_right = false; - - // predicate should not have an aggregation or window function as that would - // be influenced by join - #[allow(clippy::suspicious_else_formatting)] - if !predicate_is_pushdown_boundary(predicate, expr_arena) { - if check_input_node(predicate, &schema_left, expr_arena) { - insert_and_combine_predicate( - &mut pushdown_left, - predicate, - expr_arena, - ); - filter_left = true; - } - // this is `else if` because if the predicate is in the left hand side - // the right hand side should be renamed with the suffix. - // in that case we should not push down as the user wants to filter on `x` - // not on `x_rhs`. - else if check_input_node(predicate, &schema_right, expr_arena) { - insert_and_combine_predicate( - &mut pushdown_right, - predicate, - expr_arena, - ); - filter_right = true; - } - } - match (filter_left, filter_right, &options.args.how) { - // if not pushed down on one of the tables we have to do it locally. - (false, false, _) | - // if left join and predicate only available in right table, - // 'we should not filter right, because that would lead to - // invalid results. - // see: #2057 - (false, true, JoinType::Left) - => { - local_predicates.push(predicate); - continue; - }, - // business as usual - _ => {} - } - } - - self.pushdown_and_assign(input_left, pushdown_left, lp_arena, expr_arena)?; - self.pushdown_and_assign(input_right, pushdown_right, lp_arena, expr_arena)?; - - let lp = Join { - input_left, - input_right, - left_on, - right_on, - schema, - options, - }; - Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) + process_join(self, lp_arena, + expr_arena, + input_left, + input_right, + left_on, + right_on, + schema, + options, + acc_predicates + ) } MapFunction { ref function, .. } => { if function.allow_predicate_pd() diff --git a/py-polars/tests/unit/test_predicates.py b/py-polars/tests/unit/test_predicates.py index 7a26c3d82ea7..9ea0ef9163be 100644 --- a/py-polars/tests/unit/test_predicates.py +++ b/py-polars/tests/unit/test_predicates.py @@ -169,3 +169,15 @@ def test_predicate_pushdown_cumsum_9566() -> None: q = df.lazy().sort(["B", "A"]).filter(pl.col("A").is_in([8, 2]).cumsum() == 1) assert q.collect()["A"].to_list() == [8, 9, 0, 1] + + +def test_predicate_pushdown_join_fill_null_10058() -> None: + ids = pl.LazyFrame({"id": [0, 1, 2]}) + filters = pl.LazyFrame({"id": [0, 1], "filter": [True, False]}) + + assert ( + ids.join(filters, how="left", on="id") + .filter(pl.col("filter").fill_null(True)) + .collect() + .to_dict(False)["id"] + ) == [0, 2]