Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement operator chaining #804

Merged
merged 14 commits into from
Dec 10, 2024
Prev Previous commit
Next Next commit
chaining optimizer
  • Loading branch information
mwylde committed Dec 10, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 206ea4bc0deab99f63698fc50069a3239a390544
9 changes: 7 additions & 2 deletions crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
@@ -656,6 +656,11 @@ impl JobController {
.map(|(id, w)| (*id, w.connect.clone()))
.collect();
let program = self.model.program.clone();
let operator_indices: Arc<HashMap<_, _>> = Arc::new(program.graph
.node_indices()
.map(|idx| (program.graph[idx].node_id, idx.index() as u32))
.collect());


self.model.metric_update_task = Some(tokio::spawn(async move {
let mut metrics: HashMap<(u32, u32), HashMap<MetricName, u64>> = HashMap::new();
@@ -682,12 +687,12 @@ impl JobController {
.into_iter()
.filter_map(|f| Some((get_metric_name(&f.name?)?, f.metric)))
.flat_map(|(metric, values)| {
let program = program.clone();
let operator_indices = operator_indices.clone();
values.into_iter().filter_map(move |m| {
let subtask_idx =
u32::from_str(find_label(&m.label, "subtask_idx")?).ok()?;
let operator_idx =
program.operator_index(u32::from_str(find_label(&m.label, "node_id")?).ok()?)?;
*operator_indices.get(&u32::from_str(find_label(&m.label, "node_id")?).ok()?)?;
let value = m
.counter
.map(|c| c.value)
1 change: 1 addition & 0 deletions crates/arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
#![allow(clippy::comparison_chain)]

pub mod logical;
pub mod optimizers;

use arroyo_rpc::config::{config, DefaultSink};
use arroyo_rpc::grpc::api;
37 changes: 8 additions & 29 deletions crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
@@ -7,9 +7,8 @@ use arroyo_rpc::api_types::pipelines::{PipelineEdge, PipelineGraph, PipelineNode
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::grpc::api;
use arroyo_rpc::grpc::api::{
ArrowProgram, ArrowProgramConfig, ChainedOperator, ConnectorOp, EdgeType,
ArrowProgram, ArrowProgramConfig, ConnectorOp, EdgeType,
};
use datafusion_proto::generated::datafusion;
use petgraph::dot::Dot;
use petgraph::graph::DiGraph;
use petgraph::prelude::EdgeRef;
@@ -25,6 +24,7 @@ use std::hash::Hasher;
use std::str::FromStr;
use std::sync::Arc;
use strum::{Display, EnumString};
use crate::optimizers::Optimizer;

#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumString, Display)]
pub enum OperatorName {
@@ -136,27 +136,23 @@ impl TryFrom<LogicalProgram> for PipelineGraph {
pub struct LogicalEdge {
pub edge_type: LogicalEdgeType,
pub schema: ArroyoSchema,
pub projection: Option<Vec<usize>>,
}

impl LogicalEdge {
pub fn new(
edge_type: LogicalEdgeType,
schema: ArroyoSchema,
projection: Option<Vec<usize>>,
) -> Self {
LogicalEdge {
edge_type,
schema,
projection,
}
}

pub fn project_all(edge_type: LogicalEdgeType, schema: ArroyoSchema) -> Self {
LogicalEdge {
edge_type,
schema,
projection: None,
}
}
}
@@ -170,8 +166,8 @@ pub struct ChainedLogicalOperator {

#[derive(Clone, Debug)]
pub struct OperatorChain {
operators: Vec<ChainedLogicalOperator>,
edges: Vec<ArroyoSchema>,
pub(crate) operators: Vec<ChainedLogicalOperator>,
pub(crate) edges: Vec<ArroyoSchema>,
}

impl OperatorChain {
@@ -283,22 +279,19 @@ pub struct ProgramConfig {
pub struct LogicalProgram {
pub graph: LogicalGraph,
pub program_config: ProgramConfig,
pub operator_indices: HashMap<u32, u32>,
}

impl LogicalProgram {
pub fn new(graph: LogicalGraph, program_config: ProgramConfig) -> Self {
let operator_indices = graph
.node_indices()
.map(|idx| (graph[idx].node_id, idx.index() as u32))
.collect();

Self {
graph,
program_config,
operator_indices,
}
}

pub fn optimize(&mut self, optimizer: &dyn Optimizer) {
optimizer.optimize(&mut self.graph);
}

pub fn update_parallelism(&mut self, overrides: &HashMap<u32, usize>) {
for node in self.graph.node_weights_mut() {
@@ -341,10 +334,6 @@ impl LogicalProgram {
.collect()
}

pub fn operator_index(&self, id: u32) -> Option<u32> {
self.operator_indices.get(&id).cloned()
}

pub fn tasks_per_operator(&self) -> HashMap<String, usize> {
let mut tasks_per_operator = HashMap::new();
for node in self.graph.node_weights() {
@@ -455,11 +444,6 @@ impl TryFrom<ArrowProgram> for LogicalProgram {
LogicalEdge {
edge_type: edge.edge_type().into(),
schema: schema.clone().try_into()?,
projection: if edge.projection.is_empty() {
None
} else {
Some(edge.projection.iter().map(|p| *p as usize).collect())
},
},
);
}
@@ -644,11 +628,6 @@ impl From<LogicalProgram> for ArrowProgram {
target: target.index() as i32,
schema: Some(edge.schema.clone().into()),
edge_type: edge_type as i32,
projection: edge
.projection
.as_ref()
.map(|p| p.iter().map(|v| *v as u32).collect())
.unwrap_or_default(),
}
})
.collect();
123 changes: 123 additions & 0 deletions crates/arroyo-datastream/src/optimizers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::collections::HashSet;
use petgraph::prelude::*;
use crate::logical::{LogicalEdgeType, LogicalGraph};

pub trait Optimizer {
fn optimize(&self, plan: &mut LogicalGraph);
}

pub struct ChainingOptimizer {
}


impl Optimizer for ChainingOptimizer {
fn optimize(&self, plan: &mut LogicalGraph) {
let node_indices: Vec<NodeIndex> = plan.node_indices().collect();
let mut removed_nodes = HashSet::new();

for &node_idx in &node_indices {
if removed_nodes.contains(&node_idx) {
continue;
}

let mut current_node = match plan.node_weight(node_idx) {
Some(node) => node.clone(),
None => continue,
};

// sources and sinks can't be chained
if current_node.operator_chain.is_source() || current_node.operator_chain.is_sink() {
continue;
}

let mut chain = vec![node_idx];
let mut next_node_idx = node_idx;

loop {
let mut successors = plan
.edges_directed(next_node_idx, Outgoing)
.collect::<Vec<_>>();

if successors.len() != 1 {
break;
}

let edge = successors.remove(0);
let edge_type = edge.weight().edge_type;

if edge_type != LogicalEdgeType::Forward {
break;
}

let successor_idx = edge.target();

if removed_nodes.contains(&successor_idx) {
break;
}

let successor_node = match plan.node_weight(successor_idx) {
Some(node) => node.clone(),
None => break,
};

// skip if parallelism doesn't match or successor is a sink
if current_node.parallelism != successor_node.parallelism || successor_node.operator_chain.is_sink()
{
break;
}

// skip successors with multiple predecessors
if plan.edges_directed(successor_idx, Incoming).count() > 1 {
break;
}

chain.push(successor_idx);
next_node_idx = successor_idx;
}

if chain.len() > 1 {
for i in 1..chain.len() {
let node_to_merge_idx = chain[i];
let node_to_merge = plan.node_weight(node_to_merge_idx).unwrap().clone();

current_node.description = format!(
"{} -> {}",
current_node.description, node_to_merge.description
);

current_node
.operator_chain
.operators
.extend(node_to_merge.operator_chain.operators.clone());

if let Some(edge_idx) = plan.find_edge(chain[i - 1], node_to_merge_idx) {
let edge = plan.edge_weight(edge_idx).unwrap();
current_node
.operator_chain
.edges
.push(edge.schema.clone());
}

removed_nodes.insert(node_to_merge_idx);
}

plan[node_idx] = current_node;

let last_node_idx = *chain.last().unwrap();
let outgoing_edges: Vec<_> = plan
.edges_directed(last_node_idx, petgraph::Outgoing)
.map(|e| (e.id(), e.target(), e.weight().clone()))
.collect();

for (edge_id, target_idx, edge_weight) in outgoing_edges {
plan.remove_edge(edge_id);
plan.add_edge(node_idx, target_idx, edge_weight);
}
}
}

for node_idx in removed_nodes {
plan.remove_node(node_idx);
}
}
}
8 changes: 7 additions & 1 deletion crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -80,6 +80,8 @@ use std::{collections::HashMap, sync::Arc};
use syn::Item;
use tracing::{debug, info, warn};
use unicase::UniCase;
use arroyo_datastream::optimizers::ChainingOptimizer;
use arroyo_rpc::config::config;

const DEFAULT_IDLE_TIME: Option<Duration> = Some(Duration::from_secs(5 * 60));
pub const ASYNC_RESULT_FIELD: &str = "__async_result";
@@ -851,13 +853,17 @@ pub async fn parse_and_get_arrow_program(
}
let graph = plan_to_graph_visitor.into_graph();

let program = LogicalProgram::new(
let mut program = LogicalProgram::new(
graph,
ProgramConfig {
udf_dylibs: schema_provider.dylib_udfs.clone(),
python_udfs: schema_provider.python_udfs.clone(),
},
);

if arroyo_rpc::config::config().pipeline.enable_chaining {
program.optimize(&ChainingOptimizer{});
}

Ok(CompiledSql {
program,
1 change: 1 addition & 0 deletions crates/arroyo-rpc/default.toml
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ worker-heartbeat-timeout = "30s"
healthy-duration = "2m"
worker-startup-time = "10m"
task-startup-time = "2m"
enable-chaining = false

[pipeline.compaction]
enabled = false
1 change: 0 additions & 1 deletion crates/arroyo-rpc/proto/api.proto
Original file line number Diff line number Diff line change
@@ -298,5 +298,4 @@ message ArrowEdge {
int32 target = 2;
ArroyoSchema schema = 4;
EdgeType edge_type = 5;
repeated uint32 projection = 6;
}
3 changes: 3 additions & 0 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
@@ -426,6 +426,9 @@ pub struct PipelineConfig {
/// Default sink, for when none is specified
#[serde(default)]
pub default_sink: DefaultSink,

/// Whether to enable operator chaining
pub enable_chaining: bool,

pub compaction: CompactionConfig,
}