Skip to content

Commit

Permalink
perf(rust, python): don't parallelize literal expressions (#10321)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 6, 2023
1 parent 19e622d commit c3c1f85
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl PhysicalExpr for ApplyExpr {

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Series> {
let f = |e: &Arc<dyn PhysicalExpr>| e.evaluate(df, state);
let mut inputs = if self.allow_threading {
let mut inputs = if self.allow_threading && self.inputs.len() > 1 {
POOL.install(|| {
self.inputs
.par_iter()
Expand Down
13 changes: 9 additions & 4 deletions crates/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

pub struct BinaryExpr {
pub(crate) left: Arc<dyn PhysicalExpr>,
pub(crate) op: Operator,
pub(crate) right: Arc<dyn PhysicalExpr>,
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
expr: Expr,
has_literal: bool,
}

impl BinaryExpr {
Expand All @@ -20,12 +21,14 @@ impl BinaryExpr {
op: Operator,
right: Arc<dyn PhysicalExpr>,
expr: Expr,
has_literal: bool,
) -> Self {
Self {
left,
op,
right,
expr,
has_literal,
}
}
}
Expand Down Expand Up @@ -172,7 +175,9 @@ impl PhysicalExpr for BinaryExpr {
self.left.evaluate(df, &state),
self.right.evaluate(df, &state),
)
} else if in_streaming {
}
// literals are free, don't pay par cost
else if in_streaming || self.has_literal {
(
self.left.evaluate(df, state),
self.right.evaluate(df, state),
Expand Down
17 changes: 14 additions & 3 deletions crates/polars-lazy/src/physical_plan/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct TernaryExpr {
truthy: Arc<dyn PhysicalExpr>,
falsy: Arc<dyn PhysicalExpr>,
expr: Expr,
// can expensive on small data to run literals in parallel
run_par: bool,
}

impl TernaryExpr {
Expand All @@ -21,12 +23,14 @@ impl TernaryExpr {
truthy: Arc<dyn PhysicalExpr>,
falsy: Arc<dyn PhysicalExpr>,
expr: Expr,
run_par: bool,
) -> Self {
Self {
predicate,
truthy,
falsy,
expr,
run_par,
}
}
}
Expand Down Expand Up @@ -99,7 +103,11 @@ impl PhysicalExpr for TernaryExpr {
let op_truthy = || self.truthy.evaluate(df, &state);
let op_falsy = || self.falsy.evaluate(df, &state);

let (truthy, falsy) = POOL.install(|| rayon::join(op_truthy, op_falsy));
let (truthy, falsy) = if self.run_par {
POOL.install(|| rayon::join(op_truthy, op_falsy))
} else {
(op_truthy(), op_falsy())
};
let mut truthy = truthy?;
let mut falsy = falsy?;

Expand Down Expand Up @@ -139,8 +147,11 @@ impl PhysicalExpr for TernaryExpr {
let op_truthy = || self.truthy.evaluate_on_groups(df, groups, state);
let op_falsy = || self.falsy.evaluate_on_groups(df, groups, state);

let (ac_mask, (ac_truthy, ac_falsy)) =
POOL.install(|| rayon::join(op_mask, || rayon::join(op_truthy, op_falsy)));
let (ac_mask, (ac_truthy, ac_falsy)) = if self.run_par {
POOL.install(|| rayon::join(op_mask, || rayon::join(op_truthy, op_falsy)))
} else {
(op_mask(), (op_truthy(), op_falsy()))
};

let ac_mask = ac_mask?;
let mut ac_truthy = ac_truthy?;
Expand Down
21 changes: 17 additions & 4 deletions crates/polars-lazy/src/physical_plan/planner/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub(crate) struct ExpressionConversionState {
struct LocalConversionState {
has_implode: bool,
has_window: bool,
has_lit: bool,
}

impl ExpressionConversionState {
Expand Down Expand Up @@ -148,10 +149,13 @@ pub(crate) fn create_physical_expr(
expr: node_to_expr(expression, expr_arena),
}))
}
Literal(value) => Ok(Arc::new(LiteralExpr::new(
value,
node_to_expr(expression, expr_arena),
))),
Literal(value) => {
state.local.has_lit = true;
Ok(Arc::new(LiteralExpr::new(
value,
node_to_expr(expression, expr_arena),
)))
}
BinaryExpr { left, op, right } => {
let lhs = create_physical_expr(left, ctxt, expr_arena, schema, state)?;
let rhs = create_physical_expr(right, ctxt, expr_arena, schema, state)?;
Expand All @@ -160,6 +164,7 @@ pub(crate) fn create_physical_expr(
op,
rhs,
node_to_expr(expression, expr_arena),
state.local.has_lit,
)))
}
Column(column) => Ok(Arc::new(ColumnExpr::new(
Expand Down Expand Up @@ -421,14 +426,22 @@ pub(crate) fn create_physical_expr(
truthy,
falsy,
} => {
let mut lit_count = 0u8;
state.reset();
let predicate = create_physical_expr(predicate, ctxt, expr_arena, schema, state)?;
lit_count += state.local.has_lit as u8;
state.reset();
let truthy = create_physical_expr(truthy, ctxt, expr_arena, schema, state)?;
lit_count += state.local.has_lit as u8;
state.reset();
let falsy = create_physical_expr(falsy, ctxt, expr_arena, schema, state)?;
lit_count += state.local.has_lit as u8;
Ok(Arc::new(TernaryExpr::new(
predicate,
truthy,
falsy,
node_to_expr(expression, expr_arena),
lit_count < 2,
)))
}
AnonymousFunction {
Expand Down

0 comments on commit c3c1f85

Please sign in to comment.