Skip to content

Commit

Permalink
fix(rust, python): predicate pushdown #10058 (#10071)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jul 25, 2023
1 parent d872844 commit f8adbb7
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 112 deletions.
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct MeltArgs {

impl DataFrame {
pub fn explode_impl(&self, mut columns: Vec<Series>) -> PolarsResult<DataFrame> {
polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
let mut df = self.clone();
if self.height() == 0 {
for s in &columns {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use super::*;

fn should_block_join_specific(ae: &AExpr, how: &JoinType) -> bool {
use AExpr::*;
match ae {
// joins can produce null values
Function {
function:
FunctionExpr::Boolean(BooleanFunction::IsNotNull)
| FunctionExpr::Boolean(BooleanFunction::IsNull)
| FunctionExpr::FillNull { .. },
..
} => join_produces_null(how),
// joins can produce duplicates
#[cfg(feature = "is_unique")]
Function {
function:
FunctionExpr::Boolean(BooleanFunction::IsUnique)
| FunctionExpr::Boolean(BooleanFunction::IsDuplicated),
..
} => true,
#[cfg(feature = "is_first")]
Function {
function: FunctionExpr::Boolean(BooleanFunction::IsFirst),
..
} => true,
// any operation that checks for equality or ordering can be wrong because
// the join can produce null values
// TODO! check if we can be less conservative here
BinaryExpr { op, .. } => !matches!(op, Operator::NotEq) && join_produces_null(how),
_ => false,
}
}

fn join_produces_null(how: &JoinType) -> bool {
#[cfg(feature = "asof_join")]
{
matches!(
how,
JoinType::Left | JoinType::Outer | JoinType::Cross | JoinType::AsOf(_)
)
}
#[cfg(not(feature = "asof_join"))]
{
matches!(how, JoinType::Left | JoinType::Outer | JoinType::Cross)
}
}

#[allow(clippy::too_many_arguments)]
pub(super) fn process_join(
opt: &PredicatePushDown,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
input_left: Node,
input_right: Node,
left_on: Vec<Node>,
right_on: Vec<Node>,
schema: SchemaRef,
options: Arc<JoinOptions>,
acc_predicates: PlHashMap<Arc<str>, Node>,
) -> PolarsResult<ALogicalPlan> {
use ALogicalPlan::*;
let schema_left = lp_arena.get(input_left).schema(lp_arena);
let schema_right = lp_arena.get(input_right).schema(lp_arena);

let mut pushdown_left = init_hashmap(Some(acc_predicates.len()));
let mut pushdown_right = init_hashmap(Some(acc_predicates.len()));
let mut local_predicates = Vec::with_capacity(acc_predicates.len());

for (_, predicate) in acc_predicates {
// check if predicate can pass the joins node
if has_aexpr(predicate, expr_arena, |ae| {
should_block_join_specific(ae, &options.args.how)
}) {
local_predicates.push(predicate);
continue;
}
// these indicate to which tables we are going to push down the predicate
let mut filter_left = false;
let mut filter_right = false;

// predicate should not have an aggregation or window function as that would
// be influenced by join
#[allow(clippy::suspicious_else_formatting)]
if !predicate_is_pushdown_boundary(predicate, expr_arena) {
if check_input_node(predicate, &schema_left, expr_arena) {
insert_and_combine_predicate(&mut pushdown_left, predicate, expr_arena);
filter_left = true;
}
// this is `else if` because if the predicate is in the left hand side
// the right hand side should be renamed with the suffix.
// in that case we should not push down as the user wants to filter on `x`
// not on `x_rhs`.
else if check_input_node(predicate, &schema_right, expr_arena) {
insert_and_combine_predicate(&mut pushdown_right, predicate, expr_arena);
filter_right = true;
}
}
match (filter_left, filter_right, &options.args.how) {
// if not pushed down on one of the tables we have to do it locally.
(false, false, _) |
// if left join and predicate only available in right table,
// 'we should not filter right, because that would lead to
// invalid results.
// see: #2057
(false, true, JoinType::Left)
=> {
local_predicates.push(predicate);
continue;
},
// business as usual
_ => {}
}
}

opt.pushdown_and_assign(input_left, pushdown_left, lp_arena, expr_arena)?;
opt.pushdown_and_assign(input_right, pushdown_right, lp_arena, expr_arena)?;

let lp = Join {
input_left,
input_right,
left_on,
right_on,
schema,
options,
};
Ok(opt.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod join;
mod keys;
mod rename;
mod utils;
Expand All @@ -9,26 +10,13 @@ use utils::*;
use super::*;
use crate::dsl::function_expr::FunctionExpr;
use crate::logical_plan::{optimizer, Context};
use crate::prelude::optimizer::predicate_pushdown::join::process_join;
use crate::prelude::optimizer::predicate_pushdown::rename::process_rename;
use crate::utils::{aexprs_to_schema, check_input_node, has_aexpr};

#[derive(Default)]
pub struct PredicatePushDown {}

fn join_produces_null(how: &JoinType) -> bool {
#[cfg(feature = "asof_join")]
{
matches!(
how,
JoinType::Left | JoinType::Outer | JoinType::Cross | JoinType::AsOf(_)
)
}
#[cfg(not(feature = "asof_join"))]
{
matches!(how, JoinType::Left | JoinType::Outer | JoinType::Cross)
}
}

impl PredicatePushDown {
fn optional_apply_predicate(
&self,
Expand Down Expand Up @@ -361,104 +349,16 @@ impl PredicatePushDown {
schema,
options,
} => {
let schema_left = lp_arena.get(input_left).schema(lp_arena);
let schema_right = lp_arena.get(input_right).schema(lp_arena);

let mut pushdown_left = optimizer::init_hashmap(Some(acc_predicates.len()));
let mut pushdown_right = optimizer::init_hashmap(Some(acc_predicates.len()));
let mut local_predicates = Vec::with_capacity(acc_predicates.len());

for (_, predicate) in acc_predicates {
// unique and duplicated can be caused by joins
#[cfg(feature = "is_unique")]
let matches = {
|e: &AExpr| matches!(e, AExpr::Function{
function: FunctionExpr::Boolean(BooleanFunction::IsDuplicated)
| FunctionExpr::Boolean(BooleanFunction::IsUnique),
..
})
};
#[cfg(not(feature = "is_unique"))]
let matches = {
|_e: &AExpr| false
};


let checks_nulls =
|e: &AExpr| matches!(e, AExpr::Function{
function: FunctionExpr::Boolean(BooleanFunction::IsNotNull)
| FunctionExpr::Boolean(BooleanFunction::IsNull),
..
}) ||
// any operation that checks for equality or ordering can be wrong because
// the join can produce null values
matches!(e, AExpr::BinaryExpr {op, ..} if !matches!(op, Operator::NotEq));
if has_aexpr(predicate, expr_arena, matches)
// join might create null values.
|| has_aexpr(predicate, expr_arena, checks_nulls)
// only these join types produce null values
&& join_produces_null(&options.args.how) {
local_predicates.push(predicate);
continue;
}
// these indicate to which tables we are going to push down the predicate
let mut filter_left = false;
let mut filter_right = false;

// predicate should not have an aggregation or window function as that would
// be influenced by join
#[allow(clippy::suspicious_else_formatting)]
if !predicate_is_pushdown_boundary(predicate, expr_arena) {
if check_input_node(predicate, &schema_left, expr_arena) {
insert_and_combine_predicate(
&mut pushdown_left,
predicate,
expr_arena,
);
filter_left = true;
}
// this is `else if` because if the predicate is in the left hand side
// the right hand side should be renamed with the suffix.
// in that case we should not push down as the user wants to filter on `x`
// not on `x_rhs`.
else if check_input_node(predicate, &schema_right, expr_arena) {
insert_and_combine_predicate(
&mut pushdown_right,
predicate,
expr_arena,
);
filter_right = true;
}
}
match (filter_left, filter_right, &options.args.how) {
// if not pushed down on one of the tables we have to do it locally.
(false, false, _) |
// if left join and predicate only available in right table,
// 'we should not filter right, because that would lead to
// invalid results.
// see: #2057
(false, true, JoinType::Left)
=> {
local_predicates.push(predicate);
continue;
},
// business as usual
_ => {}
}
}

self.pushdown_and_assign(input_left, pushdown_left, lp_arena, expr_arena)?;
self.pushdown_and_assign(input_right, pushdown_right, lp_arena, expr_arena)?;

let lp = Join {
input_left,
input_right,
left_on,
right_on,
schema,
options,
};
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
process_join(self, lp_arena,
expr_arena,
input_left,
input_right,
left_on,
right_on,
schema,
options,
acc_predicates
)
}
MapFunction { ref function, .. } => {
if function.allow_predicate_pd()
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/test_predicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,15 @@ def test_predicate_pushdown_cumsum_9566() -> None:
q = df.lazy().sort(["B", "A"]).filter(pl.col("A").is_in([8, 2]).cumsum() == 1)

assert q.collect()["A"].to_list() == [8, 9, 0, 1]


def test_predicate_pushdown_join_fill_null_10058() -> None:
ids = pl.LazyFrame({"id": [0, 1, 2]})
filters = pl.LazyFrame({"id": [0, 1], "filter": [True, False]})

assert (
ids.join(filters, how="left", on="id")
.filter(pl.col("filter").fill_null(True))
.collect()
.to_dict(False)["id"]
) == [0, 2]

0 comments on commit f8adbb7

Please sign in to comment.