From a291633bfab2efb1e59220feef512e4f475b97c7 Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Fri, 9 Aug 2024 03:12:01 +0000 Subject: [PATCH] fix: clippy --- .github/workflows/rust.yml | 12 +- .../src/bin/compute_unit_runner.rs | 9 +- crates/compute_unit_runner/src/ipc.rs | 31 +-- .../src/media_data_tracker.rs | 33 ++- .../src/state_controller.rs | 15 +- crates/dp_runner/src/channel_tracker.rs | 32 +-- crates/dp_runner/src/main.rs | 4 +- crates/dp_runner/src/state_controller.rs | 20 +- .../src/models/archive_type.rs | 2 +- crates/nodes_sdk/src/fs_cache.rs | 6 + crates/nodes_sdk/src/lib.rs | 8 + crates/nodes_sdk/src/mprc.rs | 21 +- crates/nodes_sdk/src/multi_sender.rs | 5 +- nodes/copy_in_place/src/main.rs | 10 +- nodes/dummy_in/src/main.rs | 4 +- nodes/dummy_out/src/main.rs | 5 +- nodes/jz_reader/src/main.rs | 20 +- nodes/jz_writer/src/main.rs | 14 +- rust-toolchain | 2 +- src/api/client/job.rs | 18 +- src/api/job_api.rs | 49 ++-- src/api/server.rs | 1 - src/bin/jz-flow/job.rs | 33 ++- src/bin/jz-flow/run.rs | 4 +- src/core/main_db_models.rs | 3 +- src/core/spec.rs | 4 +- src/dag/dag.rs | 242 ----------------- src/dag/graph.rs | 3 + src/dag/mod.rs | 244 +++++++++++++++++- src/driver/driver.rs | 93 ------- src/driver/kube.rs | 184 ++++++++----- src/driver/mod.rs | 98 ++++++- src/job/job_mgr.rs | 43 +-- src/utils/mod.rs | 53 +++- src/utils/utils.rs | 52 ---- 35 files changed, 706 insertions(+), 671 deletions(-) delete mode 100644 src/dag/dag.rs delete mode 100644 src/driver/driver.rs delete mode 100644 src/utils/utils.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index be21065..2306614 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,6 +17,15 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Run Clippy + run: cargo +nightly clippy --fix --workspace --exclude jiaozifs_client_rs --allow-dirty + + - name: Install Protoc + uses: arduino/setup-protoc@v3 + + - name: Run tests + run: cargo test --verbose --workspace + - name: Start minikube uses: medyagh/setup-minikube@latest @@ -25,5 +34,4 @@ jobs: eval $(minikube -p minikube docker-env) make minikube-docker - - name: Run tests - run: cargo test --verbose --workspace + diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index bd60416..768079c 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -26,10 +26,7 @@ use tokio::{ signal, SignalKind, }, - sync::{ - Mutex, - RwLock, - }, + sync::RwLock, task::JoinSet, }; @@ -112,9 +109,9 @@ async fn main() -> Result<()> { _ = token.cancelled() => { handler.stop(true).await; info!("ipc server stopped"); - return Ok(()); } }; + Ok::<(), anyhow::Error>(()) }); } { @@ -135,7 +132,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index 63277c1..7ee62e9 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -13,18 +13,12 @@ use actix_web::{ HttpResponse, HttpServer, }; -use anyhow::{ - anyhow, - Result, -}; +use anyhow::Result; use core::str; use http_body_util::Collected; -use jz_action::{ - core::db::{ - JobDbRepo, - TrackerState, - }, - utils::StdIntoAnyhowResult, +use jz_action::core::db::{ + JobDbRepo, + TrackerState, }; use serde::{ ser::SerializeStruct, @@ -64,7 +58,6 @@ use serde::de::{ self, Deserializer, MapAccess, - SeqAccess, Visitor, }; use serde_repr::*; @@ -535,7 +528,7 @@ impl IPCClientImpl { impl IPCClient for IPCClientImpl { async fn finish(&self) -> Result<(), IPCError> { - let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into(); + let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status"); let req: Request> = Request::builder() .method(Method::POST) @@ -558,7 +551,7 @@ impl IPCClient for IPCClientImpl { } async fn status(&self) -> Result { - let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into(); + let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status"); let req: Request> = Request::builder() .method(Method::GET) @@ -584,7 +577,7 @@ impl IPCClient for IPCClientImpl { async fn submit_output(&self, req: SubmitOuputDataReq) -> Result<(), IPCError> { let json = serde_json::to_string(&req)?; - let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/submit").into(); + let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/submit"); let req: Request> = Request::builder() .method(Method::POST) @@ -607,7 +600,7 @@ impl IPCClient for IPCClientImpl { } async fn request_avaiable_data(&self) -> Result, IPCError> { - let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into(); + let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data"); let req: Request> = Request::builder() .method(Method::GET) .uri(url) @@ -637,7 +630,7 @@ impl IPCClient for IPCClientImpl { async fn complete_result(&self, id: &str) -> Result<(), IPCError> { let req = CompleteDataReq::new(id); let json = serde_json::to_string(&req)?; - let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into(); + let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data"); let req: Request> = Request::builder() .method(Method::POST) @@ -663,8 +656,6 @@ impl IPCClient for IPCClientImpl { #[cfg(test)] mod tests { use super::*; - use std::env; - use tracing_subscriber; #[tokio::test] async fn test_my_error() { @@ -682,7 +673,7 @@ mod tests { assert_eq!(code, ErrorNumber::AlreadyFinish); assert_eq!(msg, "aaaa"); } - _ => Err(anyhow!("not expect type")).unwrap(), + _ => panic!("unexpected error type"), } } @@ -696,7 +687,7 @@ mod tests { IPCError::UnKnown(msg) => { assert_eq!(msg, "bbbbbbbbbbbbbb"); } - _ => Err(anyhow!("not expect type")).unwrap(), + _ => panic!("unexpected error type"), } } } diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index 0242925..371180b 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -42,13 +42,13 @@ use tracing::{ use nodes_sdk::{ fs_cache::FileCache, multi_sender::MultiSender, + MessageSender, }; use tokio::{ select, sync::{ broadcast, mpsc, - oneshot, RwLock, }, task::JoinSet, @@ -83,19 +83,16 @@ where pub(crate) downstreams: Vec, // channel for process avaiable data request - pub(crate) ipc_process_data_req_tx: - Option>>)>>, + pub(crate) ipc_process_data_req_tx: Option>>, // channel for response complete data. do clean work when receive this request - pub(crate) ipc_process_completed_data_tx: - Option>)>>, + pub(crate) ipc_process_completed_data_tx: Option>, // channel for submit output data - pub(crate) ipc_process_submit_output_tx: - Option>)>>, + pub(crate) ipc_process_submit_output_tx: Option>, // channel for receive finish state from user container - pub(crate) ipc_process_finish_state_tx: Option>)>>, + pub(crate) ipc_process_finish_state_tx: Option>, } impl MediaDataTracker where @@ -114,11 +111,11 @@ where fs_cache, buf_size, name: name.to_string(), - repo: repo, + repo, local_state: Arc::new(RwLock::new(TrackerState::Init)), - up_nodes: up_nodes, - upstreams: upstreams, - downstreams: downstreams, + up_nodes, + upstreams, + downstreams, ipc_process_submit_output_tx: None, ipc_process_completed_data_tx: None, ipc_process_data_req_tx: None, @@ -157,7 +154,7 @@ where .map(|count| info!("revert {count} SelectForSent data to Received"))?; //check ready if both upnodes is finish and no pending data, we think it finish - if up_nodes.len() > 0 { + if !up_nodes.is_empty() { let is_all_success = try_join_all(up_nodes.iter().map(|node_name|db_repo.get_node_by_name(node_name))).await .map_err(|err|anyhow!("query node data {err}"))? .iter() @@ -210,7 +207,7 @@ where let mut join_set: JoinSet> = JoinSet::new(); let new_data_tx = broadcast::Sender::new(1); - if self.downstreams.len() > 0 { + if !self.downstreams.is_empty() { //process outgoing data { let new_data_tx = new_data_tx.clone(); @@ -247,7 +244,7 @@ where return anyhow::Ok(()); } new_data = new_data_rx.recv() => { - if let Err(_) = new_data { + if new_data.is_err() { //drop fast to provent exceed channel capacity continue; } @@ -270,7 +267,7 @@ where }; //write outgoing - if new_batch.size >0 && downstreams.len()>0 { + if new_batch.size >0 && !downstreams.is_empty() { info!("start to send data {} {:?}", &req.id, now.elapsed()); let sent_nodes: Vec<_>= req.sent.iter().map(|v|v.as_str()).collect(); if let Err(sent_nodes) = multi_sender.send(new_batch, &sent_nodes).await { @@ -383,7 +380,7 @@ where //TODO this make a async process to be sync process. got a low performance, //if have any bottleneck here, we should refrator this one - if self.upstreams.len() > 0 { + if !self.upstreams.is_empty() { //process user contaienr request let db_repo = self.repo.clone(); let node_name = self.name.clone(); @@ -524,7 +521,7 @@ where }); } - if self.upstreams.len() == 0 { + if self.upstreams.is_empty() { //receive event from pod //inputs nodes need to tell its finish let db_repo = self.repo.clone(); diff --git a/crates/compute_unit_runner/src/state_controller.rs b/crates/compute_unit_runner/src/state_controller.rs index aeda65b..b50de51 100644 --- a/crates/compute_unit_runner/src/state_controller.rs +++ b/crates/compute_unit_runner/src/state_controller.rs @@ -57,7 +57,7 @@ where } _ = interval.tick() => { match repo - .get_node_by_name(&name) + .get_node_by_name(name) .await{ Ok(record)=> { debug!("{} fetch state from db", record.node_name); @@ -70,15 +70,10 @@ where *local_state = record.state.clone(); info!("update state {:?} -> {:?}", old_local_state, local_state); drop(local_state); - match record.state { - TrackerState::Ready => { - if old_local_state == TrackerState::Init { - //start - info!("start data processing {:?}", record.incoming_streams); - join_set = Some(program_guard.route_data(token.clone()).await?); - } - } - _ => {} + if record.state == TrackerState::Ready && old_local_state == TrackerState::Init { + //start + info!("start data processing {:?}", record.incoming_streams); + join_set = Some(program_guard.route_data(token.clone()).await?); } }, Err(err)=> error!("fetch node state from db {err}") diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index 6433771..329c3d1 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -14,20 +14,19 @@ use jz_action::{ }, network::datatransfer::MediaDataBatchResponse, }; -use nodes_sdk::fs_cache::FileCache; +use nodes_sdk::{ + fs_cache::FileCache, + MessageSender, +}; use std::{ sync::Arc, time::Duration, }; use tokio::{ select, - sync::{ - mpsc::{ + sync::mpsc::{ self, - Sender, }, - oneshot, - }, task::JoinSet, time::{ self, @@ -60,12 +59,9 @@ where pub(crate) upstreams: Vec, - pub(crate) downstreams: Vec, + pub(crate) incoming_tx: Option>, - pub(crate) incoming_tx: Option>)>>, - - pub(crate) request_tx: - Option>>)>>, + pub(crate) request_tx: Option>>, } impl ChannelTracker @@ -79,17 +75,15 @@ where buf_size: usize, up_nodes: Vec, upstreams: Vec, - downstreams: Vec, ) -> Self { ChannelTracker { name: name.to_string(), - repo: repo, + repo, fs_cache, buf_size, local_state: TrackerState::Init, - up_nodes: up_nodes, - upstreams: upstreams, - downstreams: downstreams, + up_nodes, + upstreams, request_tx: None, incoming_tx: None, } @@ -177,7 +171,7 @@ where &mut self, token: CancellationToken, ) -> Result>> { - if self.upstreams.len() == 0 { + if self.upstreams.is_empty() { return Err(anyhow!("no upstream")); } @@ -292,7 +286,7 @@ where //write batch files if let Err(err) = fs_cache.write(data_batch).await { error!("write files to disk fail {}", err); - resp.send(Err(err.into())).expect("request alread listen this channel"); + resp.send(Err(err)).expect("request alread listen this channel"); continue; } @@ -301,7 +295,7 @@ where if let Err(err) = db_repo.insert_new_path(&DataRecord{ node_name: node_name.clone(), id:id.clone(), - size: size, + size, state: DataState::Received, sent: vec![], direction:Direction::In, diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index c63c825..7ab9f82 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -31,7 +31,6 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tonic::transport::Server; use tracing::{ - error, info, Level, }; @@ -90,7 +89,6 @@ async fn main() -> Result<()> { args.buf_size, node.up_nodes, node.incoming_streams, - node.outgoing_streams, ); program.run_backend(&mut join_set, token.clone())?; @@ -129,7 +127,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { diff --git a/crates/dp_runner/src/state_controller.rs b/crates/dp_runner/src/state_controller.rs index b9a66e3..dad72fb 100644 --- a/crates/dp_runner/src/state_controller.rs +++ b/crates/dp_runner/src/state_controller.rs @@ -3,10 +3,7 @@ use jz_action::core::db::{ JobDbRepo, TrackerState, }; -use std::{ - io::Write, - sync::Arc, -}; +use std::sync::Arc; use tokio::{ select, sync::RwLock, @@ -58,7 +55,7 @@ where } _ = interval.tick() => { match repo - .get_node_by_name(&name) + .get_node_by_name(name) .await{ Ok(record)=> { debug!("{} fetch state from db", record.node_name); @@ -69,15 +66,10 @@ where let old_local_state = program_guard.local_state.clone(); program_guard.local_state = record.state.clone(); info!("update state {:?} -> {:?}", &old_local_state, &record.state); - match record.state { - TrackerState::Ready => { - if old_local_state == TrackerState::Init { - //start - info!("start data processing {:?}", record.incoming_streams); - join_set = Some(program_guard.route_data(token.clone()).await?); - } - } - _ => {} + if record.state == TrackerState::Ready && old_local_state == TrackerState::Init { + //start + info!("start data processing {:?}", record.incoming_streams); + join_set = Some(program_guard.route_data(token.clone()).await?); } }, Err(err)=> error!("fetch node state from db {err}") diff --git a/crates/jiaozifs_client_rs/src/models/archive_type.rs b/crates/jiaozifs_client_rs/src/models/archive_type.rs index 12aa9fd..9f5bacf 100755 --- a/crates/jiaozifs_client_rs/src/models/archive_type.rs +++ b/crates/jiaozifs_client_rs/src/models/archive_type.rs @@ -11,7 +11,7 @@ use crate::models; use serde::{Deserialize, Serialize}; -/// + #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] pub enum ArchiveType { #[serde(rename = "zip")] diff --git a/crates/nodes_sdk/src/fs_cache.rs b/crates/nodes_sdk/src/fs_cache.rs index b80ad1c..5de189b 100644 --- a/crates/nodes_sdk/src/fs_cache.rs +++ b/crates/nodes_sdk/src/fs_cache.rs @@ -127,6 +127,12 @@ impl FileCache for FSCache { #[derive(Clone)] pub struct MemCache(Arc>>); +impl Default for MemCache { + fn default() -> Self { + Self::new() + } +} + impl MemCache { pub fn new() -> Self { Self(Arc::new(Mutex::new(HashMap::new()))) diff --git a/crates/nodes_sdk/src/lib.rs b/crates/nodes_sdk/src/lib.rs index acdfdd8..ff90760 100644 --- a/crates/nodes_sdk/src/lib.rs +++ b/crates/nodes_sdk/src/lib.rs @@ -7,3 +7,11 @@ pub mod multi_sender; mod sleep_program; pub use sleep_program::*; + +use anyhow::Result; +use tokio::sync::{ + mpsc::Sender, + oneshot, +}; + +pub type MessageSender = Sender<(REQ, oneshot::Sender>)>; diff --git a/crates/nodes_sdk/src/mprc.rs b/crates/nodes_sdk/src/mprc.rs index fdbc989..8d44413 100644 --- a/crates/nodes_sdk/src/mprc.rs +++ b/crates/nodes_sdk/src/mprc.rs @@ -12,6 +12,15 @@ where entries: Vec<(K, T)>, } +impl Default for Mprs +where + T: Clone, +{ + fn default() -> Self { + Self::new() + } +} + impl Mprs where T: Clone, @@ -20,10 +29,10 @@ where Mprs { entries: vec![] } } - pub fn remove(&mut self, k: &Q) -> Option + pub fn remove(&mut self, k: &Q) -> Option where K: Borrow, - Q: Hash + Eq, + Q: Hash + Eq + ?Sized, { for i in 0..self.entries.len() { if self.entries[i].0.borrow() == k { @@ -108,8 +117,8 @@ mod tests { let (tx, mut rx) = mpsc::channel(10); let counter = counter.clone(); mprs.insert("A".to_string(), tx); - let _ = tokio::spawn(async move { - while let Some(_) = rx.recv().await { + tokio::spawn(async move { + while (rx.recv().await).is_some() { counter.fetch_add(1, Ordering::SeqCst); println!("A receive"); } @@ -120,8 +129,8 @@ mod tests { let (tx, mut rx) = mpsc::channel(10); mprs.insert("B".to_string(), tx); let counter = counter.clone(); - let _ = tokio::spawn(async move { - while let Some(_) = rx.recv().await { + tokio::spawn(async move { + while (rx.recv().await).is_some() { counter.fetch_add(1, Ordering::SeqCst); println!("B receive"); } diff --git a/crates/nodes_sdk/src/multi_sender.rs b/crates/nodes_sdk/src/multi_sender.rs index 01411b0..8f47856 100644 --- a/crates/nodes_sdk/src/multi_sender.rs +++ b/crates/nodes_sdk/src/multi_sender.rs @@ -18,10 +18,7 @@ pub struct MultiSender { impl MultiSender { pub fn new(streams: Vec) -> Self { let connects = streams.iter().map(|_| None).collect(); - MultiSender { - streams, - connects: connects, - } + MultiSender { streams, connects } } } diff --git a/nodes/copy_in_place/src/main.rs b/nodes/copy_in_place/src/main.rs index 568f08d..ca7fa55 100644 --- a/nodes/copy_in_place/src/main.rs +++ b/nodes/copy_in_place/src/main.rs @@ -1,8 +1,5 @@ use anyhow::Result; -use clap::{ - error, - Parser, -}; +use clap::Parser; use compute_unit_runner::ipc::{ self, ErrorNumber, @@ -31,7 +28,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ - debug, error, info, Level, @@ -73,7 +69,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { @@ -122,7 +118,7 @@ async fn copy_in_place(token: CancellationToken, args: Args) -> Result<()> { sleep(Duration::from_secs(2)).await; continue; } - Err(IPCError::NodeError { code, msg }) => match code { + Err(IPCError::NodeError { code, msg: _ }) => match code { ErrorNumber::AlreadyFinish => { return Ok(()); } diff --git a/nodes/dummy_in/src/main.rs b/nodes/dummy_in/src/main.rs index 8f3fe97..8f8539c 100644 --- a/nodes/dummy_in/src/main.rs +++ b/nodes/dummy_in/src/main.rs @@ -70,7 +70,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { @@ -120,7 +120,7 @@ async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> { let mut tmp_file = fs::File::create(file_path).await?; for _ in 0..100 { let word = random_word::gen(Lang::En).to_string() + "\n"; - tmp_file.write(word.as_bytes()).await.unwrap(); + let _ = tmp_file.write(word.as_bytes()).await.unwrap(); } } diff --git a/nodes/dummy_out/src/main.rs b/nodes/dummy_out/src/main.rs index 3c0ee35..2fd477f 100644 --- a/nodes/dummy_out/src/main.rs +++ b/nodes/dummy_out/src/main.rs @@ -23,7 +23,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ - debug, error, info, Level, @@ -66,7 +65,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { @@ -109,7 +108,7 @@ async fn write_dummy(token: CancellationToken, args: Args) -> Result<()> { sleep(Duration::from_secs(2)).await; continue; } - Err(IPCError::NodeError { code, msg }) => match code { + Err(IPCError::NodeError { code, msg: _ }) => match code { ErrorNumber::AlreadyFinish => { return Ok(()); } diff --git a/nodes/jz_reader/src/main.rs b/nodes/jz_reader/src/main.rs index 985dc18..e40ee3e 100644 --- a/nodes/jz_reader/src/main.rs +++ b/nodes/jz_reader/src/main.rs @@ -9,7 +9,10 @@ use compute_unit_runner::ipc::{ SubmitOuputDataReq, }; use jiaozifs_client_rs::{ - apis, + apis::{ + self, + configuration::Configuration, + }, models::RefType, }; use jz_action::utils::StdIntoAnyhowResult; @@ -24,13 +27,11 @@ use tokio::{ signal, SignalKind, }, - sync::mpsc, task::JoinSet, }; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::{ - error, info, Level, }; @@ -97,7 +98,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { @@ -120,9 +121,12 @@ async fn read_jz_fs(args: Args) -> Result<()> { val => Err(anyhow!("ref type not correct {}", val)), }?; - let mut configuration = apis::configuration::Configuration::default(); - configuration.base_path = args.jiaozifs_url; - configuration.basic_auth = Some((args.username, Some(args.password))); + let configuration = Configuration { + base_path: args.jiaozifs_url, + basic_auth: Some((args.username, Some(args.password))), + ..Default::default() + }; + let file_paths = apis::objects_api::get_files( &configuration, &args.owner, @@ -154,7 +158,7 @@ async fn read_jz_fs(args: Args) -> Result<()> { ) .await?; - let file_path = output_dir.as_path().join(&path); + let file_path = output_dir.as_path().join(path); let mut tmp_file = fs::File::create(file_path).await?; while let Some(item) = byte_stream.next().await { tokio::io::copy(&mut item.unwrap().as_ref(), &mut tmp_file) diff --git a/nodes/jz_writer/src/main.rs b/nodes/jz_writer/src/main.rs index 78306cf..49733c3 100644 --- a/nodes/jz_writer/src/main.rs +++ b/nodes/jz_writer/src/main.rs @@ -12,6 +12,7 @@ use compute_unit_runner::ipc::{ use jiaozifs_client_rs::{ apis::{ self, + configuration::Configuration, }, models::RefType, }; @@ -39,7 +40,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ - debug, error, info, Level, @@ -112,7 +112,7 @@ async fn main() -> Result<()> { { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { @@ -127,9 +127,11 @@ async fn main() -> Result<()> { } async fn write_jz_fs(token: CancellationToken, args: Args) -> Result<()> { - let mut configuration = apis::configuration::Configuration::default(); - configuration.base_path = args.jiaozifs_url; - configuration.basic_auth = Some((args.username, Some(args.password))); + let configuration = Configuration { + base_path: args.jiaozifs_url, + basic_auth: Some((args.username, Some(args.password))), + ..Default::default() + }; match args.ref_type.as_str() { "wip" => { @@ -178,7 +180,7 @@ async fn write_jz_fs(token: CancellationToken, args: Args) -> Result<()> { sleep(Duration::from_secs(2)).await; continue; } - Err(IPCError::NodeError { code, msg }) => match code { + Err(IPCError::NodeError { code, msg: _ }) => match code { ErrorNumber::AlreadyFinish => { return Ok(()); } diff --git a/rust-toolchain b/rust-toolchain index 523d088..f56f1f1 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2024-05-12" \ No newline at end of file +channel = "nightly-2024-07-19" \ No newline at end of file diff --git a/src/api/client/job.rs b/src/api/client/job.rs index 3295b09..aa67d59 100644 --- a/src/api/client/job.rs +++ b/src/api/client/job.rs @@ -2,7 +2,9 @@ use crate::{ core::db::{ Job, JobUpdateInfo, - }, job::job_mgr::JobDetails, utils::StdIntoAnyhowResult + }, + job::job_mgr::JobDetails, + utils::StdIntoAnyhowResult, }; use anyhow::{ anyhow, @@ -85,11 +87,7 @@ impl JobClient { pub async fn list(&self) -> Result> { let resp = self .client - .get( - self.base_uri - .clone() - .join("jobs")? - ) + .get(self.base_uri.clone().join("jobs")?) .send() .await .anyhow()?; @@ -189,9 +187,9 @@ impl JobClient { } resp.bytes() - .await - .anyhow() - .and_then(|body| serde_json::from_slice(&body).anyhow()) - .anyhow() + .await + .anyhow() + .and_then(|body| serde_json::from_slice(&body).anyhow()) + .anyhow() } } diff --git a/src/api/job_api.rs b/src/api/job_api.rs index a71933d..ff84787 100644 --- a/src/api/job_api.rs +++ b/src/api/job_api.rs @@ -4,11 +4,11 @@ use crate::{ core::db::{ Job, JobDbRepo, - JobRepo, JobUpdateInfo, MainDbRepo, }, - driver::Driver, job::job_mgr::JobManager, + driver::Driver, + job::job_mgr::JobManager, }; use actix_web::{ web, @@ -17,11 +17,9 @@ use actix_web::{ use mongodb::bson::oid::ObjectId; //TODO change to use route macro after https://github.com/actix/actix-web/issues/2866 resolved -async fn create(db_repo: web::Data, data: web::Json) -> HttpResponse +async fn create(db_repo: web::Data, data: web::Json) -> HttpResponse where - D: Driver, MAINR: MainDbRepo, - JOBR: JobDbRepo, { match db_repo.insert(&data.0).await { Ok(inserted_result) => HttpResponse::Ok().json(&inserted_result), @@ -29,11 +27,9 @@ where } } -async fn get(db_repo: web::Data, path: web::Path) -> HttpResponse +async fn get(db_repo: web::Data, path: web::Path) -> HttpResponse where - D: Driver, MAINR: MainDbRepo, - JOBR: JobDbRepo, { match db_repo.get(&path.into_inner()).await { Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result), @@ -42,11 +38,9 @@ where } } -async fn list(db_repo: web::Data) -> HttpResponse +async fn list(db_repo: web::Data) -> HttpResponse where - D: Driver, MAINR: MainDbRepo, - JOBR: JobDbRepo, { match db_repo.list_jobs().await { Ok(jobs) => HttpResponse::Ok().json(&jobs), @@ -54,14 +48,9 @@ where } } -async fn delete( - db_repo: web::Data, - path: web::Path, -) -> HttpResponse +async fn delete(db_repo: web::Data, path: web::Path) -> HttpResponse where - D: Driver, MAINR: MainDbRepo, - JOBR: JobDbRepo, { match db_repo.delete(&path.into_inner()).await { Ok(_) => HttpResponse::Ok().finish(), @@ -69,15 +58,13 @@ where } } -async fn update( +async fn update( db_repo: web::Data, path: web::Path, query: web::Query, ) -> HttpResponse where - D: Driver, MAINR: MainDbRepo, - JOBR: JobDbRepo, { match db_repo .update(&path.into_inner(), &query.into_inner()) @@ -98,9 +85,7 @@ where JOBR: JobDbRepo, { let id = ObjectId::from_str(&path.into_inner()).unwrap(); - match job_manager.get_job_details(&id) - .await - { + match job_manager.get_job_details(&id).await { Ok(detail) => HttpResponse::Ok().json(detail), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } @@ -114,18 +99,14 @@ where { cfg.service( web::resource("/job") - .route(web::post().to(create::)) - .route(web::delete().to(delete::)), + .route(web::post().to(create::)) + .route(web::delete().to(delete::)), ) .service( web::resource("/job/{id}") - .route(web::get().to(get::)) - .route(web::post().to(update::)) - ).service( - web::resource("/jobs") - .route(web::get().to(list::)) - ).service( - web::resource("/job/detail/{id}") - .route(web::get().to(job_details::)) - ); + .route(web::get().to(get::)) + .route(web::post().to(update::)), + ) + .service(web::resource("/jobs").route(web::get().to(list::))) + .service(web::resource("/job/detail/{id}").route(web::get().to(job_details::))); } diff --git a/src/api/server.rs b/src/api/server.rs index ab8fd97..cb2bc71 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -1,4 +1,3 @@ - use crate::{ core::db::{ JobDbRepo, diff --git a/src/bin/jz-flow/job.rs b/src/bin/jz-flow/job.rs index a4d8903..f383237 100644 --- a/src/bin/jz-flow/job.rs +++ b/src/bin/jz-flow/job.rs @@ -2,10 +2,15 @@ use std::str::FromStr; use crate::global::GlobalOptions; use anyhow::Result; +use chrono::{ + DateTime, + Utc, +}; use clap::{ Args, Parser, }; +use comfy_table::Table; use jz_action::{ api::client::JzFlowClient, core::db::Job, @@ -14,15 +19,13 @@ use jz_action::{ use mongodb::bson::oid::ObjectId; use serde_variant::to_variant_name; use tokio::fs; -use comfy_table::Table; -use chrono::{Utc, DateTime}; #[derive(Debug, Parser)] pub(super) enum JobCommands { /// Adds files to myapp Create(JobCreateArgs), List, - Detail(JobDetailArgs) + Detail(JobDetailArgs), } pub(super) async fn run_job_subcommand( @@ -60,10 +63,7 @@ pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs) let created_job = client.create(&job).await?; - println!( - "Create job successfully, job ID: {}", - created_job.id.to_string() - ); + println!("Create job successfully, job ID: {}", created_job.id); Ok(()) } @@ -72,16 +72,27 @@ pub(super) async fn list_job(global_opts: GlobalOptions) -> Result<()> { let jobs = client.list().await?; let mut table = Table::new(); - table.set_header(vec!["ID", "Name", "State", "TryNumber", "CreatedAt", "UpdatedAt"]); + table.set_header(vec![ + "ID", + "Name", + "State", + "TryNumber", + "CreatedAt", + "UpdatedAt", + ]); - jobs.iter().for_each(|job|{ + jobs.iter().for_each(|job| { table.add_row(vec![ job.id.to_string(), job.name.to_string(), to_variant_name(&job.state).unwrap().to_string(), job.retry_number.to_string(), - DateTime::from_timestamp(job.created_at, 0).unwrap().to_string(), - DateTime::from_timestamp(job.updated_at, 0).unwrap().to_string(), + DateTime::from_timestamp(job.created_at, 0) + .unwrap() + .to_string(), + DateTime::from_timestamp(job.updated_at, 0) + .unwrap() + .to_string(), ]); }); println!("{table}"); diff --git a/src/bin/jz-flow/run.rs b/src/bin/jz-flow/run.rs index 32ca22b..9a63361 100644 --- a/src/bin/jz-flow/run.rs +++ b/src/bin/jz-flow/run.rs @@ -70,15 +70,15 @@ pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Re _ = token.cancelled() => { handler.stop(true).await; info!("rpc server stopped"); - return Ok(()); } }; + anyhow::Ok(()) }); } { //catch signal - let _ = tokio::spawn(async move { + tokio::spawn(async move { let mut sig_term = signal(SignalKind::terminate()).unwrap(); let mut sig_int = signal(SignalKind::interrupt()).unwrap(); select! { diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index fe1bce9..0d3d52c 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -5,8 +5,7 @@ use serde::{ Serialize, }; -#[derive(Serialize, Deserialize, Debug)] -#[derive(Default)] +#[derive(Serialize, Deserialize, Debug, Default)] pub enum JobState { #[default] Created, diff --git a/src/core/spec.rs b/src/core/spec.rs index f830f77..9e10c67 100644 --- a/src/core/spec.rs +++ b/src/core/spec.rs @@ -3,15 +3,13 @@ use serde::{ Serialize, }; -#[derive(Serialize, Deserialize, Debug, Clone)] -#[derive(Default)] +#[derive(Serialize, Deserialize, Debug, Clone, Default)] pub enum CacheType { #[default] Disk, Memory, } - // MachineSpec container information for deploy and running in cloud #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct MachineSpec { diff --git a/src/dag/dag.rs b/src/dag/dag.rs deleted file mode 100644 index 959a554..0000000 --- a/src/dag/dag.rs +++ /dev/null @@ -1,242 +0,0 @@ -use super::graph::Graph; -use crate::{ - core::ComputeUnit, - utils::IntoAnyhowResult, -}; -use anyhow::{ - anyhow, - Ok, - Result, -}; -use std::collections::HashMap; - -pub struct Dag { - pub raw: String, - pub name: String, - nodes: HashMap, - /// Store dependency relations. - rely_graph: Graph, -} - -impl Default for Dag { - fn default() -> Self { - Self::new() - } -} - -impl Dag { - pub fn new() -> Self { - Dag { - raw: String::new(), - name: String::new(), - nodes: HashMap::new(), - rely_graph: Graph::new(), - } - } - - //add_node add a compute unit to graph, if this unit alread exit, ignore it - pub fn add_node(&mut self, node: ComputeUnit) -> &mut Self { - if self.nodes.get(&node.name).is_none() { - self.rely_graph.add_node(node.name.clone()); - self.nodes.insert(node.name.clone(), node); - } - self - } - - //add_node add a compute unit to graph, if this unit alread exit, ignore it - pub fn set_edge(&mut self, from: &str, to: &str) -> Result<&mut Self> { - if self.nodes.get(from).is_none() { - return Err(anyhow!("from node not exit")); - } - - if self.nodes.get(to).is_none() { - return Err(anyhow!("from node not exit")); - } - - self.rely_graph.add_edge(from, to); - Ok(self) - } - - pub fn get_node(&self, node_id: &str) -> Option<&ComputeUnit> { - self.nodes.get(node_id) - } - - pub fn get_incomming_nodes(&self, node_id: &str) -> Vec<&str> { - self.rely_graph.get_incoming_nodes(node_id) - } - - pub fn get_outgoing_nodes(&self, node_id: &str) -> Vec<&str> { - self.rely_graph.get_outgoing_nodes(node_id) - } - - // from_json build graph from json string - pub fn from_json(json: &str) -> Result { - let value: serde_json::Value = serde_json::from_str(json)?; - - let dag_name: &str = value - .get("name") - .anyhow("name must exit") - .map(|v| v.as_str().anyhow("name must be string"))??; - - let dag = value.get("dag").anyhow("dag must exit")?; - - let mut nodes = vec![]; - for node in dag.as_array().anyhow("dag must be a arrary")?.iter() { - nodes.push(serde_json::from_value::(node.clone())?); - } - - let node_ids: Vec = nodes.iter().map(|node| node.name.clone()).collect(); - let mut rely_graph = Graph::with_nodes(&node_ids); - for node in nodes.iter() { - node.dependency.iter().for_each(|v| { - rely_graph.add_edge(v, &node.name); - }); - } - - let nodes_map: HashMap = nodes - .into_iter() - .map(|node| (node.name.clone(), node)) - .collect(); - Ok(Dag { - name: dag_name.to_string(), - raw: json.to_string(), - nodes: nodes_map, - rely_graph, - }) - } - - // is_validated do some check on the graph to ensure that all correct - pub fn is_validated(&self) -> bool { - todo!(); - //check id - - //must have channel except the end node - } - - // iter immutable iterate over graph - pub fn iter(&self) -> GraphIter { - GraphIter { - index: 0, - data: &self.nodes, - toped_graph: self.rely_graph.topo_sort(), - } - } - - // iter_mut mutable iterate over graph - pub fn iter_mut(&mut self) -> GraphIterMut { - GraphIterMut { - index: 0, - data: &mut self.nodes, - toped_graph: self.rely_graph.topo_sort(), - } - } -} - -pub struct GraphIter<'a> { - index: usize, - data: &'a HashMap, - toped_graph: Vec, -} - -impl<'a> Iterator for GraphIter<'a> { - type Item = &'a ComputeUnit; - - fn next(&mut self) -> Option { - if self.index < self.data.len() { - let item = self.toped_graph[self.index].clone(); - self.index += 1; - Some(self.data.get(&item).expect("node added in previous step")) - } else { - None - } - } -} - -pub struct GraphIterMut<'a> { - index: usize, - data: &'a mut HashMap, - toped_graph: Vec, -} - -impl<'a> Iterator for GraphIterMut<'a> { - type Item = &'a mut ComputeUnit; - - fn next(&mut self) -> Option { - if self.index < self.data.len() { - let item = self.toped_graph[self.index].clone(); - self.index += 1; - let node = self - .data - .get_mut(&item) - .expect("node added in previous step"); - unsafe { - // SAFETY: We ensure no two mutable references to the same element are possible. - let node_ptr = node as *mut ComputeUnit; - Some(&mut *node_ptr) - } - } else { - None - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn deserialize_from_str() { - let json_str = r#" -{ - "name": "example", - "version": "v1", - "dag": [ - { - "name": "ComputeUnit1", - "dependency": [ - - ], - "spec": { - "cmd": [ - "ls" - ], - "image": "" - }, - "channel": { - "spec": { - "cmd": [ - "bufsize", - "1024" - ], - "image": "jiaozifs:" - } - } - }, - { - "name": "ComputeUnit2", - "node_type": "ComputeUnit", - "dependency": [ - "ComputeUnit1" - ], - "spec": { - "cmd": [ - "ls" - ], - "image": "" - } - } - ] -} - "#; - let mut result = Dag::from_json(json_str).unwrap(); - let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); - assert_eq!(["ComputeUnit1", "ComputeUnit2"], node_names.as_slice()); - - for (index, node) in result.iter_mut().enumerate() { - node.name = "node".to_string() + &index.to_string(); - } - - let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); - assert_eq!(["node0", "node1"], node_names.as_slice()); - } -} diff --git a/src/dag/graph.rs b/src/dag/graph.rs index 5ddf317..2161d0d 100644 --- a/src/dag/graph.rs +++ b/src/dag/graph.rs @@ -150,6 +150,7 @@ impl Graph { } /// Get the out degree of a node. + #[allow(dead_code)] pub(crate) fn get_out_degree(&self, id: &str) -> usize { match self.adj.get(id) { Some(id) => id.len(), @@ -158,6 +159,7 @@ impl Graph { } /// Get the in_comming nodes. + #[allow(dead_code)] pub(crate) fn get_in_degree(&self, id: &str) -> usize { match self.in_degree.get(id) { Some(id) => id.len(), @@ -183,6 +185,7 @@ impl Graph { /// Get all the successors of a node (direct or indirect). /// This function will return a vector of indices of successors (including itself). + #[allow(dead_code)] pub(crate) fn get_node_successors(&self, id: &str) -> Vec { match self.adj.get(id) { Some(outs) => { diff --git a/src/dag/mod.rs b/src/dag/mod.rs index 120e4f5..fb00bea 100644 --- a/src/dag/mod.rs +++ b/src/dag/mod.rs @@ -1,4 +1,244 @@ -mod dag; mod graph; -pub use dag::*; +use graph::Graph; +use crate::{ + core::ComputeUnit, + utils::IntoAnyhowResult, +}; +use anyhow::{ + anyhow, + Ok, + Result, +}; +use std::collections::HashMap; + +pub struct Dag { + pub raw: String, + pub name: String, + nodes: HashMap, + /// Store dependency relations. + rely_graph: Graph, +} + +impl Default for Dag { + fn default() -> Self { + Self::new() + } +} + +impl Dag { + pub fn new() -> Self { + Dag { + raw: String::new(), + name: String::new(), + nodes: HashMap::new(), + rely_graph: Graph::new(), + } + } + + //add_node add a compute unit to graph, if this unit alread exit, ignore it + pub fn add_node(&mut self, node: ComputeUnit) -> &mut Self { + if !self.nodes.contains_key(&node.name) { + self.rely_graph.add_node(node.name.clone()); + self.nodes.insert(node.name.clone(), node); + } + self + } + + //add_node add a compute unit to graph, if this unit alread exit, ignore it + pub fn set_edge(&mut self, from: &str, to: &str) -> Result<&mut Self> { + if !self.nodes.contains_key(from) { + return Err(anyhow!("from node not exit")); + } + + if !self.nodes.contains_key(to) { + return Err(anyhow!("from node not exit")); + } + + self.rely_graph.add_edge(from, to); + Ok(self) + } + + pub fn get_node(&self, node_id: &str) -> Option<&ComputeUnit> { + self.nodes.get(node_id) + } + + pub fn get_incomming_nodes(&self, node_id: &str) -> Vec<&str> { + self.rely_graph.get_incoming_nodes(node_id) + } + + pub fn get_outgoing_nodes(&self, node_id: &str) -> Vec<&str> { + self.rely_graph.get_outgoing_nodes(node_id) + } + + // from_json build graph from json string + pub fn from_json(json: &str) -> Result { + let value: serde_json::Value = serde_json::from_str(json)?; + + let dag_name: &str = value + .get("name") + .anyhow("name must exit") + .map(|v| v.as_str().anyhow("name must be string"))??; + + let dag = value.get("dag").anyhow("dag must exit")?; + + let mut nodes = vec![]; + for node in dag.as_array().anyhow("dag must be a arrary")?.iter() { + nodes.push(serde_json::from_value::(node.clone())?); + } + + let node_ids: Vec = nodes.iter().map(|node| node.name.clone()).collect(); + let mut rely_graph = Graph::with_nodes(&node_ids); + for node in nodes.iter() { + node.dependency.iter().for_each(|v| { + rely_graph.add_edge(v, &node.name); + }); + } + + let nodes_map: HashMap = nodes + .into_iter() + .map(|node| (node.name.clone(), node)) + .collect(); + Ok(Dag { + name: dag_name.to_string(), + raw: json.to_string(), + nodes: nodes_map, + rely_graph, + }) + } + + // is_validated do some check on the graph to ensure that all correct + pub fn is_validated(&self) -> bool { + todo!(); + //check id + + //must have channel except the end node + } + + // iter immutable iterate over graph + pub fn iter(&self) -> GraphIter { + GraphIter { + index: 0, + data: &self.nodes, + toped_graph: self.rely_graph.topo_sort(), + } + } + + // iter_mut mutable iterate over graph + pub fn iter_mut(&mut self) -> GraphIterMut { + GraphIterMut { + index: 0, + data: &mut self.nodes, + toped_graph: self.rely_graph.topo_sort(), + } + } +} + +pub struct GraphIter<'a> { + index: usize, + data: &'a HashMap, + toped_graph: Vec, +} + +impl<'a> Iterator for GraphIter<'a> { + type Item = &'a ComputeUnit; + + fn next(&mut self) -> Option { + if self.index < self.data.len() { + let item = self.toped_graph[self.index].clone(); + self.index += 1; + Some(self.data.get(&item).expect("node added in previous step")) + } else { + None + } + } +} + +pub struct GraphIterMut<'a> { + index: usize, + data: &'a mut HashMap, + toped_graph: Vec, +} + +impl<'a> Iterator for GraphIterMut<'a> { + type Item = &'a mut ComputeUnit; + + fn next(&mut self) -> Option { + if self.index < self.data.len() { + let item = self.toped_graph[self.index].clone(); + self.index += 1; + let node = self + .data + .get_mut(&item) + .expect("node added in previous step"); + unsafe { + // SAFETY: We ensure no two mutable references to the same element are possible. + let node_ptr = node as *mut ComputeUnit; + Some(&mut *node_ptr) + } + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_from_str() { + let json_str = r#" +{ + "name": "example", + "version": "v1", + "dag": [ + { + "name": "ComputeUnit1", + "dependency": [ + + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + }, + "channel": { + "spec": { + "cmd": [ + "bufsize", + "1024" + ], + "image": "jiaozifs:" + } + } + }, + { + "name": "ComputeUnit2", + "node_type": "ComputeUnit", + "dependency": [ + "ComputeUnit1" + ], + "spec": { + "cmd": [ + "ls" + ], + "image": "" + } + } + ] +} + "#; + let mut result = Dag::from_json(json_str).unwrap(); + let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); + assert_eq!(["ComputeUnit1", "ComputeUnit2"], node_names.as_slice()); + + for (index, node) in result.iter_mut().enumerate() { + node.name = "node".to_string() + &index.to_string(); + } + + let node_names: Vec<_> = result.iter().map(|node| node.name.clone()).collect(); + assert_eq!(["node0", "node1"], node_names.as_slice()); + } +} diff --git a/src/driver/driver.rs b/src/driver/driver.rs deleted file mode 100644 index d652d5a..0000000 --- a/src/driver/driver.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::dag::Dag; -use anyhow::Result; -use serde::{Deserialize, Serialize}; -use std::{ - collections::HashMap, future::Future, sync::{ - Arc, - Mutex, - } -}; - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct PodStauts { - pub state: String, - pub disk_usage: u32, - pub cpu_usage: u32, - pub memory_usage: u32, -} - -#[derive(Debug, Default, Serialize,Deserialize)] -pub struct NodeStatus { - pub name: String, - pub replicas: u32, - pub storage: String, - pub pods: HashMap, -} - -pub trait UnitHandler: Send { - fn name(&self) -> &str; - //pause graph running for now - fn status(&self) -> impl Future> + Send; - - //pause graph running for now - fn pause(&mut self) -> impl Future> + Send; - - //restart paused graph - fn restart(&mut self) -> impl Future> + Send; - - //stop resource about this graph - fn stop(&mut self) -> impl Future> + Send; - - //return a channel handler - fn channel_handler( - &self, - ) -> impl Future>>>> + Send; -} - -pub trait ChannelHandler: Send { - fn name(&self) -> &str; - //pause graph running for now - fn status(&self) -> impl Future> + Send; - - //pause graph running for now - fn pause(&mut self) -> impl Future> + Send; - - //restart paused graph - fn restart(&mut self) -> impl Future> + Send; - - //stop resource about this graph - fn stop(&mut self) -> impl Future> + Send; -} - -pub trait PipelineController: Send { - type Output: UnitHandler; - - fn nodes(&self) -> Result>; - - fn get_node(&self, id: &str) - -> impl std::future::Future> + Send; - - fn get_node_mut( - &mut self, - id: &str, - ) -> impl std::future::Future> + Send; -} - -pub trait Driver: 'static + Clone + Send + Sync { - //deploy graph to cluster - fn deploy( - &self, - namespace: &str, - graph: &Dag, - ) -> impl Future> + Send; - - //attach cluster in cloud with graph - fn attach( - &self, - namespace: &str, - graph: &Dag, - ) -> impl Future> + Send; - - //clean all resource about this graph - fn clean(&self, namespace: &str) -> impl Future> + Send; -} diff --git a/src/driver/kube.rs b/src/driver/kube.rs index a86e4c3..2ad27b0 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -1,10 +1,10 @@ use super::{ ChannelHandler, Driver, - PipelineController, - UnitHandler, NodeStatus, + PipelineController, PodStauts, + UnitHandler, }; use crate::{ core::{ @@ -20,7 +20,10 @@ use crate::{ }, dag::Dag, dbrepo::MongoRunDbRepo, - utils::{IntoAnyhowResult, StdIntoAnyhowResult}, + utils::{ + IntoAnyhowResult, + StdIntoAnyhowResult, + }, }; use anyhow::{ anyhow, @@ -36,13 +39,19 @@ use handlebars::{ RenderError, }; use k8s_openapi::api::{ - apps::v1::StatefulSet, core::v1::{ - Namespace, PersistentVolumeClaim, Pod, Service - } + apps::v1::StatefulSet, + core::v1::{ + Namespace, + PersistentVolumeClaim, + Pod, + Service, + }, }; use kube::{ api::{ - DeleteParams, ListParams, PostParams + DeleteParams, + ListParams, + PostParams, }, runtime::reflector::Lookup, Api, @@ -53,10 +62,6 @@ use std::{ collections::HashMap, default::Default, marker::PhantomData, - sync::{ - Arc, - Mutex, - }, }; use tokio_retry::{ strategy::ExponentialBackoff, @@ -69,12 +74,12 @@ where R: JobDbRepo, { pub(crate) client: Client, - pub(crate) node_name:String, + pub(crate) node_name: String, pub(crate) namespace: String, pub(crate) stateset_name: String, pub(crate) claim_name: String, - pub(crate) service_name: String, - pub(crate) db_repo: R, + pub(crate) _service_name: String, + pub(crate) _db_repo: R, } impl ChannelHandler for KubeChannelHander @@ -84,43 +89,64 @@ where fn name(&self) -> &str { &self.node_name } - + async fn status(&self) -> Result { - let statefulset_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let claim_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let service_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let pods_api:Api = Api::namespaced(self.client.clone(), &self.namespace); + let statefulset_api: Api = + Api::namespaced(self.client.clone(), &self.namespace); + let claim_api: Api = + Api::namespaced(self.client.clone(), &self.namespace); + let pods_api: Api = Api::namespaced(self.client.clone(), &self.namespace); let statefulset = statefulset_api.get(&self.stateset_name).await.anyhow()?; - let selector = statefulset.spec.as_ref().unwrap().selector.match_labels.as_ref().expect("set in template") - .iter() - .map(|(key, value)| format!("{}={}", key, value)) - .collect::>() - .join(","); + let selector = statefulset + .spec + .as_ref() + .unwrap() + .selector + .match_labels + .as_ref() + .expect("set in template") + .iter() + .map(|(key, value)| format!("{}={}", key, value)) + .collect::>() + .join(","); let list_params = ListParams::default().labels(&selector); let pods = pods_api.list(&list_params).await.anyhow()?; let pvc = claim_api.get(&self.claim_name).await.anyhow()?; - let cap = pvc.status.unwrap().capacity.unwrap().get("storage").map(|cap|cap.0.clone()).unwrap_or_default(); + let cap = pvc + .status + .unwrap() + .capacity + .unwrap() + .get("storage") + .map(|cap| cap.0.clone()) + .unwrap_or_default(); let mut node_status = NodeStatus { name: self.node_name.clone(), - replicas: statefulset.spec.as_ref().and_then(|spec|spec.replicas).unwrap_or_default() as u32, + replicas: statefulset + .spec + .as_ref() + .and_then(|spec| spec.replicas) + .unwrap_or_default() as u32, storage: cap, pods: HashMap::new(), }; - for pod in pods { let pod_name = pod.metadata.name.expect("set in template"); - let phase = pod.status.and_then(|status|status.phase).unwrap_or_default(); + let phase = pod + .status + .and_then(|status| status.phase) + .unwrap_or_default(); - let pod_status = PodStauts{ + let pod_status = PodStauts { state: phase, - disk_usage:0, - cpu_usage:0, - memory_usage:0 + disk_usage: 0, + cpu_usage: 0, + memory_usage: 0, }; node_status.pods.insert(pod_name, pod_status); } @@ -144,12 +170,12 @@ where R: JobDbRepo, { pub(crate) client: Client, - pub(crate) node_name:String, + pub(crate) node_name: String, pub(crate) namespace: String, pub(crate) stateset_name: String, pub(crate) claim_name: String, - pub(crate) service_name: String, - pub(crate) db_repo: R, + pub(crate) _service_name: String, + pub(crate) _db_repo: R, pub(crate) channel: Option>, } @@ -157,45 +183,69 @@ impl UnitHandler for KubeHandler where R: JobDbRepo, { + type Output = KubeChannelHander; + fn name(&self) -> &str { &self.node_name } async fn status(&self) -> Result { - let statefulset_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let claim_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let service_api: Api = Api::namespaced(self.client.clone(), &self.namespace); - let pods_api:Api = Api::namespaced(self.client.clone(), &self.namespace); + let statefulset_api: Api = + Api::namespaced(self.client.clone(), &self.namespace); + let claim_api: Api = + Api::namespaced(self.client.clone(), &self.namespace); + let pods_api: Api = Api::namespaced(self.client.clone(), &self.namespace); let statefulset = statefulset_api.get(&self.stateset_name).await.anyhow()?; - let selector = statefulset.spec.as_ref().unwrap().selector.match_labels.as_ref().expect("set in template") - .iter() - .map(|(key, value)| format!("{}={}", key, value)) - .collect::>() - .join(","); + let selector = statefulset + .spec + .as_ref() + .unwrap() + .selector + .match_labels + .as_ref() + .expect("set in template") + .iter() + .map(|(key, value)| format!("{}={}", key, value)) + .collect::>() + .join(","); let list_params = ListParams::default().labels(&selector); let pods = pods_api.list(&list_params).await.anyhow()?; let pvc = claim_api.get(&self.claim_name).await.anyhow()?; - let cap = pvc.status.unwrap().capacity.unwrap().get("storage").map(|cap|cap.0.clone()).unwrap_or_default(); + let cap = pvc + .status + .unwrap() + .capacity + .unwrap() + .get("storage") + .map(|cap| cap.0.clone()) + .unwrap_or_default(); let mut node_status = NodeStatus { name: self.node_name.clone(), - replicas: statefulset.spec.as_ref().and_then(|spec|spec.replicas).unwrap_or_default() as u32, + replicas: statefulset + .spec + .as_ref() + .and_then(|spec| spec.replicas) + .unwrap_or_default() as u32, storage: cap, pods: HashMap::new(), }; for pod in pods { let pod_name = pod.metadata.name.expect("set in template"); - let phase = pod.status.and_then(|status|status.phase).unwrap_or_default(); + let phase = pod + .status + .and_then(|status| status.phase) + .unwrap_or_default(); - let pod_status = PodStauts{ + let pod_status = PodStauts { state: phase, - disk_usage:0, - cpu_usage:0, - memory_usage:0 + disk_usage: 0, + cpu_usage: 0, + memory_usage: 0, }; node_status.pods.insert(pod_name, pod_status); } @@ -215,8 +265,8 @@ where } #[allow(refining_impl_trait)] - async fn channel_handler(&self) -> Result>>>> { - todo!() + async fn channel_handler(&self) -> Result>> { + Ok(self.channel.as_ref()) } } @@ -224,8 +274,8 @@ pub struct KubePipelineController where R: JobDbRepo, { - client: Client, - db_repo: R, + _client: Client, + _db_repo: R, handlers: HashMap>, } @@ -235,8 +285,8 @@ where { fn new(repo: R, client: Client) -> Self { Self { - db_repo: repo, - client, + _db_repo: repo, + _client: client, handlers: Default::default(), } } @@ -509,11 +559,11 @@ where .name() .expect("set name in template") .to_string(), - service_name: channel_service + _service_name: channel_service .name() .expect("set name in template") .to_string(), - db_repo: repo.clone(), + _db_repo: repo.clone(), }), vec![channel_node_name], ) @@ -596,12 +646,12 @@ where .name() .expect("set name in template") .to_string(), - service_name: unit_service + _service_name: unit_service .name() .expect("set name in template") .to_string(), channel: channel_handler, - db_repo: repo.clone(), + _db_repo: repo.clone(), }; pipeline_ctl.handlers.insert(node.name.clone(), handler); @@ -639,7 +689,7 @@ where .await?; Some(KubeChannelHander { - node_name:node.name.clone() + "-channel", + node_name: node.name.clone() + "-channel", client: self.client.clone(), namespace: run_id.to_string().clone(), stateset_name: channel_statefulset @@ -650,11 +700,11 @@ where .name() .expect("set name in template") .to_string(), - service_name: channel_service + _service_name: channel_service .name() .expect("set name in template") .to_string(), - db_repo: repo.clone(), + _db_repo: repo.clone(), }) } else { None @@ -684,12 +734,12 @@ where .name() .expect("set name in template") .to_string(), - service_name: unit_service + _service_name: unit_service .name() .expect("set name in template") .to_string(), channel: channel_handler, - db_repo: repo.clone(), + _db_repo: repo.clone(), }; pipeline_ctl.handlers.insert(node.name.clone(), handler); @@ -787,6 +837,4 @@ mod tests { kube_driver.deploy("ntest", &dag).await.unwrap(); // kube_driver.clean("ntest").await.unwrap(); } - - fn is_send_sync(v: T) {} } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 5a0a622..0b36277 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -1,4 +1,98 @@ -mod driver; pub mod kube; -pub use driver::*; +use crate::dag::Dag; +use anyhow::Result; +use serde::{ + Deserialize, + Serialize, +}; +use std::{ + collections::HashMap, + future::Future, +}; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct PodStauts { + pub state: String, + pub disk_usage: u32, + pub cpu_usage: u32, + pub memory_usage: u32, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct NodeStatus { + pub name: String, + pub replicas: u32, + pub storage: String, + pub pods: HashMap, +} + +pub trait UnitHandler: Send { + type Output: ChannelHandler; + + fn name(&self) -> &str; + //pause graph running for now + fn status(&self) -> impl Future> + Send; + + //pause graph running for now + fn pause(&mut self) -> impl Future> + Send; + + //restart paused graph + fn restart(&mut self) -> impl Future> + Send; + + //stop resource about this graph + fn stop(&mut self) -> impl Future> + Send; + + //return a channel handler + fn channel_handler( + &self, + ) -> impl Future>> + Send; +} + +pub trait ChannelHandler: Send { + fn name(&self) -> &str; + //pause graph running for now + fn status(&self) -> impl Future> + Send; + + //pause graph running for now + fn pause(&mut self) -> impl Future> + Send; + + //restart paused graph + fn restart(&mut self) -> impl Future> + Send; + + //stop resource about this graph + fn stop(&mut self) -> impl Future> + Send; +} + +pub trait PipelineController: Send { + type Output: UnitHandler; + + fn nodes(&self) -> Result>; + + fn get_node(&self, id: &str) + -> impl std::future::Future> + Send; + + fn get_node_mut( + &mut self, + id: &str, + ) -> impl std::future::Future> + Send; +} + +pub trait Driver: 'static + Clone + Send + Sync { + //deploy graph to cluster + fn deploy( + &self, + namespace: &str, + graph: &Dag, + ) -> impl Future> + Send; + + //attach cluster in cloud with graph + fn attach( + &self, + namespace: &str, + graph: &Dag, + ) -> impl Future> + Send; + + //clean all resource about this graph + fn clean(&self, namespace: &str) -> impl Future> + Send; +} diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index e39edf3..53529dc 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -7,29 +7,42 @@ use crate::{ MainDbRepo, }, dag::Dag, - driver::{NodeStatus, Driver, PipelineController, UnitHandler}, - utils::{IntoAnyhowResult, StdIntoAnyhowResult}, + driver::{ + Driver, + NodeStatus, + PipelineController, + UnitHandler, + }, + utils::{ + IntoAnyhowResult, + StdIntoAnyhowResult, + }, }; use anyhow::Result; +use futures::future::try_join_all; use kube::Client; use mongodb::bson::oid::ObjectId; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, marker::PhantomData}; +use serde::{ + Deserialize, + Serialize, +}; +use std::{ + collections::HashMap, + marker::PhantomData, +}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{ error, info, }; -use futures::future::try_join_all; -#[derive(Serialize,Deserialize)] +#[derive(Serialize, Deserialize)] pub struct JobDetails { pub job: Job, - pub node_status: HashMap + pub node_status: HashMap, } - #[derive(Clone)] pub struct JobManager where @@ -48,7 +61,7 @@ where JOBR: JobDbRepo, MAINR: MainDbRepo, { - pub async fn new(client: Client, driver: D, db: MAINR, db_url: &str) -> Result { + pub async fn new(_client: Client, driver: D, db: MAINR, _db_url: &str) -> Result { Ok(JobManager { db, driver, @@ -110,18 +123,14 @@ where let namespace = format!("{}-{}", job.name, job.retry_number - 1); let controller = self.driver.attach(&namespace, &dag).await?; let nodes = controller.nodes().anyhow()?; - let nodes_controller = try_join_all(nodes.iter().map(|node_name|{ - controller.get_node(node_name) - })).await?; - + let nodes_controller = + try_join_all(nodes.iter().map(|node_name| controller.get_node(node_name))).await?; + let mut node_status = HashMap::new(); for node_ctl in nodes_controller { let status = node_ctl.status().await?; node_status.insert(node_ctl.name().to_string(), status); } - Ok(JobDetails { - job, - node_status, - }) + Ok(JobDetails { job, node_status }) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 4b074c3..15a8429 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,52 @@ -mod utils; +use core::fmt; -pub use utils::*; +use anyhow::{ + anyhow, + Result, +}; +use tonic::{ + Code, + Status, +}; + +pub trait IntoAnyhowResult { + fn anyhow(self, msg: impl ToString) -> Result; +} + +impl IntoAnyhowResult for Option { + fn anyhow(self, msg: impl ToString) -> Result { + match self { + Some(v) => Ok(v), + None => Err(anyhow!(msg.to_string())), + } + } +} + +pub trait StdIntoAnyhowResult { + fn anyhow(self) -> Result; +} + +impl StdIntoAnyhowResult for std::result::Result +where + E: fmt::Display, +{ + fn anyhow(self) -> Result { + match self { + Ok(v) => Ok(v), + Err(err) => Err(anyhow!("{err}")), + } + } +} + +pub trait AnyhowToGrpc { + fn to_rpc(self, code: Code) -> std::result::Result; +} + +impl AnyhowToGrpc for Result { + fn to_rpc(self, code: Code) -> std::result::Result { + match self { + Ok(v) => Ok(v), + Err(err) => Err(Status::new(code, err.to_string())), + } + } +} diff --git a/src/utils/utils.rs b/src/utils/utils.rs deleted file mode 100644 index 15a8429..0000000 --- a/src/utils/utils.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::fmt; - -use anyhow::{ - anyhow, - Result, -}; -use tonic::{ - Code, - Status, -}; - -pub trait IntoAnyhowResult { - fn anyhow(self, msg: impl ToString) -> Result; -} - -impl IntoAnyhowResult for Option { - fn anyhow(self, msg: impl ToString) -> Result { - match self { - Some(v) => Ok(v), - None => Err(anyhow!(msg.to_string())), - } - } -} - -pub trait StdIntoAnyhowResult { - fn anyhow(self) -> Result; -} - -impl StdIntoAnyhowResult for std::result::Result -where - E: fmt::Display, -{ - fn anyhow(self) -> Result { - match self { - Ok(v) => Ok(v), - Err(err) => Err(anyhow!("{err}")), - } - } -} - -pub trait AnyhowToGrpc { - fn to_rpc(self, code: Code) -> std::result::Result; -} - -impl AnyhowToGrpc for Result { - fn to_rpc(self, code: Code) -> std::result::Result { - match self { - Ok(v) => Ok(v), - Err(err) => Err(Status::new(code, err.to_string())), - } - } -}