Skip to content

Commit

Permalink
feat: add jz runner and base controller and data protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jun 16, 2024
1 parent 38d1ea5 commit 8c30815
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 64 deletions.
50 changes: 43 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,57 @@
[workspace]
members = [
"jz_runner",
]

[workspace.package]
repository = "https://github.com/GitDataAI/jz-action"
license = "MIT OR Apache-2.0"
edition = "2021"
include = [
"build.rs",
"src/**/*",
"Cargo.toml",
"LICENSE*",
"README.md",
]

[workspace.dependencies]
anyhow = "1.0.86"
tracing = "0.1.40"
tracing-subscriber = "0.3"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
tokio-retry = "0.3"
tokio-stream = "0.1.15"
tonic = "0.11.0"


[package]
name = "jz-action"
name = "jz_action"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
serde = {version ="1.0.203", features = ["derive"]}
serde_json = {version = "1.0.117"}
bimap = "0.6.3"
uuid = {version="1.8.0", features = ["v4","serde"]}
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
kube = { version = "0.91.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["latest"] }
handlebars = "5.1.2"
tracing = "0.1.40"
tracing-subscriber = "0.3"
tokio-retry = "0.3"
prost = "0.12.6"

tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
tonic = {workspace = true}

[dev-dependencies]
arrayvec = {version="0.7.4", features= ["serde"]}
arrayvec = {version="0.7.4", features= ["serde"]}

[build-dependencies]
tonic-build = "0.11.0"
log= "0.4.21"
env_logger="0.11.3"
14 changes: 14 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();

let protos = [
"src/network/protos/helloworld.proto",
"src/network/protos/common.proto",
"src/network/protos/datatransfer.proto",
"src/network/protos/nodecontroller.proto",
];

let proto_dir = "src/network/protos";
tonic_build::configure().compile(&protos, &[proto_dir])?;
Ok(())
}
18 changes: 18 additions & 0 deletions jz_runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "jz_runner"
version = "0.1.0"
edition = "2021"

[dependencies]
jz_action = { path = "../"}

tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
tonic = {workspace = true}

clap = {version="4.5.7", features=["derive"]}

47 changes: 47 additions & 0 deletions jz_runner/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
mod unit;

use jz_action::network::datatransfer::data_stream_server::DataStreamServer;
use jz_action::network::nodecontroller::node_controller_server::NodeControllerServer;

use anyhow::{anyhow, Result};
use clap::Parser;
use std::str::FromStr;
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, Level};

use unit::{DataNodeControllerServer, UnitDataStrean};
#[derive(Debug, Parser)]
#[command(
name = "jz_runner",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
)]
struct Args {
#[arg(short, long, default_value = "INFO")]
log_level: String,

#[arg(long, default_value = "[::1]:25431")]
host_port: String,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_max_level(Level::from_str(&args.log_level)?)
.try_init()
.map_err(|e| anyhow!("{}", e))?;

let addr = args.host_port.parse()?;
let node_controller = DataNodeControllerServer::default();
let unit_data_stream = UnitDataStrean::default();

Server::builder()
.add_service(NodeControllerServer::new(node_controller))
.add_service(DataStreamServer::new(unit_data_stream))
.serve(addr)
.await?;
info!("node listening on {}", addr);
Ok(())
}
39 changes: 39 additions & 0 deletions jz_runner/src/unit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use jz_action::network::common::Empty;
use jz_action::network::datatransfer::data_stream_server::DataStream;
use jz_action::network::datatransfer::DataBatch;
use jz_action::network::nodecontroller::node_controller_server::NodeController;

use std::result::Result;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

#[derive(Default)]
pub(crate) struct DataNodeControllerServer {}

#[tonic::async_trait]
impl NodeController for DataNodeControllerServer {
async fn pause(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
}
async fn restart(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
}
async fn stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
todo!()
}
}

#[derive(Default)]
pub(crate) struct UnitDataStrean {}

#[tonic::async_trait]
impl DataStream for UnitDataStrean {
type subscribe_new_dataStream = ReceiverStream<Result<DataBatch, Status>>;

async fn subscribe_new_data(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::subscribe_new_dataStream>, Status> {
todo!()
}
}
1 change: 0 additions & 1 deletion src/dag/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ node1->node2->node3->node4->node5->node6->node7->node8->node9
use crate::core::GID;
use std::{
collections::{HashMap, HashSet, VecDeque},
hash::Hash,
vec,
};

Expand Down
2 changes: 1 addition & 1 deletion src/driver/driver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::core::GID;
use crate::Dag;
use crate::dag::Dag;
use anyhow::Result;
use std::{
future::Future,
Expand Down
83 changes: 40 additions & 43 deletions src/driver/kube.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use super::{ChannelHandler, Driver, PipelineController, UnitHandler};
use crate::core::GID;
use crate::dag::Dag;
use crate::utils::IntoAnyhowResult;
use crate::Dag;
use anyhow::{anyhow, Result};
use handlebars::Handlebars;
use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::core::v1::{Service, Namespace};
use k8s_openapi::api::core::v1::{Namespace, Service};
use kube::api::{DeleteParams, PostParams};
use kube::{Api, Client};
use std::collections::HashMap;
use std::default::Default;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use tracing::debug;
use tokio_retry::Retry;
use tokio_retry::strategy::ExponentialBackoff;
use tokio_retry::Retry;
use tracing::debug;
pub struct KubeChannelHander<ID>
where
ID: GID,
Expand Down Expand Up @@ -158,11 +158,11 @@ where
"channel_service",
include_str!("kubetpl/channel_service.tpl"),
)?;
let client = Client::try_default().await?;
let client = Client::try_default().await?;
Ok(KubeDriver {
_id: std::marker::PhantomData,
reg: reg,
client:client
client: client,
})
}

Expand All @@ -181,26 +181,11 @@ where
Ok(KubeDriver {
_id: std::marker::PhantomData,
reg: reg,
client:client
client: client,
})
}

async fn retry_get_ns_state(namespaces: Api<Namespace>, ns: &str) -> Result<()> {
match namespaces.get(ns).await {
Ok(v)=> {
Err(anyhow!("expect deleted"))
},
Err(e) => {
if e.to_string().contains("not") {
Ok(())
} else {
Err(anyhow!("expect deleted"))
}
}
}
}

async fn ensure_namespace_exit_and_clean(client: &Client, ns: &str) -> Result<()>{
async fn ensure_namespace_exit_and_clean(client: &Client, ns: &str) -> Result<()> {
let namespace = Namespace {
metadata: kube::api::ObjectMeta {
name: Some(ns.to_string()),
Expand All @@ -212,13 +197,15 @@ where
let namespaces: Api<Namespace> = Api::all(client.clone());
// Create the namespace
if namespaces.get(ns).await.is_ok() {
let _ = namespaces.delete(ns, &DeleteParams::default()).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string()));
let retry_strategy = ExponentialBackoff::from_millis(1000).take(20);
let _ = namespaces
.delete(ns, &DeleteParams::default())
.await
.map(|_| ())
.map_err(|e| anyhow!("{}", e.to_string()));
let retry_strategy = ExponentialBackoff::from_millis(1000).take(20);
let _ = Retry::spawn(retry_strategy, || async {
match namespaces.get(ns).await {
Ok(v)=> {
Err(anyhow!("expect deleted"))
},
Ok(_) => Err(anyhow!("expect deleted")),
Err(e) => {
if e.to_string().contains("not found") {
Ok(())
Expand All @@ -227,9 +214,14 @@ where
}
}
}
}).await?;
}
namespaces.create(&PostParams::default(), &namespace).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string()))
})
.await?;
}
namespaces
.create(&PostParams::default(), &namespace)
.await
.map(|_| ())
.map_err(|e| anyhow!("{}", e.to_string()))
}
}

Expand All @@ -239,29 +231,26 @@ where
{
#[allow(refining_impl_trait)]
async fn deploy(&self, ns: &str, graph: &Dag<ID>) -> Result<KubePipelineController<ID>> {
let client: Client = Client::try_default().await?;
Self::ensure_namespace_exit_and_clean(&client, ns).await?;
Self::ensure_namespace_exit_and_clean(&self.client, ns).await?;

let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), ns);
let deployment_api: Api<Deployment> = Api::namespaced(self.client.clone(), ns);

let service_api: Api<Service> = Api::namespaced(client, ns);
let service_api: Api<Service> = Api::namespaced(self.client.clone(), ns);

let mut pipeline_ctl = KubePipelineController::<ID>::default();
for node in graph.iter() {
let deployment_string = self.reg.render("deployment", node)?;
debug!("rendered unit deploy string {}", deployment_string);

let unit_deployment: Deployment =
serde_json::from_str(&deployment_string)?;
let unit_deployment: Deployment = serde_json::from_str(&deployment_string)?;
let unit_deployment = deployment_api
.create(&PostParams::default(), &unit_deployment)
.await?;

let service_string = self.reg.render("service", node)?;
let service_string = self.reg.render("service", node)?;
debug!("rendered unit service config {}", service_string);

let unit_service: Service =
serde_json::from_str(service_string.as_str())?;
let unit_service: Service = serde_json::from_str(service_string.as_str())?;
let unit_service = service_api
.create(&PostParams::default(), &unit_service)
.await?;
Expand Down Expand Up @@ -306,15 +295,23 @@ where
}

#[allow(refining_impl_trait)]
async fn attach(&self, namespace: &str, graph: &Dag<ID>) -> Result<KubePipelineController<ID>> {
async fn attach(
&self,
_namespace: &str,
_graph: &Dag<ID>,
) -> Result<KubePipelineController<ID>> {
todo!()
}

async fn clean(&self, ns: &str) -> Result<()> {
let client: Client = Client::try_default().await?;
let namespaces: Api<Namespace> = Api::all(client.clone());
if namespaces.get(ns).await.is_ok() {
let _ = namespaces.delete(ns, &DeleteParams::default()).await.map(|_|()).map_err(|e|anyhow!("{}", e.to_string()))?;
let _ = namespaces
.delete(ns, &DeleteParams::default())
.await
.map(|_| ())
.map_err(|e| anyhow!("{}", e.to_string()))?;
}
Ok(())
}
Expand All @@ -325,8 +322,8 @@ mod tests {
use std::env;

use super::*;
use uuid::Uuid;
use tracing_subscriber;
use uuid::Uuid;

#[tokio::test]
async fn test_render() {
Expand Down
Loading

0 comments on commit 8c30815

Please sign in to comment.