diff --git a/.gitignore b/.gitignore index 3efd79d..284f049 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,5 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb - +.vscode dist/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index e452ae2..9c2dc35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "crates/dp_runner", "crates/compute_unit_runner", "crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/dummy_in", "nodes/dummy_out" -, "nodes/copy_in_place"] +, "nodes/copy_in_place", "crates/nodes_sdk"] [workspace.package] repository = "https://github.com/GitDataAI/jz-action" @@ -35,7 +35,7 @@ async-trait = "0.1.81" futures = "0.3.30" clap = {version="4.5.7", features=["derive"]} actix-web = "4.8.0" -awc="3.5.0" +reqwest = {version="0.12.5", features=["json"]} [package] name = "jz_action" @@ -50,6 +50,7 @@ k8s-openapi = { version = "0.22.0", features = ["latest"] } handlebars = "6.0.0" prost = "0.13.1" serde_variant = "0.1.3" +uri="0.4.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] } tokio-retry = {workspace = true} @@ -57,7 +58,6 @@ tokio-stream = {workspace = true} tokio-util= {workspace = true} actix-web = {workspace = true} -awc= {workspace = true} clap = {workspace = true} uuid = {workspace = true} anyhow = {workspace = true} @@ -70,7 +70,7 @@ mongodb = {workspace = true} chrono = {workspace = true} futures = {workspace = true} async-trait = {workspace = true} - +reqwest = {workspace = true} [build-dependencies] tonic-build = "0.12.1" diff --git a/crates/compute_unit_runner/Cargo.toml b/crates/compute_unit_runner/Cargo.toml index 4f2e11d..ed48805 100644 --- a/crates/compute_unit_runner/Cargo.toml +++ b/crates/compute_unit_runner/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] jz_action = { path = "../../"} +nodes_sdk = {path = "../nodes_sdk"} tokio = { workspace = true } tokio-retry = {workspace = true} @@ -23,6 +24,7 @@ async-trait = {workspace = true} chrono = {workspace = true} clap = {workspace = true} futures = {workspace = true} +serde_repr = "0.1" actix-web = "4.8.0" hyperlocal = "0.9.1" diff --git a/crates/compute_unit_runner/dockerfile b/crates/compute_unit_runner/dockerfile index 34f024d..99c2711 100644 --- a/crates/compute_unit_runner/dockerfile +++ b/crates/compute_unit_runner/dockerfile @@ -1,4 +1,4 @@ -FROM jz-action/net-debug +FROM ubuntu:22.04 WORKDIR /app diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index 19350b3..bd60416 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -1,8 +1,4 @@ use compute_unit_runner::{ - fs_cache::{ - FSCache, - FileCache, - }, ipc, media_data_tracker, state_controller::StateController, @@ -12,6 +8,10 @@ use jz_action::{ dbrepo::MongoRunDbRepo, utils::StdIntoAnyhowResult, }; +use nodes_sdk::fs_cache::{ + FSCache, + FileCache, +}; use anyhow::Result; use clap::Parser; @@ -26,13 +26,16 @@ use tokio::{ signal, SignalKind, }, - sync::Mutex, + sync::{ + Mutex, + RwLock, + }, task::JoinSet, }; +use nodes_sdk::monitor_tasks; use tokio_util::sync::CancellationToken; use tracing::{ - error, info, Level, }; @@ -94,7 +97,7 @@ async fn main() -> Result<()> { ); program.run_backend(&mut join_set, token.clone())?; - let program_safe = Arc::new(Mutex::new(program)); + let program_safe = Arc::new(RwLock::new(program)); let server = ipc::start_ipc_server(&args.unix_socket_addr, program_safe.clone()).unwrap(); let handler = server.handle(); @@ -142,11 +145,5 @@ async fn main() -> Result<()> { token.cancel(); }); } - - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - - info!("gracefully shutdown"); - Ok(()) + monitor_tasks(&mut join_set).await } diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index a47cd04..d0097a4 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -27,8 +27,10 @@ use jz_action::{ utils::StdIntoAnyhowResult, }; use serde::{ + ser::SerializeStruct, Deserialize, Serialize, + Serializer, }; use std::{ sync::Arc, @@ -37,7 +39,7 @@ use std::{ use tokio::{ sync::{ oneshot, - Mutex, + RwLock, }, time::sleep, }; @@ -58,6 +60,14 @@ use hyperlocal::{ UnixConnector, Uri, }; +use serde::de::{ + self, + Deserializer, + MapAccess, + SeqAccess, + Visitor, +}; +use serde_repr::*; #[derive(Debug, Serialize, Deserialize)] pub struct AvaiableDataResponse { @@ -76,6 +86,144 @@ impl CompleteDataReq { } } +use std::fmt; +#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)] +#[repr(u8)] +pub enum ErrorNumber { + NotReady = 1, + AlreadyFinish = 2, + DataNotFound = 3, +} + +#[derive(Debug)] +pub enum IPCError { + NodeError { code: ErrorNumber, msg: String }, + UnKnown(String), +} + +impl From for IPCError +where + T: std::error::Error, +{ + fn from(error: T) -> Self { + IPCError::UnKnown(error.to_string()) + } +} + +impl fmt::Display for IPCError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IPCError::NodeError { code, msg } => write!(f, "node error code({:?}): {msg}", code), + IPCError::UnKnown(err) => write!(f, "unknown error: {}", err), + } + } +} + +impl Serialize for IPCError { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + IPCError::NodeError { code, msg } => { + let mut my_error_se = serializer.serialize_struct("IPCError", 2)?; + my_error_se.serialize_field("code", &code)?; + my_error_se.serialize_field("msg", msg.as_str())?; + my_error_se.end() + } + IPCError::UnKnown(msg) => serializer.serialize_str(msg.as_str()), + } + } +} + +impl<'de> Deserialize<'de> for IPCError { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + enum Field { + Code, + Msg, + } + + impl<'de> Deserialize<'de> for Field { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FieldVisitor; + + impl<'de> Visitor<'de> for FieldVisitor { + type Value = Field; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("`code` or `msg`") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "code" => Ok(Field::Code), + "msg" => Ok(Field::Msg), + _ => Err(de::Error::unknown_field(value, &["code", "msg"])), + } + } + } + + deserializer.deserialize_identifier(FieldVisitor) + } + } + + struct DurationVisitor; + + impl<'de> Visitor<'de> for DurationVisitor { + type Value = IPCError; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct IPCError") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(IPCError::UnKnown(value.to_string())) + } + + fn visit_map(self, mut map: V) -> Result + where + V: MapAccess<'de>, + { + let mut code = None; + let mut msg = None; + while let Some(key) = map.next_key()? { + match key { + Field::Code => { + if code.is_some() { + return Err(de::Error::duplicate_field("code")); + } + code = Some(map.next_value()?); + } + Field::Msg => { + if msg.is_some() { + return Err(de::Error::duplicate_field("msg")); + } + msg = Some(map.next_value()?); + } + } + } + let code = code.ok_or_else(|| de::Error::missing_field("code"))?; + let msg = msg.ok_or_else(|| de::Error::missing_field("msg"))?; + Ok(IPCError::NodeError { code, msg }) + } + } + + deserializer.deserialize_any(DurationVisitor) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct SubmitOuputDataReq { pub id: String, @@ -96,99 +244,120 @@ pub struct Status { pub state: TrackerState, } -async fn status(program_mutex: web::Data>>>) -> HttpResponse +async fn status(program_mutex: web::Data>>>) -> HttpResponse where R: JobDbRepo + Clone, { info!("receive status request"); let status = { - let program = program_mutex.lock().await; + let program = program_mutex.read().await; + let local_state = program.local_state.read().await; Status { - state: program.local_state.clone(), + state: local_state.clone(), } }; HttpResponse::Ok().json(status) } async fn process_data_request( - program_mutex: web::Data>>>, + program_mutex: web::Data>>>, ) -> HttpResponse where R: JobDbRepo + Clone, { info!("receive avaiable data reqeust"); - let (tx, rx) = oneshot::channel::>>(); - let sender = loop { - let program = program_mutex.lock().await; - if matches!(program.local_state, TrackerState::Ready) { + let program = program_mutex.read().await; + let local_state = program.local_state.read().await; + if matches!(*local_state, TrackerState::Finish) { + return HttpResponse::BadRequest().json(IPCError::NodeError { + code: ErrorNumber::AlreadyFinish, + msg: "node is already finish".to_string(), + }); + } + + if matches!(*local_state, TrackerState::Ready) { break program.ipc_process_data_req_tx.as_ref().cloned(); } + drop(local_state); drop(program); sleep(Duration::from_secs(5)).await; }; - //read request + let (tx, rx) = oneshot::channel::>>(); match sender { Some(sender) => { if let Err(err) = sender.send(((), tx)).await { return HttpResponse::InternalServerError() - .body(format!("send to avaiable data channel {err}")); + .json(format!("send to avaiable data channel {err}")); } } None => { - return HttpResponse::InternalServerError().body("channel is not ready"); + return HttpResponse::InternalServerError().json("channel is not ready"); } } match rx.await { Ok(Ok(Some(resp))) => HttpResponse::Ok().json(resp), - Ok(Ok(None)) => HttpResponse::NotFound().body("no avaiablle data"), - Ok(Err(err)) => HttpResponse::InternalServerError().body(err.to_string()), - Err(err) => HttpResponse::ServiceUnavailable().body(err.to_string()), + Ok(Ok(None)) => HttpResponse::NotFound().json(IPCError::NodeError { + code: ErrorNumber::DataNotFound, + msg: "no avaiable data".to_string(), + }), + Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()), + Err(err) => HttpResponse::ServiceUnavailable().json(err.to_string()), } } async fn process_completed_request( - program_mutex: web::Data>>>, + program_mutex: web::Data>>>, data: web::Json, ) -> HttpResponse where R: JobDbRepo + Clone, { info!("receive data completed request"); - let (tx, rx) = oneshot::channel::>(); let sender = loop { - let program = program_mutex.lock().await; - if matches!(program.local_state, TrackerState::Ready) { + let program = program_mutex.read().await; + let local_state = program.local_state.read().await; + + if matches!(*local_state, TrackerState::Finish) { + return HttpResponse::BadRequest().json(IPCError::NodeError { + code: ErrorNumber::AlreadyFinish, + msg: "node is already finish".to_string(), + }); + } + + if matches!(*local_state, TrackerState::Ready) { break program.ipc_process_completed_data_tx.as_ref().cloned(); } + drop(local_state); drop(program); sleep(Duration::from_secs(5)).await; }; //read request + let (tx, rx) = oneshot::channel::>(); match sender { Some(sender) => { if let Err(err) = sender.send((data.0, tx)).await { return HttpResponse::InternalServerError() - .body(format!("send to avaiable data channel {err}")); + .json(format!("send to avaiable data channel {err}")); } } None => { - return HttpResponse::InternalServerError().body("channel is not ready"); + return HttpResponse::InternalServerError().json("channel is not ready"); } } match rx.await { - Ok(Ok(resp)) => HttpResponse::Ok().body(resp), - Ok(Err(err)) => HttpResponse::InternalServerError().body(err.to_string()), - Err(err) => HttpResponse::ServiceUnavailable().body(err.to_string()), + Ok(Ok(resp)) => HttpResponse::Ok().json(resp), + Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()), + Err(err) => HttpResponse::ServiceUnavailable().json(err.to_string()), } } async fn process_submit_output_request( - program_mutex: web::Data>>>, + program_mutex: web::Data>>>, data: web::Json, //body: web::Bytes, ) -> HttpResponse @@ -198,76 +367,96 @@ where // let data: SubmitOuputDataReq = serde_json::from_slice(&body).unwrap(); info!("receive submit output request {}", &data.id); - let (tx, rx) = oneshot::channel::>(); let sender = loop { - let program = program_mutex.lock().await; - if matches!(program.local_state, TrackerState::Ready) { + let program = program_mutex.read().await; + let local_state = program.local_state.read().await; + + if matches!(*local_state, TrackerState::Finish) { + return HttpResponse::BadRequest().json(IPCError::NodeError { + code: ErrorNumber::AlreadyFinish, + msg: "node is already finish".to_string(), + }); + } + + if matches!(*local_state, TrackerState::Ready) { break program.ipc_process_submit_output_tx.as_ref().cloned(); } + drop(local_state); drop(program); sleep(Duration::from_secs(5)).await; }; //read request + let (tx, rx) = oneshot::channel::>(); match sender { Some(sender) => { if let Err(err) = sender.send((data.0, tx)).await { return HttpResponse::InternalServerError() - .body(format!("send to avaiable data channel {err}")); + .json(format!("send to avaiable data channel {err}")); } } None => { - return HttpResponse::InternalServerError().body("channel is not ready"); + return HttpResponse::InternalServerError().json("channel is not ready"); } } match rx.await { - Ok(Ok(resp)) => HttpResponse::Ok().body(resp), - Ok(Err(err)) => HttpResponse::InternalServerError().body(err.to_string()), - Err(err) => HttpResponse::ServiceUnavailable().body(err.to_string()), + Ok(Ok(resp)) => HttpResponse::Ok().json(resp), + Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()), + Err(err) => HttpResponse::ServiceUnavailable().json(err.to_string()), } } async fn process_finish_state_request( - program_mutex: web::Data>>>, + program_mutex: web::Data>>>, ) -> HttpResponse where R: JobDbRepo + Clone, { info!("receive finish state request"); - let (tx, rx) = oneshot::channel::>(); let sender = loop { - let program = program_mutex.lock().await; - if matches!(program.local_state, TrackerState::Ready) { + let program = program_mutex.read().await; + let local_state = program.local_state.read().await; + + if matches!(*local_state, TrackerState::Finish) { + return HttpResponse::BadRequest().json(IPCError::NodeError { + code: ErrorNumber::AlreadyFinish, + msg: "node is already finish".to_string(), + }); + } + + if matches!(*local_state, TrackerState::Ready) { break program.ipc_process_finish_state_tx.as_ref().cloned(); } + drop(local_state); drop(program); sleep(Duration::from_secs(5)).await; }; //read request + let (tx, rx) = oneshot::channel::>(); match sender { Some(sender) => { if let Err(err) = sender.send(((), tx)).await { return HttpResponse::InternalServerError() - .body(format!("send to finish state channel {err}")); + .json(format!("send to finish state channel {err}")); } } None => { - return HttpResponse::InternalServerError().body("channel is not ready"); + return HttpResponse::InternalServerError().json("channel is not ready"); } } match rx.await { - Ok(Ok(resp)) => HttpResponse::Ok().body(resp), - Ok(Err(err)) => HttpResponse::InternalServerError().body(err.to_string()), - Err(err) => HttpResponse::ServiceUnavailable().body(err.to_string()), + Ok(Ok(resp)) => HttpResponse::Ok().json(resp), + Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()), + Err(err) => HttpResponse::ServiceUnavailable().json(err.to_string()), } } pub fn start_ipc_server( unix_socket_addr: &str, - program: Arc>>, + program: Arc>>, ) -> Result where R: JobDbRepo + Clone + Send + Sync + 'static, @@ -294,8 +483,8 @@ where web::scope("/api/v1") .service( web::resource("status") - .get(status::) - .post(process_finish_state_request::) + .get(status::) + .post(process_finish_state_request::), ) .service( web::resource("data") @@ -314,16 +503,19 @@ where } pub trait IPCClient { - fn finish(&self) -> impl std::future::Future> + Send; - fn status(&self) -> impl std::future::Future> + Send; + fn finish(&self) -> impl std::future::Future> + Send; + fn status(&self) -> impl std::future::Future> + Send; fn submit_output( &self, req: SubmitOuputDataReq, - ) -> impl std::future::Future> + Send; - fn complete_result(&self, id: &str) -> impl std::future::Future> + Send; + ) -> impl std::future::Future> + Send; + fn complete_result( + &self, + id: &str, + ) -> impl std::future::Future> + Send; fn request_avaiable_data( &self, - ) -> impl std::future::Future>> + Send; + ) -> impl std::future::Future, IPCError>> + Send; } pub struct IPCClientImpl { @@ -342,41 +534,47 @@ impl IPCClientImpl { } impl IPCClient for IPCClientImpl { - - async fn finish(&self) -> Result<()> { + async fn finish(&self) -> Result<(), IPCError> { let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into(); let req: Request> = Request::builder() .method(Method::POST) .uri(url) .header("Content-Type", "application/json") - .body(Full::default())?; + .body(Full::default()) + .map_err(IPCError::from)?; - let resp = self.client.request(req).await.anyhow()?; + let resp = self.client.request(req).await.map_err(IPCError::from)?; if resp.status().is_success() { return Ok(()); } - let status_code = resp.status(); - let contents = String::from_utf8(resp.collect().await.map(Collected::to_bytes)?.to_vec())?; - Err(anyhow!("submit data fail {} {}", status_code, contents)) + let resp_bytes = resp + .collect() + .await + .map(Collected::to_bytes) + .map_err(IPCError::from)?; + Err(serde_json::from_slice(&resp_bytes).map_err(IPCError::from)?) } - async fn status(&self) -> Result { + async fn status(&self) -> Result { let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into(); let req: Request> = Request::builder() .method(Method::GET) .uri(url) .header("Content-Type", "application/json") - .body(Full::default())?; + .body(Full::default()) + .map_err(IPCError::from)?; - let resp = self.client.request(req).await.anyhow()?; + let resp = self.client.request(req).await.map_err(IPCError::from)?; if !resp.status().is_success() { - let status_code = resp.status(); - let contents = - String::from_utf8(resp.collect().await.map(Collected::to_bytes)?.to_vec())?; - return Err(anyhow!("request status fail {} {}", status_code, contents)); + let resp_bytes = resp + .collect() + .await + .map(Collected::to_bytes) + .map_err(IPCError::from)?; + return Err(serde_json::from_slice(&resp_bytes).map_err(IPCError::from)?); } let contents = resp.collect().await.map(Collected::to_bytes)?; @@ -384,7 +582,7 @@ impl IPCClient for IPCClientImpl { Ok(status) } - async fn submit_output(&self, req: SubmitOuputDataReq) -> Result<()> { + async fn submit_output(&self, req: SubmitOuputDataReq) -> Result<(), IPCError> { let json = serde_json::to_string(&req)?; let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/submit").into(); @@ -392,39 +590,43 @@ impl IPCClient for IPCClientImpl { .method(Method::POST) .uri(url) .header("Content-Type", "application/json") - .body(Full::from(json))?; - let resp = self.client.request(req).await.anyhow()?; + .body(Full::from(json)) + .map_err(IPCError::from)?; + + let resp = self.client.request(req).await.map_err(IPCError::from)?; if resp.status().is_success() { return Ok(()); } - let status_code = resp.status(); - let contents = String::from_utf8(resp.collect().await.map(Collected::to_bytes)?.to_vec())?; - Err(anyhow!("submit data fail {} {}", status_code, contents)) + let resp_bytes = resp + .collect() + .await + .map(Collected::to_bytes) + .map_err(IPCError::from)?; + Err(serde_json::from_slice(&resp_bytes).map_err(IPCError::from)?) } - async fn request_avaiable_data(&self) -> Result> { + async fn request_avaiable_data(&self) -> Result, IPCError> { let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into(); let req: Request> = Request::builder() .method(Method::GET) .uri(url) .header("Content-Type", "application/json") - .body(Full::default())?; + .body(Full::default()) + .map_err(IPCError::from)?; - let resp = self.client.request(req).await.anyhow()?; + let resp = self.client.request(req).await.map_err(IPCError::from)?; if resp.status().as_u16() == StatusCode::NOT_FOUND { return Ok(None); } if !resp.status().is_success() { - let status_code = resp.status(); - let contents = - String::from_utf8(resp.collect().await.map(Collected::to_bytes)?.to_vec())?; - return Err(anyhow!( - "get avaiable data fail {} {}", - status_code, - contents - )); + let resp_bytes = resp + .collect() + .await + .map(Collected::to_bytes) + .map_err(IPCError::from)?; + return Err(serde_json::from_slice(&resp_bytes).map_err(IPCError::from)?); } let contents = resp.collect().await.map(Collected::to_bytes)?; @@ -432,7 +634,7 @@ impl IPCClient for IPCClientImpl { Ok(Some(avaiabel_data)) } - async fn complete_result(&self, id: &str) -> Result<()> { + async fn complete_result(&self, id: &str) -> Result<(), IPCError> { let req = CompleteDataReq::new(id); let json = serde_json::to_string(&req)?; let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into(); @@ -441,19 +643,21 @@ impl IPCClient for IPCClientImpl { .method(Method::POST) .uri(url) .header("Content-Type", "application/json") - .body(Full::from(json))?; + .body(Full::from(json)) + .map_err(IPCError::from)?; - let resp = self.client.request(req).await.anyhow()?; + let resp = self.client.request(req).await.map_err(IPCError::from)?; if resp.status().is_success() { return Ok(()); } - let status_code = resp.status(); - let contents = String::from_utf8(resp.collect().await.map(Collected::to_bytes)?.to_vec())?; - Err(anyhow!("completed data fail {} {}", status_code, contents)) + let resp_bytes = resp + .collect() + .await + .map(Collected::to_bytes) + .map_err(IPCError::from)?; + Err(serde_json::from_slice(&resp_bytes).map_err(IPCError::from)?) } - - } #[cfg(test)] @@ -470,4 +674,39 @@ mod tests { let client = IPCClientImpl::new("/home/hunjixin/code/jz-action/test.d".to_string()); client.request_avaiable_data().await.unwrap(); } + + #[tokio::test] + async fn test_my_error() { + { + let my_err = IPCError::NodeError { + code: ErrorNumber::AlreadyFinish, + msg: "aaaa".to_string(), + }; + let result = serde_json::to_string(&my_err).unwrap(); + assert_eq!(result, r#"{"code":2,"msg":"aaaa"}"#); + + let de_my_err = serde_json::from_str(&result).unwrap(); + match de_my_err { + IPCError::NodeError { code, msg } => { + assert_eq!(code, ErrorNumber::AlreadyFinish); + assert_eq!(msg, "aaaa"); + } + _ => Err(anyhow!("not expect type")).unwrap(), + } + } + + { + let my_err = IPCError::UnKnown("bbbbbbbbbbbbbb".to_string()); + let result = serde_json::to_string(&my_err).unwrap(); + assert_eq!(result, r#""bbbbbbbbbbbbbb""#); + + let de_my_err = serde_json::from_str(&result).unwrap(); + match de_my_err { + IPCError::UnKnown(msg) => { + assert_eq!(msg, "bbbbbbbbbbbbbb"); + } + _ => Err(anyhow!("not expect type")).unwrap(), + } + } + } } diff --git a/crates/compute_unit_runner/src/lib.rs b/crates/compute_unit_runner/src/lib.rs index 2e51468..43f9186 100644 --- a/crates/compute_unit_runner/src/lib.rs +++ b/crates/compute_unit_runner/src/lib.rs @@ -1,7 +1,3 @@ -pub mod fs_cache; pub mod ipc; pub mod media_data_tracker; - -mod mprc; -mod multi_sender; pub mod state_controller; diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index 909f27c..0242925 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,10 +1,7 @@ -use crate::{ - fs_cache::FileCache, - ipc::{ - AvaiableDataResponse, - CompleteDataReq, - SubmitOuputDataReq, - }, +use crate::ipc::{ + AvaiableDataResponse, + CompleteDataReq, + SubmitOuputDataReq, }; use anyhow::{ anyhow, @@ -36,18 +33,23 @@ use tonic::{ Code, }; use tracing::{ + debug, error, info, warn, }; -use crate::multi_sender::MultiSender; +use nodes_sdk::{ + fs_cache::FileCache, + multi_sender::MultiSender, +}; use tokio::{ select, sync::{ broadcast, mpsc, oneshot, + RwLock, }, task::JoinSet, time::{ @@ -72,7 +74,7 @@ where pub(crate) repo: R, - pub(crate) local_state: TrackerState, + pub(crate) local_state: Arc>, pub(crate) up_nodes: Vec, @@ -113,7 +115,7 @@ where buf_size, name: name.to_string(), repo: repo, - local_state: TrackerState::Init, + local_state: Arc::new(RwLock::new(TrackerState::Init)), up_nodes: up_nodes, upstreams: upstreams, downstreams: downstreams, @@ -155,23 +157,26 @@ where .map(|count| info!("revert {count} SelectForSent data to Received"))?; //check ready if both upnodes is finish and no pending data, we think it finish - let is_all_success = try_join_all(up_nodes.iter().map(|node_name|db_repo.get_node_by_name(node_name))).await + if up_nodes.len() > 0 { + let is_all_success = try_join_all(up_nodes.iter().map(|node_name|db_repo.get_node_by_name(node_name))).await .map_err(|err|anyhow!("query node data {err}"))? .iter() .any(|node| matches!(node.state, TrackerState::Finish)); - if is_all_success { - let running_state = &[ - &DataState::Received, - &DataState::Assigned, - &DataState::SelectForSend, - &DataState::PartialSent, - &DataState::Sent - ]; - if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 { - db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?; - info!("node was finished, not need to run backend"); - return anyhow::Ok(()); + if is_all_success { + let running_state = &[ + &DataState::Received, + &DataState::Assigned, + &DataState::SelectForSend, + &DataState::PartialSent + ]; + + if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 && + db_repo.count(&node_name, &[&DataState::Received,&DataState::Assigned], &Direction::In).await? == 0 { + db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?; + info!("node was finished, not need to run backend"); + return anyhow::Ok(()); + } } } info!("backend thread end {:?}", now.elapsed()); @@ -315,6 +320,7 @@ where let node_name = self.name.clone(); let buf_size = self.buf_size; let token = token.clone(); + let local_state = self.local_state.clone(); join_set.spawn(async move { loop { @@ -323,6 +329,12 @@ where return anyhow::Ok(()); } Some((req, resp)) = ipc_process_submit_result_rx.recv() => { + //check finish state + if matches!(*local_state.read().await, TrackerState::Finish) { + resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); + continue; + } + loop { if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], &Direction::Out).await.and_then(|count|{ if count > buf_size { @@ -378,10 +390,10 @@ where let url = self.upstreams[0].clone(); let fs_cache = self.fs_cache.clone(); let token = token.clone(); + let local_state = self.local_state.clone(); join_set.spawn(async move { // let mut client = DataStreamClient::connect(url.clone()).await; - let mut client: Option> = None; loop { select! { @@ -389,6 +401,11 @@ where return anyhow::Ok(()); } Some((_, resp)) = ipc_process_data_req_rx.recv() => { + //check finish state + if matches!(*local_state.read().await, TrackerState::Finish) { + resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); + continue; + } //select a unassgined data info!("try to find avaiable data"); if client.is_none() { @@ -421,10 +438,9 @@ where _=>{} } - let res_data = AvaiableDataResponse{ id: data.id.clone(), - size: data.size, + size: data.size }; if let Err(err) = fs_cache.write(data).await { @@ -484,6 +500,11 @@ where } }, Some((req, resp)) = ipc_process_completed_data_rx.recv() => { + //check finish state + if matches!(*local_state.read().await, TrackerState::Finish) { + resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); + continue; + } match db_repo.update_state(&node_name, &req.id, &Direction::In, &DataState::Processed, None).await{ Ok(_) =>{ // respose with nothing @@ -498,7 +519,24 @@ where } } } - Some((_, resp)) = ipc_process_finish_state_rx.recv() => { + } + } + }); + } + + if self.upstreams.len() == 0 { + //receive event from pod + //inputs nodes need to tell its finish + let db_repo = self.repo.clone(); + let node_name = self.name.clone(); + let token = token.clone(); + join_set.spawn(async move { + loop { + select! { + _ = token.cancelled() => { + return anyhow::Ok(()); + } + Some((_, resp)) = ipc_process_finish_state_rx.recv() => { loop { //query data need to be done let running_state = &[ @@ -506,13 +544,13 @@ where &DataState::Assigned, &DataState::SelectForSend, &DataState::PartialSent, - &DataState::Sent ]; match db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await { Ok(count) => { if count ==0 { break; } + debug!("there are {count} data need to be sent"); }, Err(err) => error!("query node state {err}") } @@ -522,6 +560,7 @@ where match db_repo.update_node_by_name(&node_name, TrackerState::Finish).await{ Ok(_) =>{ // respose with nothing + resp.send(Ok(())).expect("channel only read once"); }, Err(err) => { @@ -531,9 +570,8 @@ where } } } - }); + }); } - Ok(join_set) } } diff --git a/crates/compute_unit_runner/src/state_controller.rs b/crates/compute_unit_runner/src/state_controller.rs index cc77909..aeda65b 100644 --- a/crates/compute_unit_runner/src/state_controller.rs +++ b/crates/compute_unit_runner/src/state_controller.rs @@ -7,7 +7,7 @@ use jz_action::core::db::{ use std::sync::Arc; use tokio::{ select, - sync::Mutex, + sync::RwLock, task::JoinSet, time::{ self, @@ -26,7 +26,7 @@ pub struct StateController where R: JobDbRepo, { - pub program: Arc>>, + pub program: Arc>>, pub _handler: ServerHandle, } @@ -61,20 +61,23 @@ where .await{ Ok(record)=> { debug!("{} fetch state from db", record.node_name); + let mut program_guard = program.write().await; + let mut local_state = program_guard.local_state.write().await; + if *local_state == record.state { + continue + } + let old_local_state = local_state.clone(); + *local_state = record.state.clone(); + info!("update state {:?} -> {:?}", old_local_state, local_state); + drop(local_state); match record.state { TrackerState::Ready => { - let mut program_guard = program.lock().await; - let cloned_token = token.clone(); - if matches!(program_guard.local_state, TrackerState::Init) { + if old_local_state == TrackerState::Init { //start - info!("set to ready state {:?}", record.incoming_streams); - program_guard.local_state = TrackerState::Ready; - join_set = Some(program_guard.route_data(cloned_token).await?); + info!("start data processing {:?}", record.incoming_streams); + join_set = Some(program_guard.route_data(token.clone()).await?); } } - TrackerState::Stop => { - todo!() - } _ => {} } }, diff --git a/crates/dp_runner/Cargo.toml b/crates/dp_runner/Cargo.toml index b846651..2e7b499 100644 --- a/crates/dp_runner/Cargo.toml +++ b/crates/dp_runner/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] jz_action = { path = "../../"} -compute_unit_runner = { path = "../compute_unit_runner"} +nodes_sdk = {path = "../nodes_sdk"} tokio = { workspace = true } tokio-retry = {workspace = true} diff --git a/crates/dp_runner/dockerfile b/crates/dp_runner/dockerfile index 69efb7e..6b77abe 100644 --- a/crates/dp_runner/dockerfile +++ b/crates/dp_runner/dockerfile @@ -1,4 +1,4 @@ -FROM jz-action/net-debug +FROM ubuntu:22.04 WORKDIR /app diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index a1138a0..6433771 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -3,7 +3,6 @@ use anyhow::{ Result, }; use chrono::Utc; -use compute_unit_runner::fs_cache::FileCache; use futures::future::try_join_all; use jz_action::{ core::db::{ @@ -15,6 +14,7 @@ use jz_action::{ }, network::datatransfer::MediaDataBatchResponse, }; +use nodes_sdk::fs_cache::FileCache; use std::{ sync::Arc, time::Duration, diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index ef98139..c63c825 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -11,8 +11,8 @@ use jz_action::{ use anyhow::Result; use channel_tracker::ChannelTracker; use clap::Parser; -use compute_unit_runner::fs_cache::*; use jz_action::core::db::NodeRepo; +use nodes_sdk::fs_cache::*; use state_controller::StateController; use std::{ str::FromStr, @@ -25,7 +25,7 @@ use tokio::{ signal, SignalKind, }, - sync::Mutex, + sync::RwLock, task::JoinSet, }; use tokio_util::sync::CancellationToken; @@ -94,7 +94,7 @@ async fn main() -> Result<()> { ); program.run_backend(&mut join_set, token.clone())?; - let program_safe = Arc::new(Mutex::new(program)); + let program_safe = Arc::new(RwLock::new(program)); { let program_safe = program_safe.clone(); let node_name = args.node_name.clone(); @@ -140,10 +140,5 @@ async fn main() -> Result<()> { }); } - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } diff --git a/crates/dp_runner/src/state_controller.rs b/crates/dp_runner/src/state_controller.rs index 24070c9..b9a66e3 100644 --- a/crates/dp_runner/src/state_controller.rs +++ b/crates/dp_runner/src/state_controller.rs @@ -3,10 +3,13 @@ use jz_action::core::db::{ JobDbRepo, TrackerState, }; -use std::sync::Arc; +use std::{ + io::Write, + sync::Arc, +}; use tokio::{ select, - sync::Mutex, + sync::RwLock, task::JoinSet, time::{ self, @@ -25,7 +28,7 @@ pub struct StateController where R: JobDbRepo, { - pub program: Arc>>, + pub program: Arc>>, } impl StateController @@ -59,22 +62,21 @@ where .await{ Ok(record)=> { debug!("{} fetch state from db", record.node_name); + let mut program_guard = program.write().await; + if program_guard.local_state == record.state { + continue + } + let old_local_state = program_guard.local_state.clone(); + program_guard.local_state = record.state.clone(); + info!("update state {:?} -> {:?}", &old_local_state, &record.state); match record.state { TrackerState::Ready => { - let mut program_guard = program.lock().await; - let cloned_token = token.clone(); - if matches!(program_guard.local_state, TrackerState::Init) { + if old_local_state == TrackerState::Init { //start - info!("set to ready state {:?}", record.incoming_streams); - program_guard.local_state = TrackerState::Ready; - program_guard.upstreams = record.incoming_streams; - program_guard.downstreams = record.outgoing_streams; - join_set = Some(program_guard.route_data(cloned_token).await?); + info!("start data processing {:?}", record.incoming_streams); + join_set = Some(program_guard.route_data(token.clone()).await?); } } - TrackerState::Stop => { - todo!() - } _ => {} } }, diff --git a/crates/dp_runner/src/stream.rs b/crates/dp_runner/src/stream.rs index d6c204f..3d78a74 100644 --- a/crates/dp_runner/src/stream.rs +++ b/crates/dp_runner/src/stream.rs @@ -13,7 +13,7 @@ use jz_action::{ use std::sync::Arc; use tokio::sync::{ oneshot, - Mutex, + RwLock, }; use tonic::{ Request, @@ -27,7 +27,7 @@ pub struct ChannelDataStream where R: JobDbRepo, { - pub(crate) program: Arc>>, + pub(crate) program: Arc>>, } #[tonic::async_trait] @@ -40,7 +40,7 @@ where req: Request, ) -> Result, Status> { let send_tx = { - let program = self.program.lock().await; + let program = self.program.read().await; if program.incoming_tx.is_none() { return Err(Status::internal("not ready")); } @@ -65,7 +65,7 @@ where _: Request, ) -> Result, Status> { let send_tx = { - let program = self.program.lock().await; + let program = self.program.read().await; if program.request_tx.is_none() { return Err(Status::internal("not ready")); } diff --git a/crates/nodes_sdk/Cargo.toml b/crates/nodes_sdk/Cargo.toml new file mode 100644 index 0000000..6803c0f --- /dev/null +++ b/crates/nodes_sdk/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "nodes_sdk" +version = "0.1.0" +repository.workspace = true +license.workspace = true +edition.workspace = true +include.workspace = true + +[dependencies] +jz_action = { path = "../../"} + +tokio = { workspace = true } +tokio-retry = {workspace = true} +tokio-stream = {workspace = true} +tokio-util = {workspace = true} + +anyhow = {workspace = true} +tracing = {workspace = true} +tracing-subscriber = {workspace = true} +tonic = {workspace = true} +serde = {workspace = true} +serde_json = {workspace = true} +uuid = {workspace = true} +mongodb = {workspace = true} +async-trait = {workspace = true} +chrono = {workspace = true} +clap = {workspace = true} +futures = {workspace = true} + +actix-web = "4.8.0" +hyperlocal = "0.9.1" +hyper = "1.4.1" +hyper-util = "0.1.6" +http-body-util = "0.1.2" +walkdir = "2.5.0" +rand = "0.8.5" \ No newline at end of file diff --git a/crates/compute_unit_runner/src/fs_cache.rs b/crates/nodes_sdk/src/fs_cache.rs similarity index 100% rename from crates/compute_unit_runner/src/fs_cache.rs rename to crates/nodes_sdk/src/fs_cache.rs diff --git a/crates/nodes_sdk/src/lib.rs b/crates/nodes_sdk/src/lib.rs new file mode 100644 index 0000000..acdfdd8 --- /dev/null +++ b/crates/nodes_sdk/src/lib.rs @@ -0,0 +1,9 @@ +#![feature(duration_constructors)] + +pub mod fs_cache; +pub mod mprc; +pub mod multi_sender; + +mod sleep_program; + +pub use sleep_program::*; diff --git a/crates/compute_unit_runner/src/mprc.rs b/crates/nodes_sdk/src/mprc.rs similarity index 89% rename from crates/compute_unit_runner/src/mprc.rs rename to crates/nodes_sdk/src/mprc.rs index a205582..fdbc989 100644 --- a/crates/compute_unit_runner/src/mprc.rs +++ b/crates/nodes_sdk/src/mprc.rs @@ -5,7 +5,7 @@ use std::{ vec, }; -pub(crate) struct Mprs +pub struct Mprs where T: Clone, { @@ -16,11 +16,11 @@ impl Mprs where T: Clone, { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Mprs { entries: vec![] } } - pub(crate) fn remove(&mut self, k: &Q) -> Option + pub fn remove(&mut self, k: &Q) -> Option where K: Borrow, Q: Hash + Eq, @@ -34,15 +34,15 @@ where None } - pub(crate) fn count(&self) -> usize { + pub fn count(&self) -> usize { self.entries.len() } - pub(crate) fn iter(&mut self) -> impl Iterator { + pub fn iter(&mut self) -> impl Iterator { self.entries.iter() } - pub(crate) fn insert(&mut self, k: K, sender: T) -> Option + pub fn insert(&mut self, k: K, sender: T) -> Option where K: Hash + Eq, { @@ -52,7 +52,7 @@ where ret } - pub(crate) fn get_random(&mut self) -> Option<&(K, T)> + pub fn get_random(&mut self) -> Option<&(K, T)> where K: Hash + Eq, { diff --git a/crates/compute_unit_runner/src/multi_sender.rs b/crates/nodes_sdk/src/multi_sender.rs similarity index 100% rename from crates/compute_unit_runner/src/multi_sender.rs rename to crates/nodes_sdk/src/multi_sender.rs diff --git a/crates/nodes_sdk/src/sleep_program.rs b/crates/nodes_sdk/src/sleep_program.rs new file mode 100644 index 0000000..69b2dbf --- /dev/null +++ b/crates/nodes_sdk/src/sleep_program.rs @@ -0,0 +1,33 @@ +use std::time::Duration; + +use anyhow::{ + anyhow, + Result, +}; +use tokio::{ + task::JoinSet, + time::sleep, +}; +use tracing::{ + error, + info, +}; + +pub async fn monitor_tasks(join_set: &mut JoinSet>) -> Result<()> { + let mut has_err = false; + while let Some(result) = join_set.join_next().await { + if let Err(err) = result { + has_err = true; + error!("Task exited with error: {err}"); + } + } + + if !has_err { + info!("Gracefully shutting down..."); + // Prevent StatefulSet from restarting by sleeping for a long duration + sleep(Duration::from_days(364 * 100)).await; + Ok(()) + } else { + Err(anyhow!("One or more tasks exited with an error")) + } +} diff --git a/makefile b/makefile index 887ff83..3793b48 100644 --- a/makefile +++ b/makefile @@ -20,10 +20,10 @@ build: build-cd build-dp cargo build --release docker_cd: build-cd - docker build -f ./crates/compute_unit_runner/dockerfile -t jz-action/compute_unit_runner:latest . + docker build -f ./crates/compute_unit_runner/dockerfile -t gitdatateam/compute_unit_runner:latest . docker_dp: build-dp - docker build -f ./crates/dp_runner/dockerfile -t jz-action/dp_runner:latest . + docker build -f ./crates/dp_runner/dockerfile -t gitdatateam/dp_runner:latest . ################## build nodes build-nodes: $(OUTPUT) @@ -43,11 +43,11 @@ build-nodes: $(OUTPUT) cp target/release/copy_in_place $(OUTPUT)/copy_in_place docker_nodes: build-nodes - docker build -f ./nodes/jz_reader/dockerfile -t jz-action/jz_reader:latest . - docker build -f ./nodes/jz_writer/dockerfile -t jz-action/jz_writer:latest . - docker build -f ./nodes/dummy_in/dockerfile -t jz-action/dummy_in:latest . - docker build -f ./nodes/dummy_out/dockerfile -t jz-action/dummy_out:latest . - docker build -f ./nodes/copy_in_place/dockerfile -t jz-action/copy_in_place:latest . + docker build -f ./nodes/jz_reader/dockerfile -t gitdatateam/jz_reader:latest . + docker build -f ./nodes/jz_writer/dockerfile -t gitdatateam/jz_writer:latest . + docker build -f ./nodes/dummy_in/dockerfile -t gitdatateam/dummy_in:latest . + docker build -f ./nodes/dummy_out/dockerfile -t gitdatateam/dummy_out:latest . + docker build -f ./nodes/copy_in_place/dockerfile -t gitdatateam/copy_in_place:latest . ################## minikube docker: docker_cd docker_dp docker_nodes diff --git a/nodes/copy_in_place/Cargo.toml b/nodes/copy_in_place/Cargo.toml index 9dcb339..1fe67a8 100644 --- a/nodes/copy_in_place/Cargo.toml +++ b/nodes/copy_in_place/Cargo.toml @@ -9,6 +9,7 @@ include.workspace = true [dependencies] jz_action = { path = "../../"} compute_unit_runner = {path = "../../crates/compute_unit_runner"} +nodes_sdk = {path = "../../crates/nodes_sdk"} uuid = {workspace = true} tokio = { workspace = true} diff --git a/nodes/copy_in_place/dockerfile b/nodes/copy_in_place/dockerfile index fcadd80..001c960 100644 --- a/nodes/copy_in_place/dockerfile +++ b/nodes/copy_in_place/dockerfile @@ -1,4 +1,4 @@ -FROM jz-action/net-debug +FROM ubuntu:22.04 WORKDIR /app diff --git a/nodes/copy_in_place/src/main.rs b/nodes/copy_in_place/src/main.rs index 7a08cb7..568f08d 100644 --- a/nodes/copy_in_place/src/main.rs +++ b/nodes/copy_in_place/src/main.rs @@ -1,8 +1,13 @@ use anyhow::Result; -use clap::Parser; +use clap::{ + error, + Parser, +}; use compute_unit_runner::ipc::{ self, + ErrorNumber, IPCClient, + IPCError, SubmitOuputDataReq, }; use jz_action::utils::StdIntoAnyhowResult; @@ -26,6 +31,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + debug, error, info, Level, @@ -78,11 +84,7 @@ async fn main() -> Result<()> { }); } - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } async fn copy_in_place(token: CancellationToken, args: Args) -> Result<()> { @@ -94,29 +96,48 @@ async fn copy_in_place(token: CancellationToken, args: Args) -> Result<()> { } let instant = Instant::now(); - let req = client.request_avaiable_data().await?; - if req.is_none() { - sleep(Duration::from_secs(2)).await; - continue; + match client.request_avaiable_data().await { + Ok(Some(req)) => { + let id = req.id; + let path_str = tmp_path.join(&id); + let root_input_dir = path_str.as_path(); + + let new_id = uuid::Uuid::new_v4().to_string(); + let output_dir = tmp_path.join(&new_id); + + fs::rename(root_input_dir, output_dir).await?; + + info!("move data {:?}", instant.elapsed()); + + client.complete_result(&id).await.anyhow()?; + + //submit directory after completed a batch + client + .submit_output(SubmitOuputDataReq::new(&new_id, 30)) + .await + .anyhow()?; + info!("submit new data {:?}", instant.elapsed()); + } + Ok(None) => { + sleep(Duration::from_secs(2)).await; + continue; + } + Err(IPCError::NodeError { code, msg }) => match code { + ErrorNumber::AlreadyFinish => { + return Ok(()); + } + ErrorNumber::NotReady => { + sleep(Duration::from_secs(2)).await; + continue; + } + ErrorNumber::DataNotFound => { + sleep(Duration::from_secs(2)).await; + continue; + } + }, + Err(IPCError::UnKnown(msg)) => { + error!("got unknow error {msg}"); + } } - - let id = req.unwrap().id; - let path_str = tmp_path.join(&id); - let root_input_dir = path_str.as_path(); - - let new_id = uuid::Uuid::new_v4().to_string(); - let output_dir = tmp_path.join(&new_id); - - fs::rename(root_input_dir, output_dir).await?; - - info!("move data {:?}", instant.elapsed()); - - client.complete_result(&id).await?; - - //submit directory after completed a batch - client - .submit_output(SubmitOuputDataReq::new(&new_id, 30)) - .await?; - info!("submit new data {:?}", instant.elapsed()); } } diff --git a/nodes/dummy_in/Cargo.toml b/nodes/dummy_in/Cargo.toml index 089fe98..3ad2ca2 100644 --- a/nodes/dummy_in/Cargo.toml +++ b/nodes/dummy_in/Cargo.toml @@ -9,6 +9,7 @@ include.workspace = true [dependencies] jz_action = { path = "../../"} compute_unit_runner = {path = "../../crates/compute_unit_runner"} +nodes_sdk = {path = "../../crates/nodes_sdk"} uuid = {workspace = true} tokio = { workspace = true} diff --git a/nodes/dummy_in/dockerfile b/nodes/dummy_in/dockerfile index 657c326..f52d1a6 100644 --- a/nodes/dummy_in/dockerfile +++ b/nodes/dummy_in/dockerfile @@ -1,4 +1,4 @@ -FROM jz-action/net-debug +FROM ubuntu:22.04 WORKDIR /app diff --git a/nodes/dummy_in/src/main.rs b/nodes/dummy_in/src/main.rs index f07c64c..8f3fe97 100644 --- a/nodes/dummy_in/src/main.rs +++ b/nodes/dummy_in/src/main.rs @@ -5,7 +5,10 @@ use compute_unit_runner::ipc::{ IPCClient, SubmitOuputDataReq, }; -use jz_action::utils::StdIntoAnyhowResult; +use jz_action::{ + core::db::TrackerState, + utils::StdIntoAnyhowResult, +}; use random_word::Lang; use std::{ path::Path, @@ -24,7 +27,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ - error, info, Level, }; @@ -79,11 +81,7 @@ async fn main() -> Result<()> { }); } - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> { @@ -91,20 +89,24 @@ async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> { let tmp_path = Path::new(&args.tmp_path); let count_file = tmp_path.join("number.txt"); let mut count = if fs::try_exists(&count_file).await? { - let count_str = fs::read_to_string(&count_file).await?; + let count_str = fs::read_to_string(&count_file).await?; str::parse::(count_str.as_str())? } else { 0 }; loop { - if args.total_count > 0 && count > args.total_count { + if args.total_count > 0 && count > args.total_count { + info!("exit pod because work has done"); + if client.status().await.unwrap().state == TrackerState::Finish { + return Ok(()); + } + client.finish().await.unwrap(); return Ok(()); } if token.is_cancelled() { - fs::write(&count_file, count.to_string()).await.unwrap(); return Ok(()); } @@ -126,8 +128,10 @@ async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> { //submit directory after completed a batch client .submit_output(SubmitOuputDataReq::new(&id, 30)) - .await?; - info!("submit new data {:?}", instant.elapsed()); - count+=1; + .await + .anyhow()?; + info!("submit new data({count}) {:?}", instant.elapsed()); + count += 1; + fs::write(&count_file, count.to_string()).await.unwrap(); } } diff --git a/nodes/dummy_out/Cargo.toml b/nodes/dummy_out/Cargo.toml index 11aab28..6d56104 100644 --- a/nodes/dummy_out/Cargo.toml +++ b/nodes/dummy_out/Cargo.toml @@ -9,6 +9,7 @@ include.workspace = true [dependencies] jz_action = { path = "../../"} compute_unit_runner = {path = "../../crates/compute_unit_runner"} +nodes_sdk = {path = "../../crates/nodes_sdk"} uuid = {workspace = true} tokio = { workspace = true} diff --git a/nodes/dummy_out/dockerfile b/nodes/dummy_out/dockerfile index 214e8b5..d762a7c 100644 --- a/nodes/dummy_out/dockerfile +++ b/nodes/dummy_out/dockerfile @@ -1,4 +1,4 @@ -FROM jz-action/net-debug +FROM ubuntu:22.04 WORKDIR /app diff --git a/nodes/dummy_out/src/main.rs b/nodes/dummy_out/src/main.rs index fb41a5d..3c0ee35 100644 --- a/nodes/dummy_out/src/main.rs +++ b/nodes/dummy_out/src/main.rs @@ -1,11 +1,10 @@ -use anyhow::{ - Ok, - Result, -}; +use anyhow::Result; use clap::Parser; use compute_unit_runner::ipc::{ self, + ErrorNumber, IPCClient, + IPCError, }; use jz_action::utils::StdIntoAnyhowResult; use std::{ @@ -24,6 +23,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + debug, error, info, Level, @@ -77,11 +77,7 @@ async fn main() -> Result<()> { }); } - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } async fn write_dummy(token: CancellationToken, args: Args) -> Result<()> { @@ -89,26 +85,46 @@ async fn write_dummy(token: CancellationToken, args: Args) -> Result<()> { let tmp_path = Path::new(&args.tmp_path); loop { if token.is_cancelled() { - return Ok(()); + return anyhow::Ok(()); } - let req = client.request_avaiable_data().await?; - if req.is_none() { - sleep(Duration::from_secs(2)).await; - continue; - } - let id = req.unwrap().id; - let path_str = tmp_path.join(&id); - let root_input_dir = path_str.as_path(); + info!("request data"); + match client.request_avaiable_data().await { + Ok(Some(req)) => { + let id = req.id; + let path_str = tmp_path.join(&id); + let root_input_dir = path_str.as_path(); + + for entry in WalkDir::new(root_input_dir) { + let entry = entry?; + if entry.file_type().is_file() { + let path = entry.path(); + info!("read path {:?}", path); + } + } - for entry in WalkDir::new(root_input_dir) { - let entry = entry?; - if entry.file_type().is_file() { - let path = entry.path(); - info!("read path {:?}", path); + client.complete_result(&id).await.anyhow()?; + } + Ok(None) => { + sleep(Duration::from_secs(2)).await; + continue; + } + Err(IPCError::NodeError { code, msg }) => match code { + ErrorNumber::AlreadyFinish => { + return Ok(()); + } + ErrorNumber::NotReady => { + sleep(Duration::from_secs(2)).await; + continue; + } + ErrorNumber::DataNotFound => { + sleep(Duration::from_secs(2)).await; + continue; + } + }, + Err(IPCError::UnKnown(msg)) => { + error!("got unknow error {msg}"); } } - - client.complete_result(&id).await?; } } diff --git a/nodes/jz_reader/Cargo.toml b/nodes/jz_reader/Cargo.toml index a616105..a9f4a89 100644 --- a/nodes/jz_reader/Cargo.toml +++ b/nodes/jz_reader/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" jz_action = { path = "../../"} compute_unit_runner = {path = "../../crates/compute_unit_runner"} jiaozifs_client_rs = {path = "../../crates/jiaozifs_client_rs"} +nodes_sdk = {path = "../../crates/nodes_sdk"} uuid = {workspace = true} tokio = { workspace = true} diff --git a/nodes/jz_reader/src/main.rs b/nodes/jz_reader/src/main.rs index c84168b..985dc18 100644 --- a/nodes/jz_reader/src/main.rs +++ b/nodes/jz_reader/src/main.rs @@ -25,8 +25,10 @@ use tokio::{ SignalKind, }, sync::mpsc, + task::JoinSet, }; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::{ error, info, @@ -87,14 +89,10 @@ async fn main() -> Result<()> { .try_init() .anyhow()?; - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>(1); + let mut join_set = JoinSet::new(); + let token = CancellationToken::new(); { - let shutdown_tx = shutdown_tx.clone(); - let _ = tokio::spawn(async move { - if let Err(err) = read_jz_fs(args).await { - let _ = shutdown_tx.send(Err(anyhow!("read jz fs {err}"))).await; - } - }); + join_set.spawn(async move { read_jz_fs(args).await }); } { @@ -106,15 +104,11 @@ async fn main() -> Result<()> { _ = sig_term.recv() => info!("Recieve SIGTERM"), _ = sig_int.recv() => info!("Recieve SIGTINT"), }; - let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await; + token.cancel(); }); } - if let Some(Err(err)) = shutdown_rx.recv().await { - error!("program exit with error {:?}", err) - } - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } async fn read_jz_fs(args: Args) -> Result<()> { @@ -172,7 +166,8 @@ async fn read_jz_fs(args: Args) -> Result<()> { //submit directory after completed a batch client .submit_output(SubmitOuputDataReq::new(&id, batch.len() as u32)) - .await?; + .await + .anyhow()?; } // read all files client.finish().await.unwrap(); diff --git a/nodes/jz_writer/Cargo.toml b/nodes/jz_writer/Cargo.toml index a9e6ee3..5060dae 100644 --- a/nodes/jz_writer/Cargo.toml +++ b/nodes/jz_writer/Cargo.toml @@ -10,6 +10,7 @@ include.workspace = true jz_action = { path = "../../"} compute_unit_runner = {path = "../../crates/compute_unit_runner"} jiaozifs_client_rs = {path = "../../crates/jiaozifs_client_rs"} +nodes_sdk = {path = "../../crates/nodes_sdk"} uuid = {workspace = true} tokio = { workspace = true } diff --git a/nodes/jz_writer/src/main.rs b/nodes/jz_writer/src/main.rs index 1db44fa..78306cf 100644 --- a/nodes/jz_writer/src/main.rs +++ b/nodes/jz_writer/src/main.rs @@ -1,12 +1,13 @@ use anyhow::{ anyhow, - Ok, Result, }; use clap::Parser; use compute_unit_runner::ipc::{ self, + ErrorNumber, IPCClient, + IPCError, }; use jiaozifs_client_rs::{ apis::{ @@ -31,10 +32,14 @@ use tokio::{ SignalKind, }, task::JoinSet, - time::sleep, + time::{ + sleep, + Instant, + }, }; use tokio_util::sync::CancellationToken; use tracing::{ + debug, error, info, Level, @@ -118,11 +123,7 @@ async fn main() -> Result<()> { }); } - while let Some(Err(err)) = join_set.join_next().await { - error!("exit spawn {err}"); - } - info!("gracefully shutdown"); - Ok(()) + nodes_sdk::monitor_tasks(&mut join_set).await } async fn write_jz_fs(token: CancellationToken, args: Args) -> Result<()> { @@ -134,7 +135,7 @@ async fn write_jz_fs(token: CancellationToken, args: Args) -> Result<()> { "wip" => { //ensure wip exit apis::wip_api::get_wip(&configuration, &args.owner, &args.repo, &args.ref_name).await?; - Ok(RefType::Wip) + anyhow::Ok(RefType::Wip) } val => Err(anyhow!("ref type not support {}", val)), }?; @@ -143,35 +144,56 @@ async fn write_jz_fs(token: CancellationToken, args: Args) -> Result<()> { let tmp_path = Path::new(&args.tmp_path); loop { if token.is_cancelled() { - return Ok(()); - } - let req = client.request_avaiable_data().await?; - if req.is_none() { - sleep(Duration::from_secs(2)).await; - continue; + return anyhow::Ok(()); } - let id = req.unwrap().id; - - let path_str = tmp_path.join(&id); - let root_input_dir = path_str.as_path(); - for entry in WalkDir::new(root_input_dir) { - let entry = entry?; - if entry.file_type().is_file() { - let path = entry.path(); - let content = fs::read(path).await?; - let rel_path = path.strip_prefix(root_input_dir)?; - apis::objects_api::upload_object( - &configuration, - &args.owner, - &args.repo, - &args.ref_name, - rel_path.to_str().anyhow("path must be validate")?, - Some(true), - Some(content), - ) - .await?; + + let instant = Instant::now(); + match client.request_avaiable_data().await { + Ok(Some(req)) => { + let id = req.id; + let path_str = tmp_path.join(&id); + let root_input_dir = path_str.as_path(); + for entry in WalkDir::new(root_input_dir) { + let entry = entry?; + if entry.file_type().is_file() { + let path = entry.path(); + let content = fs::read(path).await?; + let rel_path = path.strip_prefix(root_input_dir)?; + apis::objects_api::upload_object( + &configuration, + &args.owner, + &args.repo, + &args.ref_name, + rel_path.to_str().anyhow("path must be validate")?, + Some(true), + Some(content), + ) + .await?; + } + } + client.complete_result(&id).await.anyhow()?; + info!("submit new data {:?}", instant.elapsed()); + } + Ok(None) => { + sleep(Duration::from_secs(2)).await; + continue; + } + Err(IPCError::NodeError { code, msg }) => match code { + ErrorNumber::AlreadyFinish => { + return Ok(()); + } + ErrorNumber::NotReady => { + sleep(Duration::from_secs(2)).await; + continue; + } + ErrorNumber::DataNotFound => { + sleep(Duration::from_secs(2)).await; + continue; + } + }, + Err(IPCError::UnKnown(msg)) => { + error!("got unknow error {msg}"); } } - client.complete_result(&id).await?; } } diff --git a/script/example_dag.json b/script/example_dag.json index ab7d523..040de95 100644 --- a/script/example_dag.json +++ b/script/example_dag.json @@ -5,7 +5,7 @@ { "name": "dummy-in", "spec": { - "image": "jz-action/dummy_in:latest", + "image": "gitdatateam/dummy_in:latest", "command": "/dummy_in", "args": [ "--log-level=debug" @@ -19,7 +19,7 @@ "dummy-in" ], "spec": { - "image": "jz-action/copy_in_place:latest", + "image": "gitdatateam/copy_in_place:latest", "command": "/copy_in_place", "replicas": 3, "args": [ @@ -37,7 +37,7 @@ "copy-in-place" ], "spec": { - "image": "jz-action/dummy_out:latest", + "image": "gitdatateam/dummy_out:latest", "command": "/dummy_out", "replicas": 3, "args": [ diff --git a/src/api/client/job.rs b/src/api/client/job.rs index 0d74ddc..7680fcb 100644 --- a/src/api/client/job.rs +++ b/src/api/client/job.rs @@ -9,37 +9,40 @@ use anyhow::{ anyhow, Result, }; -use awc::{ - http::StatusCode, + +use mongodb::bson::oid::ObjectId; +use reqwest::{ Client, + StatusCode, + Url, }; -use mongodb::bson::oid::ObjectId; - pub struct JobClient { - pub(crate) client: Client, - pub(crate) base_uri: String, + pub(crate) client: Client, + pub(crate) base_uri: Url, } impl JobClient { pub async fn create(&self, job: &Job) -> Result { - let mut resp = self + let resp = self .client - .post(self.base_uri.clone() + "/job") - .send_json(&job) + .post(self.base_uri.clone().join("job")?) + .json(&job) + .send() .await .anyhow()?; if !resp.status().is_success() { + let code = resp.status(); let err_msg = resp - .body() + .bytes() .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + return Err(anyhow!("create job {code} reason {err_msg}")); } - resp.body() + resp.bytes() .await .anyhow() .and_then(|body| serde_json::from_slice(&body).anyhow()) @@ -47,9 +50,14 @@ impl JobClient { } pub async fn get(&self, job_id: &ObjectId) -> Result> { - let mut resp = self + let resp = self .client - .get(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) + .get( + self.base_uri + .clone() + .join("job")? + .join(job_id.to_hex().as_str())?, + ) .send() .await .anyhow()?; @@ -59,15 +67,16 @@ impl JobClient { } if !resp.status().is_success() { + let code = resp.status(); let err_msg = resp - .body() + .bytes() .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + return Err(anyhow!("get job {code} reason {err_msg}")); } - resp.body() + resp.bytes() .await .anyhow() .and_then(|body| serde_json::from_slice(&body).anyhow()) @@ -75,40 +84,53 @@ impl JobClient { } pub async fn delete(&self, job_id: &ObjectId) -> Result<()> { - let mut resp = self + let resp = self .client - .delete(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) + .delete( + self.base_uri + .clone() + .join("job")? + .join(job_id.to_hex().as_str())?, + ) .send() .await .anyhow()?; if !resp.status().is_success() { + let code = resp.status(); let err_msg = resp - .body() + .bytes() .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + return Err(anyhow!("delete job {code} reason {err_msg}")); } Ok(()) } pub async fn update(&self, job_id: &ObjectId, update_info: &JobUpdateInfo) -> Result<()> { - let mut resp = self + let resp = self .client - .post(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) - .send_json(update_info) + .post( + self.base_uri + .clone() + .join("job")? + .join(job_id.to_hex().as_str())?, + ) + .json(update_info) + .send() .await .anyhow()?; if !resp.status().is_success() { + let code = resp.status(); let err_msg = resp - .body() + .bytes() .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + return Err(anyhow!("request job {code} reason {err_msg}")); } Ok(()) diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 6c81dc1..f024f7e 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -1,21 +1,28 @@ mod job; -use awc::Client; use anyhow::Result; use job::JobClient; - +use reqwest::{ + header, + Client, + Url, +}; #[derive(Clone)] pub struct JzFlowClient { client: Client, - base_uri: String, + base_uri: Url, } impl JzFlowClient { pub fn new(base_uri: &str) -> Result { - let client = Client::builder() - .add_default_header(("Content-Type", "application/json")) - .finish(); - let base_uri = base_uri.to_string() + "/api/v1"; + let mut headers = header::HeaderMap::new(); + headers.insert( + "Content-Type", + header::HeaderValue::from_static("application/json"), + ); + + let client = Client::builder().default_headers(headers).build()?; + let base_uri = Url::parse(base_uri)?.join("/api/v1/")?; Ok(JzFlowClient { client, base_uri }) } @@ -25,4 +32,4 @@ impl JzFlowClient { base_uri: self.base_uri.clone(), } } -} \ No newline at end of file +} diff --git a/src/api/job_api.rs b/src/api/job_api.rs index 1a03ef0..eeba30c 100644 --- a/src/api/job_api.rs +++ b/src/api/job_api.rs @@ -74,6 +74,25 @@ where } } +async fn job_details( + db_repo: web::Data, + path: web::Path, + query: web::Query, +) -> HttpResponse +where + D: Driver, + MAINR: MainDbRepo, + JOBR: JobDbRepo, +{ + match db_repo + .update(&path.into_inner(), &query.into_inner()) + .await + { + Ok(_) => HttpResponse::Ok().finish(), + Err(err) => HttpResponse::InternalServerError().body(err.to_string()), + } +} + pub(super) fn job_route_config(cfg: &mut web::ServiceConfig) where D: Driver, diff --git a/src/api/server.rs b/src/api/server.rs index 920025a..44005a2 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -1,12 +1,21 @@ - use std::fmt::format; +use crate::{ + core::db::{ + JobDbRepo, + MainDbRepo, + }, + driver::Driver, + job::job_mgr::JobManager, + utils::IntoAnyhowResult, +}; use actix_web::{ dev::Server, error, middleware, web::{ self, + Data, }, App, HttpRequest, @@ -14,16 +23,7 @@ use actix_web::{ HttpServer, }; use anyhow::Result; -use awc::http::Uri; - -use crate::{ - core::db::{ - JobDbRepo, - MainDbRepo, - }, - driver::Driver, - job::job_mgr::JobManager, utils::IntoAnyhowResult, -}; +use reqwest::Url; use super::job_api::job_route_config; @@ -33,7 +33,7 @@ where MAINR: MainDbRepo, JOBR: JobDbRepo, { - cfg.service(web::scope("/job").configure(job_route_config::)); + job_route_config::(cfg); } fn config(cfg: &mut web::ServiceConfig) @@ -47,7 +47,7 @@ where pub fn start_rpc_server( addr: &str, - db_repo: MAINR, + main_db_repo: MAINR, job_manager: JobManager, ) -> Result where @@ -55,9 +55,13 @@ where MAINR: MainDbRepo, JOBR: JobDbRepo, { - let db_repo = db_repo; - let uri = Uri::try_from(addr)?; - let host_port= format!("{}:{}", uri.host().anyhow("host not found")?, uri.port().map(|v|v.as_u16()).unwrap_or_else(||80)); + let main_db_repo = main_db_repo; + let uri = Url::parse(addr)?; + let host_port = format!( + "{}:{}", + uri.host().anyhow("host not found")?, + uri.port().unwrap_or_else(|| 80) + ); let server = HttpServer::new(move || { fn json_error_handler(err: error::JsonPayloadError, _req: &HttpRequest) -> error::Error { use actix_web::error::JsonPayloadError; @@ -75,9 +79,9 @@ where App::new() .wrap(middleware::Logger::default()) - .app_data(db_repo.clone()) - .configure(config::) + .app_data(Data::new(main_db_repo.clone())) .app_data(web::JsonConfig::default().error_handler(json_error_handler)) + .configure(config::) }) .disable_signals() .bind(host_port)? diff --git a/src/bin/jz-flow/global.rs b/src/bin/jz-flow/global.rs index 1646ed1..365d56e 100644 --- a/src/bin/jz-flow/global.rs +++ b/src/bin/jz-flow/global.rs @@ -2,9 +2,17 @@ use clap::Args; #[derive(Debug, Args)] pub(super) struct GlobalOptions { - #[arg(long, default_value = "INFO", help="set log level(TRACE, DEBUG, INFO, WARN, ERROR)")] + #[arg( + long, + default_value = "DEBUG", + help = "set log level(TRACE, DEBUG, INFO, WARN, ERROR)" + )] pub(super) log_level: String, - #[arg(long, default_value = "http://localhost:45131", help="set api address")] + #[arg( + long, + default_value = "http://localhost:45131", + help = "set api address" + )] pub(super) listen: String, } diff --git a/src/bin/jz-flow/job.rs b/src/bin/jz-flow/job.rs index 1ea319c..512f257 100644 --- a/src/bin/jz-flow/job.rs +++ b/src/bin/jz-flow/job.rs @@ -1,9 +1,17 @@ use crate::global::GlobalOptions; use anyhow::Result; use chrono::Utc; -use clap::{Args, Parser, Subcommand}; +use clap::{ + Args, + Parser, + Subcommand, +}; +use jz_action::{ + api::client::JzFlowClient, + core::db::Job, + dag::Dag, +}; use tokio::fs; -use jz_action::{api::client::JzFlowClient, core::db::Job, dag::Dag}; #[derive(Debug, Parser)] pub(super) enum JobCommands { @@ -11,16 +19,18 @@ pub(super) enum JobCommands { Create(JobCreateArgs), } -pub(super) async fn run_job_subcommand(global_opts: GlobalOptions ,command: JobCommands) ->Result<()> { +pub(super) async fn run_job_subcommand( + global_opts: GlobalOptions, + command: JobCommands, +) -> Result<()> { match command { JobCommands::Create(args) => create_job(global_opts, args).await, } } - #[derive(Debug, Args)] pub(super) struct JobCreateArgs { - #[arg(long, help="job name, must be unique")] + #[arg(long, help = "job name, must be unique")] pub(super) name: String, #[arg(long, help = "dag pipline definition")] @@ -29,20 +39,22 @@ pub(super) struct JobCreateArgs { pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs) -> Result<()> { let client = JzFlowClient::new(&global_opts.listen)?.job(); - let dag_config = fs::read_to_string(&args.path).await?; + let dag_config = fs::read_to_string(&args.path).await?; let _ = Dag::from_json(dag_config.as_str())?; let tm = Utc::now().timestamp(); - let job = Job{ - name: args.name.clone(), - graph_json: dag_config, - created_at: tm, - updated_at: tm, - ..Default::default() + let job = Job { + name: args.name.clone(), + graph_json: dag_config, + created_at: tm, + updated_at: tm, + ..Default::default() }; - println!("aaaaad"); let created_job = client.create(&job).await?; - println!("Create job successfully, job ID: {}", created_job.id.to_string()); + println!( + "Create job successfully, job ID: {}", + created_job.id.to_string() + ); Ok(()) } diff --git a/src/bin/jz-flow/main.rs b/src/bin/jz-flow/main.rs index 6928440..882ebd5 100644 --- a/src/bin/jz-flow/main.rs +++ b/src/bin/jz-flow/main.rs @@ -9,7 +9,10 @@ use clap::{ }; use global::GlobalOptions; -use job::{run_job_subcommand, JobCommands}; +use job::{ + run_job_subcommand, + JobCommands, +}; use jz_action::{ core::db::MainDbRepo, utils::StdIntoAnyhowResult, @@ -21,7 +24,6 @@ use run::{ use std::str::FromStr; use tracing::Level; - #[derive(Debug, Parser)] #[command(name = "jz-action-backend", author = "Author Name ", version, about= "jz-action backend", long_about = None, disable_version_flag = true)] struct Cli { @@ -38,7 +40,7 @@ enum Commands { Run(RunArgs), #[command(subcommand)] - Job(JobCommands) + Job(JobCommands), } #[tokio::main(flavor = "multi_thread")] diff --git a/src/bin/jz-flow/run.rs b/src/bin/jz-flow/run.rs index ad193a6..32ca22b 100644 --- a/src/bin/jz-flow/run.rs +++ b/src/bin/jz-flow/run.rs @@ -2,9 +2,7 @@ use anyhow::Result; use clap::Args; use jz_action::{ - api::{ - server::start_rpc_server, - }, + api::server::start_rpc_server, core::db::MainDbRepo, dbrepo::{ MongoMainDbRepo, @@ -33,7 +31,11 @@ use crate::global::GlobalOptions; #[derive(Debug, Args)] pub(super) struct RunArgs { - #[arg(long, default_value="mongodb://192.168.3.163:27017", help="mongo connection string")] + #[arg( + long, + default_value = "mongodb://192.168.3.163:27017", + help = "mongo connection string" + )] mongo_url: String, } @@ -45,7 +47,7 @@ pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Re let db_repo = MongoMainDbRepo::new(db_url.as_str()).await?; let client = Client::try_default().await.unwrap(); - let driver = KubeDriver::new(client.clone(), db_url.as_str()).await?; + let driver = KubeDriver::new(client.clone(), args.mongo_url.as_str()).await?; let job_manager = JobManager::, MongoMainDbRepo, MongoRunDbRepo>::new( client, @@ -54,10 +56,11 @@ pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Re &args.mongo_url, ) .await?; + + job_manager.run_backend(&mut join_set, token.clone())?; let server = start_rpc_server(&global_opts.listen, db_repo, job_manager)?; let handler = server.handle(); { - //listen unix socket let token = token.clone(); let handler = handler.clone(); join_set.spawn(async move { diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index 23081fe..197aa33 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -15,11 +15,12 @@ pub enum JobState { impl Default for JobState { fn default() -> Self { - JobState::Created + JobState::Created } } #[derive(Default, Serialize, Deserialize, Debug)] pub struct Job { + #[serde(rename = "_id")] pub id: ObjectId, pub name: String, pub graph_json: String, diff --git a/src/driver/driver.rs b/src/driver/driver.rs index debbdf9..a91efa4 100644 --- a/src/driver/driver.rs +++ b/src/driver/driver.rs @@ -7,7 +7,7 @@ use std::{ Mutex, }, }; -pub trait UnitHandler:Send { +pub trait UnitHandler: Send { //pause graph running for now fn pause(&mut self) -> impl Future> + Send; @@ -23,7 +23,7 @@ pub trait UnitHandler:Send { ) -> impl Future>>>> + Send; } -pub trait ChannelHandler:Send { +pub trait ChannelHandler: Send { //pause graph running for now fn pause(&mut self) -> impl Future> + Send; @@ -34,7 +34,7 @@ pub trait ChannelHandler:Send { fn stop(&mut self) -> impl Future> + Send; } -pub trait PipelineController:Send { +pub trait PipelineController: Send { fn get_node<'a>( &'a self, id: &'a String, diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 6a9f62c..393fa64 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -129,7 +129,7 @@ pub struct KubePipelineController where R: JobDbRepo, { - pub db_repo: R, + db_repo: R, handlers: HashMap>, } @@ -287,7 +287,9 @@ where Self::ensure_namespace_exit_and_clean(&self.client, run_id).await?; let db_url = self.db_url.to_string() + "/" + run_id; - let repo = MongoRunDbRepo::new(db_url.as_str()).await?; + let repo = MongoRunDbRepo::new(db_url.as_str()) + .await + .map_err(|err| anyhow!("create database fail {err}"))?; let statefulset_api: Api = Api::namespaced(self.client.clone(), run_id); let claim_api: Api = Api::namespaced(self.client.clone(), run_id); let service_api: Api = Api::namespaced(self.client.clone(), run_id); @@ -481,8 +483,67 @@ where } #[allow(refining_impl_trait)] - async fn attach(&self, _namespace: &str, _graph: &Dag) -> Result> { - todo!() + async fn attach( + &self, + run_id: &str, + graph: &Dag, + ) -> Result> { + let db_url = self.db_url.to_string() + "/" + run_id; + let repo = MongoRunDbRepo::new(db_url.as_str()) + .await + .map_err(|err| anyhow!("create database fail {err}"))?; + let statefulset_api: Api = Api::namespaced(self.client.clone(), run_id); + let claim_api: Api = Api::namespaced(self.client.clone(), run_id); + let service_api: Api = Api::namespaced(self.client.clone(), run_id); + + let mut pipeline_ctl = KubePipelineController::new(repo.clone()); + for node in graph.iter() { + let up_nodes = graph.get_incomming_nodes(&node.name); + // query channel + let channel_handler = if up_nodes.len() > 0 { + let claim_deployment = claim_api + .get((node.name.clone() + "-channel-claim").as_str()) + .await?; + let channel_statefulset = statefulset_api + .get((node.name.clone() + "-channel-statefulset").as_str()) + .await?; + let channel_service = service_api + .get((node.name.clone() + "-channel-headless-service").as_str()) + .await?; + + Some(KubeChannelHander { + claim: claim_deployment, + statefulset: channel_statefulset, + service: channel_service, + db_repo: repo.clone(), + }) + } else { + None + }; + + // apply nodes + let claim_deployment = claim_api + .get((node.name.clone() + "-node-claim").as_str()) + .await?; + let unit_statefulset = statefulset_api + .get((node.name.clone() + "-statefulset").as_str()) + .await?; + + let unit_service = service_api + .get((node.name.clone() + "-headless-service").as_str()) + .await?; + + let handler = KubeHandler { + claim: claim_deployment, + statefulset: unit_statefulset, + service: unit_service, + channel: channel_handler, + db_repo: repo.clone(), + }; + + pipeline_ctl.handlers.insert(node.name.clone(), handler); + } + Ok(pipeline_ctl) } async fn clean(&self, ns: &str) -> Result<()> { @@ -521,9 +582,9 @@ mod tests { { "name": "dummy-in", "spec": { - "image": "jz-action/dummy_in:latest", + "image": "gitdatateam/dummy_in:latest", "command":"/dummy_in", - "args": ["--log-level=debug"] + "args": ["--log-level=debug", "--total-count=100"] } }, { "name": "copy-in-place", @@ -532,7 +593,7 @@ mod tests { "dummy-in" ], "spec": { - "image": "jz-action/copy_in_place:latest", + "image": "gitdatateam/copy_in_place:latest", "command":"/copy_in_place", "replicas": 3, "args": ["--log-level=debug"] @@ -548,7 +609,7 @@ mod tests { "copy-in-place" ], "spec": { - "image": "jz-action/dummy_out:latest", + "image": "gitdatateam/dummy_out:latest", "command":"/dummy_out", "replicas": 3, "args": ["--log-level=debug"] @@ -562,8 +623,10 @@ mod tests { "#; let dag = Dag::from_json(json_str).unwrap(); - let db_url = "mongodb://192.168.3.163:27017/ntest"; - let client = MongoClient::with_uri_str(db_url).await.unwrap(); + let db_url = "mongodb://192.168.3.163:27017"; + let client = MongoClient::with_uri_str(db_url.to_string() + "/ntest") + .await + .unwrap(); client.database("ntest").drop().await.unwrap(); let client = Client::try_default().await.unwrap(); diff --git a/src/driver/kubetpl/channel_statefulset.tpl b/src/driver/kubetpl/channel_statefulset.tpl index 6b0098a..ae52ce5 100644 --- a/src/driver/kubetpl/channel_statefulset.tpl +++ b/src/driver/kubetpl/channel_statefulset.tpl @@ -26,7 +26,7 @@ "containers": [ { "name": "channel", - "image": {{#if node.channel.image}} "{{{node.channel.image}}}"{{else}}"jz-action/dp_runner:latest"{{/if}}, + "image": {{#if node.channel.image}} "{{{node.channel.image}}}"{{else}}"gitdatateam/dp_runner:latest"{{/if}}, "imagePullPolicy": "IfNotPresent", "command": [ "/dp_runner" @@ -34,7 +34,7 @@ "args": [ "--node-name={{{node.name}}}-channel", "--log-level={{{log_level}}}", - "--mongo-url={{{db_url}}}", + "--mongo-url={{{db_url}}}" {{#if (eq node.channel.cache_type "Disk") }},"--tmp-path=/app/tmp"{{/if}} ], "ports": [ diff --git a/src/driver/kubetpl/statefulset.tpl b/src/driver/kubetpl/statefulset.tpl index abcbce2..5e74e47 100644 --- a/src/driver/kubetpl/statefulset.tpl +++ b/src/driver/kubetpl/statefulset.tpl @@ -25,7 +25,7 @@ "containers": [ { "name": "compute-data-unit", - "image": "jz-action/compute_unit_runner:latest", + "image": "gitdatateam/compute_unit_runner:latest", "command": [ "/compute_unit_runner" ], diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index 17fa9bf..36982fa 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -1,16 +1,30 @@ use crate::{ core::db::{ - JobDbRepo, JobState, JobUpdateInfo, MainDbRepo + Job, + JobDbRepo, + JobState, + JobUpdateInfo, + MainDbRepo, + Node, }, dag::Dag, driver::Driver, + utils::IntoAnyhowResult, }; use anyhow::Result; use kube::Client; +use mongodb::bson::oid::ObjectId; use std::marker::PhantomData; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::error; +use tracing::{ + error, + info, +}; +pub struct JobDetails { + job: Job, +} + pub struct JobManager where D: Driver, @@ -43,34 +57,52 @@ where JOBR: JobDbRepo, MAINR: MainDbRepo, { - pub async fn run_backend(&self, token: CancellationToken) -> Result>> { - let mut join_set = JoinSet::new(); - { - let db = self.db.clone(); - let driver = self.driver.clone(); - join_set.spawn(async move { - loop { - if token.is_cancelled() { - return Ok(()); - } + pub fn run_backend( + &self, + join_set: &mut JoinSet>, + token: CancellationToken, + ) -> Result<()> { + let db = self.db.clone(); + let driver = self.driver.clone(); + join_set.spawn(async move { + info!("backend thead is running"); + loop { + if token.is_cancelled() { + return Ok(()); + } - while let Some(job) = db.get_job_for_running().await? { - let dag = Dag::from_json(job.graph_json.as_str())?; - let namespace = format!("{}-{}", job.name, job.retry_number); - if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { - error!("run job {} {err}, start cleaning", job.name); - if let Err(err) = driver.clean(namespace.as_str()).await { - error!("clean job resource {err}"); - } - if let Err(err) = db.update(&job.id, &JobUpdateInfo{state: Some(JobState::Error)}).await{ - error!("set job to error state {err}"); - } - }; - } + while let Some(job) = db.get_job_for_running().await? { + let dag = Dag::from_json(job.graph_json.as_str())?; + let namespace = format!("{}-{}", job.name, job.retry_number); + if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { + error!("run job {} {err}, start cleaning", job.name); + if let Err(err) = driver.clean(namespace.as_str()).await { + error!("clean job resource {err}"); + } + if let Err(err) = db + .update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Error), + }, + ) + .await + { + error!("set job to error state {err}"); + } + }; } - }); - } + } + }); + + Ok(()) + } - Ok(join_set) + pub async fn get_job_details(&self, id: &ObjectId) -> Result { + let job = self.db.get(id).await?.anyhow("job not found")?; + let dag = Dag::from_json(job.graph_json.as_str())?; + let namespace = format!("{}-{}", job.name, job.retry_number - 1); + let controller = self.driver.attach(&namespace, &dag).await?; + Ok(JobDetails { job: job }) } } diff --git a/src/lib.rs b/src/lib.rs index cc642e5..fe91c10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![feature(async_closure)] #![feature(future_join)] #![feature(iterator_try_collect)] +#![feature(duration_constructors)] pub mod api; pub mod core;