Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust, python): allow sequential runners in select/with_columns #10322

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,23 @@ impl LazyFrame {
/// }
/// ```
pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
let exprs = exprs.as_ref().to_vec();
self.select_impl(exprs, ProjectionOptions { run_parallel: true })
}

pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
let exprs = exprs.as_ref().to_vec();
self.select_impl(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.project(exprs.as_ref().to_vec())
.build();
let lp = self.get_plan_builder().project(exprs, options).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down Expand Up @@ -999,7 +1011,15 @@ impl LazyFrame {
/// ```
pub fn with_column(self, expr: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(vec![expr]).build();
let lp = self
.get_plan_builder()
.with_columns(
vec![expr],
ProjectionOptions {
run_parallel: false,
},
)
.build();
Self::from_logical_plan(lp, opt_state)
}

Expand All @@ -1019,8 +1039,23 @@ impl LazyFrame {
/// ```
pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
let exprs = exprs.as_ref().to_vec();
self.with_columns_impl(exprs, ProjectionOptions { run_parallel: true })
}

/// Add multiple columns to a DataFrame, but evaluate them sequentially.
pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
let exprs = exprs.as_ref().to_vec();
self.with_columns_impl(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(exprs).build();
let lp = self.get_plan_builder().with_columns(exprs, options).build();
Self::from_logical_plan(lp, opt_state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct ProjectionExec {
pub(crate) input_schema: SchemaRef,
#[cfg(test)]
pub(crate) schema: SchemaRef,
pub(crate) options: ProjectionOptions,
}

impl ProjectionExec {
Expand All @@ -25,6 +26,7 @@ impl ProjectionExec {
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
#[allow(unused_mut)]
let mut df = check_expand_literals(selected_cols, df.height() == 0)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,40 @@ fn run_exprs_par(
})
}

fn run_exprs_seq(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
exprs.iter().map(|expr| expr.evaluate(df, state)).collect()
}

pub(super) fn evaluate_physical_expressions(
df: &mut DataFrame,
cse_exprs: &[Arc<dyn PhysicalExpr>],
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
run_parallel: bool,
) -> PolarsResult<Vec<Series>> {
let runner = if has_windows {
let expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};

let cse_expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && cse_exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};

let selected_columns = if !cse_exprs.is_empty() {
let tmp_cols = runner(df, cse_exprs, state)?;
let tmp_cols = cse_expr_runner(df, cse_exprs, state)?;
if has_windows {
state.clear_window_expr_cache();
}
Expand All @@ -144,7 +163,7 @@ pub(super) fn evaluate_physical_expressions(
unsafe {
df.hstack_mut_unchecked(&tmp_cols);
}
let mut result = runner(df, exprs, state)?;
let mut result = expr_runner(df, exprs, state)?;
// restore original df
unsafe {
df.get_columns_mut().truncate(width);
Expand All @@ -158,7 +177,7 @@ pub(super) fn evaluate_physical_expressions(

result
} else {
runner(df, exprs, state)?
expr_runner(df, exprs, state)?
};

if has_windows {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct StackExec {
pub(crate) cse_exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
pub(crate) options: ProjectionOptions,
}

impl StackExec {
Expand All @@ -20,6 +21,7 @@ impl StackExec {
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
state.clear_window_expr_cache();

Expand Down
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub fn create_physical_plan(
expr,
input,
schema: _schema,
options,
..
} => {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
Expand Down Expand Up @@ -259,6 +260,7 @@ pub fn create_physical_plan(
input_schema,
#[cfg(test)]
schema: _schema,
options,
}))
}
LocalProjection {
Expand Down Expand Up @@ -286,6 +288,7 @@ pub fn create_physical_plan(
input_schema,
#[cfg(test)]
schema: _schema,
options: Default::default(),
}))
}
DataFrameScan {
Expand Down Expand Up @@ -519,6 +522,7 @@ pub fn create_physical_plan(
input,
exprs,
schema: _schema,
options,
} => {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;
Expand Down Expand Up @@ -547,6 +551,7 @@ pub fn create_physical_plan(
cse_exprs,
exprs: phys_exprs,
input_schema,
options,
}))
}
MapFunction {
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub enum ALogicalPlan {
input: Node,
expr: ProjectionExprs,
schema: SchemaRef,
options: ProjectionOptions,
},
LocalProjection {
expr: Vec<Node>,
Expand Down Expand Up @@ -95,6 +96,7 @@ pub enum ALogicalPlan {
input: Node,
exprs: ProjectionExprs,
schema: SchemaRef,
options: ProjectionOptions,
},
Distinct {
input: Node,
Expand Down Expand Up @@ -252,10 +254,13 @@ impl ALogicalPlan {
expr: exprs,
schema: schema.clone(),
},
Projection { schema, .. } => Projection {
Projection {
schema, options, ..
} => Projection {
input: inputs[0],
expr: ProjectionExprs::new(exprs),
schema: schema.clone(),
options: *options,
},
Aggregate {
keys,
Expand Down Expand Up @@ -302,10 +307,13 @@ impl ALogicalPlan {
input: inputs[0],
options: options.clone(),
},
HStack { schema, .. } => HStack {
HStack {
schema, options, ..
} => HStack {
input: inputs[0],
exprs: ProjectionExprs::new(exprs),
schema: schema.clone(),
options: *options,
},
Scan {
path,
Expand Down
13 changes: 10 additions & 3 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn project(self, exprs: Vec<Expr>) -> Self {
pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
let schema = try_delayed!(self.0.schema(), &self.0, into);
let (exprs, schema) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into);

Expand All @@ -373,6 +373,7 @@ impl LogicalPlanBuilder {
expr: exprs,
input: Box::new(self.0),
schema: Arc::new(schema),
options,
}
.into()
}
Expand Down Expand Up @@ -410,10 +411,15 @@ impl LogicalPlanBuilder {
_ => None,
})
.collect();
self.with_columns(exprs)
self.with_columns(
exprs,
ProjectionOptions {
run_parallel: false,
},
)
}

pub fn with_columns(self, exprs: Vec<Expr>) -> Self {
pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
// current schema
let schema = try_delayed!(self.0.schema(), &self.0, into);
let mut new_schema = (**schema).clone();
Expand Down Expand Up @@ -441,6 +447,7 @@ impl LogicalPlanBuilder {
input: Box::new(self.0),
exprs,
schema: Arc::new(new_schema),
options,
}
.into()
}
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-plan/src/logical_plan/builder_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
self.add_alp(lp)
}

pub fn project(self, exprs: Vec<Node>) -> Self {
pub fn project(self, exprs: Vec<Node>, options: ProjectionOptions) -> Self {
let input_schema = self.lp_arena.get(self.root).schema(self.lp_arena);
let schema = aexprs_to_schema(&exprs, &input_schema, Context::Default, self.expr_arena);

Expand All @@ -62,6 +62,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
expr: exprs.into(),
input: self.root,
schema: Arc::new(schema),
options,
};
let node = self.lp_arena.add(lp);
ALogicalPlanBuilder::new(node, self.expr_arena, self.lp_arena)
Expand All @@ -82,7 +83,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
self.lp_arena.get(self.root).schema(self.lp_arena)
}

pub(crate) fn with_columns(self, exprs: Vec<Node>) -> Self {
pub(crate) fn with_columns(self, exprs: Vec<Node>, options: ProjectionOptions) -> Self {
let schema = self.schema();
let mut new_schema = (**schema).clone();

Expand All @@ -100,6 +101,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
input: self.root,
exprs: ProjectionExprs::new(exprs),
schema: Arc::new(new_schema),
options,
};
self.add_alp(lp)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-plan/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,15 @@ pub fn to_alp(
expr,
input,
schema,
options,
} => {
let expr = expr.into_iter().map(|x| to_aexpr(x, expr_arena)).collect();
let i = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::Projection {
expr,
input: i,
schema,
options,
}
}
LogicalPlan::LocalProjection {
Expand Down Expand Up @@ -335,13 +337,15 @@ pub fn to_alp(
input,
exprs,
schema,
options,
} => {
let exp = exprs.into_iter().map(|x| to_aexpr(x, expr_arena)).collect();
let input = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::HStack {
input,
exprs: exp,
schema,
options,
}
}
LogicalPlan::Distinct { input, options } => {
Expand Down Expand Up @@ -689,13 +693,15 @@ impl ALogicalPlan {
expr,
input,
schema,
options,
} => {
let i = convert_to_lp(input, lp_arena);

LogicalPlan::Projection {
expr: nodes_to_exprs(&expr, expr_arena),
input: Box::new(i),
schema,
options,
}
}
ALogicalPlan::LocalProjection {
Expand Down Expand Up @@ -772,13 +778,15 @@ impl ALogicalPlan {
input,
exprs,
schema,
options,
} => {
let i = convert_to_lp(input, lp_arena);

LogicalPlan::HStack {
input: Box::new(i),
exprs: nodes_to_exprs(&exprs, expr_arena),
schema,
options,
}
}
ALogicalPlan::Distinct { input, options } => {
Expand Down
Loading