Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Nov 10, 2023
1 parent 9eecd5a commit f7f01f6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
6 changes: 3 additions & 3 deletions arroyo-sql/src/optimizations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::plan_graph::{
};

pub fn optimize(graph: &mut DiGraph<PlanNode, PlanEdge>) {
WindowTopNOptimization::default().optimize(graph);
ExpressionFusionOptimizer::default().optimize(graph);
TwoPhaseOptimization {}.optimize(graph);
// WindowTopNOptimization::default().optimize(graph);
// ExpressionFusionOptimizer::default().optimize(graph);
// TwoPhaseOptimization {}.optimize(graph);
}

pub trait Optimizer {
Expand Down
7 changes: 4 additions & 3 deletions arroyo-sql/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,9 +805,10 @@ impl<'a> SqlPipelineBuilder<'a> {
fn insert_join(&mut self, join: &datafusion_expr::logical_plan::Join) -> Result<SqlOperator> {
let left_input = self.insert_sql_plan(&join.left)?;
let right_input = self.insert_sql_plan(&join.right)?;
if left_input.is_updating() || right_input.is_updating() {
bail!("don't support joins with updating inputs");
}
// if left_input.is_updating() || right_input.is_updating() {
// bail!("don't support joins with updating inputs");
// }

match join.join_constraint {
JoinConstraint::On => {}
JoinConstraint::Using => bail!("don't support 'using' in joins"),
Expand Down
26 changes: 14 additions & 12 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,8 +1140,8 @@ impl PlanType {
#[derive(Debug)]
pub struct PlanGraph {
pub graph: DiGraph<PlanNode, PlanEdge>,
pub types: HashSet<StructDef>,
pub key_structs: HashSet<String>,
// pub types: HashSet<StructDef>,
// pub key_structs: HashSet<String>,
pub sources: HashMap<String, NodeIndex>,
pub named_tables: HashMap<String, NodeIndex>,
pub sql_config: SqlConfig,
Expand All @@ -1152,8 +1152,8 @@ impl PlanGraph {
pub fn new(sql_config: SqlConfig) -> Self {
Self {
graph: DiGraph::new(),
types: HashSet::new(),
key_structs: HashSet::new(),
// types: HashSet::new(),
// key_structs: HashSet::new(),
sources: HashMap::new(),
named_tables: HashMap::new(),
sql_config,
Expand Down Expand Up @@ -1485,7 +1485,8 @@ impl PlanGraph {
let left_index = self.add_sql_operator(*left);
let right_index = self.add_sql_operator(*right);

let key_struct = join_operator.left_key.output_struct();
let left_key_struct = join_operator.left_key.output_struct();
let right_key_struct = join_operator.right_key.output_struct();

let left_key_operator =
PlanOperator::RecordTransform(RecordTransform::KeyProjection(join_operator.left_key));
Expand All @@ -1495,14 +1496,14 @@ impl PlanGraph {
let left_key_index = self.insert_operator(
left_key_operator,
PlanType::Keyed {
key: key_struct.clone(),
key: left_key_struct.clone(),
value: left_type.clone(),
},
);
let right_key_index = self.insert_operator(
right_key_operator,
PlanType::Keyed {
key: key_struct.clone(),
key: right_key_struct.clone(),
value: right_type.clone(),
},
);
Expand All @@ -1522,7 +1523,7 @@ impl PlanGraph {
self.add_post_window_join(
left_key_index,
right_key_index,
key_struct,
left_key_struct,
left_type,
right_type,
join_type,
Expand All @@ -1531,7 +1532,7 @@ impl PlanGraph {
self.add_join_with_expiration(
left_key_index,
right_key_index,
key_struct,
left_key_struct,
left_type,
right_type,
join_type,
Expand Down Expand Up @@ -1595,11 +1596,12 @@ impl PlanGraph {

flatten_index
}

fn add_join_with_expiration(
&mut self,
left_index: NodeIndex,
right_index: NodeIndex,
key_struct: StructDef,
left_key_struct: StructDef,
left_struct: StructDef,
right_struct: StructDef,
join_type: JoinType,
Expand All @@ -1610,7 +1612,7 @@ impl PlanGraph {
join_type: join_type.clone(),
};
let join_node_output_type = PlanType::KeyedPair {
key: key_struct.clone(),
key: left_key_struct.clone(),
left_value: left_struct.clone(),
right_value: right_struct.clone(),
join_type: join_type.clone(),
Expand Down Expand Up @@ -1640,7 +1642,7 @@ impl PlanGraph {
JoinType::Inner => PlanType::Unkeyed(merge_type),
JoinType::Left | JoinType::Right | JoinType::Full => {
PlanType::Updating(Box::new(PlanType::Keyed {
key: key_struct,
key: left_key_struct,
value: merge_type,
}))
}
Expand Down

0 comments on commit f7f01f6

Please sign in to comment.