diff --git a/Cargo.toml b/Cargo.toml index 0ae69ce..bfee1c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,10 @@ anyhow = "1.0.86" serde = {version ="1.0.203", features = ["derive"]} serde_json = {version = "1.0.117"} bimap = "0.6.3" -uuid = {version="1.8.0", features = ["v4","serde"]} \ No newline at end of file +uuid = {version="1.8.0", features = ["v4","serde"]} +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } +kube = { version = "0.91.0", features = ["runtime", "derive"] } +k8s-openapi = { version = "0.22.0", features = ["latest"] } + +[dev-dependencies] +arrayvec = {version="0.7.4", features= ["serde"]} \ No newline at end of file diff --git a/docs/graph.json b/docs/graph.json deleted file mode 100644 index 5253166..0000000 --- a/docs/graph.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "name":"example", - "dag": [ - { - "id":"ComputeUnit1", - "name":"ComputeUnit2", - "type": "ComputeUnit", - "dependency": [], - "shell": "", - "image":"" - }, - { - "id":"channel2", - "name":"Channel2", - "type": "Channel", - "dependency": ["ComputeUnit1"], - "image":"" - }, - { - "id":"ComputeUnit1", - "name":"ComputeUnit1", - "type": "ComputeUnit", - "dependency": ["channel2"], - "shell": "", - "image":"" - } - ] -} \ No newline at end of file diff --git "a/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" "b/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" new file mode 100644 index 0000000..169b737 --- /dev/null +++ "b/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" @@ -0,0 +1,87 @@ +# 详细设计 + + +## spec配置定义 +```json +{ + "name": "example", + "version": "v1", + "dag": [ + { + "id": "5c42b900-a87f-45e3-ba06-c40d94ad5ba2", + "name": "ComputeUnit1", + "dependency": [ + + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + }, + "channel": { + "spec": { + "cmd": [ + "bufsize", + "1024" + ], + "image": "jiaozifs:" + } + } + }, + { + "id": "353fc5bf-697e-4221-8487-6ab91915e2a1", + "name": "ComputeUnit2", + "node_type": "ComputeUnit", + "dependency": [ + "5c42b900-a87f-45e3-ba06-c40d94ad5ba2" + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + } + } + ] +} +``` + +## Graph + +1. 节点= 节点+数据管道(可选) + +## job 控制器 + +追踪整个job的运行状态,处理重试,日志,状态收集的问题的问题 + +## Pipe流控制器 + +负责pipeline的部署,及状态监控 + +## 节点控制 + +控制节点和数据管道的部署,操纵及状态监控 +* deploy +* status +* monitor + +# 节点接口设计 + +## 节点程序接口设计 + +* init +* start +* restart +* pause +* stop +* subscribe + +## 数据通道程序接口设计 + +* init +* start +* restart +* pause +* stop +* subscribe diff --git a/src/core/cnode.rs b/src/core/cnode.rs new file mode 100644 index 0000000..ce164a4 --- /dev/null +++ b/src/core/cnode.rs @@ -0,0 +1,66 @@ +use super::{MachineSpec, GID}; +use serde::{Deserialize, Serialize}; + +// Channel use to definite data transfer channel +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Channel { + pub spec: MachineSpec, +} + +// ComputeUnit used to define logic for data generation, transformer, ouput. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ComputeUnit +where + ID: GID, +{ + #[serde(bound(deserialize = ""))] + pub id: ID, + + pub name: String, + + pub spec: MachineSpec, + + pub channel: Option, + + #[serde(bound(deserialize = ""))] + pub(crate) dependency: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + #[test] + fn test_from_str() { + xxx(); + } + + fn xxx() -> ComputeUnit { + let json_str = r#" +{ + "id": "5c42b900-a87f-45e3-ba06-c40d94ad5ba2", + "name": "ComputeUnit1", + "dependency": [ + + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + }, + "channel": { + "spec": { + "cmd": [ + "ls" + ], + "image": "" + } + } +} + "# + .to_owned(); + serde_json::from_str::>(&json_str).unwrap() + } +} diff --git a/src/core/id.rs b/src/core/id.rs new file mode 100644 index 0000000..be6fd9f --- /dev/null +++ b/src/core/id.rs @@ -0,0 +1,5 @@ +use serde::{Deserialize, Serialize}; +use std::hash::Hash; +use std::marker::Send; + +pub trait GID = Eq + Clone + Copy + Hash + Send + Sync + Serialize + for<'de> Deserialize<'de>; diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..66025b3 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,7 @@ +mod cnode; +mod id; +mod spec; + +pub use cnode::*; +pub use id::*; +pub use spec::*; diff --git a/src/core/spec.rs b/src/core/spec.rs new file mode 100644 index 0000000..dd2e284 --- /dev/null +++ b/src/core/spec.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +// MachineSpec container information for deploy and running in cloud +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MachineSpec { + pub image: String, + pub cmd: Vec, +} diff --git a/src/dag/base.rs b/src/dag/base.rs deleted file mode 100644 index 2d938ed..0000000 --- a/src/dag/base.rs +++ /dev/null @@ -1,12 +0,0 @@ -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Serialize, Deserialize, Debug)] -pub enum UnitType { - ChannelUnit, - ComputeUnit, -} -pub trait BaseUnit { - fn id(&self) -> Uuid; - fn name(&self) -> String; -} diff --git a/src/dag/channel.rs b/src/dag/channel.rs deleted file mode 100644 index 006a9ff..0000000 --- a/src/dag/channel.rs +++ /dev/null @@ -1,22 +0,0 @@ -use super::base::{BaseUnit, UnitType}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Serialize, Deserialize, Debug)] -pub struct ChannelUnit { - pub id: uuid::Uuid, - pub name: String, - - pub(crate) dependency: Vec, - pub(crate) node_type: UnitType, -} - -impl BaseUnit for ChannelUnit { - fn id(&self) -> Uuid { - self.id - } - - fn name(&self) -> String { - self.name.clone() - } -} diff --git a/src/dag/cnode.rs b/src/dag/cnode.rs deleted file mode 100644 index 9b023fd..0000000 --- a/src/dag/cnode.rs +++ /dev/null @@ -1,24 +0,0 @@ -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use super::base::{BaseUnit, UnitType}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct ComputeUnit { - pub id: Uuid, - pub name: String, - pub image: String, - pub cmd: Vec, - - pub(crate) dependency: Vec, - pub(crate) node_type: UnitType, -} - -impl BaseUnit for ComputeUnit { - fn id(&self) -> Uuid { - self.id - } - fn name(&self) -> String { - self.name.clone() - } -} diff --git a/src/dag/dag.rs b/src/dag/dag.rs index 3d837fb..94fca7e 100644 --- a/src/dag/dag.rs +++ b/src/dag/dag.rs @@ -1,132 +1,121 @@ -use anyhow::{Ok, Result}; -use std::collections::HashMap; -use uuid::Uuid; - -use crate::utils::IntoAnyhowResult; - -use super::base::{BaseUnit, UnitType}; -use super::channel::ChannelUnit; -use super::cnode::ComputeUnit; use super::graph::Graph; +use crate::core::{ComputeUnit, GID}; +use crate::utils::IntoAnyhowResult; +use anyhow::{anyhow, Ok, Result}; +use std::collections::HashMap; -pub enum Unit { - CNode(Box), - Channel(Box), -} - -impl BaseUnit for Unit { - fn id(&self) -> Uuid { - match self { - Unit::CNode(node) => node.id(), - Unit::Channel(channel) => channel.id(), - } - } - - fn name(&self) -> String { - match self { - Unit::CNode(node) => node.name(), - Unit::Channel(channel) => channel.name(), - } - } -} - -pub struct Dag { +pub struct Dag +where + ID: GID, +{ name: String, - nodes: HashMap, - topoed_graph: Vec, + nodes: HashMap>, /// Store dependency relations. - rely_graph: Graph, + rely_graph: Graph, } -impl Dag { +impl Dag { pub fn new() -> Self { Dag { name: String::new(), nodes: HashMap::new(), rely_graph: Graph::new(), - topoed_graph: vec![], } } - pub fn from_json(json: &str) -> Result { - // let reader = Cursor::new(json); - // let mut deserializer = Deserializer::from_reader(reader); + //add_node add a compute unit to graph, if this unit alread exit, ignore it + pub fn add_node(&mut self, node: ComputeUnit) -> &mut Self { + if self.nodes.get(&node.id).is_none() { + self.rely_graph.add_node(node.id.clone()); + self.nodes.insert(node.id.clone(), node); + } + self + } + + //add_node add a compute unit to graph, if this unit alread exit, ignore it + pub fn set_edge(&mut self, from: &ID, to: &ID) -> Result<&mut Self> { + if self.nodes.get(from).is_none() { + return Err(anyhow!("from node not exit")); + } + + if self.nodes.get(to).is_none() { + return Err(anyhow!("from node not exit")); + } + + self.rely_graph.add_edge(from, to); + Ok(self) + } + + // from_json build graph from json string + pub fn from_json<'a>(json: &'a str) -> Result { let value: serde_json::Value = serde_json::from_str(json)?; let dag_name: &str = value .get("name") .anyhow("name must exit") .map(|v| v.as_str().anyhow("name must be string"))??; - let dag = value.get("dag").anyhow("dag m ust exit")?; - let nodes = dag - .as_array() - .anyhow("dag must be a arrary")? - .iter() - .map(|node_str| { - let type_value = node_str.get("node_type").anyhow("node_type must exit")?; - let node_type: UnitType = serde_json::from_value(type_value.clone())?; - Ok(match node_type { - UnitType::ComputeUnit => { - Unit::CNode(Box::new(serde_json::from_value(node_str.clone())?)) - } - UnitType::ChannelUnit => { - Unit::Channel(Box::new(serde_json::from_value(node_str.clone())?)) - } - }) - }) - .collect::, _>>()?; - - let node_ids: Vec = nodes.iter().map(|node| node.id()).collect(); + + let dag = value.get("dag").anyhow("dag must exit")?; + + let mut nodes = vec![]; + for node in dag.as_array().anyhow("dag must be a arrary")?.iter() { + nodes.push(serde_json::from_value::>(node.clone())?); + } + + let node_ids: Vec = nodes.iter().map(|node| node.id).collect(); let mut rely_graph = Graph::with_nodes(node_ids.as_slice()); for node in nodes.iter() { - match node { - Unit::CNode(compute_unit) => { - compute_unit.dependency.iter().for_each(|v| { - rely_graph.add_edge(*v, compute_unit.id()); - }); - } - Unit::Channel(channel_unit) => { - channel_unit.dependency.iter().for_each(|v| { - rely_graph.add_edge(*v, channel_unit.id()); - }); - } - } + node.dependency.iter().for_each(|v| { + rely_graph.add_edge(v, &node.id); + }); } - let nodes_map: HashMap = - nodes.into_iter().map(|node| (node.id(), node)).collect(); + let nodes_map: HashMap> = + nodes.into_iter().map(|node| (node.id, node)).collect(); Ok(Dag { name: dag_name.to_string(), nodes: nodes_map, - topoed_graph: rely_graph.topo_sort(), rely_graph: rely_graph, }) } - pub fn iter(&self) -> GraphIter { + // is_validated do some check on the graph to ensure that all correct + pub fn is_validated(&self) -> bool { + todo!(); + //check id + + //must have channel except the end node + } + + // iter immutable iterate over graph + pub fn iter(&self) -> GraphIter { GraphIter { index: 0, data: &self.nodes, - toped_graph: &self.topoed_graph, + toped_graph: self.rely_graph.topo_sort(), } } - pub fn iter_mut(&mut self) -> GraphIterMut { + // iter_mut mutable iterate over graph + pub fn iter_mut(&mut self) -> GraphIterMut { GraphIterMut { index: 0, data: &mut self.nodes, - toped_graph: &self.topoed_graph, + toped_graph: self.rely_graph.topo_sort(), } } } -pub struct GraphIter<'a> { +pub struct GraphIter<'a, ID> +where + ID: GID, +{ index: usize, - data: &'a HashMap, - toped_graph: &'a Vec, + data: &'a HashMap>, + toped_graph: Vec, } -impl<'a> Iterator for GraphIter<'a> { - type Item = &'a Unit; +impl<'a, ID: GID> Iterator for GraphIter<'a, ID> { + type Item = &'a ComputeUnit; fn next(&mut self) -> Option { if self.index < self.data.len() { @@ -139,14 +128,17 @@ impl<'a> Iterator for GraphIter<'a> { } } -pub struct GraphIterMut<'a> { +pub struct GraphIterMut<'a, ID> +where + ID: GID, +{ index: usize, - data: &'a mut HashMap, - toped_graph: &'a Vec, + data: &'a mut HashMap>, + toped_graph: Vec, } -impl<'a> Iterator for GraphIterMut<'a> { - type Item = &'a mut Unit; +impl<'a, ID: GID> Iterator for GraphIterMut<'a, ID> { + type Item = &'a mut ComputeUnit; fn next(&mut self) -> Option { if self.index < self.data.len() { @@ -158,7 +150,7 @@ impl<'a> Iterator for GraphIterMut<'a> { .expect("node added in previous step"); unsafe { // SAFETY: We ensure no two mutable references to the same element are possible. - let node_ptr = node as *mut Unit; + let node_ptr = node as *mut ComputeUnit; Some(&mut *node_ptr) } } else { @@ -170,58 +162,63 @@ impl<'a> Iterator for GraphIterMut<'a> { #[cfg(test)] mod tests { use super::*; + use uuid::Uuid; #[test] - fn test_from_str() { + fn deserialize_from_str() { let json_str = r#" - { - "name":"example", - "dag": [ - { - "id":"5c42b900-a87f-45e3-ba06-c40d94ad5ba2", - "name":"ComputeUnit1", - "node_type": "ComputeUnit", - "dependency": [], - "cmd": ["ls"], - "image":"" - }, - { - "id":"1193c01b-9847-4660-9ea1-34b66f7847f4", - "name":"Channel2", - "node_type": "ChannelUnit", - "dependency": ["5c42b900-a87f-45e3-ba06-c40d94ad5ba2"], - "image":"" - }, - { - "id":"353fc5bf-697e-4221-8487-6ab91915e2a1", - "name":"ComputeUnit3", - "node_type": "ComputeUnit", - "dependency": ["1193c01b-9847-4660-9ea1-34b66f7847f4"], - "cmd": ["ls"], - "image":"" - } - ] +{ + "name": "example", + "version": "v1", + "dag": [ + { + "id": "5c42b900-a87f-45e3-ba06-c40d94ad5ba2", + "name": "ComputeUnit1", + "dependency": [ + + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + }, + "channel": { + "spec": { + "cmd": [ + "bufsize", + "1024" + ], + "image": "jiaozifs:" } - "#; - let mut result = Dag::from_json(json_str).unwrap(); - let node_names: Vec<_> = result.iter().map(|node| node.name()).collect(); - assert_eq!( - ["ComputeUnit1", "Channel2", "ComputeUnit3"], - node_names.as_slice() - ); + } + }, + { + "id": "353fc5bf-697e-4221-8487-6ab91915e2a1", + "name": "ComputeUnit2", + "node_type": "ComputeUnit", + "dependency": [ + "5c42b900-a87f-45e3-ba06-c40d94ad5ba2" + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + } + } + ] +} + "#; + let mut result = Dag::::from_json(json_str).unwrap(); + let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); + assert_eq!(["ComputeUnit1", "ComputeUnit2"], node_names.as_slice()); for (index, node) in result.iter_mut().enumerate() { - match node { - Unit::CNode(v) => { - v.name = "node".to_string() + &index.to_string(); - } - Unit::Channel(v) => { - v.name = "channel".to_string() + &index.to_string(); - } - } + node.name = "node".to_string() + &index.to_string(); } - let node_names: Vec<_> = result.iter().map(|node| node.name()).collect(); - assert_eq!(["node0", "channel1", "node2"], node_names.as_slice()); + let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); + assert_eq!(["node0", "node1"], node_names.as_slice()); } } diff --git a/src/dag/graph.rs b/src/dag/graph.rs index cae3d6b..e7776e4 100644 --- a/src/dag/graph.rs +++ b/src/dag/graph.rs @@ -23,6 +23,7 @@ node1->node2->node3->node4->node5->node6->node7->node8->node9 */ +use crate::core::GID; use std::{ collections::{HashMap, HashSet, VecDeque}, hash::Hash, @@ -33,7 +34,7 @@ use std::{ /// Graph Struct pub(crate) struct Graph where - ID: Eq + Clone + Copy + Hash, + ID: GID, { nodes: HashSet, nodes_count: usize, @@ -45,7 +46,7 @@ where impl Graph where - ID: Eq + Clone + Copy + Hash, + ID: GID, { /// Allocate an empty graph pub(crate) fn new() -> Self { @@ -81,26 +82,26 @@ where /// Add an edge into the graph. /// Above operation adds a arrow from node 0 to node 1, /// which means node 0 shall be executed before node 1. - pub(crate) fn add_edge(&mut self, from: ID, to: ID) { + pub(crate) fn add_edge(&mut self, from: &ID, to: &ID) { match self.adj.get_mut(&from) { Some(v) => { if !v.contains(&from) { - v.push(to); + v.push(to.clone()); } } None => { - self.adj.insert(from, vec![to]); + self.adj.insert(from.clone(), vec![to.clone()]); } } match self.in_degree.get_mut(&to) { Some(v) => { if !v.contains(&to) { - v.push(from); + v.push(from.clone()); } } None => { - self.in_degree.insert(to, vec![from]); + self.in_degree.insert(to.clone(), vec![from.clone()]); } } } @@ -202,7 +203,13 @@ where #[cfg(test)] mod tests { + use std::str::FromStr; + use super::*; + use arrayvec::ArrayString; + use serde::{Deserialize, Serialize}; + + type StringID = ArrayString<5>; #[test] fn test_simple() { @@ -210,13 +217,19 @@ mod tests { // "a" => "c" // "b" => "d" // "b" => "d" - let mut graph = Graph::with_nodes(["a", "b", "c", "d"].as_slice()); - graph.add_edge("a", "b"); - graph.add_edge("a", "c"); - graph.add_edge("b", "d"); - graph.add_edge("c", "d"); - - let sequence = graph.topo_sort(); + let a = StringID::from_str("a").unwrap(); + let b = StringID::from_str("b").unwrap(); + let c = StringID::from_str("c").unwrap(); + let d = StringID::from_str("d").unwrap(); + + let mut graph = Graph::with_nodes([a, b, c, d].as_slice()); + graph.add_edge(&a, &b); + graph.add_edge(&a, &c); + graph.add_edge(&b, &d); + graph.add_edge(&c, &d); + + let topo_ids = graph.topo_sort(); + let sequence: Vec<&str> = topo_ids.iter().map(|id| id.as_str()).collect(); assert_eq!(["a", "b", "c", "d"], sequence.as_slice()); } @@ -226,18 +239,26 @@ mod tests { // "a" => "c" // "b" => "d" // "b" => "d" - let mut graph = Graph::with_nodes(["a", "b", "c", "d", "e", "f", "g"].as_slice()); - graph.add_edge("a", "b"); - graph.add_edge("a", "c"); - graph.add_edge("b", "d"); - graph.add_edge("c", "d"); - graph.add_edge("a", "e"); - graph.add_edge("e", "f"); - graph.add_edge("f", "g"); - graph.add_edge("d", "g"); - - let sequence = graph.topo_sort(); - println!("{:?}", sequence); + let a = StringID::from_str("a").unwrap(); + let b = StringID::from_str("b").unwrap(); + let c = StringID::from_str("c").unwrap(); + let d = StringID::from_str("d").unwrap(); + let e = StringID::from_str("e").unwrap(); + let f = StringID::from_str("f").unwrap(); + let g = StringID::from_str("g").unwrap(); + + let mut graph = Graph::with_nodes([a, b, c, d, e, f, g].as_slice()); + graph.add_edge(&a, &b); + graph.add_edge(&a, &c); + graph.add_edge(&b, &d); + graph.add_edge(&c, &d); + graph.add_edge(&a, &e); + graph.add_edge(&e, &f); + graph.add_edge(&f, &g); + graph.add_edge(&d, &g); + + let topo_ids = graph.topo_sort(); + let sequence: Vec<&str> = topo_ids.iter().map(|id| id.as_str()).collect(); assert_eq!(["a", "b", "c", "e", "d", "f", "g"], sequence.as_slice()); } @@ -247,33 +268,42 @@ mod tests { // "a" => "c" // "b" => "d" // "b" => "d" - let mut graph = Graph::with_nodes(["a", "b", "c", "d", "e", "f", "g"].as_slice()); - graph.add_edge("a", "b"); - graph.add_edge("a", "c"); - graph.add_edge("b", "d"); - graph.add_edge("c", "d"); - graph.add_edge("a", "e"); - graph.add_edge("e", "f"); - graph.add_edge("f", "g"); - graph.add_edge("d", "g"); - - let sequence = graph.get_node_successors(&"c"); + let a = StringID::from_str("a").unwrap(); + let b = StringID::from_str("b").unwrap(); + let c = StringID::from_str("c").unwrap(); + let d = StringID::from_str("d").unwrap(); + let e = StringID::from_str("e").unwrap(); + let f = StringID::from_str("f").unwrap(); + let g = StringID::from_str("g").unwrap(); + + let mut graph = Graph::with_nodes([a, b, c, d, e, f, g].as_slice()); + graph.add_edge(&a, &b); + graph.add_edge(&a, &c); + graph.add_edge(&b, &d); + graph.add_edge(&c, &d); + graph.add_edge(&a, &e); + graph.add_edge(&e, &f); + graph.add_edge(&f, &g); + graph.add_edge(&d, &g); + + let sequence = graph.get_node_successors(&c); + let sequence: Vec<&str> = sequence.iter().map(|id| id.as_str()).collect(); assert_eq!(["c", "d", "g"], sequence.as_slice()); - assert_eq!(0, graph.get_in_degree(&"a")); - assert_eq!(1, graph.get_in_degree(&"b")); - assert_eq!(1, graph.get_in_degree(&"c")); - assert_eq!(2, graph.get_in_degree(&"d")); - assert_eq!(1, graph.get_in_degree(&"e")); - assert_eq!(1, graph.get_in_degree(&"f")); - assert_eq!(2, graph.get_in_degree(&"g")); - - assert_eq!(3, graph.get_out_degree(&"a")); - assert_eq!(1, graph.get_out_degree(&"b")); - assert_eq!(1, graph.get_out_degree(&"c")); - assert_eq!(1, graph.get_out_degree(&"d")); - assert_eq!(1, graph.get_out_degree(&"e")); - assert_eq!(1, graph.get_out_degree(&"f")); - assert_eq!(0, graph.get_out_degree(&"g")); + assert_eq!(0, graph.get_in_degree(&a)); + assert_eq!(1, graph.get_in_degree(&b)); + assert_eq!(1, graph.get_in_degree(&c)); + assert_eq!(2, graph.get_in_degree(&d)); + assert_eq!(1, graph.get_in_degree(&e)); + assert_eq!(1, graph.get_in_degree(&f)); + assert_eq!(2, graph.get_in_degree(&g)); + + assert_eq!(3, graph.get_out_degree(&a)); + assert_eq!(1, graph.get_out_degree(&b)); + assert_eq!(1, graph.get_out_degree(&c)); + assert_eq!(1, graph.get_out_degree(&d)); + assert_eq!(1, graph.get_out_degree(&e)); + assert_eq!(1, graph.get_out_degree(&f)); + assert_eq!(0, graph.get_out_degree(&g)); } } diff --git a/src/dag/mod.rs b/src/dag/mod.rs index 1c3ddac..120e4f5 100644 --- a/src/dag/mod.rs +++ b/src/dag/mod.rs @@ -1,10 +1,4 @@ -mod base; -mod channel; -mod cnode; mod dag; mod graph; -pub use base::*; -pub use channel::*; -pub use cnode::*; pub use dag::*; diff --git a/src/driver/driver.rs b/src/driver/driver.rs index 3cbd32e..1e07391 100644 --- a/src/driver/driver.rs +++ b/src/driver/driver.rs @@ -1,25 +1,35 @@ +use crate::core::GID; use crate::Dag; use anyhow::Result; - +use std::{ + future::Future, + sync::{Arc, Mutex}, +}; pub trait NHandler { //pause graph running for now - fn pause(&mut self) -> Result<()>; + fn pause(&mut self) -> impl Future> + Send; //restart paused graph - fn restart(&mut self) -> Result<()>; + fn restart(&mut self) -> impl Future> + Send; //stop resource about this graph - fn stop(&mut self) -> Result<()>; + fn stop(&mut self) -> impl Future> + Send; } pub trait PipelineController { - fn get_node(&self, name: &str) -> Option>; + fn get_node(&self, name: &str) -> impl Future> + Send; } -pub trait Driver { +pub trait Driver +where + ID: GID, +{ //deploy graph to cluster - fn deploy(&mut self, graph: &Dag) -> Result>; + fn deploy( + &self, + graph: &Dag, + ) -> impl Future>>> + Send; //clean all resource about this graph - fn clean(&mut self) -> Result<()>; + fn clean(&self) -> impl Future> + Send; } diff --git a/src/driver/kube.rs b/src/driver/kube.rs index a764999..84b53d0 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -1,14 +1,49 @@ -use super::Driver; +use super::{Driver, NHandler, PipelineController}; +use crate::core::GID; use crate::Dag; -use anyhow::Result; -pub struct KubeDriver {} +use anyhow::{Error, Result}; +use std::sync::{Arc, Mutex}; +pub struct KubeHandler {} -impl Driver for KubeDriver { - fn deploy(&mut self, graph: &Dag) -> Result> { +impl NHandler for KubeHandler { + async fn pause(&mut self) -> Result<()> { todo!() } - fn clean(&mut self) -> Result<()> { + async fn restart(&mut self) -> Result<()> { + todo!() + } + + async fn stop(&mut self) -> Result<()> { + todo!() + } +} +pub struct KubePipelineController {} + +impl PipelineController for KubePipelineController { + #[allow(refining_impl_trait)] + async fn get_node(&self, name: &str) -> Result { + todo!() + } +} + +pub struct KubeDriver +where + ID: GID, +{ + _id: std::marker::PhantomData, +} + +impl Driver for KubeDriver +where + ID: GID, +{ + #[allow(refining_impl_trait)] + async fn deploy(&self, graph: &Dag) -> Result>> { + todo!() + } + + async fn clean(&self) -> Result<()> { todo!() } } diff --git a/src/main.rs b/src/main.rs index f41ccb8..2d162f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,46 +1,11 @@ +#![feature(trait_alias)] + +mod core; mod dag; +pub mod driver; mod utils; -use dag::BaseUnit; use dag::Dag; use uuid::Uuid; -fn main() { - let json_str = r#" - { - "name":"example", - "dag": [ - { - "id":"5c42b900-a87f-45e3-ba06-c40d94ad5ba2", - "name":"ComputeUnit1", - "node_type": "ComputeUnit", - "dependency": [], - "cmd": ["ls"], - "image":"" - }, - { - "id":"1193c01b-9847-4660-9ea1-34b66f7847f4", - "name":"Channel2", - "node_type": "Channel", - "dependency": ["5c42b900-a87f-45e3-ba06-c40d94ad5ba2"], - "image":"" - }, - { - "id":"353fc5bf-697e-4221-8487-6ab91915e2a1", - "name":"ComputeUnit3", - "node_type": "ComputeUnit", - "dependency": ["1193c01b-9847-4660-9ea1-34b66f7847f4"], - "cmd": ["ls"], - "image":"" - } - ] - } -"#; - - println!("{}", Uuid::new_v4()); - - let result = Dag::from_json(json_str).unwrap(); - result.iter().for_each(|v| { - println!("{}", v.name()); - }); -} +fn main() {} diff --git a/src/utils/utils.rs b/src/utils/utils.rs index f4cc157..a0a5dce 100644 --- a/src/utils/utils.rs +++ b/src/utils/utils.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Result}; -use uuid::Uuid; pub trait IntoAnyhowResult { fn anyhow(self, msg: impl ToString) -> Result;