Skip to content

Commit

Permalink
perf: Improve hive partition pruning with datetime predicates from SQL (
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Nov 8, 2024
1 parent 772d023 commit 235f240
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 51 deletions.
7 changes: 7 additions & 0 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ impl PhysicalExpr for AliasExpr {
Ok(self.finish(series))
}

fn evaluate_inline_impl(&self, depth_limit: u8) -> Option<Column> {
let depth_limit = depth_limit.checked_sub(1)?;
self.physical_expr
.evaluate_inline_impl(depth_limit)
.map(|s| self.finish(s))
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down
63 changes: 34 additions & 29 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::sync::OnceLock;

use polars_core::chunked_array::builder::get_list_builder;
use polars_core::prelude::*;
Expand Down Expand Up @@ -28,6 +29,7 @@ pub struct ApplyExpr {
check_lengths: bool,
allow_group_aware: bool,
output_field: Field,
inlined_eval: OnceLock<Option<Column>>,
}

impl ApplyExpr {
Expand Down Expand Up @@ -63,6 +65,7 @@ impl ApplyExpr {
check_lengths: options.check_lengths(),
allow_group_aware: options.flags.contains(FunctionFlags::ALLOW_GROUP_AWARE),
output_field,
inlined_eval: Default::default(),
}
}

Expand Down Expand Up @@ -347,6 +350,24 @@ impl PhysicalExpr for ApplyExpr {
}
}

fn evaluate_inline_impl(&self, depth_limit: u8) -> Option<Column> {
// For predicate evaluation at I/O of:
// `lit("2024-01-01").str.strptime()`

self.inlined_eval
.get_or_init(|| {
let depth_limit = depth_limit.checked_sub(1)?;
let mut inputs = self
.inputs
.iter()
.map(|x| x.evaluate_inline_impl(depth_limit).filter(|s| s.len() == 1))
.collect::<Option<Vec<_>>>()?;

self.eval_and_flatten(&mut inputs).ok()
})
.clone()
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down Expand Up @@ -576,11 +597,10 @@ impl ApplyExpr {
FunctionExpr::Boolean(BooleanFunction::IsIn) => {
let should_read = || -> Option<bool> {
let root = expr_to_leaf_column_name(&input[0]).ok()?;
let Expr::Literal(LiteralValue::Series(input)) = &input[1] else {
return None;
};
#[allow(clippy::explicit_auto_deref)]
let input: &Series = &**input;

let input = self.inputs[1].evaluate_inline()?;
let input = input.as_materialized_series();

let st = stats.get_stats(&root).ok()?;
let min = st.to_min()?;
let max = st.to_max()?;
Expand All @@ -603,35 +623,20 @@ impl ApplyExpr {
FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }) => {
let should_read = || -> Option<bool> {
let root: PlSmallStr = expr_to_leaf_column_name(&input[0]).ok()?;
let Expr::Literal(left) = &input[1] else {
return None;
};
let Expr::Literal(right) = &input[2] else {
return None;
};

let left = self.inputs[1]
.evaluate_inline()?
.as_materialized_series()
.clone();
let right = self.inputs[2]
.evaluate_inline()?
.as_materialized_series()
.clone();

let st = stats.get_stats(&root).ok()?;
let min = st.to_min()?;
let max = st.to_max()?;

let (left, left_dtype) = (left.to_any_value()?, left.get_datatype());
let (right, right_dtype) = (right.to_any_value()?, right.get_datatype());

let left = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[left],
&left_dtype,
false,
)
.ok()?;
let right = Series::from_any_values_and_dtype(
PlSmallStr::EMPTY,
&[right],
&right_dtype,
false,
)
.ok()?;

// don't read the row_group anyways as
// the condition will evaluate to false.
// e.g. in_between(10, 5)
Expand Down
38 changes: 22 additions & 16 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,17 +392,26 @@ mod stats {
impl BinaryExpr {
fn impl_should_read(&self, stats: &BatchStats) -> PolarsResult<bool> {
// See: #5864 for the rationale behind this.
use Expr::*;
use Operator::*;
if !self.expr.into_iter().all(|e| match e {
BinaryExpr { op, .. } => {
!matches!(op, Multiply | Divide | TrueDivide | FloorDivide | Modulus)
},
Column(_) | Literal(_) | Alias(_, _) => true,
_ => false,
}) {
return Ok(true);
{
use Operator::*;

match self.op {
// These don't result in a boolean output
Multiply | Divide | TrueDivide | FloorDivide | Modulus => return Ok(true),
_ => {},
}

let Expr::BinaryExpr { left, right, .. } = &self.expr else {
unreachable!()
};

match (left.as_ref(), right.as_ref()) {
// The logic below assumes one side is a column
(Expr::Column(_), _) | (_, Expr::Column(_)) => {},
_ => return Ok(true),
}
}

let schema = stats.schema();
let Some(fld_l) = self.left.to_field(schema).ok() else {
return Ok(true);
Expand All @@ -423,18 +432,16 @@ mod stats {
}
}

let dummy = DataFrame::empty();
let state = ExecutionState::new();

let out = match (self.left.is_literal(), self.right.is_literal()) {
(false, true) => {
let out = match (self.left.evaluate_inline(), self.right.evaluate_inline()) {
(None, Some(lit_s)) => {
let l = stats.get_stats(fld_l.name())?;
match l.to_min_max() {
None => Ok(true),
Some(min_max_s) => {
// will be incorrect if not
debug_assert_eq!(min_max_s.null_count(), 0);
let lit_s = self.right.evaluate(&dummy, &state).unwrap();
Ok(apply_operator_stats_rhs_lit(
&min_max_s.into_column(),
&lit_s,
Expand All @@ -443,14 +450,13 @@ mod stats {
},
}
},
(true, false) => {
(Some(lit_s), None) => {
let r = stats.get_stats(fld_r.name())?;
match r.to_min_max() {
None => Ok(true),
Some(min_max_s) => {
// will be incorrect if not
debug_assert_eq!(min_max_s.null_count(), 0);
let lit_s = self.left.evaluate(&dummy, &state).unwrap();
Ok(apply_operator_stats_lhs_lit(
&lit_s,
&min_max_s.into_column(),
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::OnceLock;

use polars_core::chunked_array::cast::CastOptions;
use polars_core::prelude::*;

Expand All @@ -9,6 +11,7 @@ pub struct CastExpr {
pub(crate) dtype: DataType,
pub(crate) expr: Expr,
pub(crate) options: CastOptions,
pub(crate) inlined_eval: OnceLock<Option<Column>>,
}

impl CastExpr {
Expand All @@ -27,6 +30,18 @@ impl PhysicalExpr for CastExpr {
self.finish(&column)
}

fn evaluate_inline_impl(&self, depth_limit: u8) -> Option<Column> {
self.inlined_eval
.get_or_init(|| {
let depth_limit = depth_limit.checked_sub(1)?;
self.input
.evaluate_inline_impl(depth_limit)
.filter(|x| x.len() == 1)
.and_then(|x| self.finish(&x).ok())
})
.clone()
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down
24 changes: 18 additions & 6 deletions crates/polars-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ impl LiteralExpr {
pub fn new(value: LiteralValue, expr: Expr) -> Self {
Self(value, expr)
}
}

impl PhysicalExpr for LiteralExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.1)
}
fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column> {
fn as_column(&self) -> PolarsResult<Column> {
use LiteralValue::*;
let s = match &self.0 {
#[cfg(feature = "dtype-i8")]
Expand Down Expand Up @@ -118,6 +113,23 @@ impl PhysicalExpr for LiteralExpr {
};
Ok(s)
}
}

impl PhysicalExpr for LiteralExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.1)
}
fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column> {
self.as_column()
}

fn evaluate_inline_impl(&self, _depth_limit: u8) -> Option<Column> {
use LiteralValue::*;
match &self.0 {
Range { .. } => None,
_ => self.as_column().ok(),
}
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,20 @@ pub trait PhysicalExpr: Send + Sync {
/// Take a DataFrame and evaluate the expression.
fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column>;

/// Attempt to cheaply evaluate this expression in-line without a DataFrame context.
/// This is used by StatsEvaluator when skipping files / row groups using a predicate.
/// TODO: Maybe in the future we can do this evaluation in-line at the optimizer stage?
///
/// Do not implement this directly - instead implement `evaluate_inline_impl`
fn evaluate_inline(&self) -> Option<Column> {
self.evaluate_inline_impl(4)
}

/// Implementation of `evaluate_inline`
fn evaluate_inline_impl(&self, _depth_limit: u8) -> Option<Column> {
None
}

/// Some expression that are not aggregations can be done per group
/// Think of sort, slice, filter, shift, etc.
/// defaults to ignoring the group
Expand Down
1 change: 1 addition & 0 deletions crates/polars-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ fn create_physical_expr_inner(
dtype: dtype.clone(),
expr: node_to_expr(expression, expr_arena),
options: *options,
inlined_eval: Default::default(),
}))
},
Ternary {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl ColumnStats {
///
/// Returns `None` if no maximum value is available.
pub fn to_min(&self) -> Option<&Series> {
// @scalar-opt
let min_val = self.min_value.as_ref()?;
let dtype = min_val.dtype();

Expand All @@ -177,6 +178,7 @@ impl ColumnStats {
///
/// Returns `None` if no maximum value is available.
pub fn to_max(&self) -> Option<&Series> {
// @scalar-opt
let max_val = self.max_value.as_ref()?;
let dtype = max_val.dtype();

Expand Down
72 changes: 72 additions & 0 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from math import ceil
from pathlib import Path
Expand Down Expand Up @@ -835,3 +836,74 @@ def test_streaming_scan_csv_with_row_index_19172(io_files_path: Path) -> None:
schema={"calories": pl.String, "index": pl.UInt32},
),
)


@pytest.mark.write_disk
def test_predicate_hive_pruning_with_cast(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame({"x": 1})

(p := (tmp_path / "date=2024-01-01")).mkdir()

df.write_parquet(p / "1")

(p := (tmp_path / "date=2024-01-02")).mkdir()

# Write an invalid parquet file that will cause errors if polars attempts to
# read it.
# This works because `scan_parquet()` only looks at the first file during
# schema inference.
(p / "1").write_text("not a parquet file")

expect = pl.DataFrame({"x": 1, "date": datetime(2024, 1, 1).date()})

lf = pl.scan_parquet(tmp_path)

q = lf.filter(pl.col("date") < datetime(2024, 1, 2).date())

assert_frame_equal(q.collect(), expect)

# This filter expr with stprtime is effectively what LazyFrame.sql()
# generates
q = lf.filter(
pl.col("date")
< pl.lit("2024-01-02").str.strptime(
dtype=pl.Date, format="%Y-%m-%d", ambiguous="latest"
)
)

assert_frame_equal(q.collect(), expect)

q = lf.sql("select * from self where date < '2024-01-02'")
assert_frame_equal(q.collect(), expect)


def test_predicate_stats_eval_nested_binary() -> None:
bufs: list[bytes] = []

for i in range(10):
b = io.BytesIO()
pl.DataFrame({"x": i}).write_parquet(b)
b.seek(0)
bufs.append(b.read())

assert_frame_equal(
(
pl.scan_parquet(bufs)
.filter(pl.col("x") % 2 == 0)
.collect(no_optimization=True)
),
pl.DataFrame({"x": [0, 2, 4, 6, 8]}),
)

assert_frame_equal(
(
pl.scan_parquet(bufs)
# The literal eval depth limit is 4 -
# * crates/polars-expr/src/expressions/mod.rs::PhysicalExpr::evaluate_inline
.filter(pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64))
.collect()
),
pl.DataFrame({"x": [2]}),
)

0 comments on commit 235f240

Please sign in to comment.