Skip to content

Commit

Permalink
fix: add is_rewrited check for None NameNode of sink
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuliquan committed Sep 25, 2024
1 parent a2bf77b commit 09afad5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
8 changes: 6 additions & 2 deletions crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,12 @@ fn rewrite_sinks(extensions: Vec<LogicalPlan>) -> Result<Vec<LogicalPlan>> {
let mut sink_inputs = build_sink_inputs(&extensions);
let mut new_extensions = vec![];
for extension in extensions {
let result = extension.rewrite(&mut SinkInputRewriter::new(&mut sink_inputs))?;
if result.transformed {
let mut is_rewrited = false;
let result = extension.rewrite(&mut SinkInputRewriter::new(
&mut sink_inputs,
&mut is_rewrited,
))?;
if !(is_rewrited) {
new_extensions.push(result.data);
}
}
Expand Down
15 changes: 11 additions & 4 deletions crates/arroyo-planner/src/rewriters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,15 @@ type SinkInputs = HashMap<NamedNode, Vec<LogicalPlan>>;

pub(crate) struct SinkInputRewriter<'a> {
sink_inputs: &'a mut SinkInputs,
is_rewrited: &'a mut bool,
}

impl<'a> SinkInputRewriter<'a> {
pub(crate) fn new(sink_inputs: &'a mut SinkInputs) -> Self {
Self { sink_inputs }
pub(crate) fn new(sink_inputs: &'a mut SinkInputs, is_rewrited: &'a mut bool) -> Self {
Self {
sink_inputs,
is_rewrited,
}
}
}

Expand All @@ -678,11 +682,14 @@ impl<'a> TreeNodeRewriter for SinkInputRewriter<'a> {
if let Some(sink_node) = extension.node.as_any().downcast_ref::<SinkExtension>() {
if let Some(named_node) = sink_node.node_name() {
if let Some(inputs) = self.sink_inputs.remove(&named_node) {
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
let extension = LogicalPlan::Extension(Extension {
// NOTE: new version from_template is replace by
// with_exprs_and_inputs(Vec<Expr>, Vec<LogicalPlan>) -> Result<Arc<dyn UserDefinedLogicalNode>>
node: sink_node.with_exprs_and_inputs(vec![], inputs.clone())?,
})));
});
return Ok(Transformed::new(extension, true, TreeNodeRecursion::Jump));
} else {
*self.is_rewrited = true;
}
}
}
Expand Down

0 comments on commit 09afad5

Please sign in to comment.