From 8c30815021e60ec4336df4a3ae2c5420fdfa16c4 Mon Sep 17 00:00:00 2001 From: hunjixin <16705420332lee@gmai.com> Date: Sun, 16 Jun 2024 12:59:51 +0000 Subject: [PATCH] feat: add jz runner and base controller and data protocol --- Cargo.toml | 50 ++++++++++++--- build.rs | 14 +++++ jz_runner/Cargo.toml | 18 ++++++ jz_runner/src/main.rs | 47 ++++++++++++++ jz_runner/src/unit.rs | 39 ++++++++++++ src/dag/graph.rs | 1 - src/driver/driver.rs | 2 +- src/driver/kube.rs | 83 ++++++++++++------------- src/lib.rs | 49 +++++++++++++++ src/main.rs | 12 ---- src/network/mod.rs | 15 +++++ src/network/protos/common.proto | 6 ++ src/network/protos/datatransfer.proto | 15 +++++ src/network/protos/helloworld.proto | 19 ++++++ src/network/protos/nodecontroller.proto | 11 ++++ 15 files changed, 317 insertions(+), 64 deletions(-) create mode 100644 build.rs create mode 100644 jz_runner/Cargo.toml create mode 100644 jz_runner/src/main.rs create mode 100644 jz_runner/src/unit.rs create mode 100644 src/lib.rs delete mode 100644 src/main.rs create mode 100644 src/network/mod.rs create mode 100644 src/network/protos/common.proto create mode 100644 src/network/protos/datatransfer.proto create mode 100644 src/network/protos/helloworld.proto create mode 100644 src/network/protos/nodecontroller.proto diff --git a/Cargo.toml b/Cargo.toml index 6132725..996d6fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,21 +1,57 @@ +[workspace] +members = [ + "jz_runner", +] + +[workspace.package] +repository = "https://github.com/GitDataAI/jz-action" +license = "MIT OR Apache-2.0" +edition = "2021" +include = [ + "build.rs", + "src/**/*", + "Cargo.toml", + "LICENSE*", + "README.md", +] + +[workspace.dependencies] +anyhow = "1.0.86" +tracing = "0.1.40" +tracing-subscriber = "0.3" +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } +tokio-retry = "0.3" +tokio-stream = "0.1.15" +tonic = "0.11.0" + + [package] -name = "jz-action" +name = "jz_action" version = "0.1.0" edition = "2021" [dependencies] -anyhow = "1.0.86" serde = {version ="1.0.203", features = ["derive"]} serde_json = {version = "1.0.117"} bimap = "0.6.3" uuid = {version="1.8.0", features = ["v4","serde"]} -tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } kube = { version = "0.91.0", features = ["runtime", "derive"] } k8s-openapi = { version = "0.22.0", features = ["latest"] } handlebars = "5.1.2" -tracing = "0.1.40" -tracing-subscriber = "0.3" -tokio-retry = "0.3" +prost = "0.12.6" + +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio-retry = {workspace = true} +tokio-stream = {workspace = true} +anyhow = {workspace = true} +tracing = {workspace = true} +tracing-subscriber = {workspace = true} +tonic = {workspace = true} [dev-dependencies] -arrayvec = {version="0.7.4", features= ["serde"]} \ No newline at end of file +arrayvec = {version="0.7.4", features= ["serde"]} + +[build-dependencies] +tonic-build = "0.11.0" +log= "0.4.21" +env_logger="0.11.3" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..cee742e --- /dev/null +++ b/build.rs @@ -0,0 +1,14 @@ +fn main() -> Result<(), Box> { + env_logger::init(); + + let protos = [ + "src/network/protos/helloworld.proto", + "src/network/protos/common.proto", + "src/network/protos/datatransfer.proto", + "src/network/protos/nodecontroller.proto", + ]; + + let proto_dir = "src/network/protos"; + tonic_build::configure().compile(&protos, &[proto_dir])?; + Ok(()) +} diff --git a/jz_runner/Cargo.toml b/jz_runner/Cargo.toml new file mode 100644 index 0000000..e6f328d --- /dev/null +++ b/jz_runner/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "jz_runner" +version = "0.1.0" +edition = "2021" + +[dependencies] +jz_action = { path = "../"} + +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio-retry = {workspace = true} +tokio-stream = {workspace = true} +anyhow = {workspace = true} +tracing = {workspace = true} +tracing-subscriber = {workspace = true} +tonic = {workspace = true} + +clap = {version="4.5.7", features=["derive"]} + diff --git a/jz_runner/src/main.rs b/jz_runner/src/main.rs new file mode 100644 index 0000000..d69ec39 --- /dev/null +++ b/jz_runner/src/main.rs @@ -0,0 +1,47 @@ +mod unit; + +use jz_action::network::datatransfer::data_stream_server::DataStreamServer; +use jz_action::network::nodecontroller::node_controller_server::NodeControllerServer; + +use anyhow::{anyhow, Result}; +use clap::Parser; +use std::str::FromStr; +use tonic::{transport::Server, Request, Response, Status}; +use tracing::{info, Level}; + +use unit::{DataNodeControllerServer, UnitDataStrean}; +#[derive(Debug, Parser)] +#[command( + name = "jz_runner", + version = "0.0.1", + author = "Author Name ", + about = "embed in k8s images" +)] +struct Args { + #[arg(short, long, default_value = "INFO")] + log_level: String, + + #[arg(long, default_value = "[::1]:25431")] + host_port: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + tracing_subscriber::fmt() + .with_max_level(Level::from_str(&args.log_level)?) + .try_init() + .map_err(|e| anyhow!("{}", e))?; + + let addr = args.host_port.parse()?; + let node_controller = DataNodeControllerServer::default(); + let unit_data_stream = UnitDataStrean::default(); + + Server::builder() + .add_service(NodeControllerServer::new(node_controller)) + .add_service(DataStreamServer::new(unit_data_stream)) + .serve(addr) + .await?; + info!("node listening on {}", addr); + Ok(()) +} diff --git a/jz_runner/src/unit.rs b/jz_runner/src/unit.rs new file mode 100644 index 0000000..12ab475 --- /dev/null +++ b/jz_runner/src/unit.rs @@ -0,0 +1,39 @@ +use jz_action::network::common::Empty; +use jz_action::network::datatransfer::data_stream_server::DataStream; +use jz_action::network::datatransfer::DataBatch; +use jz_action::network::nodecontroller::node_controller_server::NodeController; + +use std::result::Result; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +#[derive(Default)] +pub(crate) struct DataNodeControllerServer {} + +#[tonic::async_trait] +impl NodeController for DataNodeControllerServer { + async fn pause(&self, _request: Request) -> Result, Status> { + todo!() + } + async fn restart(&self, _request: Request) -> Result, Status> { + todo!() + } + async fn stop(&self, _request: Request) -> Result, Status> { + todo!() + } +} + +#[derive(Default)] +pub(crate) struct UnitDataStrean {} + +#[tonic::async_trait] +impl DataStream for UnitDataStrean { + type subscribe_new_dataStream = ReceiverStream>; + + async fn subscribe_new_data( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } +} diff --git a/src/dag/graph.rs b/src/dag/graph.rs index 3dbeb49..0f1d601 100644 --- a/src/dag/graph.rs +++ b/src/dag/graph.rs @@ -26,7 +26,6 @@ node1->node2->node3->node4->node5->node6->node7->node8->node9 use crate::core::GID; use std::{ collections::{HashMap, HashSet, VecDeque}, - hash::Hash, vec, }; diff --git a/src/driver/driver.rs b/src/driver/driver.rs index bc86442..21a20ce 100644 --- a/src/driver/driver.rs +++ b/src/driver/driver.rs @@ -1,5 +1,5 @@ use crate::core::GID; -use crate::Dag; +use crate::dag::Dag; use anyhow::Result; use std::{ future::Future, diff --git a/src/driver/kube.rs b/src/driver/kube.rs index af8d9c0..8c7f285 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -1,20 +1,20 @@ use super::{ChannelHandler, Driver, PipelineController, UnitHandler}; use crate::core::GID; +use crate::dag::Dag; use crate::utils::IntoAnyhowResult; -use crate::Dag; use anyhow::{anyhow, Result}; use handlebars::Handlebars; use k8s_openapi::api::apps::v1::Deployment; -use k8s_openapi::api::core::v1::{Service, Namespace}; +use k8s_openapi::api::core::v1::{Namespace, Service}; use kube::api::{DeleteParams, PostParams}; use kube::{Api, Client}; use std::collections::HashMap; use std::default::Default; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; -use tracing::debug; -use tokio_retry::Retry; use tokio_retry::strategy::ExponentialBackoff; +use tokio_retry::Retry; +use tracing::debug; pub struct KubeChannelHander where ID: GID, @@ -158,11 +158,11 @@ where "channel_service", include_str!("kubetpl/channel_service.tpl"), )?; - let client = Client::try_default().await?; + let client = Client::try_default().await?; Ok(KubeDriver { _id: std::marker::PhantomData, reg: reg, - client:client + client: client, }) } @@ -181,26 +181,11 @@ where Ok(KubeDriver { _id: std::marker::PhantomData, reg: reg, - client:client + client: client, }) } - async fn retry_get_ns_state(namespaces: Api, ns: &str) -> Result<()> { - match namespaces.get(ns).await { - Ok(v)=> { - Err(anyhow!("expect deleted")) - }, - Err(e) => { - if e.to_string().contains("not") { - Ok(()) - } else { - Err(anyhow!("expect deleted")) - } - } - } - } - - async fn ensure_namespace_exit_and_clean(client: &Client, ns: &str) -> Result<()>{ + async fn ensure_namespace_exit_and_clean(client: &Client, ns: &str) -> Result<()> { let namespace = Namespace { metadata: kube::api::ObjectMeta { name: Some(ns.to_string()), @@ -212,13 +197,15 @@ where let namespaces: Api = Api::all(client.clone()); // Create the namespace if namespaces.get(ns).await.is_ok() { - let _ = namespaces.delete(ns, &DeleteParams::default()).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string())); - let retry_strategy = ExponentialBackoff::from_millis(1000).take(20); + let _ = namespaces + .delete(ns, &DeleteParams::default()) + .await + .map(|_| ()) + .map_err(|e| anyhow!("{}", e.to_string())); + let retry_strategy = ExponentialBackoff::from_millis(1000).take(20); let _ = Retry::spawn(retry_strategy, || async { match namespaces.get(ns).await { - Ok(v)=> { - Err(anyhow!("expect deleted")) - }, + Ok(_) => Err(anyhow!("expect deleted")), Err(e) => { if e.to_string().contains("not found") { Ok(()) @@ -227,9 +214,14 @@ where } } } - }).await?; - } - namespaces.create(&PostParams::default(), &namespace).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string())) + }) + .await?; + } + namespaces + .create(&PostParams::default(), &namespace) + .await + .map(|_| ()) + .map_err(|e| anyhow!("{}", e.to_string())) } } @@ -239,29 +231,26 @@ where { #[allow(refining_impl_trait)] async fn deploy(&self, ns: &str, graph: &Dag) -> Result> { - let client: Client = Client::try_default().await?; - Self::ensure_namespace_exit_and_clean(&client, ns).await?; + Self::ensure_namespace_exit_and_clean(&self.client, ns).await?; - let deployment_api: Api = Api::namespaced(client.clone(), ns); + let deployment_api: Api = Api::namespaced(self.client.clone(), ns); - let service_api: Api = Api::namespaced(client, ns); + let service_api: Api = Api::namespaced(self.client.clone(), ns); let mut pipeline_ctl = KubePipelineController::::default(); for node in graph.iter() { let deployment_string = self.reg.render("deployment", node)?; debug!("rendered unit deploy string {}", deployment_string); - let unit_deployment: Deployment = - serde_json::from_str(&deployment_string)?; + let unit_deployment: Deployment = serde_json::from_str(&deployment_string)?; let unit_deployment = deployment_api .create(&PostParams::default(), &unit_deployment) .await?; - let service_string = self.reg.render("service", node)?; + let service_string = self.reg.render("service", node)?; debug!("rendered unit service config {}", service_string); - let unit_service: Service = - serde_json::from_str(service_string.as_str())?; + let unit_service: Service = serde_json::from_str(service_string.as_str())?; let unit_service = service_api .create(&PostParams::default(), &unit_service) .await?; @@ -306,7 +295,11 @@ where } #[allow(refining_impl_trait)] - async fn attach(&self, namespace: &str, graph: &Dag) -> Result> { + async fn attach( + &self, + _namespace: &str, + _graph: &Dag, + ) -> Result> { todo!() } @@ -314,7 +307,11 @@ where let client: Client = Client::try_default().await?; let namespaces: Api = Api::all(client.clone()); if namespaces.get(ns).await.is_ok() { - let _ = namespaces.delete(ns, &DeleteParams::default()).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string()))?; + let _ = namespaces + .delete(ns, &DeleteParams::default()) + .await + .map(|_| ()) + .map_err(|e| anyhow!("{}", e.to_string()))?; } Ok(()) } @@ -325,8 +322,8 @@ mod tests { use std::env; use super::*; - use uuid::Uuid; use tracing_subscriber; + use uuid::Uuid; #[tokio::test] async fn test_render() { diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..d7d2c6f --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,49 @@ +#![feature(trait_alias)] +#![feature(async_closure)] + +pub mod core; +pub mod dag; +pub mod driver; +pub mod network; +pub mod utils; + +/* +use dag::Dag; + +use network::helloworld::greeter_server::{Greeter, GreeterServer}; +use network::helloworld::{HelloReply, HelloRequest}; +use tonic::{transport::Server, Request, Response, Status}; + +#[derive(Default)] +pub struct MyGreeter {} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request from {:?}", request.remote_addr()); + + let reply = network::helloworld::HelloReply { + message: format!("Hello {}!", request.into_inner().name), + }; + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "[::1]:50051".parse().unwrap(); + let greeter = MyGreeter::default(); + + println!("GreeterServer listening on {}", addr); + + Server::builder() + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) +} + */ diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index e9a5cdb..0000000 --- a/src/main.rs +++ /dev/null @@ -1,12 +0,0 @@ -#![feature(trait_alias)] -#![feature(async_closure)] - -mod core; -mod dag; -pub mod driver; -mod utils; - -use dag::Dag; -use uuid::Uuid; - -fn main() {} diff --git a/src/network/mod.rs b/src/network/mod.rs new file mode 100644 index 0000000..f671180 --- /dev/null +++ b/src/network/mod.rs @@ -0,0 +1,15 @@ +pub mod helloworld { + tonic::include_proto!("helloworld"); +} + +pub mod common { + tonic::include_proto!("common"); +} + +pub mod nodecontroller { + tonic::include_proto!("nodecontroller"); +} + +pub mod datatransfer { + tonic::include_proto!("datatransfer"); +} diff --git a/src/network/protos/common.proto b/src/network/protos/common.proto new file mode 100644 index 0000000..e988f57 --- /dev/null +++ b/src/network/protos/common.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +package common; + +message Empty { +} diff --git a/src/network/protos/datatransfer.proto b/src/network/protos/datatransfer.proto new file mode 100644 index 0000000..90a27b1 --- /dev/null +++ b/src/network/protos/datatransfer.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package datatransfer; + +import "common.proto"; + +service DataStream { + rpc subscribe_new_data(common.Empty) returns (stream DataBatch) {} +} + +message DataBatch { + uint32 size = 1; + repeated bytes data = 2; +} + \ No newline at end of file diff --git a/src/network/protos/helloworld.proto b/src/network/protos/helloworld.proto new file mode 100644 index 0000000..13e203e --- /dev/null +++ b/src/network/protos/helloworld.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/src/network/protos/nodecontroller.proto b/src/network/protos/nodecontroller.proto new file mode 100644 index 0000000..40660d7 --- /dev/null +++ b/src/network/protos/nodecontroller.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package nodecontroller; + +import "common.proto"; + +service NodeController { + rpc pause(common.Empty) returns(common.Empty) {} + rpc restart(common.Empty) returns(common.Empty) {} + rpc stop(common.Empty) returns(common.Empty) {} +}