From 79a840bf2be4b920e0d5f10a7b704979a74287e4 Mon Sep 17 00:00:00 2001 From: hunjixin <16705420332lee@gmai.com> Date: Mon, 17 Jun 2024 14:29:47 +0000 Subject: [PATCH] feat: rough implement node controller --- jz_runner/src/main.rs | 57 +++++++++++++++++++------ jz_runner/src/unit.rs | 52 +++++++++++++++++----- src/network/protos/datatransfer.proto | 4 +- src/network/protos/nodecontroller.proto | 5 +++ src/utils/utils.rs | 36 +++++++++++++++- 5 files changed, 128 insertions(+), 26 deletions(-) diff --git a/jz_runner/src/main.rs b/jz_runner/src/main.rs index d69ec39..8997508 100644 --- a/jz_runner/src/main.rs +++ b/jz_runner/src/main.rs @@ -1,15 +1,19 @@ mod unit; -use jz_action::network::datatransfer::data_stream_server::DataStreamServer; use jz_action::network::nodecontroller::node_controller_server::NodeControllerServer; +use jz_action::utils::StdIntoAnyhowResult; use anyhow::{anyhow, Result}; use clap::Parser; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use tonic::{transport::Server, Request, Response, Status}; use tracing::{info, Level}; +use unit::DataNodeControllerServer; +use tokio::sync::mpsc; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::select; -use unit::{DataNodeControllerServer, UnitDataStrean}; #[derive(Debug, Parser)] #[command( name = "jz_runner", @@ -25,23 +29,50 @@ struct Args { host_port: String, } -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let args = Args::parse(); tracing_subscriber::fmt() .with_max_level(Level::from_str(&args.log_level)?) .try_init() - .map_err(|e| anyhow!("{}", e))?; + .anyhow()?; let addr = args.host_port.parse()?; - let node_controller = DataNodeControllerServer::default(); - let unit_data_stream = UnitDataStrean::default(); - - Server::builder() - .add_service(NodeControllerServer::new(node_controller)) - .add_service(DataStreamServer::new(unit_data_stream)) - .serve(addr) - .await?; - info!("node listening on {}", addr); + let node_controller = DataNodeControllerServer{ + child: Arc::new(Mutex::new(None)), + }; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>(1); + { + //listen port + let shutdown_tx_arc = shutdown_tx.clone(); + let _ = tokio::spawn(async move { + if let Err(e) = Server::builder() + .add_service(NodeControllerServer::new(node_controller)) + .serve(addr) + .await + .anyhow(){ + let _ = shutdown_tx_arc.send(Err(e)).await; + } + }); + + info!("node listening on {}", addr); + } + + { + //catch signal + let _ = tokio::spawn(async move { + let mut sig_term = signal(SignalKind::terminate()).unwrap(); + let mut sig_int = signal(SignalKind::interrupt()).unwrap(); + select! { + _ = sig_term.recv() => info!("Recieve SIGTERM"), + _ = sig_int.recv() => info!("Recieve SIGTINT"), + }; + let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await; + }); + } + + shutdown_rx.recv().await; + info!("gracefully shutdown"); Ok(()) } diff --git a/jz_runner/src/unit.rs b/jz_runner/src/unit.rs index 12ab475..7853fcf 100644 --- a/jz_runner/src/unit.rs +++ b/jz_runner/src/unit.rs @@ -1,23 +1,55 @@ -use jz_action::network::common::Empty; +use anyhow::anyhow; +use jz_action::{network::common::Empty, utils::StdIntoAnyhowResult}; use jz_action::network::datatransfer::data_stream_server::DataStream; -use jz_action::network::datatransfer::DataBatch; +use jz_action::network::datatransfer::DataBatchResponse; use jz_action::network::nodecontroller::node_controller_server::NodeController; - -use std::result::Result; +use jz_action::network::nodecontroller::StartRequest; +use tokio::sync::mpsc; +use std::{os::unix::process::CommandExt, result::Result, sync::{Arc, Mutex}}; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; - -#[derive(Default)] -pub(crate) struct DataNodeControllerServer {} +use tonic::{Request, Response, Status, Code}; +use std::process::Command; +use jz_action::utils::AnyhowToGrpc; +pub(crate) struct DataNodeControllerServer { + pub(crate) child: Arc>> +} #[tonic::async_trait] impl NodeController for DataNodeControllerServer { + async fn start(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + let child = Command::new("sh"). + arg("-c").arg(req.script).spawn()?; + + let mut guard = self.child.lock().anyhow().to_rpc(Code::Aborted)?; + match guard.as_mut() { + None => { + *guard = Some(child); + Ok(()) + }, + _=> Err(anyhow!("process is already running")), + }.to_rpc(Code::Internal)?; + + Ok( Response::new(Empty{})) + } async fn pause(&self, _request: Request) -> Result, Status> { - todo!() + let mut guard = self.child.lock().anyhow().to_rpc(Code::Aborted)?; + + if guard.is_none() { + return Ok( Response::new(Empty{})); + } + + if let Some(child) = guard.as_mut() { + let _ = child.wait()?; + } + *guard = None; + Ok( Response::new(Empty{})) } + async fn restart(&self, _request: Request) -> Result, Status> { todo!() } + async fn stop(&self, _request: Request) -> Result, Status> { todo!() } @@ -28,7 +60,7 @@ pub(crate) struct UnitDataStrean {} #[tonic::async_trait] impl DataStream for UnitDataStrean { - type subscribe_new_dataStream = ReceiverStream>; + type subscribe_new_dataStream = ReceiverStream>; async fn subscribe_new_data( &self, diff --git a/src/network/protos/datatransfer.proto b/src/network/protos/datatransfer.proto index 90a27b1..e57a692 100644 --- a/src/network/protos/datatransfer.proto +++ b/src/network/protos/datatransfer.proto @@ -5,10 +5,10 @@ package datatransfer; import "common.proto"; service DataStream { - rpc subscribe_new_data(common.Empty) returns (stream DataBatch) {} + rpc subscribe_new_data(common.Empty) returns (stream DataBatchResponse) {} } -message DataBatch { +message DataBatchResponse { uint32 size = 1; repeated bytes data = 2; } diff --git a/src/network/protos/nodecontroller.proto b/src/network/protos/nodecontroller.proto index 40660d7..2065f96 100644 --- a/src/network/protos/nodecontroller.proto +++ b/src/network/protos/nodecontroller.proto @@ -5,7 +5,12 @@ package nodecontroller; import "common.proto"; service NodeController { + rpc start(StartRequest) returns(common.Empty) {} rpc pause(common.Empty) returns(common.Empty) {} rpc restart(common.Empty) returns(common.Empty) {} rpc stop(common.Empty) returns(common.Empty) {} } + +message StartRequest { + string script = 1; +} \ No newline at end of file diff --git a/src/utils/utils.rs b/src/utils/utils.rs index a0a5dce..5e43dc4 100644 --- a/src/utils/utils.rs +++ b/src/utils/utils.rs @@ -1,4 +1,7 @@ -use anyhow::{anyhow, Result}; +use core::fmt; + +use anyhow::{anyhow, Error, Result}; +use tonic::{Code, Status}; pub trait IntoAnyhowResult { fn anyhow(self, msg: impl ToString) -> Result; @@ -12,3 +15,34 @@ impl IntoAnyhowResult for Option { } } } + +pub trait StdIntoAnyhowResult { + fn anyhow(self) -> Result; +} + +impl StdIntoAnyhowResult for std::result::Result +where E: fmt::Display +{ + fn anyhow(self) -> Result { + match self { + Ok(v) => Ok(v), + Err(e) => Err(anyhow!("{}", e)), + } + } +} + + + +pub trait AnyhowToGrpc { + fn to_rpc(self, code: Code) -> std::result::Result; +} + +impl AnyhowToGrpc for Result +{ + fn to_rpc(self, code:Code) -> std::result::Result { + match self { + Ok(v) => Ok(v), + Err(e) => Err( Status::new(code, e.to_string())), + } + } +}