From d7d720ad3634d64094c9101de72be083baaede5d Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 15 May 2024 16:48:26 +0200 Subject: [PATCH] refactor: move all describe, describe_tree and dot-viz code to IR instead of DslPlan (#16237) --- crates/polars-lazy/src/dot.rs | 56 +- crates/polars-lazy/src/frame/mod.rs | 43 +- .../src/physical_plan/expressions/apply.rs | 2 +- .../src/physical_plan/expressions/mod.rs | 4 +- .../src/physical_plan/streaming/tree.rs | 11 +- crates/polars-lazy/src/tests/cse.rs | 6 +- crates/polars-lazy/src/tests/io.rs | 6 +- crates/polars-lazy/src/tests/logical.rs | 2 +- crates/polars-lazy/src/tests/schema.rs | 8 +- crates/polars-lazy/src/tests/tpch.rs | 6 +- crates/polars-plan/src/dot.rs | 477 ------------------ crates/polars-plan/src/dsl/meta.rs | 8 +- crates/polars-plan/src/lib.rs | 1 - .../polars-plan/src/logical_plan/alp/dot.rs | 376 ++++++++++++++ .../src/logical_plan/alp/format.rs | 172 +++++-- .../polars-plan/src/logical_plan/alp/mod.rs | 77 ++- .../src/logical_plan/{ => alp}/tree_format.rs | 438 +++++++++------- .../src/logical_plan/conversion/dsl_to_ir.rs | 2 +- crates/polars-plan/src/logical_plan/debug.rs | 2 +- .../polars-plan/src/logical_plan/expr_ir.rs | 9 + crates/polars-plan/src/logical_plan/format.rs | 301 +---------- .../src/logical_plan/functions/mod.rs | 9 +- crates/polars-plan/src/logical_plan/mod.rs | 29 +- py-polars/src/lazyframe/mod.rs | 28 +- .../tests/unit/streaming/test_streaming.py | 4 +- .../unit/streaming/test_streaming_cse.py | 2 +- .../unit/streaming/test_streaming_group_by.py | 2 +- 27 files changed, 918 insertions(+), 1163 deletions(-) delete mode 100644 crates/polars-plan/src/dot.rs create mode 100644 crates/polars-plan/src/logical_plan/alp/dot.rs rename crates/polars-plan/src/logical_plan/{ => alp}/tree_format.rs (67%) diff --git a/crates/polars-lazy/src/dot.rs b/crates/polars-lazy/src/dot.rs index 9c7f53f821a3..f8facf074838 100644 --- a/crates/polars-lazy/src/dot.rs +++ b/crates/polars-lazy/src/dot.rs @@ -1,60 +1,16 @@ -use std::fmt::Write; - use polars_core::prelude::*; -use polars_plan::dot::*; -use polars_plan::prelude::*; use crate::prelude::*; impl LazyFrame { /// Get a dot language representation of the LogicalPlan. pub fn to_dot(&self, optimized: bool) -> PolarsResult { - let mut s = String::with_capacity(512); - - let mut logical_plan = self.clone().get_plan_builder().build(); - if optimized { - // initialize arena's - let mut expr_arena = Arena::with_capacity(64); - let mut lp_arena = Arena::with_capacity(32); - - let lp_top = self.clone().optimize_with_scratch( - &mut lp_arena, - &mut expr_arena, - &mut vec![], - true, - )?; - logical_plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena); - } - - let prev_node = DotNode { - branch: 0, - id: 0, - fmt: "", - }; - - // maps graphviz id to label - // we use this to create this graph - // first we create nodes including ids to make sure they are unique - // A [id] -- B [id] - // B [id] -- C [id] - // - // then later we hide the [id] by adding this to the graph - // A [id] [label="A"] - // B [id] [label="B"] - // C [id] [label="C"] - - let mut id_map = PlHashMap::with_capacity(8); - logical_plan - .dot(&mut s, (0, 0), prev_node, &mut id_map) - .expect("io error"); - s.push('\n'); + let lp = if optimized { + self.clone().to_alp_optimized() + } else { + self.clone().to_alp() + }?; - for (id, label) in id_map { - // the label is wrapped in double quotes - // the id already is wrapped in double quotes - writeln!(s, "{id}[label=\"{label}\"]").unwrap(); - } - s.push_str("\n}"); - Ok(s) + Ok(lp.display_dot().to_string()) } } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index ab4eca5f8fee..4f09aa0ac48c 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -208,49 +208,27 @@ impl LazyFrame { } /// Return a String describing the naive (un-optimized) logical plan. - pub fn describe_plan(&self) -> String { - self.logical_plan.describe() + pub fn describe_plan(&self) -> PolarsResult { + Ok(self.clone().to_alp()?.describe()) } /// Return a String describing the naive (un-optimized) logical plan in tree format. - pub fn describe_plan_tree(&self) -> String { - self.logical_plan.describe_tree_format() - } - - fn optimized_plan_ir(&self) -> PolarsResult { - let mut expr_arena = Arena::with_capacity(64); - let mut lp_arena = Arena::with_capacity(64); - let lp_top = self.clone().optimize_with_scratch( - &mut lp_arena, - &mut expr_arena, - &mut vec![], - true, - )?; - - Ok(IRPlan::new(lp_top, lp_arena, expr_arena)) - } - - fn optimized_plan(&self) -> PolarsResult { - let IRPlan { - lp_top, - mut lp_arena, - expr_arena, - } = self.optimized_plan_ir()?; - Ok(node_to_lp(lp_top, &expr_arena, &mut lp_arena)) + pub fn describe_plan_tree(&self) -> PolarsResult { + Ok(self.clone().to_alp()?.describe_tree_format()) } /// Return a String describing the optimized logical plan. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan(&self) -> PolarsResult { - Ok(self.optimized_plan_ir()?.describe()) + Ok(self.clone().to_alp_optimized()?.describe()) } /// Return a String describing the optimized logical plan in tree format. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan_tree(&self) -> PolarsResult { - Ok(self.optimized_plan()?.describe_tree_format()) + Ok(self.clone().to_alp_optimized()?.describe_tree_format()) } /// Return a String describing the logical plan. @@ -261,7 +239,7 @@ impl LazyFrame { if optimized { self.describe_optimized_plan() } else { - Ok(self.describe_plan()) + self.describe_plan() } } @@ -532,15 +510,16 @@ impl LazyFrame { self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false) } - pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena, Arena)> { + pub fn to_alp_optimized(self) -> PolarsResult { let mut lp_arena = Arena::with_capacity(16); let mut expr_arena = Arena::with_capacity(16); let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?; - Ok((node, lp_arena, expr_arena)) + + Ok(IRPlan::new(node, lp_arena, expr_arena)) } - pub fn to_alp(self) -> PolarsResult<(Node, Arena, Arena)> { + pub fn to_alp(self) -> PolarsResult { self.logical_plan.to_alp() } diff --git a/crates/polars-lazy/src/physical_plan/expressions/apply.rs b/crates/polars-lazy/src/physical_plan/expressions/apply.rs index 0b75510b6ac6..b510f8471370 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/apply.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/apply.rs @@ -38,7 +38,7 @@ impl ApplyExpr { ) -> Self { #[cfg(debug_assertions)] if matches!(options.collect_groups, ApplyOptions::ElementWise) && options.returns_scalar { - panic!("expr {} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr) + panic!("expr {:?} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr) } Self { diff --git a/crates/polars-lazy/src/physical_plan/expressions/mod.rs b/crates/polars-lazy/src/physical_plan/expressions/mod.rs index 6d496e82b716..1158268c7da6 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/mod.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/mod.rs @@ -341,7 +341,7 @@ impl<'a> AggregationContext<'a> { (true, &DataType::List(_)) => { if series.len() != self.groups.len() { let fmt_expr = if let Some(e) = expr { - format!("'{e}' ") + format!("'{e:?}' ") } else { String::new() }; @@ -589,7 +589,7 @@ impl Display for &dyn PhysicalExpr { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.as_expression() { None => Ok(()), - Some(e) => write!(f, "{e}"), + Some(e) => write!(f, "{e:?}"), } } } diff --git a/crates/polars-lazy/src/physical_plan/streaming/tree.rs b/crates/polars-lazy/src/physical_plan/streaming/tree.rs index e10643d20d70..db25b429bfe3 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/tree.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/tree.rs @@ -177,8 +177,15 @@ pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena, expr_arena: &Arena usize { - let (node, lp_arena, _) = q.to_alp_optimized().unwrap(); + let IRPlan { + lp_top, lp_arena, .. + } = q.to_alp_optimized().unwrap(); (&lp_arena) - .iter(node) + .iter(lp_top) .filter(|(_node, lp)| matches!(lp, IR::Cache { .. })) .count() } diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index db92d90c5417..4afc3feada86 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -391,8 +391,10 @@ fn test_scan_parquet_limit_9001() { ..Default::default() }; let q = LazyFrame::scan_parquet(path, args).unwrap().limit(3); - let (node, lp_arena, _) = q.to_alp_optimized().unwrap(); - (&lp_arena).iter(node).all(|(_, lp)| match lp { + let IRPlan { + lp_top, lp_arena, .. + } = q.to_alp_optimized().unwrap(); + (&lp_arena).iter(lp_top).all(|(_, lp)| match lp { IR::Union { options, .. } => { let sliced = options.slice.unwrap(); sliced.1 == 3 diff --git a/crates/polars-lazy/src/tests/logical.rs b/crates/polars-lazy/src/tests/logical.rs index 674e6ecd793b..ca9906d55fd7 100644 --- a/crates/polars-lazy/src/tests/logical.rs +++ b/crates/polars-lazy/src/tests/logical.rs @@ -52,7 +52,7 @@ fn test_duration() -> PolarsResult<()> { } fn print_plans(lf: &LazyFrame) { - println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan()); + println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan().unwrap()); println!( "OPTIMIZED LOGICAL PLAN\n\n{}\n", lf.describe_optimized_plan().unwrap() diff --git a/crates/polars-lazy/src/tests/schema.rs b/crates/polars-lazy/src/tests/schema.rs index 5bd17a58f697..c51f15d4b4b7 100644 --- a/crates/polars-lazy/src/tests/schema.rs +++ b/crates/polars-lazy/src/tests/schema.rs @@ -16,10 +16,14 @@ fn test_schema_update_after_projection_pd() -> PolarsResult<()> { // run optimizations // Get the explode node - let (input, lp_arena, _expr_arena) = q.to_alp_optimized()?; + let IRPlan { + lp_top, + lp_arena, + expr_arena: _, + } = q.to_alp_optimized()?; // assert the schema has been corrected with the projection pushdown run - let lp = lp_arena.get(input); + let lp = lp_arena.get(lp_top); assert!(matches!( lp, IR::MapFunction { diff --git a/crates/polars-lazy/src/tests/tpch.rs b/crates/polars-lazy/src/tests/tpch.rs index 34447a0ef4f7..0a647615d0ea 100644 --- a/crates/polars-lazy/src/tests/tpch.rs +++ b/crates/polars-lazy/src/tests/tpch.rs @@ -85,10 +85,12 @@ fn test_q2() -> PolarsResult<()> { .limit(100) .with_comm_subplan_elim(true); - let (node, lp_arena, _) = q.clone().to_alp_optimized().unwrap(); + let IRPlan { + lp_top, lp_arena, .. + } = q.clone().to_alp_optimized().unwrap(); assert_eq!( (&lp_arena) - .iter(node) + .iter(lp_top) .filter(|(_, alp)| matches!(alp, IR::Cache { .. })) .count(), 2 diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs deleted file mode 100644 index 174331019342..000000000000 --- a/crates/polars-plan/src/dot.rs +++ /dev/null @@ -1,477 +0,0 @@ -use std::borrow::Cow; -use std::fmt::{Display, Write}; -use std::path::PathBuf; - -use polars_core::prelude::*; - -use crate::constants::UNLIMITED_CACHE; -use crate::prelude::*; - -impl Expr { - /// Get a dot language representation of the Expression. - pub fn to_dot(&self) -> PolarsResult { - let mut s = String::with_capacity(512); - self.dot_viz(&mut s, (0, 0), "").expect("io error"); - s.push_str("\n}"); - Ok(s) - } - - fn write_dot( - &self, - acc_str: &mut String, - prev_node: &str, - current_node: &str, - id: usize, - ) -> std::fmt::Result { - if id == 0 { - writeln!(acc_str, "graph expr {{") - } else { - writeln!( - acc_str, - "\"{}\" -- \"{}\"", - prev_node.replace('"', r#"\""#), - current_node.replace('"', r#"\""#) - ) - } - } - - fn dot_viz( - &self, - acc_str: &mut String, - id: (usize, usize), - prev_node: &str, - ) -> std::fmt::Result { - let (mut branch, id) = id; - - match self { - Expr::BinaryExpr { left, op, right } => { - let current_node = format!( - r#"BINARY - left _; - op {op:?}; - right: _ [{branch},{id}]"#, - ); - - self.write_dot(acc_str, prev_node, ¤t_node, id)?; - for input in [left, right] { - input.dot_viz(acc_str, (branch, id + 1), ¤t_node)?; - branch += 1; - } - Ok(()) - }, - _ => self.write_dot(acc_str, prev_node, &format!("{branch}{id}"), id), - } - } -} - -#[derive(Copy, Clone)] -pub struct DotNode<'a> { - pub branch: usize, - pub id: usize, - pub fmt: &'a str, -} - -impl DslPlan { - fn write_single_node(&self, acc_str: &mut String, node: DotNode) -> std::fmt::Result { - let fmt_node = node.fmt.replace('"', r#"\""#); - writeln!(acc_str, "graph polars_query {{\n\"[{fmt_node}]\"")?; - Ok(()) - } - - fn write_dot( - &self, - acc_str: &mut String, - prev_node: DotNode, - current_node: DotNode, - id_map: &mut PlHashMap, - ) -> std::fmt::Result { - if current_node.id == 0 && current_node.branch == 0 { - writeln!(acc_str, "graph polars_query {{") - } else { - let fmt_prev_node = prev_node.fmt.replace('"', r#"\""#); - let fmt_current_node = current_node.fmt.replace('"', r#"\""#); - - let id_prev_node = format!( - "\"{} [{:?}]\"", - &fmt_prev_node, - (prev_node.branch, prev_node.id) - ); - let id_current_node = format!( - "\"{} [{:?}]\"", - &fmt_current_node, - (current_node.branch, current_node.id) - ); - - writeln!(acc_str, "{} -- {}", &id_prev_node, &id_current_node)?; - - id_map.insert(id_current_node, fmt_current_node); - id_map.insert(id_prev_node, fmt_prev_node); - - Ok(()) - } - } - - fn is_single(&self, branch: usize, id: usize) -> bool { - id == 0 && branch == 0 - } - - /// - /// # Arguments - /// `id` - (branch, id) - /// Used to make sure that the dot boxes are distinct. - /// branch is an id per join/union branch - /// id is incremented by the depth traversal of the tree. - pub fn dot( - &self, - acc_str: &mut String, - id: (usize, usize), - prev_node: DotNode, - id_map: &mut PlHashMap, - ) -> std::fmt::Result { - use DslPlan::*; - let (mut branch, id) = id; - - match self { - Union { inputs, .. } => { - let current_node = DotNode { - branch, - id, - fmt: "UNION", - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - for input in inputs { - input.dot(acc_str, (branch, id + 1), current_node, id_map)?; - branch += 1; - } - Ok(()) - }, - HConcat { inputs, .. } => { - let current_node = DotNode { - branch, - id, - fmt: "HCONCAT", - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - for input in inputs { - input.dot(acc_str, (branch, id + 1), current_node, id_map)?; - branch += 1; - } - Ok(()) - }, - Cache { - input, - id: cache_id, - cache_hits, - } => { - // Always increment cache ids as the `DotNode[0, 0]` will insert a new graph, which we don't want. - let cache_id = cache_id.saturating_add(1); - let fmt = if *cache_hits == UNLIMITED_CACHE { - Cow::Borrowed("CACHE") - } else { - Cow::Owned(format!("CACHE: {} times", *cache_hits)) - }; - let current_node = DotNode { - branch: cache_id, - id: cache_id, - fmt: &fmt, - }; - // here we take the cache id, to ensure the same cached subplans get the same ids - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (cache_id, cache_id + 1), current_node, id_map) - }, - Filter { predicate, input } => { - let pred = fmt_predicate(Some(predicate)); - let fmt = format!("FILTER BY {pred}"); - - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - #[cfg(feature = "python")] - PythonScan { options } => self.write_scan( - acc_str, - prev_node, - "PYTHON", - &[], - options.with_columns.as_ref().map(|s| s.as_slice()), - Some(options.schema.len()), - &options.predicate, - branch, - id, - id_map, - ), - Select { expr, input, .. } => { - let schema = input.compute_schema().map_err(|_| { - eprintln!("could not determine schema"); - std::fmt::Error - })?; - - let fmt = format!("π {}/{}", expr.len(), schema.len()); - - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - Sort { - input, by_column, .. - } => { - let fmt = format!("SORT BY {by_column:?}"); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - GroupBy { - input, keys, aggs, .. - } => { - let mut s_keys = String::with_capacity(128); - s_keys.push('['); - for key in keys.iter() { - write!(s_keys, "{key:?},")? - } - s_keys.pop(); - s_keys.push(']'); - let fmt = format!("AGG {:?}\nBY\n{} [{:?}]", aggs, s_keys, (branch, id)); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - HStack { input, exprs, .. } => { - let mut fmt = String::with_capacity(128); - fmt.push_str("WITH COLUMNS ["); - for e in exprs { - if let Expr::Alias(_, name) = e { - write!(fmt, "\"{name}\",")? - } else { - for name in expr_to_leaf_column_names(e).iter().take(1) { - write!(fmt, "\"{name}\",")? - } - } - } - fmt.pop(); - fmt.push(']'); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - Slice { input, offset, len } => { - let fmt = format!("SLICE offset: {offset}; len: {len}"); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - Distinct { input, options, .. } => { - let mut fmt = String::with_capacity(128); - fmt.push_str("DISTINCT"); - if let Some(subset) = &options.subset { - fmt.push_str(" BY "); - for name in subset.iter() { - write!(fmt, "{name}")? - } - } - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - DataFrameScan { - schema, - projection, - selection, - .. - } => { - let total_columns = schema.len(); - let mut n_columns = "*".to_string(); - if let Some(columns) = projection { - n_columns = format!("{}", columns.len()); - } - - let pred = fmt_predicate(selection.as_ref()); - let fmt = format!("TABLE\nπ {n_columns}/{total_columns};\nσ {pred}"); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - if self.is_single(branch, id) { - self.write_single_node(acc_str, current_node) - } else { - self.write_dot(acc_str, prev_node, current_node, id_map) - } - }, - Scan { - paths, - file_info, - predicate, - scan_type, - file_options: options, - } => { - let name: &str = scan_type.into(); - - self.write_scan( - acc_str, - prev_node, - name, - paths.as_ref(), - options.with_columns.as_ref().map(|cols| cols.as_slice()), - file_info.as_ref().map(|fi| fi.schema.len()), - predicate, - branch, - id, - id_map, - ) - }, - Join { - input_left, - input_right, - left_on, - right_on, - options, - .. - } => { - let fmt = format!( - r#"JOIN {} - left: {:?}; - right: {:?}"#, - options.args.how, left_on, right_on - ); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input_left.dot(acc_str, (branch + 100, id + 1), current_node, id_map)?; - input_right.dot(acc_str, (branch + 200, id + 1), current_node, id_map) - }, - MapFunction { - input, function, .. - } => { - let fmt = format!("{function}"); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - ExtContext { input, .. } => { - let current_node = DotNode { - branch, - id, - fmt: "EXTERNAL_CONTEXT", - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - Sink { input, payload, .. } => { - let current_node = DotNode { - branch, - id, - fmt: match payload { - SinkType::Memory => "SINK (MEMORY)", - SinkType::File { .. } => "SINK (FILE)", - #[cfg(feature = "cloud")] - SinkType::Cloud { .. } => "SINK (CLOUD)", - }, - }; - self.write_dot(acc_str, prev_node, current_node, id_map)?; - input.dot(acc_str, (branch, id + 1), current_node, id_map) - }, - } - } - - #[allow(clippy::too_many_arguments)] - fn write_scan( - &self, - acc_str: &mut String, - prev_node: DotNode, - name: &str, - path: &[PathBuf], - with_columns: Option<&[String]>, - total_columns: Option, - predicate: &Option

, - branch: usize, - id: usize, - id_map: &mut PlHashMap, - ) -> std::fmt::Result { - let mut n_columns_fmt = "*".to_string(); - if let Some(columns) = with_columns { - n_columns_fmt = format!("{}", columns.len()); - } - - let path_fmt = match path.len() { - 1 => path[0].to_string_lossy(), - 0 => "".into(), - _ => Cow::Owned(format!( - "{} files: first file: {}", - path.len(), - path[0].to_string_lossy() - )), - }; - - let pred = fmt_predicate(predicate.as_ref()); - let total_columns = total_columns - .map(|v| format!("{v}")) - .unwrap_or_else(|| "?".to_string()); - let fmt = format!( - "{name} SCAN {};\nπ {}/{};\nσ {}", - path_fmt, n_columns_fmt, total_columns, pred, - ); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - if self.is_single(branch, id) { - self.write_single_node(acc_str, current_node) - } else { - self.write_dot(acc_str, prev_node, current_node, id_map) - } - } -} - -fn fmt_predicate(predicate: Option<&P>) -> String { - if let Some(predicate) = predicate { - let n = 25; - let mut pred_fmt = format!("{predicate}"); - pred_fmt = pred_fmt.replace('[', ""); - pred_fmt = pred_fmt.replace(']', ""); - if pred_fmt.len() > n { - pred_fmt.truncate(n); - pred_fmt.push_str("...") - } - pred_fmt - } else { - "-".to_string() - } -} diff --git a/crates/polars-plan/src/dsl/meta.rs b/crates/polars-plan/src/dsl/meta.rs index 5cab9a3f36df..940a0689c6f6 100644 --- a/crates/polars-plan/src/dsl/meta.rs +++ b/crates/polars-plan/src/dsl/meta.rs @@ -2,8 +2,8 @@ use std::fmt::Display; use std::ops::BitAnd; use super::*; +use crate::logical_plan::alp::tree_format::TreeFmtVisitor; use crate::logical_plan::expr_expansion::is_regex_projection; -use crate::logical_plan::tree_format::TreeFmtVisitor; use crate::logical_plan::visitor::{AexprNode, TreeWalker}; /// Specialized expressions for Categorical dtypes. @@ -85,7 +85,7 @@ impl MetaNameSpace { } Ok(Expr::Selector(s)) } else { - polars_bail!(ComputeError: "expected selector, got {}", self.0) + polars_bail!(ComputeError: "expected selector, got {:?}", self.0) } } @@ -98,7 +98,7 @@ impl MetaNameSpace { } Ok(Expr::Selector(s)) } else { - polars_bail!(ComputeError: "expected selector, got {}", self.0) + polars_bail!(ComputeError: "expected selector, got {:?}", self.0) } } @@ -111,7 +111,7 @@ impl MetaNameSpace { } Ok(Expr::Selector(s)) } else { - polars_bail!(ComputeError: "expected selector, got {}", self.0) + polars_bail!(ComputeError: "expected selector, got {:?}", self.0) } } diff --git a/crates/polars-plan/src/lib.rs b/crates/polars-plan/src/lib.rs index 071cca71e247..5cad9759d823 100644 --- a/crates/polars-plan/src/lib.rs +++ b/crates/polars-plan/src/lib.rs @@ -5,7 +5,6 @@ extern crate core; pub mod constants; -pub mod dot; pub mod dsl; pub mod frame; pub mod global; diff --git a/crates/polars-plan/src/logical_plan/alp/dot.rs b/crates/polars-plan/src/logical_plan/alp/dot.rs new file mode 100644 index 000000000000..62b162349c1a --- /dev/null +++ b/crates/polars-plan/src/logical_plan/alp/dot.rs @@ -0,0 +1,376 @@ +use std::fmt; +use std::path::PathBuf; + +use super::format::ExprIRSliceDisplay; +use crate::constants::UNLIMITED_CACHE; +use crate::prelude::alp::format::ColumnsDisplay; +use crate::prelude::*; + +pub struct IRDotDisplay<'a>(pub(crate) IRPlanRef<'a>); + +const INDENT: &str = " "; + +#[derive(Clone, Copy)] +enum DotNode { + Plain(usize), + Cache(usize), +} + +impl fmt::Display for DotNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DotNode::Plain(n) => write!(f, "p{n}"), + DotNode::Cache(n) => write!(f, "c{n}"), + } + } +} + +#[inline(always)] +fn write_label<'a, 'b>( + f: &'b mut fmt::Formatter<'a>, + id: DotNode, + mut w: impl FnMut(&mut EscapeLabel<'a, 'b>) -> fmt::Result, +) -> fmt::Result { + write!(f, "{INDENT}{id}[label=\"")?; + + let mut escaped = EscapeLabel(f); + w(&mut escaped)?; + let EscapeLabel(f) = escaped; + + writeln!(f, "\"]")?; + + Ok(()) +} + +impl<'a> IRDotDisplay<'a> { + fn with_root(&self, root: Node) -> Self { + Self(self.0.with_root(root)) + } + + fn display_expr(&self, expr: &'a ExprIR) -> ExprIRDisplay<'a> { + expr.display(self.0.expr_arena) + } + + fn display_exprs(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { + ExprIRSliceDisplay { + exprs, + expr_arena: self.0.expr_arena, + } + } + + fn _format( + &self, + f: &mut fmt::Formatter<'_>, + parent: Option, + last: &mut usize, + ) -> std::fmt::Result { + use fmt::Write; + + let root = self.0.root(); + + let id = if let IR::Cache { id, .. } = root { + DotNode::Cache(*id) + } else { + *last += 1; + DotNode::Plain(*last) + }; + + if let Some(parent) = parent { + writeln!(f, "{INDENT}{parent} -- {id}")?; + } + + use IR::*; + match root { + Union { inputs, .. } => { + for input in inputs { + self.with_root(*input)._format(f, Some(id), last)?; + } + + write_label(f, id, |f| f.write_str("UNION"))?; + }, + HConcat { inputs, .. } => { + for input in inputs { + self.with_root(*input)._format(f, Some(id), last)?; + } + + write_label(f, id, |f| f.write_str("HCONCAT"))?; + }, + Cache { + input, cache_hits, .. + } => { + self.with_root(*input)._format(f, Some(id), last)?; + + if *cache_hits == UNLIMITED_CACHE { + write_label(f, id, |f| f.write_str("CACHE"))?; + } else { + write_label(f, id, |f| write!(f, "CACHE: {cache_hits} times"))?; + }; + }, + Filter { predicate, input } => { + self.with_root(*input)._format(f, Some(id), last)?; + + let pred = self.display_expr(predicate); + write_label(f, id, |f| write!(f, "FILTER BY {pred}"))?; + }, + #[cfg(feature = "python")] + PythonScan { predicate, options } => { + let predicate = predicate.as_ref().map(|e| self.display_expr(e)); + let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_slice())); + let total_columns = options.schema.len(); + let predicate = OptionExprIRDisplay(predicate); + + write_label(f, id, |f| { + write!( + f, + "PYTHON SCAN\nπ {with_columns}/{total_columns};\nσ {predicate}" + ) + })? + }, + Select { + expr, + input, + schema, + .. + } => { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "π {}/{}", expr.len(), schema.len()))?; + }, + Sort { + input, by_column, .. + } => { + let by_column = self.display_exprs(by_column); + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "SORT BY {by_column}"))?; + }, + GroupBy { + input, keys, aggs, .. + } => { + let keys = self.display_exprs(keys); + let aggs = self.display_exprs(aggs); + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "AGG {aggs}\nBY\n{keys}"))?; + }, + HStack { input, exprs, .. } => { + let exprs = self.display_exprs(exprs); + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "WITH COLUMNS {exprs}"))?; + }, + Slice { input, offset, len } => { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "SLICE offset: {offset}; len: {len}"))?; + }, + Distinct { input, options, .. } => { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| { + f.write_str("DISTINCT")?; + + if let Some(subset) = &options.subset { + f.write_str(" BY ")?; + + let mut subset = subset.iter(); + + if let Some(fst) = subset.next() { + f.write_str(fst)?; + for name in subset { + write!(f, ", \"{name}\"")?; + } + } else { + f.write_str("None")?; + } + } + + Ok(()) + })?; + }, + DataFrameScan { + schema, + projection, + selection, + .. + } => { + let num_columns = NumColumns(projection.as_ref().map(|p| p.as_ref().as_ref())); + let selection = selection.as_ref().map(|e| self.display_expr(e)); + let selection = OptionExprIRDisplay(selection); + let total_columns = schema.len(); + + write_label(f, id, |f| { + write!(f, "TABLE\nπ {num_columns}/{total_columns};\nσ {selection}") + })?; + }, + Scan { + paths, + file_info, + predicate, + scan_type, + file_options: options, + output_schema: _, + } => { + let name: &str = scan_type.into(); + let path = PathsDisplay(paths.as_ref()); + let with_columns = options.with_columns.as_ref().map(|cols| cols.as_slice()); + let with_columns = NumColumns(with_columns); + let total_columns = file_info.schema.len(); + let predicate = predicate.as_ref().map(|e| self.display_expr(e)); + let predicate = OptionExprIRDisplay(predicate); + + write_label(f, id, |f| { + write!( + f, + "{name} SCAN {path}\nπ {with_columns}/{total_columns};\nσ {predicate}", + ) + })?; + }, + Join { + input_left, + input_right, + left_on, + right_on, + options, + .. + } => { + self.with_root(*input_left)._format(f, Some(id), last)?; + self.with_root(*input_right)._format(f, Some(id), last)?; + + let left_on = self.display_exprs(left_on); + let right_on = self.display_exprs(right_on); + + write_label(f, id, |f| { + write!( + f, + "JOIN {}\nleft: {left_on};\nright: {right_on}", + options.args.how + ) + })?; + }, + MapFunction { + input, function, .. + } => { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "{function}"))?; + }, + ExtContext { input, .. } => { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| f.write_str("EXTERNAL_CONTEXT"))?; + }, + Sink { input, payload, .. } => { + self.with_root(*input)._format(f, Some(id), last)?; + + write_label(f, id, |f| { + f.write_str(match payload { + SinkType::Memory => "SINK (MEMORY)", + SinkType::File { .. } => "SINK (FILE)", + #[cfg(feature = "cloud")] + SinkType::Cloud { .. } => "SINK (CLOUD)", + }) + })?; + }, + SimpleProjection { input, columns } => { + let num_columns = columns.as_ref().len(); + let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len(); + + let columns = ColumnsDisplay(columns.as_ref()); + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| { + write!(f, "simple π {num_columns}/{total_columns}\n[{columns}]") + })?; + }, + Invalid => write_label(f, id, |f| f.write_str("INVALID"))?, + } + + Ok(()) + } +} + +// A few utility structures for formatting +struct PathsDisplay<'a>(&'a [PathBuf]); +struct NumColumns<'a>(Option<&'a [String]>); +struct OptionExprIRDisplay<'a>(Option>); + +impl fmt::Display for PathsDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.len() { + 0 => Ok(()), + 1 => self.0[0].display().fmt(f), + _ => write!( + f, + "{} files: first file: {}", + self.0.len(), + self.0[0].display() + ), + } + } +} + +impl fmt::Display for NumColumns<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0 { + None => f.write_str("*"), + Some(columns) => columns.len().fmt(f), + } + } +} + +impl fmt::Display for OptionExprIRDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0 { + None => f.write_str("None"), + Some(expr) => expr.fmt(f), + } + } +} + +/// Utility structure to write to a [`fmt::Formatter`] whilst escaping the output as a label name +struct EscapeLabel<'a, 'b>(&'b mut fmt::Formatter<'a>); + +impl<'a, 'b> fmt::Write for EscapeLabel<'a, 'b> { + fn write_str(&mut self, mut s: &str) -> fmt::Result { + loop { + let mut char_indices = s.char_indices(); + + // This escapes quotes and new lines + // @NOTE: I am aware this does not work for \" and such. I am ignoring that fact as we + // are not really using such strings. + let f = char_indices + .find_map(|(i, c)| { + (|| match c { + '"' => { + self.0.write_str(&s[..i])?; + self.0.write_str(r#"\""#)?; + Ok(Some(i + 1)) + }, + '\n' => { + self.0.write_str(&s[..i])?; + self.0.write_str(r#"\n"#)?; + Ok(Some(i + 1)) + }, + _ => Ok(None), + })() + .transpose() + }) + .transpose()?; + + let Some(at) = f else { + break; + }; + + s = &s[at..]; + } + + self.0.write_str(s)?; + + Ok(()) + } +} + +impl fmt::Display for IRDotDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "graph polars_query {{")?; + + let mut last = 0; + self._format(f, None, &mut last)?; + + writeln!(f, "}}")?; + + Ok(()) + } +} diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index 87cfdc093ab5..95bfbc92f3ef 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -3,26 +3,28 @@ use std::fmt; use std::fmt::{Display, Formatter}; use std::path::PathBuf; +use polars_core::datatypes::AnyValue; +use polars_core::schema::Schema; +use recursive::recursive; + use crate::prelude::*; -struct ExprDisplay<'a> { - node: Node, - output_name: &'a OutputName, - expr_arena: &'a Arena, -} +pub struct IRDisplay<'a>(pub(crate) IRPlanRef<'a>); -struct ExprVecDisplay<'a, T: AsExpr> { - exprs: &'a [T], - expr_arena: &'a Arena, +#[derive(Clone, Copy)] +pub struct ExprIRDisplay<'a> { + pub(crate) node: Node, + pub(crate) output_name: &'a OutputName, + pub(crate) expr_arena: &'a Arena, } -pub struct IRDisplay<'a> { - pub root: Node, - pub ir_arena: &'a Arena, - pub expr_arena: &'a Arena, +/// Utility structure to display several [`ExprIR`]'s in a nice way +pub(crate) struct ExprIRSliceDisplay<'a, T: AsExpr> { + pub(crate) exprs: &'a [T], + pub(crate) expr_arena: &'a Arena, } -trait AsExpr { +pub(crate) trait AsExpr { fn node(&self) -> Node; fn output_name(&self) -> &OutputName; } @@ -53,7 +55,7 @@ fn write_scan( indent: usize, n_columns: i64, total_columns: usize, - predicate: &Option>, + predicate: &Option>, n_rows: Option, ) -> fmt::Result { if indent != 0 { @@ -89,6 +91,7 @@ fn write_scan( } impl<'a> IRDisplay<'a> { + #[recursive] fn _format(&self, f: &mut Formatter, indent: usize) -> fmt::Result { if indent != 0 { writeln!(f)?; @@ -219,8 +222,8 @@ impl<'a> IRDisplay<'a> { }, Select { expr, input, .. } => { // @NOTE: Maybe there should be a clear delimiter here? - let default_exprs = self.display_expr_vec(expr.default_exprs()); - let cse_exprs = self.display_expr_vec(expr.cse_exprs()); + let default_exprs = self.display_expr_slice(expr.default_exprs()); + let cse_exprs = self.display_expr_slice(expr.cse_exprs()); write!(f, "{:indent$} SELECT {default_exprs}, {cse_exprs} FROM", "")?; self.with_root(*input)._format(f, sub_indent) @@ -228,15 +231,15 @@ impl<'a> IRDisplay<'a> { Sort { input, by_column, .. } => { - let by_column = self.display_expr_vec(by_column); + let by_column = self.display_expr_slice(by_column); write!(f, "{:indent$}SORT BY {by_column}", "")?; self.with_root(*input)._format(f, sub_indent) }, GroupBy { input, keys, aggs, .. } => { - let aggs = self.display_expr_vec(aggs); - let keys = self.display_expr_vec(keys); + let aggs = self.display_expr_slice(aggs); + let keys = self.display_expr_slice(keys); write!(f, "{:indent$}AGGREGATE", "")?; write!(f, "\n{:indent$}\t{aggs} BY {keys} FROM", "")?; @@ -250,8 +253,8 @@ impl<'a> IRDisplay<'a> { options, .. } => { - let left_on = self.display_expr_vec(left_on); - let right_on = self.display_expr_vec(right_on); + let left_on = self.display_expr_slice(left_on); + let right_on = self.display_expr_slice(right_on); let how = &options.args.how; write!(f, "{:indent$}{how} JOIN:", "")?; @@ -263,8 +266,8 @@ impl<'a> IRDisplay<'a> { }, HStack { input, exprs, .. } => { // @NOTE: Maybe there should be a clear delimiter here? - let default_exprs = self.display_expr_vec(exprs.default_exprs()); - let cse_exprs = self.display_expr_vec(exprs.cse_exprs()); + let default_exprs = self.display_expr_slice(exprs.default_exprs()); + let cse_exprs = self.display_expr_slice(exprs.cse_exprs()); write!(f, "{:indent$} WITH_COLUMNS:", "",)?; write!(f, "\n{:indent$} {default_exprs}, {cse_exprs} ", "")?; @@ -303,8 +306,17 @@ impl<'a> IRDisplay<'a> { write!(f, "{:indent$}{name}", "")?; self.with_root(*input)._format(f, sub_indent) }, - SimpleProjection { input, .. } => { - write!(f, "{:indent$}SIMPLE_PROJECTION ", "")?; + SimpleProjection { input, columns } => { + let num_columns = columns.as_ref().len(); + let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len(); + + let columns = ColumnsDisplay(columns.as_ref()); + write!( + f, + "{:indent$}simple π {num_columns}/{total_columns} [{columns}]", + "" + )?; + self.with_root(*input)._format(f, sub_indent) }, Invalid => write!(f, "{:indent$}INVALID", ""), @@ -313,43 +325,39 @@ impl<'a> IRDisplay<'a> { } impl<'a> IRDisplay<'a> { - fn display_expr(&self, root: &'a ExprIR) -> ExprDisplay<'a> { - ExprDisplay { - node: root.node(), - output_name: root.output_name_inner(), - expr_arena: self.expr_arena, - } + fn root(&self) -> &IR { + self.0.root() } - fn display_expr_vec(&self, exprs: &'a [ExprIR]) -> ExprVecDisplay<'a, ExprIR> { - ExprVecDisplay { - exprs, - expr_arena: self.expr_arena, - } + fn with_root(&self, root: Node) -> Self { + Self(self.0.with_root(root)) } - fn root(&self) -> &IR { - self.ir_arena.get(self.root) + fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> { + ExprIRDisplay { + node: root.node(), + output_name: root.output_name_inner(), + expr_arena: self.0.expr_arena, + } } - fn with_root(&self, root: Node) -> Self { - Self { - root, - ir_arena: self.ir_arena, - expr_arena: self.expr_arena, + fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { + ExprIRSliceDisplay { + exprs, + expr_arena: self.0.expr_arena, } } } -impl<'a> ExprDisplay<'a> { - pub(crate) fn with_vec(&self, exprs: &'a [T]) -> ExprVecDisplay<'a, T> { - ExprVecDisplay { +impl<'a> ExprIRDisplay<'a> { + fn with_slice(&self, exprs: &'a [T]) -> ExprIRSliceDisplay<'a, T> { + ExprIRSliceDisplay { exprs, expr_arena: self.expr_arena, } } - pub(crate) fn with_root(&self, root: &'a T) -> Self { + fn with_root(&self, root: &'a T) -> Self { Self { node: root.node(), output_name: root.output_name(), @@ -364,7 +372,7 @@ impl<'a> Display for IRDisplay<'a> { } } -impl<'a, T: AsExpr> Display for ExprVecDisplay<'a, T> { +impl<'a, T: AsExpr> Display for ExprIRSliceDisplay<'a, T> { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { // Display items in slice delimited by a comma @@ -374,7 +382,7 @@ impl<'a, T: AsExpr> Display for ExprVecDisplay<'a, T> { f.write_char('[')?; if let Some(fst) = iter.next() { - let fst = ExprDisplay { + let fst = ExprIRDisplay { node: fst.node(), output_name: fst.output_name(), expr_arena: self.expr_arena, @@ -383,7 +391,7 @@ impl<'a, T: AsExpr> Display for ExprVecDisplay<'a, T> { } for expr in iter { - let expr = ExprDisplay { + let expr = ExprIRDisplay { node: expr.node(), output_name: expr.output_name(), expr_arena: self.expr_arena, @@ -397,7 +405,8 @@ impl<'a, T: AsExpr> Display for ExprVecDisplay<'a, T> { } } -impl<'a> Display for ExprDisplay<'a> { +impl<'a> Display for ExprIRDisplay<'a> { + #[recursive] fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let root = self.expr_arena.get(self.node); @@ -409,7 +418,7 @@ impl<'a> Display for ExprDisplay<'a> { options, } => { let function = self.with_root(function); - let partition_by = self.with_vec(partition_by); + let partition_by = self.with_slice(partition_by); match options { #[cfg(feature = "dynamic_group_by")] WindowType::Rolling(options) => { @@ -465,7 +474,7 @@ impl<'a> Display for ExprDisplay<'a> { sort_options, } => { let expr = self.with_root(expr); - let by = self.with_vec(by); + let by = self.with_slice(by); write!(f, "{expr}.sort_by(by={by}, sort_option={sort_options:?})",) }, Filter { input, by } => { @@ -556,7 +565,7 @@ impl<'a> Display for ExprDisplay<'a> { let fst = self.with_root(&input[0]); fst.fmt(f)?; if input.len() >= 2 { - write!(f, ".{function}({})", self.with_vec(&input[1..])) + write!(f, ".{function}({})", self.with_slice(&input[1..])) } else { write!(f, ".{function}()") } @@ -565,7 +574,7 @@ impl<'a> Display for ExprDisplay<'a> { let fst = self.with_root(&input[0]); fst.fmt(f)?; if input.len() >= 2 { - write!(f, ".{}({})", options.fmt_str, self.with_vec(&input[1..])) + write!(f, ".{}({})", options.fmt_str, self.with_slice(&input[1..])) } else { write!(f, ".{}()", options.fmt_str) } @@ -594,3 +603,56 @@ impl<'a> Display for ExprDisplay<'a> { Ok(()) } } + +pub(crate) struct ColumnsDisplay<'a>(pub(crate) &'a Schema); + +impl fmt::Display for ColumnsDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let len = self.0.len(); + let mut iter_names = self.0.iter_names(); + + if let Some(fst) = iter_names.next() { + write!(f, "\"{fst}\"")?; + + if len > 0 { + write!(f, ", ... {len} other columns")?; + } + } + + Ok(()) + } +} + +impl fmt::Debug for Operator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + +impl fmt::Debug for LiteralValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use LiteralValue::*; + + match self { + Binary(_) => write!(f, "[binary value]"), + Range { low, high, .. } => write!(f, "range({low}, {high})"), + Series(s) => { + let name = s.name(); + if name.is_empty() { + write!(f, "Series") + } else { + write!(f, "Series[{name}]") + } + }, + Float(v) => { + let av = AnyValue::Float64(*v); + write!(f, "dyn float: {}", av) + }, + Int(v) => write!(f, "dyn int: {}", v), + _ => { + let av = self.to_any_value().unwrap(); + write!(f, "{av}") + }, + } + } +} diff --git a/crates/polars-plan/src/logical_plan/alp/mod.rs b/crates/polars-plan/src/logical_plan/alp/mod.rs index 5d61cfb5593d..dd55f598a944 100644 --- a/crates/polars-plan/src/logical_plan/alp/mod.rs +++ b/crates/polars-plan/src/logical_plan/alp/mod.rs @@ -1,11 +1,15 @@ +mod dot; mod format; mod inputs; mod schema; +pub(crate) mod tree_format; use std::borrow::Cow; use std::fmt; use std::path::PathBuf; +pub use dot::IRDotDisplay; +pub use format::{ExprIRDisplay, IRDisplay}; use polars_core::prelude::*; use polars_utils::idx_vec::UnitVec; use polars_utils::unitvec; @@ -19,6 +23,13 @@ pub struct IRPlan { pub expr_arena: Arena, } +#[derive(Clone, Copy)] +pub struct IRPlanRef<'a> { + pub lp_top: Node, + pub lp_arena: &'a Arena, + pub expr_arena: &'a Arena, +} + /// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`] /// In this IR the logical plan has access to the full dataset. #[derive(Clone, Debug, Default)] @@ -143,17 +154,65 @@ impl IRPlan { } } - pub fn describe(&self) -> String { - self.display().to_string() + pub fn root(&self) -> &IR { + self.lp_arena.get(self.lp_top) } - fn display(&self) -> format::IRDisplay { - format::IRDisplay { - root: self.lp_top, - ir_arena: &self.lp_arena, + pub fn as_ref(&self) -> IRPlanRef { + IRPlanRef { + lp_top: self.lp_top, + lp_arena: &self.lp_arena, expr_arena: &self.expr_arena, } } + + pub fn describe(&self) -> String { + self.as_ref().describe() + } + + pub fn describe_tree_format(&self) -> String { + self.as_ref().describe_tree_format() + } + + pub fn display(&self) -> format::IRDisplay { + format::IRDisplay(self.as_ref()) + } + + pub fn display_dot(&self) -> dot::IRDotDisplay { + dot::IRDotDisplay(self.as_ref()) + } +} + +impl<'a> IRPlanRef<'a> { + pub fn root(self) -> &'a IR { + self.lp_arena.get(self.lp_top) + } + + pub fn with_root(self, root: Node) -> Self { + Self { + lp_top: root, + lp_arena: self.lp_arena, + expr_arena: self.expr_arena, + } + } + + pub fn display(self) -> format::IRDisplay<'a> { + format::IRDisplay(self) + } + + pub fn display_dot(self) -> dot::IRDotDisplay<'a> { + dot::IRDotDisplay(self) + } + + pub fn describe(self) -> String { + self.display().to_string() + } + + pub fn describe_tree_format(self) -> String { + let mut visitor = tree_format::TreeFmtVisitor::default(); + tree_format::TreeFmtNode::root_logical_plan(self).traverse(&mut visitor); + format!("{visitor:#?}") + } } impl fmt::Debug for IRPlan { @@ -162,6 +221,12 @@ impl fmt::Debug for IRPlan { } } +impl fmt::Debug for IRPlanRef<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&self.display(), f) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/polars-plan/src/logical_plan/tree_format.rs b/crates/polars-plan/src/logical_plan/alp/tree_format.rs similarity index 67% rename from crates/polars-plan/src/logical_plan/tree_format.rs rename to crates/polars-plan/src/logical_plan/alp/tree_format.rs index 5a227a5660db..244280b24eb2 100644 --- a/crates/polars-plan/src/logical_plan/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/alp/tree_format.rs @@ -1,20 +1,30 @@ -use std::borrow::Cow; -use std::fmt::{Debug, Display, Formatter, UpperExp}; +use std::fmt; use polars_core::error::*; #[cfg(feature = "regex")] use regex::Regex; -use crate::constants::LEN; +use crate::constants; +use crate::logical_plan::alp::IRPlanRef; use crate::logical_plan::visitor::{VisitRecursion, Visitor}; +use crate::prelude::alp::format::ColumnsDisplay; use crate::prelude::visitor::AexprNode; use crate::prelude::*; +pub struct TreeFmtNode<'a> { + h: Option, + content: TreeFmtNodeContent<'a>, + + lp: IRPlanRef<'a>, +} + +pub struct TreeFmtAExpr<'a>(&'a AExpr); + /// Hack UpperExpr trait to get a kind of formatting that doesn't traverse the nodes. /// So we can format with {foo:E} -impl UpperExp for AExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let s = match self { +impl fmt::Display for TreeFmtAExpr<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self.0 { AExpr::Explode(_) => "explode", AExpr::Alias(_, name) => return write!(f, "alias({})", name.as_ref()), AExpr::Column(name) => return write!(f, "col({})", name.as_ref()), @@ -62,7 +72,7 @@ impl UpperExp for AExpr { AExpr::Window { .. } => "window", AExpr::Wildcard => "*", AExpr::Slice { .. } => "slice", - AExpr::Len => LEN, + AExpr::Len => constants::LEN, AExpr::Nth(v) => return write!(f, "nth({})", v), }; @@ -70,9 +80,9 @@ impl UpperExp for AExpr { } } -pub enum TreeFmtNode<'a> { - Expression(Option, &'a Expr), - LogicalPlan(Option, &'a DslPlan), +pub enum TreeFmtNodeContent<'a> { + Expression(&'a ExprIR), + LogicalPlan(Node), } struct TreeFmtNodeData<'a>(String, Vec>); @@ -86,14 +96,37 @@ fn with_header(header: &Option, text: &str) -> String { } #[cfg(feature = "regex")] -fn multiline_expression(expr: &str) -> Cow<'_, str> { +fn multiline_expression(expr: &str) -> std::borrow::Cow<'_, str> { let re = Regex::new(r"([\)\]])(\.[a-z0-9]+\()").unwrap(); re.replace_all(expr, "$1\n $2") } impl<'a> TreeFmtNode<'a> { - pub fn root_logical_plan(lp: &'a DslPlan) -> Self { - Self::LogicalPlan(None, lp) + pub fn root_logical_plan(lp: IRPlanRef<'a>) -> Self { + Self { + h: None, + content: TreeFmtNodeContent::LogicalPlan(lp.lp_top), + + lp, + } + } + + pub fn lp_node(&self, h: Option, root: Node) -> Self { + Self { + h, + content: TreeFmtNodeContent::LogicalPlan(root), + + lp: self.lp, + } + } + + pub fn expr_node(&self, h: Option, expr: &'a ExprIR) -> Self { + Self { + h, + content: TreeFmtNodeContent::Expression(expr), + + lp: self.lp, + } } pub fn traverse(&self, visitor: &mut TreeFmtVisitor) { @@ -123,190 +156,203 @@ impl<'a> TreeFmtNode<'a> { } fn node_data(&self) -> TreeFmtNodeData<'_> { - use DslPlan::*; - use TreeFmtNode::{Expression as NE, LogicalPlan as NL}; - use {with_header as wh, TreeFmtNodeData as ND}; + use {with_header as wh, TreeFmtNodeContent as C, TreeFmtNodeData as ND}; - match self { + let lp = &self.lp; + let h = &self.h; + + use IR::*; + match self.content { #[cfg(feature = "regex")] - NE(h, expr) => ND(wh(h, &multiline_expression(&format!("{expr:?}"))), vec![]), - #[cfg(not(feature = "regex"))] - NE(h, expr) => ND(wh(h, &format!("{expr:?}")), vec![]), - #[cfg(feature = "python")] - NL(h, lp @ PythonScan { .. }) => ND(wh(h, &format!("{lp:?}",)), vec![]), - NL(h, lp @ Scan { .. }) => ND(wh(h, &format!("{lp:?}",)), vec![]), - NL( - h, - DataFrameScan { - schema, - projection, - selection, - .. - }, - ) => ND( + C::Expression(expr) => ND( wh( h, - &format!( - "DF {:?}\nPROJECT {}/{} COLUMNS", - schema.iter_names().take(4).collect::>(), - if let Some(columns) = projection { - format!("{}", columns.len()) + &multiline_expression(&expr.display(self.lp.expr_arena).to_string()), + ), + vec![], + ), + #[cfg(not(feature = "regex"))] + C::Expression(expr) => ND(wh(h, &expr.display(self.lp.expr_arena).to_string()), vec![]), + C::LogicalPlan(lp_top) => { + match self.lp.with_root(lp_top).root() { + #[cfg(feature = "python")] + PythonScan { .. } => ND(wh(h, &lp.describe()), vec![]), + Scan { .. } => ND(wh(h, &lp.describe()), vec![]), + DataFrameScan { + schema, + projection, + selection, + .. + } => ND( + wh( + h, + &format!( + "DF {:?}\nPROJECT {}/{} COLUMNS", + schema.iter_names().take(4).collect::>(), + if let Some(columns) = projection { + format!("{}", columns.len()) + } else { + "*".to_string() + }, + schema.len() + ), + ), + if let Some(expr) = selection { + vec![self.expr_node(Some("SELECTION:".to_string()), expr)] } else { - "*".to_string() + vec![] }, - schema.len() ), - ), - if let Some(expr) = selection { - vec![NE(Some("SELECTION:".to_string()), expr)] - } else { - vec![] - }, - ), - NL(h, Union { inputs, .. }) => ND( - wh( - h, - // THis is commented out, but must be restored when we convert to IR's. - // &(if let Some(slice) = options.slice { - // format!("SLICED UNION: {slice:?}") - // } else { - // "UNION".to_string() - // }), - "UNION", - ), - inputs - .iter() - .enumerate() - .map(|(i, lp)| NL(Some(format!("PLAN {i}:")), lp)) - .collect(), - ), - NL(h, HConcat { inputs, .. }) => ND( - wh(h, "HCONCAT"), - inputs - .iter() - .enumerate() - .map(|(i, lp)| NL(Some(format!("PLAN {i}:")), lp)) - .collect(), - ), - NL( - h, - Cache { - input, - id, - cache_hits, - }, - ) => ND( - wh( - h, - &format!("CACHE[id: {:x}, cache_hits: {}]", *id, *cache_hits), - ), - vec![NL(None, input)], - ), - NL(h, Filter { input, predicate }) => ND( - wh(h, "FILTER"), - vec![ - NE(Some("predicate:".to_string()), predicate), - NL(Some("FROM:".to_string()), input), - ], - ), - NL(h, Select { expr, input, .. }) => ND( - wh(h, "SELECT"), - expr.iter() - .map(|expr| NE(Some("expression:".to_string()), expr)) - .chain([NL(Some("FROM:".to_string()), input)]) - .collect(), - ), - NL( - h, - DslPlan::Sort { - input, by_column, .. - }, - ) => ND( - wh(h, "SORT BY"), - by_column - .iter() - .map(|expr| NE(Some("expression:".to_string()), expr)) - .chain([NL(None, input)]) - .collect(), - ), - NL( - h, - GroupBy { - input, keys, aggs, .. - }, - ) => ND( - wh(h, "AGGREGATE"), - aggs.iter() - .map(|expr| NE(Some("expression:".to_string()), expr)) - .chain( - keys.iter() - .map(|expr| NE(Some("aggregate by:".to_string()), expr)), - ) - .chain([NL(Some("FROM:".to_string()), input)]) - .collect(), - ), - NL( - h, - Join { - input_left, - input_right, - left_on, - right_on, - options, - .. - }, - ) => ND( - wh(h, &format!("{} JOIN", options.args.how)), - left_on - .iter() - .map(|expr| NE(Some("left on:".to_string()), expr)) - .chain([NL(Some("LEFT PLAN:".to_string()), input_left)]) - .chain( - right_on + Union { inputs, .. } => ND( + wh( + h, + // THis is commented out, but must be restored when we convert to IR's. + // &(if let Some(slice) = options.slice { + // format!("SLICED UNION: {slice:?}") + // } else { + // "UNION".to_string() + // }), + "UNION", + ), + inputs .iter() - .map(|expr| NE(Some("right on:".to_string()), expr)), - ) - .chain([NL(Some("RIGHT PLAN:".to_string()), input_right)]) - .collect(), - ), - NL(h, HStack { input, exprs, .. }) => ND( - wh(h, "WITH_COLUMNS"), - exprs - .iter() - .map(|expr| NE(Some("expression:".to_string()), expr)) - .chain([NL(None, input)]) - .collect(), - ), - NL(h, Distinct { input, options }) => ND( - wh( - h, - &format!( - "UNIQUE[maintain_order: {:?}, keep_strategy: {:?}] BY {:?}", - options.maintain_order, options.keep_strategy, options.subset + .enumerate() + .map(|(i, lp_root)| self.lp_node(Some(format!("PLAN {i}:")), *lp_root)) + .collect(), ), - ), - vec![NL(None, input)], - ), - NL(h, DslPlan::Slice { input, offset, len }) => ND( - wh(h, &format!("SLICE[offset: {offset}, len: {len}]")), - vec![NL(None, input)], - ), - NL(h, MapFunction { input, function }) => { - ND(wh(h, &format!("{function}")), vec![NL(None, input)]) - }, - NL(h, ExtContext { input, .. }) => ND(wh(h, "EXTERNAL_CONTEXT"), vec![NL(None, input)]), - NL(h, Sink { input, payload }) => ND( - wh( - h, - match payload { - SinkType::Memory => "SINK (memory)", - SinkType::File { .. } => "SINK (file)", - #[cfg(feature = "cloud")] - SinkType::Cloud { .. } => "SINK (cloud)", + HConcat { inputs, .. } => ND( + wh(h, "HCONCAT"), + inputs + .iter() + .enumerate() + .map(|(i, lp_root)| self.lp_node(Some(format!("PLAN {i}:")), *lp_root)) + .collect(), + ), + Cache { + input, + id, + cache_hits, + } => ND( + wh( + h, + &format!("CACHE[id: {:x}, cache_hits: {}]", *id, *cache_hits), + ), + vec![self.lp_node(None, *input)], + ), + Filter { input, predicate } => ND( + wh(h, "FILTER"), + vec![ + self.expr_node(Some("predicate:".to_string()), predicate), + self.lp_node(Some("FROM:".to_string()), *input), + ], + ), + Select { expr, input, .. } => ND( + wh(h, "SELECT"), + expr.iter() + .map(|expr| self.expr_node(Some("expression:".to_string()), expr)) + .chain([self.lp_node(Some("FROM:".to_string()), *input)]) + .collect(), + ), + Sort { + input, by_column, .. + } => ND( + wh(h, "SORT BY"), + by_column + .iter() + .map(|expr| self.expr_node(Some("expression:".to_string()), expr)) + .chain([self.lp_node(None, *input)]) + .collect(), + ), + GroupBy { + input, keys, aggs, .. + } => ND( + wh(h, "AGGREGATE"), + aggs.iter() + .map(|expr| self.expr_node(Some("expression:".to_string()), expr)) + .chain(keys.iter().map(|expr| { + self.expr_node(Some("aggregate by:".to_string()), expr) + })) + .chain([self.lp_node(Some("FROM:".to_string()), *input)]) + .collect(), + ), + Join { + input_left, + input_right, + left_on, + right_on, + options, + .. + } => ND( + wh(h, &format!("{} JOIN", options.args.how)), + left_on + .iter() + .map(|expr| self.expr_node(Some("left on:".to_string()), expr)) + .chain([self.lp_node(Some("LEFT PLAN:".to_string()), *input_left)]) + .chain( + right_on.iter().map(|expr| { + self.expr_node(Some("right on:".to_string()), expr) + }), + ) + .chain([self.lp_node(Some("RIGHT PLAN:".to_string()), *input_right)]) + .collect(), + ), + HStack { input, exprs, .. } => ND( + wh(h, "WITH_COLUMNS"), + exprs + .iter() + .map(|expr| self.expr_node(Some("expression:".to_string()), expr)) + .chain([self.lp_node(None, *input)]) + .collect(), + ), + Distinct { input, options } => ND( + wh( + h, + &format!( + "UNIQUE[maintain_order: {:?}, keep_strategy: {:?}] BY {:?}", + options.maintain_order, options.keep_strategy, options.subset + ), + ), + vec![self.lp_node(None, *input)], + ), + Slice { input, offset, len } => ND( + wh(h, &format!("SLICE[offset: {offset}, len: {len}]")), + vec![self.lp_node(None, *input)], + ), + MapFunction { input, function } => ND( + wh(h, &format!("{function}")), + vec![self.lp_node(None, *input)], + ), + ExtContext { input, .. } => { + ND(wh(h, "EXTERNAL_CONTEXT"), vec![self.lp_node(None, *input)]) }, - ), - vec![NL(None, input)], - ), + Sink { input, payload } => ND( + wh( + h, + match payload { + SinkType::Memory => "SINK (memory)", + SinkType::File { .. } => "SINK (file)", + #[cfg(feature = "cloud")] + SinkType::Cloud { .. } => "SINK (cloud)", + }, + ), + vec![self.lp_node(None, *input)], + ), + SimpleProjection { input, columns } => { + let num_columns = columns.as_ref().len(); + let total_columns = lp.lp_arena.get(*input).schema(lp.lp_arena).len(); + + let columns = ColumnsDisplay(columns.as_ref()); + ND( + wh( + h, + &format!("simple π {num_columns}/{total_columns} [{columns}]"), + ), + vec![self.lp_node(None, *input)], + ) + }, + Invalid => ND(wh(h, "INVALID"), vec![]), + } + }, } } } @@ -329,8 +375,8 @@ impl Visitor for TreeFmtVisitor { node: &Self::Node, arena: &Self::Arena, ) -> PolarsResult { - let ae = node.to_aexpr(arena); - let repr = format!("{:E}", ae); + let repr = TreeFmtAExpr(arena.get(node.node())); + let repr = repr.to_string(); if self.levels.len() <= self.depth { self.levels.push(vec![]) @@ -787,8 +833,8 @@ impl From> for Canvas { } } -impl Display for Canvas { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for Canvas { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { for row in &self.canvas { writeln!(f, "{}", row.iter().collect::().trim_end())?; } @@ -797,14 +843,14 @@ impl Display for Canvas { } } -impl Display for TreeFmtVisitor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Debug::fmt(self, f) +impl fmt::Display for TreeFmtVisitor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + fmt::Debug::fmt(self, f) } } -impl Debug for TreeFmtVisitor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for TreeFmtVisitor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { let tree_view: TreeView<'_> = self.levels.as_slice().into(); let canvas: Canvas = tree_view.into(); write!(f, "{canvas}")?; diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 2bb58dc83127..b5ba674f45d7 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -579,7 +579,7 @@ fn expand_filter(predicate: Expr, input: Node, lp_arena: &Arena) -> PolarsRe _ => { let mut expanded = String::new(); for e in rewritten.iter().take(5) { - expanded.push_str(&format!("\t{e},\n")) + expanded.push_str(&format!("\t{e:?},\n")) } // pop latest comma expanded.pop(); diff --git a/crates/polars-plan/src/logical_plan/debug.rs b/crates/polars-plan/src/logical_plan/debug.rs index fac0e7c75600..c4f4690b86b4 100644 --- a/crates/polars-plan/src/logical_plan/debug.rs +++ b/crates/polars-plan/src/logical_plan/debug.rs @@ -7,7 +7,7 @@ pub fn dbg_nodes(nodes: &[Node], arena: &Arena) { println!("["); for node in nodes { let e = node_to_expr(*node, arena); - println!("{e}") + println!("{e:?}") } println!("]"); } diff --git a/crates/polars-plan/src/logical_plan/expr_ir.rs b/crates/polars-plan/src/logical_plan/expr_ir.rs index b8281aa8bf04..7929d7b43a8d 100644 --- a/crates/polars-plan/src/logical_plan/expr_ir.rs +++ b/crates/polars-plan/src/logical_plan/expr_ir.rs @@ -111,6 +111,15 @@ impl ExprIR { self.node } + /// Create a `ExprIR` structure that implements display + pub fn display<'a>(&'a self, expr_arena: &'a Arena) -> ExprIRDisplay<'a> { + ExprIRDisplay { + node: self.node(), + output_name: self.output_name_inner(), + expr_arena, + } + } + pub(crate) fn set_node(&mut self, node: Node) { self.node = node; } diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index 53f04f36b9e9..841e6f0f8b13 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -1,272 +1,15 @@ -use std::borrow::Cow; use std::fmt; -use std::fmt::{Debug, Display, Formatter}; -use std::path::PathBuf; - -use polars_core::prelude::AnyValue; use crate::prelude::*; -#[allow(clippy::too_many_arguments)] -fn write_scan( - f: &mut Formatter, - name: &str, - path: &[PathBuf], - indent: usize, - n_columns: i64, - total_columns: Option, - predicate: &Option

, - n_rows: Option, -) -> fmt::Result { - if indent != 0 { - writeln!(f)?; - } - let path_fmt = match path.len() { - 1 => path[0].to_string_lossy(), - 0 => "".into(), - _ => Cow::Owned(format!( - "{} files: first file: {}", - path.len(), - path[0].to_string_lossy() - )), - }; - let total_columns = total_columns - .map(|v| format!("{v}")) - .unwrap_or_else(|| "?".to_string()); - - write!(f, "{:indent$}{name} SCAN {path_fmt}", "")?; - if n_columns > 0 { - write!( - f, - "\n{:indent$}PROJECT {n_columns}/{total_columns} COLUMNS", - "", - )?; - } else { - write!(f, "\n{:indent$}PROJECT */{total_columns} COLUMNS", "")?; - } - if let Some(predicate) = predicate { - write!(f, "\n{:indent$}SELECTION: {predicate}", "")?; - } - if let Some(n_rows) = n_rows { - write!(f, "\n{:indent$}N_ROWS: {n_rows}", "")?; - } - Ok(()) -} - -impl DslPlan { - fn _format(&self, f: &mut Formatter, indent: usize) -> fmt::Result { - if indent != 0 { - writeln!(f)?; - } - let sub_indent = indent + 2; - use DslPlan::*; - match self { - #[cfg(feature = "python")] - PythonScan { options } => { - let total_columns = Some(options.schema.len()); - let n_columns = options - .with_columns - .as_ref() - .map(|s| s.len() as i64) - .unwrap_or(-1); - - write_scan( - f, - "PYTHON", - &[], - sub_indent, - n_columns, - total_columns, - &options.predicate, - options.n_rows, - ) - }, - Union { inputs, .. } => { - // let mut name = String::new(); - // THIS is commented out, but must be restored once we format IR's - // let name = if let Some(slice) = options.slice { - // write!(name, "SLICED UNION: {slice:?}")?; - // name.as_str() - // } else { - // "UNION" - // }; - let name = "UNION"; - // 3 levels of indentation - // - 0 => UNION ... END UNION - // - 1 => PLAN 0, PLAN 1, ... PLAN N - // - 2 => actual formatting of plans - let sub_sub_indent = sub_indent + 2; - write!(f, "{:indent$}{name}", "")?; - for (i, plan) in inputs.iter().enumerate() { - write!(f, "\n{:sub_indent$}PLAN {i}:", "")?; - plan._format(f, sub_sub_indent)?; - } - write!(f, "\n{:indent$}END {name}", "") - }, - HConcat { inputs, .. } => { - let sub_sub_indent = sub_indent + 2; - write!(f, "{:indent$}HCONCAT", "")?; - for (i, plan) in inputs.iter().enumerate() { - write!(f, "\n{:sub_indent$}PLAN {i}:", "")?; - plan._format(f, sub_sub_indent)?; - } - write!(f, "\n{:indent$}END HCONCAT", "") - }, - Cache { - input, - id, - cache_hits, - } => { - write!( - f, - "{:indent$}CACHE[id: {:x}, cache_hits: {}]", - "", *id, *cache_hits - )?; - input._format(f, sub_indent) - }, - Scan { - paths, - file_info, - predicate, - scan_type, - file_options, - .. - } => { - let n_columns = file_options - .with_columns - .as_ref() - .map(|columns| columns.len() as i64) - .unwrap_or(-1); - write_scan( - f, - scan_type.into(), - paths, - sub_indent, - n_columns, - file_info.as_ref().map(|fi| fi.schema.len()), - predicate, - file_options.n_rows, - ) - }, - Filter { predicate, input } => { - // this one is writeln because we don't increase indent (which inserts a line) - writeln!(f, "{:indent$}FILTER {predicate:?} FROM", "")?; - input._format(f, indent) - }, - DataFrameScan { - schema, - projection, - selection, - .. - } => { - let total_columns = schema.len(); - let mut n_columns = "*".to_string(); - if let Some(columns) = projection { - n_columns = format!("{}", columns.len()); - } - let selection = match selection { - Some(s) => Cow::Owned(format!("{s:?}")), - None => Cow::Borrowed("None"), - }; - write!( - f, - "{:indent$}DF {:?}; PROJECT {}/{} COLUMNS; SELECTION: {:?}", - "", - schema.iter_names().take(4).collect::>(), - n_columns, - total_columns, - selection, - ) - }, - Select { expr, input, .. } => { - write!(f, "{:indent$} SELECT {expr:?} FROM", "")?; - input._format(f, sub_indent) - }, - Sort { - input, by_column, .. - } => { - write!(f, "{:indent$}SORT BY {by_column:?}", "")?; - input._format(f, sub_indent) - }, - GroupBy { - input, keys, aggs, .. - } => { - write!(f, "{:indent$}AGGREGATE", "")?; - write!(f, "\n{:indent$}\t{aggs:?} BY {keys:?} FROM", "")?; - input._format(f, sub_indent) - }, - Join { - input_left, - input_right, - left_on, - right_on, - options, - .. - } => { - let how = &options.args.how; - write!(f, "{:indent$}{how} JOIN:", "")?; - write!(f, "\n{:indent$}LEFT PLAN ON: {left_on:?}", "")?; - input_left._format(f, sub_indent)?; - write!(f, "\n{:indent$}RIGHT PLAN ON: {right_on:?}", "")?; - input_right._format(f, sub_indent)?; - write!(f, "\n{:indent$}END {how} JOIN", "") - }, - HStack { input, exprs, .. } => { - write!(f, "{:indent$} WITH_COLUMNS:", "",)?; - write!(f, "\n{:indent$} {exprs:?}", "")?; - input._format(f, sub_indent) - }, - Distinct { input, options } => { - write!( - f, - "{:indent$}UNIQUE[maintain_order: {:?}, keep_strategy: {:?}] BY {:?}", - "", options.maintain_order, options.keep_strategy, options.subset - )?; - input._format(f, sub_indent) - }, - Slice { input, offset, len } => { - write!(f, "{:indent$}SLICE[offset: {offset}, len: {len}]", "")?; - input._format(f, sub_indent) - }, - MapFunction { - input, function, .. - } => { - let function_fmt = format!("{function}"); - write!(f, "{:indent$}{function_fmt}", "")?; - input._format(f, sub_indent) - }, - ExtContext { input, .. } => { - write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?; - input._format(f, sub_indent) - }, - Sink { input, payload, .. } => { - let name = match payload { - SinkType::Memory => "SINK (memory)", - SinkType::File { .. } => "SINK (file)", - #[cfg(feature = "cloud")] - SinkType::Cloud { .. } => "SINK (cloud)", - }; - write!(f, "{:indent$}{name}", "")?; - input._format(f, sub_indent) - }, - } - } -} - -impl Debug for DslPlan { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - self._format(f, 0) +impl fmt::Display for Expr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self, f) } } -impl Display for Expr { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - Debug::fmt(self, f) - } -} - -impl Debug for Expr { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { +impl fmt::Debug for Expr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use Expr::*; match self { Window { @@ -425,37 +168,3 @@ impl Debug for Expr { } } } - -impl Debug for Operator { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - Display::fmt(self, f) - } -} - -impl Debug for LiteralValue { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - use LiteralValue::*; - - match self { - Binary(_) => write!(f, "[binary value]"), - Range { low, high, .. } => write!(f, "range({low}, {high})"), - Series(s) => { - let name = s.name(); - if name.is_empty() { - write!(f, "Series") - } else { - write!(f, "Series[{name}]") - } - }, - Float(v) => { - let av = AnyValue::Float64(*v); - write!(f, "dyn float: {}", av) - }, - Int(v) => write!(f, "dyn int: {}", v), - _ => { - let av = self.to_any_value().unwrap(); - write!(f, "{av}") - }, - } - } -} diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index 7de90431ab17..8c12cf8eae8b 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -327,12 +327,15 @@ impl Display for FunctionNode { MergeSorted { .. } => write!(f, "MERGE SORTED"), Pipeline { original, .. } => { if let Some(original) = original { + let ir_plan = original.as_ref().clone().to_alp().unwrap(); + let ir_display = ir_plan.display(); + writeln!(f, "--- STREAMING")?; - write!(f, "{:?}", original.as_ref())?; + write!(f, "{ir_display}")?; let indent = 2; - writeln!(f, "{:indent$}--- END STREAMING", "") + write!(f, "{:indent$}--- END STREAMING", "") } else { - writeln!(f, "STREAMING") + write!(f, "STREAMING") } }, Rename { .. } => write!(f, "RENAME"), diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index e403c6dc84ab..6d83c1bf139d 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; @@ -33,7 +34,6 @@ mod projection_expr; #[cfg(feature = "python")] mod pyarrow; mod schema; -pub(crate) mod tree_format; pub mod visitor; pub use aexpr::*; @@ -54,8 +54,6 @@ pub use schema::*; use serde::{Deserialize, Serialize}; use strum_macros::IntoStaticStr; -use self::tree_format::{TreeFmtNode, TreeFmtVisitor}; - pub type ColumnName = Arc; #[derive(Clone, Copy, Debug)] @@ -219,22 +217,31 @@ impl Default for DslPlan { } impl DslPlan { - pub fn describe(&self) -> String { - format!("{self:#?}") + pub fn describe(&self) -> PolarsResult { + Ok(self.clone().to_alp()?.describe()) + } + + pub fn describe_tree_format(&self) -> PolarsResult { + Ok(self.clone().to_alp()?.describe_tree_format()) } - pub fn describe_tree_format(&self) -> String { - let mut visitor = TreeFmtVisitor::default(); - TreeFmtNode::root_logical_plan(self).traverse(&mut visitor); - format!("{visitor:#?}") + pub fn display(&self) -> PolarsResult { + struct DslPlanDisplay(IRPlan); + impl fmt::Display for DslPlanDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.as_ref().display().fmt(f) + } + } + Ok(DslPlanDisplay(self.clone().to_alp()?)) } - pub fn to_alp(self) -> PolarsResult<(Node, Arena, Arena)> { + pub fn to_alp(self) -> PolarsResult { let mut lp_arena = Arena::with_capacity(16); let mut expr_arena = Arena::with_capacity(16); let node = to_alp(self, &mut expr_arena, &mut lp_arena, true, true)?; + let plan = IRPlan::new(node, lp_arena, expr_arena); - Ok((node, lp_arena, expr_arena)) + Ok(plan) } } diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 91e457a59abb..77c2eb2209c5 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -415,28 +415,32 @@ impl PyLazyFrame { Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into()) } - fn describe_plan(&self) -> String { - self.ldf.describe_plan() + fn describe_plan(&self) -> PyResult { + self.ldf + .describe_plan() + .map_err(PyPolarsErr::from) + .map_err(Into::into) } fn describe_optimized_plan(&self) -> PyResult { - let result = self - .ldf + self.ldf .describe_optimized_plan() - .map_err(PyPolarsErr::from)?; - Ok(result) + .map_err(PyPolarsErr::from) + .map_err(Into::into) } - fn describe_plan_tree(&self) -> String { - self.ldf.describe_plan_tree() + fn describe_plan_tree(&self) -> PyResult { + self.ldf + .describe_plan_tree() + .map_err(PyPolarsErr::from) + .map_err(Into::into) } fn describe_optimized_plan_tree(&self) -> PyResult { - let result = self - .ldf + self.ldf .describe_optimized_plan_tree() - .map_err(PyPolarsErr::from)?; - Ok(result) + .map_err(PyPolarsErr::from) + .map_err(Into::into) } fn to_dot(&self, optimized: bool) -> PyResult { diff --git a/py-polars/tests/unit/streaming/test_streaming.py b/py-polars/tests/unit/streaming/test_streaming.py index 34a985ab1735..17303c6bc5e3 100644 --- a/py-polars/tests/unit/streaming/test_streaming.py +++ b/py-polars/tests/unit/streaming/test_streaming.py @@ -138,7 +138,7 @@ def test_streaming_ternary() -> None: pl.when(pl.col("a") >= 2).then(pl.col("a")).otherwise(None).alias("b"), ) .explain(streaming=True) - .startswith("--- STREAMING") + .startswith("STREAMING") ) @@ -357,7 +357,7 @@ def test_streaming_with_hconcat(tmp_path: Path) -> None: for i, line in enumerate(plan_lines): if line.startswith("PLAN"): assert plan_lines[i + 1].startswith( - "--- STREAMING" + "STREAMING" ), f"{line} does not contain a streaming section" result = query.collect(streaming=True) diff --git a/py-polars/tests/unit/streaming/test_streaming_cse.py b/py-polars/tests/unit/streaming/test_streaming_cse.py index 909bbc272f3a..d0691e042d56 100644 --- a/py-polars/tests/unit/streaming/test_streaming_cse.py +++ b/py-polars/tests/unit/streaming/test_streaming_cse.py @@ -79,7 +79,7 @@ def test_cse_expr_group_by() -> None: # check if it uses CSE_expr # and is a complete pipeline assert "__POLARS_CSER" in s - assert s.startswith("--- STREAMING") + assert s.startswith("STREAMING") expected = pl.DataFrame( {"a": [1, 2, 3, 4], "sum": [1, 4, 9, 16], "min": [1, 4, 9, 16]} diff --git a/py-polars/tests/unit/streaming/test_streaming_group_by.py b/py-polars/tests/unit/streaming/test_streaming_group_by.py index 133c9755cafb..c7f22f357eea 100644 --- a/py-polars/tests/unit/streaming/test_streaming_group_by.py +++ b/py-polars/tests/unit/streaming/test_streaming_group_by.py @@ -396,7 +396,7 @@ def test_streaming_restart_non_streamable_group_by() -> None: ) # non-streamable UDF + nested_agg ) - assert """--- STREAMING""" in res.explain(streaming=True) + assert "STREAMING" in res.explain(streaming=True) def test_group_by_min_max_string_type() -> None: