diff --git a/crates/arroyo-planner/src/extension/mod.rs b/crates/arroyo-planner/src/extension/mod.rs index 161a9de21..8213d0bae 100644 --- a/crates/arroyo-planner/src/extension/mod.rs +++ b/crates/arroyo-planner/src/extension/mod.rs @@ -15,15 +15,9 @@ use datafusion::logical_expr::{ }; use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; -use datafusion_proto::protobuf::ProjectionNode; use prost::Message; use watermark_node::WatermarkNode; -use crate::builder::{NamedNode, Planner}; -use crate::schemas::{add_timestamp_field, has_timestamp_field}; -use crate::{fields_with_qualifiers, schema_from_df_fields, DFField, ASYNC_RESULT_FIELD}; -use join::JoinExtension; - use self::debezium::{DebeziumUnrollingExtension, ToDebeziumExtension}; use self::updating_aggregate::UpdatingAggregateExtension; use self::{ @@ -31,6 +25,10 @@ use self::{ remote_table::RemoteTableExtension, sink::SinkExtension, table_source::TableSourceExtension, window_fn::WindowFunctionExtension, }; +use crate::builder::{NamedNode, Planner}; +use crate::schemas::{add_timestamp_field, has_timestamp_field}; +use crate::{fields_with_qualifiers, schema_from_df_fields, DFField, ASYNC_RESULT_FIELD}; +use join::JoinExtension; pub(crate) mod aggregate; pub(crate) mod debezium; @@ -207,12 +205,6 @@ impl ArroyoExtension for AsyncUDFExtension { }) .collect::>>()?; - ProjectionNode { - input: None, - expr: vec![], - optional_alias: None, - }; - let config = AsyncUdfOperator { name: self.name.clone(), udf: Some(self.udf.clone().into()), diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index fd3b47cf6..a938292f3 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -536,9 +536,25 @@ impl<'a> TreeNodeRewriter for AsyncUdfRewriter<'a> { let udf = self.provider.dylib_udfs.get(&name).unwrap().clone(); + let input = if matches!(*projection.input, LogicalPlan::Projection(..)) { + // if our input is a projection, we need to plan it separately -- this happens + // for subqueries + + Arc::new(LogicalPlan::Extension(Extension { + node: Arc::new(RemoteTableExtension { + input: (*projection.input).clone(), + name: TableReference::bare("subquery_projection"), + schema: projection.input.schema().clone(), + materialize: false, + }), + })) + } else { + projection.input + }; + Ok(Transformed::yes(LogicalPlan::Extension(Extension { node: Arc::new(AsyncUDFExtension { - input: projection.input, + input, name, udf, arg_exprs: args,