Skip to content

Commit

Permalink
feat(rust, python): allow sequential runners in select/with_columns (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 7, 2023
1 parent 8e6da21 commit 51aa46e
Show file tree
Hide file tree
Showing 26 changed files with 320 additions and 70 deletions.
47 changes: 41 additions & 6 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,23 @@ impl LazyFrame {
/// }
/// ```
pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
let exprs = exprs.as_ref().to_vec();
self.select_impl(exprs, ProjectionOptions { run_parallel: true })
}

pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
let exprs = exprs.as_ref().to_vec();
self.select_impl(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.project(exprs.as_ref().to_vec())
.build();
let lp = self.get_plan_builder().project(exprs, options).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down Expand Up @@ -999,7 +1011,15 @@ impl LazyFrame {
/// ```
pub fn with_column(self, expr: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(vec![expr]).build();
let lp = self
.get_plan_builder()
.with_columns(
vec![expr],
ProjectionOptions {
run_parallel: false,
},
)
.build();
Self::from_logical_plan(lp, opt_state)
}

Expand All @@ -1019,8 +1039,23 @@ impl LazyFrame {
/// ```
pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
let exprs = exprs.as_ref().to_vec();
self.with_columns_impl(exprs, ProjectionOptions { run_parallel: true })
}

/// Add multiple columns to a DataFrame, but evaluate them sequentially.
pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
let exprs = exprs.as_ref().to_vec();
self.with_columns_impl(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(exprs).build();
let lp = self.get_plan_builder().with_columns(exprs, options).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct ProjectionExec {
pub(crate) input_schema: SchemaRef,
#[cfg(test)]
pub(crate) schema: SchemaRef,
pub(crate) options: ProjectionOptions,
}

impl ProjectionExec {
Expand All @@ -25,6 +26,7 @@ impl ProjectionExec {
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
#[allow(unused_mut)]
let mut df = check_expand_literals(selected_cols, df.height() == 0)?;
Expand Down
27 changes: 23 additions & 4 deletions crates/polars-lazy/src/physical_plan/executors/projection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,40 @@ fn run_exprs_par(
})
}

fn run_exprs_seq(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
exprs.iter().map(|expr| expr.evaluate(df, state)).collect()
}

pub(super) fn evaluate_physical_expressions(
df: &mut DataFrame,
cse_exprs: &[Arc<dyn PhysicalExpr>],
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
run_parallel: bool,
) -> PolarsResult<Vec<Series>> {
let runner = if has_windows {
let expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};

let cse_expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && cse_exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};

let selected_columns = if !cse_exprs.is_empty() {
let tmp_cols = runner(df, cse_exprs, state)?;
let tmp_cols = cse_expr_runner(df, cse_exprs, state)?;
if has_windows {
state.clear_window_expr_cache();
}
Expand All @@ -144,7 +163,7 @@ pub(super) fn evaluate_physical_expressions(
unsafe {
df.hstack_mut_unchecked(&tmp_cols);
}
let mut result = runner(df, exprs, state)?;
let mut result = expr_runner(df, exprs, state)?;
// restore original df
unsafe {
df.get_columns_mut().truncate(width);
Expand All @@ -158,7 +177,7 @@ pub(super) fn evaluate_physical_expressions(

result
} else {
runner(df, exprs, state)?
expr_runner(df, exprs, state)?
};

if has_windows {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct StackExec {
pub(crate) cse_exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
pub(crate) options: ProjectionOptions,
}

impl StackExec {
Expand All @@ -20,6 +21,7 @@ impl StackExec {
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
state.clear_window_expr_cache();

Expand Down
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub fn create_physical_plan(
expr,
input,
schema: _schema,
options,
..
} => {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
Expand Down Expand Up @@ -259,6 +260,7 @@ pub fn create_physical_plan(
input_schema,
#[cfg(test)]
schema: _schema,
options,
}))
}
LocalProjection {
Expand Down Expand Up @@ -286,6 +288,7 @@ pub fn create_physical_plan(
input_schema,
#[cfg(test)]
schema: _schema,
options: Default::default(),
}))
}
DataFrameScan {
Expand Down Expand Up @@ -519,6 +522,7 @@ pub fn create_physical_plan(
input,
exprs,
schema: _schema,
options,
} => {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;
Expand Down Expand Up @@ -547,6 +551,7 @@ pub fn create_physical_plan(
cse_exprs,
exprs: phys_exprs,
input_schema,
options,
}))
}
MapFunction {
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub enum ALogicalPlan {
input: Node,
expr: ProjectionExprs,
schema: SchemaRef,
options: ProjectionOptions,
},
LocalProjection {
expr: Vec<Node>,
Expand Down Expand Up @@ -95,6 +96,7 @@ pub enum ALogicalPlan {
input: Node,
exprs: ProjectionExprs,
schema: SchemaRef,
options: ProjectionOptions,
},
Distinct {
input: Node,
Expand Down Expand Up @@ -252,10 +254,13 @@ impl ALogicalPlan {
expr: exprs,
schema: schema.clone(),
},
Projection { schema, .. } => Projection {
Projection {
schema, options, ..
} => Projection {
input: inputs[0],
expr: ProjectionExprs::new(exprs),
schema: schema.clone(),
options: *options,
},
Aggregate {
keys,
Expand Down Expand Up @@ -302,10 +307,13 @@ impl ALogicalPlan {
input: inputs[0],
options: options.clone(),
},
HStack { schema, .. } => HStack {
HStack {
schema, options, ..
} => HStack {
input: inputs[0],
exprs: ProjectionExprs::new(exprs),
schema: schema.clone(),
options: *options,
},
Scan {
path,
Expand Down
13 changes: 10 additions & 3 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn project(self, exprs: Vec<Expr>) -> Self {
pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
let schema = try_delayed!(self.0.schema(), &self.0, into);
let (exprs, schema) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into);

Expand All @@ -373,6 +373,7 @@ impl LogicalPlanBuilder {
expr: exprs,
input: Box::new(self.0),
schema: Arc::new(schema),
options,
}
.into()
}
Expand Down Expand Up @@ -410,10 +411,15 @@ impl LogicalPlanBuilder {
_ => None,
})
.collect();
self.with_columns(exprs)
self.with_columns(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

pub fn with_columns(self, exprs: Vec<Expr>) -> Self {
pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
// current schema
let schema = try_delayed!(self.0.schema(), &self.0, into);
let mut new_schema = (**schema).clone();
Expand Down Expand Up @@ -441,6 +447,7 @@ impl LogicalPlanBuilder {
input: Box::new(self.0),
exprs,
schema: Arc::new(new_schema),
options,
}
.into()
}
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-plan/src/logical_plan/builder_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
self.add_alp(lp)
}

pub fn project(self, exprs: Vec<Node>) -> Self {
pub fn project(self, exprs: Vec<Node>, options: ProjectionOptions) -> Self {
let input_schema = self.lp_arena.get(self.root).schema(self.lp_arena);
let schema = aexprs_to_schema(&exprs, &input_schema, Context::Default, self.expr_arena);

Expand All @@ -62,6 +62,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
expr: exprs.into(),
input: self.root,
schema: Arc::new(schema),
options,
};
let node = self.lp_arena.add(lp);
ALogicalPlanBuilder::new(node, self.expr_arena, self.lp_arena)
Expand All @@ -82,7 +83,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
self.lp_arena.get(self.root).schema(self.lp_arena)
}

pub(crate) fn with_columns(self, exprs: Vec<Node>) -> Self {
pub(crate) fn with_columns(self, exprs: Vec<Node>, options: ProjectionOptions) -> Self {
let schema = self.schema();
let mut new_schema = (**schema).clone();

Expand All @@ -100,6 +101,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
input: self.root,
exprs: ProjectionExprs::new(exprs),
schema: Arc::new(new_schema),
options,
};
self.add_alp(lp)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-plan/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,15 @@ pub fn to_alp(
expr,
input,
schema,
options,
} => {
let expr = expr.into_iter().map(|x| to_aexpr(x, expr_arena)).collect();
let i = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::Projection {
expr,
input: i,
schema,
options,
}
}
LogicalPlan::LocalProjection {
Expand Down Expand Up @@ -335,13 +337,15 @@ pub fn to_alp(
input,
exprs,
schema,
options,
} => {
let exp = exprs.into_iter().map(|x| to_aexpr(x, expr_arena)).collect();
let input = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::HStack {
input,
exprs: exp,
schema,
options,
}
}
LogicalPlan::Distinct { input, options } => {
Expand Down Expand Up @@ -689,13 +693,15 @@ impl ALogicalPlan {
expr,
input,
schema,
options,
} => {
let i = convert_to_lp(input, lp_arena);

LogicalPlan::Projection {
expr: nodes_to_exprs(&expr, expr_arena),
input: Box::new(i),
schema,
options,
}
}
ALogicalPlan::LocalProjection {
Expand Down Expand Up @@ -772,13 +778,15 @@ impl ALogicalPlan {
input,
exprs,
schema,
options,
} => {
let i = convert_to_lp(input, lp_arena);

LogicalPlan::HStack {
input: Box::new(i),
exprs: nodes_to_exprs(&exprs, expr_arena),
schema,
options,
}
}
ALogicalPlan::Distinct { input, options } => {
Expand Down
Loading

0 comments on commit 51aa46e

Please sign in to comment.