From f7f01f6a730ddd7ccd1fcb4d983bfd9e590c0036 Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Fri, 10 Nov 2023 12:35:27 -0800 Subject: [PATCH] wip --- arroyo-sql/src/optimizations.rs | 6 +++--- arroyo-sql/src/pipeline.rs | 7 ++++--- arroyo-sql/src/plan_graph.rs | 26 ++++++++++++++------------ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/arroyo-sql/src/optimizations.rs b/arroyo-sql/src/optimizations.rs index 930797c62..1fcf150f3 100644 --- a/arroyo-sql/src/optimizations.rs +++ b/arroyo-sql/src/optimizations.rs @@ -21,9 +21,9 @@ use crate::plan_graph::{ }; pub fn optimize(graph: &mut DiGraph) { - WindowTopNOptimization::default().optimize(graph); - ExpressionFusionOptimizer::default().optimize(graph); - TwoPhaseOptimization {}.optimize(graph); + // WindowTopNOptimization::default().optimize(graph); + // ExpressionFusionOptimizer::default().optimize(graph); + // TwoPhaseOptimization {}.optimize(graph); } pub trait Optimizer { diff --git a/arroyo-sql/src/pipeline.rs b/arroyo-sql/src/pipeline.rs index e95e3c324..10b45c234 100644 --- a/arroyo-sql/src/pipeline.rs +++ b/arroyo-sql/src/pipeline.rs @@ -805,9 +805,10 @@ impl<'a> SqlPipelineBuilder<'a> { fn insert_join(&mut self, join: &datafusion_expr::logical_plan::Join) -> Result { let left_input = self.insert_sql_plan(&join.left)?; let right_input = self.insert_sql_plan(&join.right)?; - if left_input.is_updating() || right_input.is_updating() { - bail!("don't support joins with updating inputs"); - } + // if left_input.is_updating() || right_input.is_updating() { + // bail!("don't support joins with updating inputs"); + // } + match join.join_constraint { JoinConstraint::On => {} JoinConstraint::Using => bail!("don't support 'using' in joins"), diff --git a/arroyo-sql/src/plan_graph.rs b/arroyo-sql/src/plan_graph.rs index 3c85b0d42..b8073cc7d 100644 --- a/arroyo-sql/src/plan_graph.rs +++ b/arroyo-sql/src/plan_graph.rs @@ -1140,8 +1140,8 @@ impl PlanType { #[derive(Debug)] pub struct PlanGraph { pub graph: DiGraph, - pub types: HashSet, - pub key_structs: HashSet, + // pub types: HashSet, + // pub key_structs: HashSet, pub sources: HashMap, pub named_tables: HashMap, pub sql_config: SqlConfig, @@ -1152,8 +1152,8 @@ impl PlanGraph { pub fn new(sql_config: SqlConfig) -> Self { Self { graph: DiGraph::new(), - types: HashSet::new(), - key_structs: HashSet::new(), + // types: HashSet::new(), + // key_structs: HashSet::new(), sources: HashMap::new(), named_tables: HashMap::new(), sql_config, @@ -1485,7 +1485,8 @@ impl PlanGraph { let left_index = self.add_sql_operator(*left); let right_index = self.add_sql_operator(*right); - let key_struct = join_operator.left_key.output_struct(); + let left_key_struct = join_operator.left_key.output_struct(); + let right_key_struct = join_operator.right_key.output_struct(); let left_key_operator = PlanOperator::RecordTransform(RecordTransform::KeyProjection(join_operator.left_key)); @@ -1495,14 +1496,14 @@ impl PlanGraph { let left_key_index = self.insert_operator( left_key_operator, PlanType::Keyed { - key: key_struct.clone(), + key: left_key_struct.clone(), value: left_type.clone(), }, ); let right_key_index = self.insert_operator( right_key_operator, PlanType::Keyed { - key: key_struct.clone(), + key: right_key_struct.clone(), value: right_type.clone(), }, ); @@ -1522,7 +1523,7 @@ impl PlanGraph { self.add_post_window_join( left_key_index, right_key_index, - key_struct, + left_key_struct, left_type, right_type, join_type, @@ -1531,7 +1532,7 @@ impl PlanGraph { self.add_join_with_expiration( left_key_index, right_key_index, - key_struct, + left_key_struct, left_type, right_type, join_type, @@ -1595,11 +1596,12 @@ impl PlanGraph { flatten_index } + fn add_join_with_expiration( &mut self, left_index: NodeIndex, right_index: NodeIndex, - key_struct: StructDef, + left_key_struct: StructDef, left_struct: StructDef, right_struct: StructDef, join_type: JoinType, @@ -1610,7 +1612,7 @@ impl PlanGraph { join_type: join_type.clone(), }; let join_node_output_type = PlanType::KeyedPair { - key: key_struct.clone(), + key: left_key_struct.clone(), left_value: left_struct.clone(), right_value: right_struct.clone(), join_type: join_type.clone(), @@ -1640,7 +1642,7 @@ impl PlanGraph { JoinType::Inner => PlanType::Unkeyed(merge_type), JoinType::Left | JoinType::Right | JoinType::Full => { PlanType::Updating(Box::new(PlanType::Keyed { - key: key_struct, + key: left_key_struct, value: merge_type, })) }