diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index 7ff480733..ffd225e60 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -622,8 +622,12 @@ fn rewrite_sinks(extensions: Vec) -> Result> { let mut sink_inputs = build_sink_inputs(&extensions); let mut new_extensions = vec![]; for extension in extensions { - let result = extension.rewrite(&mut SinkInputRewriter::new(&mut sink_inputs))?; - if result.transformed { + let mut is_rewrited = false; + let result = extension.rewrite(&mut SinkInputRewriter::new( + &mut sink_inputs, + &mut is_rewrited, + ))?; + if !(is_rewrited) { new_extensions.push(result.data); } } diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index ae50d7ebb..19f5dd808 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -662,11 +662,15 @@ type SinkInputs = HashMap>; pub(crate) struct SinkInputRewriter<'a> { sink_inputs: &'a mut SinkInputs, + is_rewrited: &'a mut bool, } impl<'a> SinkInputRewriter<'a> { - pub(crate) fn new(sink_inputs: &'a mut SinkInputs) -> Self { - Self { sink_inputs } + pub(crate) fn new(sink_inputs: &'a mut SinkInputs, is_rewrited: &'a mut bool) -> Self { + Self { + sink_inputs, + is_rewrited, + } } } @@ -678,11 +682,14 @@ impl<'a> TreeNodeRewriter for SinkInputRewriter<'a> { if let Some(sink_node) = extension.node.as_any().downcast_ref::() { if let Some(named_node) = sink_node.node_name() { if let Some(inputs) = self.sink_inputs.remove(&named_node) { - return Ok(Transformed::yes(LogicalPlan::Extension(Extension { + let extension = LogicalPlan::Extension(Extension { // NOTE: new version from_template is replace by // with_exprs_and_inputs(Vec, Vec) -> Result> node: sink_node.with_exprs_and_inputs(vec![], inputs.clone())?, - }))); + }); + return Ok(Transformed::new(extension, true, TreeNodeRecursion::Jump)); + } else { + *self.is_rewrited = true; } } }