Skip to content

Commit

Permalink
Implement operator chaining (#804)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Dec 10, 2024
1 parent 130ba60 commit cde8210
Show file tree
Hide file tree
Showing 120 changed files with 3,961 additions and 2,352 deletions.
52 changes: 27 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 50 additions & 26 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use axum::{debug_handler, Json};
use axum_extra::extract::WithRejection;
use http::StatusCode;

use petgraph::visit::NodeRef;
use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;

use petgraph::visit::NodeRef;
use std::num::ParseIntError;
use std::str::FromStr;
use std::time::{Duration, SystemTime};

use crate::{compiler_service, connection_profiles, jobs, types};
Expand All @@ -23,7 +24,9 @@ use arroyo_rpc::api_types::{JobCollection, PaginationQueryParams, PipelineCollec
use arroyo_rpc::grpc::api::{ArrowProgram, ConnectorOp};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_datastream::logical::{LogicalNode, LogicalProgram, OperatorName};
use arroyo_datastream::logical::{
ChainedLogicalOperator, LogicalNode, LogicalProgram, OperatorChain, OperatorName,
};
use arroyo_df::{ArroyoSchemaProvider, CompiledSql, SqlConfig};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::formats::Format;
Expand Down Expand Up @@ -268,17 +271,19 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
let schema = edge.weight().schema.schema.clone();

let node = compiled_sql.program.graph.node_weight_mut(idx).unwrap();
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
"failed to decode configuration for connector node {:?}",
node
)
})?;

try_register_confluent_schema(&mut op, &schema).await?;

node.operator_config = op.encode_to_vec();
for (node, _) in node.operator_chain.iter_mut() {
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
"failed to decode configuration for connector node {:?}",
node
)
})?;

try_register_confluent_schema(&mut op, &schema).await?;

node.operator_config = op.encode_to_vec();
}
}
}

Expand Down Expand Up @@ -324,19 +329,31 @@ pub(crate) async fn create_pipeline_int<'a>(
let g = &mut compiled.program.graph;
for idx in g.node_indices() {
let should_replace = {
let node = g.node_weight(idx).unwrap();
node.operator_name == OperatorName::ConnectorSink
&& node.operator_config != default_sink().encode_to_vec()
let node = &g.node_weight(idx).unwrap().operator_chain;
node.is_sink()
&& node.iter().next().unwrap().0.operator_config
!= default_sink().encode_to_vec()
};
if should_replace {
if enable_sinks {
let new_idx = g.add_node(LogicalNode {
operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
node_id: g.node_weights().map(|n| n.node_id).max().unwrap() + 1,
description: "Preview sink".to_string(),
operator_name: OperatorName::ConnectorSink,
operator_config: default_sink().encode_to_vec(),
operator_chain: OperatorChain::new(ChainedLogicalOperator {
operator_id: format!(
"{}_1",
g.node_weight(idx)
.unwrap()
.operator_chain
.first()
.operator_id
),
operator_name: OperatorName::ConnectorSink,
operator_config: default_sink().encode_to_vec(),
}),
parallelism: 1,
});

let edges: Vec<_> = g
.edges_directed(idx, Direction::Incoming)
.map(|e| (e.source(), e.weight().clone()))
Expand All @@ -345,8 +362,14 @@ pub(crate) async fn create_pipeline_int<'a>(
g.add_edge(source, new_idx, weight);
}
} else {
g.node_weight_mut(idx).unwrap().operator_config =
default_sink().encode_to_vec();
g.node_weight_mut(idx)
.unwrap()
.operator_chain
.iter_mut()
.next()
.unwrap()
.0
.operator_config = default_sink().encode_to_vec();
}
}
}
Expand Down Expand Up @@ -452,8 +475,9 @@ impl TryInto<Pipeline> for DbPipeline {
.as_object()
.unwrap()
.into_iter()
.map(|(k, v)| (k.clone(), v.as_u64().unwrap() as usize))
.collect(),
.map(|(k, v)| Ok((u32::from_str(k)?, v.as_u64().unwrap() as usize)))
.collect::<Result<HashMap<_, _>, ParseIntError>>()
.map_err(|e| bad_request(format!("invalid node_id: {}", e)))?,
);

let stop = match self.stop {
Expand Down Expand Up @@ -682,10 +706,10 @@ pub async fn patch_pipeline(
.ok_or_else(|| not_found("Job"))?;

let program = ArrowProgram::decode(&res.program[..]).map_err(log_and_map)?;
let map: HashMap<String, u32> = program
let map: HashMap<_, _> = program
.nodes
.into_iter()
.map(|node| (node.node_id, parallelism as u32))
.map(|node| (node.node_id.to_string(), parallelism as u32))
.collect();

Some(serde_json::to_value(map).map_err(log_and_map)?)
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::blackhole::operator::BlackholeSinkFunc;
use anyhow::anyhow;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_operator::operator::ConstructedOperator;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
Expand Down Expand Up @@ -120,8 +120,8 @@ impl Connector for BlackholeConnector {
_: Self::ProfileT,
_: Self::TableT,
_: OperatorConfig,
) -> anyhow::Result<OperatorNode> {
Ok(OperatorNode::from_operator(Box::new(
) -> anyhow::Result<ConstructedOperator> {
Ok(ConstructedOperator::from_operator(Box::new(
BlackholeSinkFunc::new(),
)))
}
Expand Down
Loading

0 comments on commit cde8210

Please sign in to comment.