Skip to content

Commit

Permalink
feat: rough implement node controller
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jun 17, 2024
1 parent 8c30815 commit 79a840b
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 26 deletions.
57 changes: 44 additions & 13 deletions jz_runner/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
mod unit;

use jz_action::network::datatransfer::data_stream_server::DataStreamServer;
use jz_action::network::nodecontroller::node_controller_server::NodeControllerServer;
use jz_action::utils::StdIntoAnyhowResult;

use anyhow::{anyhow, Result};
use clap::Parser;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, Level};
use unit::DataNodeControllerServer;
use tokio::sync::mpsc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::select;

use unit::{DataNodeControllerServer, UnitDataStrean};
#[derive(Debug, Parser)]
#[command(
name = "jz_runner",
Expand All @@ -25,23 +29,50 @@ struct Args {
host_port: String,
}

#[tokio::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_max_level(Level::from_str(&args.log_level)?)
.try_init()
.map_err(|e| anyhow!("{}", e))?;
.anyhow()?;

let addr = args.host_port.parse()?;
let node_controller = DataNodeControllerServer::default();
let unit_data_stream = UnitDataStrean::default();

Server::builder()
.add_service(NodeControllerServer::new(node_controller))
.add_service(DataStreamServer::new(unit_data_stream))
.serve(addr)
.await?;
info!("node listening on {}", addr);
let node_controller = DataNodeControllerServer{
child: Arc::new(Mutex::new(None)),
};

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<Result<()>>(1);
{
//listen port
let shutdown_tx_arc = shutdown_tx.clone();
let _ = tokio::spawn(async move {
if let Err(e) = Server::builder()
.add_service(NodeControllerServer::new(node_controller))
.serve(addr)
.await
.anyhow(){
let _ = shutdown_tx_arc.send(Err(e)).await;
}
});

info!("node listening on {}", addr);
}

{
//catch signal
let _ = tokio::spawn(async move {
let mut sig_term = signal(SignalKind::terminate()).unwrap();
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
select! {
_ = sig_term.recv() => info!("Recieve SIGTERM"),
_ = sig_int.recv() => info!("Recieve SIGTINT"),
};
let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await;
});
}

shutdown_rx.recv().await;
info!("gracefully shutdown");
Ok(())
}
52 changes: 42 additions & 10 deletions jz_runner/src/unit.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,55 @@
use jz_action::network::common::Empty;
use anyhow::anyhow;
use jz_action::{network::common::Empty, utils::StdIntoAnyhowResult};
use jz_action::network::datatransfer::data_stream_server::DataStream;
use jz_action::network::datatransfer::DataBatch;
use jz_action::network::datatransfer::DataBatchResponse;
use jz_action::network::nodecontroller::node_controller_server::NodeController;

use std::result::Result;
use jz_action::network::nodecontroller::StartRequest;
use tokio::sync::mpsc;
use std::{os::unix::process::CommandExt, result::Result, sync::{Arc, Mutex}};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

#[derive(Default)]
pub(crate) struct DataNodeControllerServer {}
use tonic::{Request, Response, Status, Code};
use std::process::Command;
use jz_action::utils::AnyhowToGrpc;
pub(crate) struct DataNodeControllerServer {
pub(crate) child: Arc<Mutex<Option<std::process::Child>>>
}

#[tonic::async_trait]
impl NodeController for DataNodeControllerServer {
async fn start(&self, request: Request<StartRequest>) -> Result<Response<Empty>, Status> {
let req = request.into_inner();
let child = Command::new("sh").
arg("-c").arg(req.script).spawn()?;

let mut guard = self.child.lock().anyhow().to_rpc(Code::Aborted)?;
match guard.as_mut() {
None => {
*guard = Some(child);
Ok(())
},
_=> Err(anyhow!("process is already running")),
}.to_rpc(Code::Internal)?;

Ok( Response::new(Empty{}))
}
async fn pause(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
let mut guard = self.child.lock().anyhow().to_rpc(Code::Aborted)?;

if guard.is_none() {
return Ok( Response::new(Empty{}));
}

if let Some(child) = guard.as_mut() {
let _ = child.wait()?;
}
*guard = None;
Ok( Response::new(Empty{}))
}

async fn restart(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
}

async fn stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
}
Expand All @@ -28,7 +60,7 @@ pub(crate) struct UnitDataStrean {}

#[tonic::async_trait]
impl DataStream for UnitDataStrean {
type subscribe_new_dataStream = ReceiverStream<Result<DataBatch, Status>>;
type subscribe_new_dataStream = ReceiverStream<Result<DataBatchResponse, Status>>;

async fn subscribe_new_data(
&self,
Expand Down
4 changes: 2 additions & 2 deletions src/network/protos/datatransfer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package datatransfer;
import "common.proto";

service DataStream {
rpc subscribe_new_data(common.Empty) returns (stream DataBatch) {}
rpc subscribe_new_data(common.Empty) returns (stream DataBatchResponse) {}
}

message DataBatch {
message DataBatchResponse {
uint32 size = 1;
repeated bytes data = 2;
}
Expand Down
5 changes: 5 additions & 0 deletions src/network/protos/nodecontroller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ package nodecontroller;
import "common.proto";

service NodeController {
rpc start(StartRequest) returns(common.Empty) {}
rpc pause(common.Empty) returns(common.Empty) {}
rpc restart(common.Empty) returns(common.Empty) {}
rpc stop(common.Empty) returns(common.Empty) {}
}

message StartRequest {
string script = 1;
}
36 changes: 35 additions & 1 deletion src/utils/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use anyhow::{anyhow, Result};
use core::fmt;

use anyhow::{anyhow, Error, Result};
use tonic::{Code, Status};

pub trait IntoAnyhowResult<T> {
fn anyhow(self, msg: impl ToString) -> Result<T>;
Expand All @@ -12,3 +15,34 @@ impl<T> IntoAnyhowResult<T> for Option<T> {
}
}
}

pub trait StdIntoAnyhowResult<T> {
fn anyhow(self) -> Result<T>;
}

impl<R,E> StdIntoAnyhowResult<R> for std::result::Result<R,E>
where E: fmt::Display
{
fn anyhow(self) -> Result<R> {
match self {
Ok(v) => Ok(v),
Err(e) => Err(anyhow!("{}", e)),
}
}
}



pub trait AnyhowToGrpc<R> {
fn to_rpc(self, code: Code) -> std::result::Result<R, Status>;
}

impl<R> AnyhowToGrpc<R> for Result<R>
{
fn to_rpc(self, code:Code) -> std::result::Result<R, Status> {
match self {
Ok(v) => Ok(v),
Err(e) => Err( Status::new(code, e.to_string())),
}
}
}

0 comments on commit 79a840b

Please sign in to comment.