diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index cc96ce2173d3..f08646959e3f 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -751,7 +751,7 @@ impl LazyFrame { ) -> PolarsResult<()> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), + input: Arc::new(self.logical_plan), payload: SinkType::Cloud { uri: Arc::new(uri), cloud_options, @@ -806,7 +806,7 @@ impl LazyFrame { fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), + input: Arc::new(self.logical_plan), payload, }; let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; @@ -1846,7 +1846,7 @@ impl LazyGroupBy { let options = GroupbyOptions { slice: None }; let lp = LogicalPlan::Aggregate { - input: Box::new(self.logical_plan), + input: Arc::new(self.logical_plan), keys: Arc::new(self.keys), aggs: vec![], schema, diff --git a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs index d10a09dffcbb..0f5f1e50901b 100644 --- a/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs +++ b/crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs @@ -257,7 +257,7 @@ impl PartitionGroupByExec { } .into(); let lp = LogicalPlan::Aggregate { - input: Box::new(original_df.lazy().logical_plan), + input: Arc::new(original_df.lazy().logical_plan), keys: Arc::new(std::mem::take(&mut self.keys)), aggs: std::mem::take(&mut self.aggs), schema: self.output_schema.clone(), diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 3f1fba60c172..9a257eb95b9f 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -68,7 +68,7 @@ macro_rules! raise_err { let err = $err.wrap_msg(&format_err_outer); LogicalPlan::Error { - input: Box::new(input), + input: Arc::new(input), err: err.into(), // PolarsError -> ErrorState } }, @@ -424,7 +424,7 @@ impl LogicalPlanBuilder { } pub fn cache(self) -> Self { - let input = Box::new(self.0); + let input = Arc::new(self.0); let id = input.as_ref() as *const LogicalPlan as usize; LogicalPlan::Cache { input, @@ -461,7 +461,7 @@ impl LogicalPlanBuilder { } else { LogicalPlan::Projection { expr: columns, - input: Box::new(self.0), + input: Arc::new(self.0), schema: Arc::new(output_schema), options: ProjectionOptions { run_parallel: false, @@ -486,7 +486,7 @@ impl LogicalPlanBuilder { } else { LogicalPlan::Projection { expr: exprs, - input: Box::new(self.0), + input: Arc::new(self.0), schema: Arc::new(schema), options, } @@ -576,7 +576,7 @@ impl LogicalPlanBuilder { } LogicalPlan::HStack { - input: Box::new(self.0), + input: Arc::new(self.0), exprs, schema: Arc::new(new_schema), options, @@ -586,7 +586,7 @@ impl LogicalPlanBuilder { pub fn add_err(self, err: PolarsError) -> Self { LogicalPlan::Error { - input: Box::new(self.0), + input: Arc::new(self.0), err: err.into(), } .into() @@ -608,7 +608,7 @@ impl LogicalPlanBuilder { } } LogicalPlan::ExtContext { - input: Box::new(self.0), + input: Arc::new(self.0), contexts, schema: Arc::new(schema), } @@ -692,7 +692,7 @@ impl LogicalPlanBuilder { LogicalPlan::Selection { predicate, - input: Box::new(self.0), + input: Arc::new(self.0), } .into() } @@ -777,7 +777,7 @@ impl LogicalPlanBuilder { }; LogicalPlan::Aggregate { - input: Box::new(self.0), + input: Arc::new(self.0), keys: Arc::new(keys), aggs, schema: Arc::new(schema), @@ -814,7 +814,7 @@ impl LogicalPlanBuilder { let schema = try_delayed!(self.0.schema(), &self.0, into); let by_column = try_delayed!(rewrite_projections(by_column, &schema, &[]), &self.0, into); LogicalPlan::Sort { - input: Box::new(self.0), + input: Arc::new(self.0), by_column, args: SortArguments { descending, @@ -846,7 +846,7 @@ impl LogicalPlanBuilder { try_delayed!(explode_schema(&mut schema, &columns), &self.0, into); LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function: FunctionNode::Explode { columns, schema: Arc::new(schema), @@ -859,7 +859,7 @@ impl LogicalPlanBuilder { let schema = try_delayed!(self.0.schema(), &self.0, into); let schema = det_melt_schema(&args, &schema); LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function: FunctionNode::Melt { args, schema }, } .into() @@ -871,7 +871,7 @@ impl LogicalPlanBuilder { row_index_schema(schema_mut, name); LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function: FunctionNode::RowIndex { name: ColumnName::from(name), offset, @@ -883,7 +883,7 @@ impl LogicalPlanBuilder { pub fn distinct(self, options: DistinctOptions) -> Self { LogicalPlan::Distinct { - input: Box::new(self.0), + input: Arc::new(self.0), options, } .into() @@ -891,7 +891,7 @@ impl LogicalPlanBuilder { pub fn slice(self, offset: i64, len: IdxSize) -> Self { LogicalPlan::Slice { - input: Box::new(self.0), + input: Arc::new(self.0), offset, len, } @@ -908,7 +908,7 @@ impl LogicalPlanBuilder { for e in left_on.iter().chain(right_on.iter()) { if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { return LogicalPlan::Error { - input: Box::new(self.0), + input: Arc::new(self.0), err: polars_err!( ComputeError: "'alias' is not allowed in a join key, use 'with_columns' first", @@ -929,8 +929,8 @@ impl LogicalPlanBuilder { ); LogicalPlan::Join { - input_left: Box::new(self.0), - input_right: Box::new(other), + input_left: Arc::new(self.0), + input_right: Arc::new(other), schema, left_on, right_on, @@ -940,7 +940,7 @@ impl LogicalPlanBuilder { } pub fn map_private(self, function: FunctionNode) -> Self { LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function, } .into() @@ -955,7 +955,7 @@ impl LogicalPlanBuilder { validate_output: bool, ) -> Self { LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function: FunctionNode::OpaquePython { function, schema, @@ -981,7 +981,7 @@ impl LogicalPlanBuilder { let function = Arc::new(function); LogicalPlan::MapFunction { - input: Box::new(self.0), + input: Arc::new(self.0), function: FunctionNode::Opaque { function, schema, diff --git a/crates/polars-plan/src/logical_plan/conversion.rs b/crates/polars-plan/src/logical_plan/conversion.rs index c78e8c5b56ea..d3fd74f6830a 100644 --- a/crates/polars-plan/src/logical_plan/conversion.rs +++ b/crates/polars-plan/src/logical_plan/conversion.rs @@ -274,6 +274,7 @@ pub fn to_alp( expr_arena: &mut Arena, lp_arena: &mut Arena, ) -> PolarsResult { + let owned = Arc::unwrap_or_clone; let v = match lp { LogicalPlan::Scan { file_info, @@ -317,7 +318,7 @@ pub fn to_alp( } }, LogicalPlan::Selection { input, predicate } => { - let i = to_alp(*input, expr_arena, lp_arena)?; + let i = to_alp(owned(input), expr_arena, lp_arena)?; let p = to_expr_ir(predicate, expr_arena); ALogicalPlan::Selection { input: i, @@ -325,7 +326,7 @@ pub fn to_alp( } }, LogicalPlan::Slice { input, offset, len } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::Slice { input, offset, len } }, LogicalPlan::DataFrameScan { @@ -349,7 +350,7 @@ pub fn to_alp( } => { let eirs = to_expr_irs(expr, expr_arena); let expr = eirs.into(); - let i = to_alp(*input, expr_arena, lp_arena)?; + let i = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::Projection { expr, input: i, @@ -362,7 +363,7 @@ pub fn to_alp( by_column, args, } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; let by_column = to_expr_irs(by_column, expr_arena); ALogicalPlan::Sort { input, @@ -375,7 +376,7 @@ pub fn to_alp( id, cache_hits, } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::Cache { input, id, @@ -391,7 +392,7 @@ pub fn to_alp( maintain_order, options, } => { - let i = to_alp(*input, expr_arena, lp_arena)?; + let i = to_alp(owned(input), expr_arena, lp_arena)?; let aggs = to_expr_irs(aggs, expr_arena); let keys = keys.convert(|e| to_expr_ir(e.clone(), expr_arena)); @@ -413,8 +414,8 @@ pub fn to_alp( right_on, options, } => { - let input_left = to_alp(*input_left, expr_arena, lp_arena)?; - let input_right = to_alp(*input_right, expr_arena, lp_arena)?; + let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?; + let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?; let left_on = to_expr_irs_ignore_alias(left_on, expr_arena); let right_on = to_expr_irs_ignore_alias(right_on, expr_arena); @@ -436,7 +437,7 @@ pub fn to_alp( } => { let eirs = to_expr_irs(exprs, expr_arena); let exprs = eirs.into(); - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::HStack { input, exprs, @@ -445,11 +446,11 @@ pub fn to_alp( } }, LogicalPlan::Distinct { input, options } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::Distinct { input, options } }, LogicalPlan::MapFunction { input, function } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::MapFunction { input, function } }, LogicalPlan::Error { err, .. } => { @@ -462,7 +463,7 @@ pub fn to_alp( contexts, schema, } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; let contexts = contexts .into_iter() .map(|lp| to_alp(lp, expr_arena, lp_arena)) @@ -474,7 +475,7 @@ pub fn to_alp( } }, LogicalPlan::Sink { input, payload } => { - let input = to_alp(*input, expr_arena, lp_arena)?; + let input = to_alp(owned(input), expr_arena, lp_arena)?; ALogicalPlan::Sink { input, payload } }, }; @@ -770,7 +771,7 @@ impl ALogicalPlan { ALogicalPlan::Slice { input, offset, len } => { let lp = convert_to_lp(input, lp_arena); LogicalPlan::Slice { - input: Box::new(lp), + input: Arc::new(lp), offset, len, } @@ -779,7 +780,7 @@ impl ALogicalPlan { let lp = convert_to_lp(input, lp_arena); let predicate = predicate.to_expr(expr_arena); LogicalPlan::Selection { - input: Box::new(lp), + input: Arc::new(lp), predicate, } }, @@ -806,7 +807,7 @@ impl ALogicalPlan { let expr = expr_irs_to_exprs(expr.all_exprs(), expr_arena); LogicalPlan::Projection { expr, - input: Box::new(i), + input: Arc::new(i), schema, options, } @@ -819,7 +820,7 @@ impl ALogicalPlan { .collect::>(); LogicalPlan::Projection { expr, - input: Box::new(input), + input: Arc::new(input), schema: columns.clone(), options: Default::default(), } @@ -829,7 +830,7 @@ impl ALogicalPlan { by_column, args, } => { - let input = Box::new(convert_to_lp(input, lp_arena)); + let input = Arc::new(convert_to_lp(input, lp_arena)); let by_column = expr_irs_to_exprs(by_column, expr_arena); LogicalPlan::Sort { input, @@ -842,7 +843,7 @@ impl ALogicalPlan { id, cache_hits, } => { - let input = Box::new(convert_to_lp(input, lp_arena)); + let input = Arc::new(convert_to_lp(input, lp_arena)); LogicalPlan::Cache { input, id, @@ -863,7 +864,7 @@ impl ALogicalPlan { let aggs = expr_irs_to_exprs(aggs, expr_arena); LogicalPlan::Aggregate { - input: Box::new(i), + input: Arc::new(i), keys, aggs, schema, @@ -887,8 +888,8 @@ impl ALogicalPlan { let right_on = expr_irs_to_exprs(right_on, expr_arena); LogicalPlan::Join { - input_left: Box::new(i_l), - input_right: Box::new(i_r), + input_left: Arc::new(i_l), + input_right: Arc::new(i_r), schema, left_on, right_on, @@ -905,7 +906,7 @@ impl ALogicalPlan { let exprs = expr_irs_to_exprs(exprs.all_exprs(), expr_arena); LogicalPlan::HStack { - input: Box::new(i), + input: Arc::new(i), exprs, schema, options, @@ -914,12 +915,12 @@ impl ALogicalPlan { ALogicalPlan::Distinct { input, options } => { let i = convert_to_lp(input, lp_arena); LogicalPlan::Distinct { - input: Box::new(i), + input: Arc::new(i), options, } }, ALogicalPlan::MapFunction { input, function } => { - let input = Box::new(convert_to_lp(input, lp_arena)); + let input = Arc::new(convert_to_lp(input, lp_arena)); LogicalPlan::MapFunction { input, function } }, ALogicalPlan::ExtContext { @@ -927,7 +928,7 @@ impl ALogicalPlan { contexts, schema, } => { - let input = Box::new(convert_to_lp(input, lp_arena)); + let input = Arc::new(convert_to_lp(input, lp_arena)); let contexts = contexts .into_iter() .map(|node| convert_to_lp(node, lp_arena)) @@ -939,7 +940,7 @@ impl ALogicalPlan { } }, ALogicalPlan::Sink { input, payload } => { - let input = Box::new(convert_to_lp(input, lp_arena)); + let input = Arc::new(convert_to_lp(input, lp_arena)); LogicalPlan::Sink { input, payload } }, ALogicalPlan::Invalid => unreachable!(), diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index 1b832b03ec57..4a6a910a9d07 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -135,12 +135,12 @@ pub enum LogicalPlan { PythonScan { options: PythonOptions }, /// Filter on a boolean mask Selection { - input: Box, + input: Arc, predicate: Expr, }, /// Cache the input at this point in the LP Cache { - input: Box, + input: Arc, id: usize, cache_hits: u32, }, @@ -164,13 +164,13 @@ pub enum LogicalPlan { /// Column selection Projection { expr: Vec, - input: Box, + input: Arc, schema: SchemaRef, options: ProjectionOptions, }, /// Groupby aggregation Aggregate { - input: Box, + input: Arc, keys: Arc>, aggs: Vec, schema: SchemaRef, @@ -181,8 +181,8 @@ pub enum LogicalPlan { }, /// Join operation Join { - input_left: Box, - input_right: Box, + input_left: Arc, + input_right: Arc, schema: SchemaRef, left_on: Vec, right_on: Vec, @@ -190,31 +190,31 @@ pub enum LogicalPlan { }, /// Adding columns to the table without a Join HStack { - input: Box, + input: Arc, exprs: Vec, schema: SchemaRef, options: ProjectionOptions, }, /// Remove duplicates from the table Distinct { - input: Box, + input: Arc, options: DistinctOptions, }, /// Sort the table Sort { - input: Box, + input: Arc, by_column: Vec, args: SortArguments, }, /// Slice the table Slice { - input: Box, + input: Arc, offset: i64, len: IdxSize, }, /// A (User Defined) Function MapFunction { - input: Box, + input: Arc, function: FunctionNode, }, Union { @@ -230,17 +230,17 @@ pub enum LogicalPlan { /// Catches errors and throws them later #[cfg_attr(feature = "serde", serde(skip))] Error { - input: Box, + input: Arc, err: ErrorState, }, /// This allows expressions to access other tables ExtContext { - input: Box, + input: Arc, contexts: Vec, schema: SchemaRef, }, Sink { - input: Box, + input: Arc, payload: SinkType, }, }