Skip to content

Commit

Permalink
perf: Make LogicalPlan immutable (pola-rs#15416)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 1, 2024
1 parent 82f717b commit 758b55a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 66 deletions.
6 changes: 3 additions & 3 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ impl LazyFrame {
) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
input: Arc::new(self.logical_plan),
payload: SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
Expand Down Expand Up @@ -806,7 +806,7 @@ impl LazyFrame {
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
input: Arc::new(self.logical_plan),
payload,
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
Expand Down Expand Up @@ -1846,7 +1846,7 @@ impl LazyGroupBy {
let options = GroupbyOptions { slice: None };

let lp = LogicalPlan::Aggregate {
input: Box::new(self.logical_plan),
input: Arc::new(self.logical_plan),
keys: Arc::new(self.keys),
aggs: vec![],
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl PartitionGroupByExec {
}
.into();
let lp = LogicalPlan::Aggregate {
input: Box::new(original_df.lazy().logical_plan),
input: Arc::new(original_df.lazy().logical_plan),
keys: Arc::new(std::mem::take(&mut self.keys)),
aggs: std::mem::take(&mut self.aggs),
schema: self.output_schema.clone(),
Expand Down
42 changes: 21 additions & 21 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ macro_rules! raise_err {
let err = $err.wrap_msg(&format_err_outer);

LogicalPlan::Error {
input: Box::new(input),
input: Arc::new(input),
err: err.into(), // PolarsError -> ErrorState
}
},
Expand Down Expand Up @@ -424,7 +424,7 @@ impl LogicalPlanBuilder {
}

pub fn cache(self) -> Self {
let input = Box::new(self.0);
let input = Arc::new(self.0);
let id = input.as_ref() as *const LogicalPlan as usize;
LogicalPlan::Cache {
input,
Expand Down Expand Up @@ -461,7 +461,7 @@ impl LogicalPlanBuilder {
} else {
LogicalPlan::Projection {
expr: columns,
input: Box::new(self.0),
input: Arc::new(self.0),
schema: Arc::new(output_schema),
options: ProjectionOptions {
run_parallel: false,
Expand All @@ -486,7 +486,7 @@ impl LogicalPlanBuilder {
} else {
LogicalPlan::Projection {
expr: exprs,
input: Box::new(self.0),
input: Arc::new(self.0),
schema: Arc::new(schema),
options,
}
Expand Down Expand Up @@ -576,7 +576,7 @@ impl LogicalPlanBuilder {
}

LogicalPlan::HStack {
input: Box::new(self.0),
input: Arc::new(self.0),
exprs,
schema: Arc::new(new_schema),
options,
Expand All @@ -586,7 +586,7 @@ impl LogicalPlanBuilder {

pub fn add_err(self, err: PolarsError) -> Self {
LogicalPlan::Error {
input: Box::new(self.0),
input: Arc::new(self.0),
err: err.into(),
}
.into()
Expand All @@ -608,7 +608,7 @@ impl LogicalPlanBuilder {
}
}
LogicalPlan::ExtContext {
input: Box::new(self.0),
input: Arc::new(self.0),
contexts,
schema: Arc::new(schema),
}
Expand Down Expand Up @@ -692,7 +692,7 @@ impl LogicalPlanBuilder {

LogicalPlan::Selection {
predicate,
input: Box::new(self.0),
input: Arc::new(self.0),
}
.into()
}
Expand Down Expand Up @@ -777,7 +777,7 @@ impl LogicalPlanBuilder {
};

LogicalPlan::Aggregate {
input: Box::new(self.0),
input: Arc::new(self.0),
keys: Arc::new(keys),
aggs,
schema: Arc::new(schema),
Expand Down Expand Up @@ -814,7 +814,7 @@ impl LogicalPlanBuilder {
let schema = try_delayed!(self.0.schema(), &self.0, into);
let by_column = try_delayed!(rewrite_projections(by_column, &schema, &[]), &self.0, into);
LogicalPlan::Sort {
input: Box::new(self.0),
input: Arc::new(self.0),
by_column,
args: SortArguments {
descending,
Expand Down Expand Up @@ -846,7 +846,7 @@ impl LogicalPlanBuilder {
try_delayed!(explode_schema(&mut schema, &columns), &self.0, into);

LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function: FunctionNode::Explode {
columns,
schema: Arc::new(schema),
Expand All @@ -859,7 +859,7 @@ impl LogicalPlanBuilder {
let schema = try_delayed!(self.0.schema(), &self.0, into);
let schema = det_melt_schema(&args, &schema);
LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function: FunctionNode::Melt { args, schema },
}
.into()
Expand All @@ -871,7 +871,7 @@ impl LogicalPlanBuilder {
row_index_schema(schema_mut, name);

LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function: FunctionNode::RowIndex {
name: ColumnName::from(name),
offset,
Expand All @@ -883,15 +883,15 @@ impl LogicalPlanBuilder {

pub fn distinct(self, options: DistinctOptions) -> Self {
LogicalPlan::Distinct {
input: Box::new(self.0),
input: Arc::new(self.0),
options,
}
.into()
}

pub fn slice(self, offset: i64, len: IdxSize) -> Self {
LogicalPlan::Slice {
input: Box::new(self.0),
input: Arc::new(self.0),
offset,
len,
}
Expand All @@ -908,7 +908,7 @@ impl LogicalPlanBuilder {
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
return LogicalPlan::Error {
input: Box::new(self.0),
input: Arc::new(self.0),
err: polars_err!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
Expand All @@ -929,8 +929,8 @@ impl LogicalPlanBuilder {
);

LogicalPlan::Join {
input_left: Box::new(self.0),
input_right: Box::new(other),
input_left: Arc::new(self.0),
input_right: Arc::new(other),
schema,
left_on,
right_on,
Expand All @@ -940,7 +940,7 @@ impl LogicalPlanBuilder {
}
pub fn map_private(self, function: FunctionNode) -> Self {
LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function,
}
.into()
Expand All @@ -955,7 +955,7 @@ impl LogicalPlanBuilder {
validate_output: bool,
) -> Self {
LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function: FunctionNode::OpaquePython {
function,
schema,
Expand All @@ -981,7 +981,7 @@ impl LogicalPlanBuilder {
let function = Arc::new(function);

LogicalPlan::MapFunction {
input: Box::new(self.0),
input: Arc::new(self.0),
function: FunctionNode::Opaque {
function,
schema,
Expand Down
Loading

0 comments on commit 758b55a

Please sign in to comment.