From 4e74bde683001839bdc9071246dcbc93daed7c6b Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 16:33:25 +0800 Subject: [PATCH] feat: Plan with types --- Cargo.lock | 17 +++ src/flow/Cargo.toml | 1 + src/flow/src/compute/render.rs | 10 +- src/flow/src/compute/render/map.rs | 19 ++- src/flow/src/compute/render/reduce.rs | 50 ++++++-- src/flow/src/expr/scalar.rs | 36 +++++- src/flow/src/plan.rs | 48 +++---- src/flow/src/repr/relation.rs | 8 ++ src/flow/src/transform/aggr.rs | 173 ++++++++++++++++---------- src/flow/src/transform/expr.rs | 49 ++++++-- src/flow/src/transform/plan.rs | 14 ++- 11 files changed, 304 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcb3b9de8bbd..fc7c994908f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3824,6 +3824,7 @@ dependencies = [ "minstant", "nom", "num-traits", + "pretty_assertions", "prost 0.12.4", "query", "serde", @@ -7364,6 +7365,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "pretty_assertions" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettydiff" version = "0.6.4" @@ -12603,6 +12614,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yansi" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + [[package]] name = "zerocopy" version = "0.7.34" diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 510c92d6e8cf..270e2edbf80c 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -50,6 +50,7 @@ tonic.workspace = true [dev-dependencies] catalog.workspace = true common-catalog.workspace = true +pretty_assertions = "1.4.0" prost.workspace = true query.workspace = true serde_json = "1.0" diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index db479eadb352..c1c4b37cbb4e 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -39,7 +39,7 @@ use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{ self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr, }; -use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan}; +use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement}; @@ -101,8 +101,8 @@ impl<'referred, 'df> Context<'referred, 'df> { /// Interpret and execute plan /// /// return the output of this plan - pub fn render_plan(&mut self, plan: Plan) -> Result { - match plan { + pub fn render_plan(&mut self, plan: TypedPlan) -> Result { + match plan.plan { Plan::Constant { rows } => Ok(self.render_constant(rows)), Plan::Get { id } => self.get_by_id(id), Plan::Let { id, value, body } => self.eval_let(id, value, body), @@ -193,8 +193,8 @@ impl<'referred, 'df> Context<'referred, 'df> { pub fn eval_let( &mut self, id: LocalId, - value: Box, - body: Box, + value: Box, + body: Box, ) -> Result { let value = self.render_plan(*value)?; diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 4c4b41953cb1..2261f4de14f7 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -24,7 +24,7 @@ use crate::compute::render::Context; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr}; -use crate::plan::Plan; +use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::ArrangeHandler; @@ -38,7 +38,7 @@ impl<'referred, 'df> Context<'referred, 'df> { #[allow(clippy::mutable_key_type)] pub fn render_mfp( &mut self, - input: Box, + input: Box, mfp: MapFilterProject, ) -> Result { let input = self.render_plan(*input)?; @@ -184,6 +184,7 @@ mod test { use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check}; use crate::compute::state::DataflowState; use crate::expr::{self, BinaryFunc, GlobalId}; + use crate::repr::{ColumnType, RelationType}; /// test if temporal filter works properly /// namely: if mfp operator can schedule a delete at the correct time @@ -203,6 +204,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); // temporal filter: now <= col(0) < now + 4 let mfp = MapFilterProject::new(1) .filter(vec![ @@ -225,7 +229,9 @@ mod test { ]) .unwrap(); - let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap(); + let bundle = ctx + .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .unwrap(); let output = get_output_handle(&mut ctx, bundle); // drop ctx here to simulate actual process of compile first, run later scenario drop(ctx); @@ -273,6 +279,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); // filter: col(0)>1 let mfp = MapFilterProject::new(1) .filter(vec![ScalarExpr::Column(0).call_binary( @@ -280,7 +289,9 @@ mod test { BinaryFunc::Gt, )]) .unwrap(); - let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap(); + let bundle = ctx + .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .unwrap(); let output = get_output_handle(&mut ctx, bundle); drop(ctx); diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index c8ebefb7fb04..46b2dc196f00 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -28,7 +28,7 @@ use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; -use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan}; +use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter}; @@ -39,7 +39,7 @@ impl<'referred, 'df> Context<'referred, 'df> { #[allow(clippy::mutable_key_type)] pub fn render_reduce( &mut self, - input: Box, + input: Box, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, ) -> Result { @@ -736,6 +736,7 @@ mod test { use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check}; use crate::compute::state::DataflowState; use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject}; + use crate::repr::{ColumnType, RelationType}; /// SELECT DISTINCT col FROM table /// @@ -762,13 +763,20 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); let key_val_plan = KeyValPlan { key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), }; let reduce_plan = ReducePlan::Distinct; let bundle = ctx - .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .render_reduce( + Box::new(input_plan.with_types(typ)), + key_val_plan, + reduce_plan, + ) .unwrap(); let output = get_output_handle(&mut ctx, bundle); @@ -809,6 +817,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); let key_val_plan = KeyValPlan { key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), @@ -835,7 +846,11 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx - .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .render_reduce( + Box::new(input_plan.with_types(typ)), + key_val_plan, + reduce_plan, + ) .unwrap(); let output = get_output_handle(&mut ctx, bundle); @@ -882,6 +897,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); let key_val_plan = KeyValPlan { key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), @@ -908,7 +926,11 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx - .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .render_reduce( + Box::new(input_plan.with_types(typ)), + key_val_plan, + reduce_plan, + ) .unwrap(); let output = get_output_handle(&mut ctx, bundle); @@ -951,6 +973,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); let key_val_plan = KeyValPlan { key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), @@ -977,7 +1002,11 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx - .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .render_reduce( + Box::new(input_plan.with_types(typ)), + key_val_plan, + reduce_plan, + ) .unwrap(); let output = get_output_handle(&mut ctx, bundle); @@ -1020,6 +1049,9 @@ mod test { let input_plan = Plan::Get { id: expr::Id::Global(GlobalId::User(1)), }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); let key_val_plan = KeyValPlan { key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), @@ -1061,7 +1093,11 @@ mod test { let reduce_plan = ReducePlan::Accumulable(accum_plan); let bundle = ctx - .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .render_reduce( + Box::new(input_plan.with_types(typ)), + key_val_plan, + reduce_plan, + ) .unwrap(); let output = get_output_handle(&mut ctx, bundle); diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 0f979f65edb7..c217e323dbc4 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -21,7 +21,9 @@ use datatypes::value::Value; use serde::{Deserialize, Serialize}; use snafu::ensure; -use crate::adapter::error::{Error, InvalidQuerySnafu, UnsupportedTemporalFilterSnafu}; +use crate::adapter::error::{ + Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu, +}; use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; use crate::repr::ColumnType; @@ -80,6 +82,38 @@ pub enum ScalarExpr { }, } +impl ScalarExpr { + /// try to determine the type of the expression + pub fn typ(&self, input_types: &[ColumnType]) -> Result { + match self { + ScalarExpr::Column(i) => input_types.get(*i).cloned().ok_or_else(|| { + UnexpectedSnafu { + reason: format!( + "column index {} out of range of len={}", + i, + input_types.len() + ), + } + .build() + }), + ScalarExpr::Literal(_, typ) => Ok(ColumnType::new_nullable(typ.clone())), + ScalarExpr::CallUnmaterializable(func) => { + Ok(ColumnType::new_nullable(func.signature().output)) + } + ScalarExpr::CallUnary { func, .. } => { + Ok(ColumnType::new_nullable(func.signature().output)) + } + ScalarExpr::CallBinary { func, .. } => { + Ok(ColumnType::new_nullable(func.signature().output)) + } + ScalarExpr::CallVariadic { func, .. } => { + Ok(ColumnType::new_nullable(func.signature().output)) + } + ScalarExpr::If { then, .. } => then.typ(input_types), + } + } +} + impl ScalarExpr { /// apply optimization to the expression, like flatten variadic function pub fn optimize(&mut self) { diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 2ff4ebc67a5c..5b28d8c7d55e 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -44,6 +44,7 @@ pub struct TypedPlan { impl TypedPlan { /// directly apply a mfp to the plan pub fn mfp(self, mfp: MapFilterProject) -> Result { + let new_type = self.typ.apply_mfp(&mfp, &[])?; let plan = match self.plan { Plan::Mfp { input, @@ -53,12 +54,12 @@ impl TypedPlan { mfp: MapFilterProject::compose(old_mfp, mfp)?, }, _ => Plan::Mfp { - input: Box::new(self.plan), + input: Box::new(self), mfp, }, }; Ok(TypedPlan { - typ: self.typ, + typ: new_type, plan, }) } @@ -85,7 +86,7 @@ impl TypedPlan { mfp: MapFilterProject::compose(old_mfp, mfp)?, }, _ => Plan::Mfp { - input: Box::new(self.plan), + input: Box::new(self), mfp, }, }; @@ -94,6 +95,7 @@ impl TypedPlan { /// Add a new filter to the plan, will filter out the records that do not satisfy the filter pub fn filter(self, filter: TypedExpr) -> Result { + let typ = self.typ.clone(); let plan = match self.plan { Plan::Mfp { input, @@ -103,15 +105,11 @@ impl TypedPlan { mfp: old_mfp.filter(vec![filter.expr])?, }, _ => Plan::Mfp { - input: Box::new(self.plan), - mfp: MapFilterProject::new(self.typ.column_types.len()) - .filter(vec![filter.expr])?, + input: Box::new(self), + mfp: MapFilterProject::new(typ.column_types.len()).filter(vec![filter.expr])?, }, }; - Ok(TypedPlan { - typ: self.typ, - plan, - }) + Ok(TypedPlan { typ, plan }) } } @@ -135,20 +133,20 @@ pub enum Plan { /// } Let { id: LocalId, - value: Box, - body: Box, + value: Box, + body: Box, }, /// Map, Filter, and Project operators. Chained together. Mfp { /// The input collection. - input: Box, + input: Box, /// Linear operator to apply to each record. mfp: MapFilterProject, }, /// Reduce operator, aggregation by key assembled from KeyValPlan Reduce { /// The input collection. - input: Box, + input: Box, /// A plan for changing input records into key, value pairs. key_val_plan: KeyValPlan, /// A plan for performing the reduce. @@ -164,7 +162,7 @@ pub enum Plan { /// strategy we will use, and any pushed down per-record work. Join { /// An ordered list of inputs that will be joined. - inputs: Vec, + inputs: Vec, /// Detailed information about the implementation of the join. /// /// This includes information about the implementation strategy, but also @@ -180,7 +178,7 @@ pub enum Plan { /// implementing the "distinct" operator. Union { /// The input collections - inputs: Vec, + inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, }, @@ -200,23 +198,23 @@ impl Plan { }; } Plan::Let { value, body, .. } => { - recur_find_use(value, used); - recur_find_use(body, used); + recur_find_use(&value.plan, used); + recur_find_use(&body.plan, used); } Plan::Mfp { input, .. } => { - recur_find_use(input, used); + recur_find_use(&input.plan, used); } Plan::Reduce { input, .. } => { - recur_find_use(input, used); + recur_find_use(&input.plan, used); } Plan::Join { inputs, .. } => { for input in inputs { - recur_find_use(input, used); + recur_find_use(&input.plan, used); } } Plan::Union { inputs, .. } => { for input in inputs { - recur_find_use(input, used); + recur_find_use(&input.plan, used); } } _ => {} @@ -227,3 +225,9 @@ impl Plan { ret } } + +impl Plan { + pub fn with_types(self, typ: RelationType) -> TypedPlan { + TypedPlan { typ, plan: self } + } +} diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index b10881d2c966..f7e028339bee 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -181,7 +181,12 @@ impl RelationType { } /// Adds a new key for the relation. Also sorts the key indices. + /// + /// will ignore empty key pub fn with_key(mut self, mut indices: Vec) -> Self { + if indices.is_empty() { + return self; + } indices.sort_unstable(); let key = Key::from(indices); if !self.keys.contains(&key) { @@ -191,6 +196,9 @@ impl RelationType { } /// Adds new keys for the relation. Also sorts the key indices. + /// + /// + /// will ignore empty keys pub fn with_keys(mut self, keys: Vec>) -> Self { for key in keys { self = self.with_key(key) diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index abe3aef17776..c287e984595d 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -285,7 +285,7 @@ impl TypedPlan { distinct_aggrs, }; let plan = Plan::Reduce { - input: Box::new(input.plan), + input: Box::new(input), key_val_plan, reduce_plan: ReducePlan::Accumulable(accum_plan), }; @@ -298,6 +298,9 @@ impl TypedPlan { #[cfg(test)] mod test { + use datatypes::prelude::ConcreteDataType; + use pretty_assertions::{assert_eq, assert_ne}; + use super::*; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; @@ -311,7 +314,10 @@ mod test { let mut ctx = create_test_ctx(); let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan); - + let typ = RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + true, + )]); let aggr_expr = AggregateExpr { func: AggregateFunc::SumUInt32, expr: ScalarExpr::Column(0), @@ -320,26 +326,34 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), plan: Plan::Mfp { - input: Box::new(Plan::Reduce { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(1) - .project(vec![]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(1) - .project(vec![0]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - }), + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(1) + .project(vec![]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(1) + .project(vec![0]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types(typ), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)]) .unwrap() @@ -370,28 +384,42 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), // col number ]), plan: Plan::Mfp { - input: Box::new(Plan::Reduce { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(1) - .map(vec![ScalarExpr::Column(0)]) - .unwrap() - .project(vec![1]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(1) - .project(vec![0]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - }), + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(1) + .map(vec![ScalarExpr::Column(0)]) + .unwrap() + .project(vec![1]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(1) + .project(vec![0]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // col number + ColumnType::new(CDT::uint32_datatype(), true), // col sum(number) + ]) + .with_key(vec![0]), + ), + ), mfp: MapFilterProject::new(2) .map(vec![ScalarExpr::Column(1), ScalarExpr::Column(0)]) .unwrap() @@ -420,29 +448,40 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), plan: Plan::Mfp { - input: Box::new(Plan::Reduce { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(1) - .project(vec![]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(1) - .map(vec![ScalarExpr::Column(0) - .call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)]) - .unwrap() - .project(vec![1]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - }), + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(1) + .project(vec![]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(1) + .map(vec![ScalarExpr::Column(0) + .call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)]) + .unwrap() + .project(vec![1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types(RelationType::new(vec![ColumnType::new( + CDT::uint32_datatype(), + true, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)]) .unwrap() diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index f6eda483b226..72d5da620d40 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -321,6 +321,7 @@ impl TypedExpr { #[cfg(test)] mod test { + use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use super::*; @@ -359,9 +360,15 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]), plan: Plan::Mfp { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)]) .unwrap() @@ -411,9 +418,15 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), plan: Plan::Mfp { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0).call_binary( ScalarExpr::Literal(Value::from(1u32), CDT::uint32_datatype()), @@ -439,9 +452,15 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)]), plan: Plan::Mfp { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Literal( Value::Int64(1), @@ -468,9 +487,15 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)]), plan: Plan::Mfp { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0) .call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)]) diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index d2c196fa72ae..083b4a7a1f05 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -180,6 +180,8 @@ impl TypedPlan { #[cfg(test)] mod test { + use datatypes::prelude::ConcreteDataType; + use super::*; use crate::expr::{GlobalId, ScalarExpr}; use crate::plan::{Plan, TypedPlan}; @@ -199,9 +201,15 @@ mod test { let expected = TypedPlan { typ: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]), plan: Plan::Mfp { - input: Box::new(Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - }), + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(0)), + } + .with_types(RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )])), + ), mfp: MapFilterProject::new(1) .map(vec![ScalarExpr::Column(0)]) .unwrap()