Skip to content

Commit

Permalink
feat: support data flags & add example to read label
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 12, 2024
1 parent 54696fe commit 437fbc8
Show file tree
Hide file tree
Showing 24 changed files with 626 additions and 236 deletions.
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ members = [
"nodes/make_article",
"nodes/list_files",
"nodes/copy_in_place",
"crates/nodes_sdk"
]
"crates/nodes_sdk"]

[workspace.package]
repository = "https://github.com/GitDataAI/jiaoziflow"
Expand Down Expand Up @@ -42,6 +41,7 @@ futures = "0.3.30"
clap = {version="4.5.7", features=["derive"]}
actix-web = "4.8.0"
reqwest = {version="0.12.5", features=["json"]}
itertools = "0.13.0"

[package]
name = "jiaoziflow"
Expand All @@ -57,6 +57,7 @@ serde_variant = "0.1.3"
uri="0.4.0"
prettytable-rs = "^0.10"
k8s-metrics = "0.16.0"
hostname = "^0.4"

tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
tokio-retry = {workspace = true}
Expand All @@ -77,12 +78,13 @@ chrono = {workspace = true}
futures = {workspace = true}
async-trait = {workspace = true}
reqwest = {workspace = true}
ratatui = "0.28.0"
itertools = {workspace = true}


[build-dependencies]
tonic-build = "0.12.1"
log= "0.4.21"
env_logger="0.11.3"

[dev-dependencies]
local-ip-address = "0.6.1"
local-ip-address = "0.6.1"
59 changes: 32 additions & 27 deletions crates/channel_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ use chrono::Utc;
use futures::future::try_join_all;
use jiaoziflow::{
core::db::{
DataFlag,
DataRecord,
DataState,
Direction,
JobDbRepo,
TrackerState,
},
network::datatransfer::MediaDataBatchResponse,
utils::k8s_helper::get_machine_name,
};
use nodes_sdk::{
fs_cache::FileCache,
metadata::is_metadata,
MessageSender,
};
use std::{
Expand Down Expand Up @@ -117,17 +118,17 @@ where
match db_repo.list_by_node_name_and_state(&node_name, &DataState::EndRecieved).await {
Ok(datas) => {
for data in datas {
if data.is_metadata {
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::KeeptForMetadata, None).await{
error!("mark metadata data as client receive {err}");
if data.flag.is_keep_data {
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::CleanButKeepData, None).await{
error!("mark data as clean but keep data {err}");
continue;
}
info!("mark metadata as cleint received")
info!("mark keep data as clean")
}else {
match data_cache.remove(&data.id).await {
Ok(_)=>{
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::Clean, None).await{
error!("mark data as client receive {err}");
error!("mark data as clean {err}");
continue;
}
debug!("remove data {}", &data.id);
Expand Down Expand Up @@ -183,6 +184,7 @@ where
return Err(anyhow!("no upstream"));
}

let machine_name = get_machine_name();
let (incoming_tx, mut incoming_rx) = mpsc::channel(1);
self.incoming_tx = Some(incoming_tx);

Expand All @@ -196,21 +198,25 @@ where
let node_name = self.name.clone();
let data_cache = self.data_cache.clone();
let token = token.clone();
let machine_name = machine_name.clone();

join_set.spawn(async move {
loop {
select! {
_ = token.cancelled() => {
return Ok(());
}
Some((_, resp)) = request_rx.recv() => { //make this params
Some((_, resp)) = request_rx.recv() => {
loop {
match db_repo.find_data_and_mark_state(&node_name, &Direction::In, &DataState::SelectForSend).await {
match db_repo.find_data_and_mark_state(&node_name, &Direction::In, true,&DataState::SelectForSend, Some(machine_name.clone())).await {
std::result::Result::Ok(Some(record)) =>{
info!("return downstream's datarequest and start response data {}", &record.id);
match data_cache.read(&record.id).await {
Ok(databatch)=>{
Ok(mut databatch)=>{
//response this data's position
//TODO keep this data
databatch.data_flag = record.flag.to_bit();
databatch.priority = record.priority as u32;
resp.send(Ok(Some(databatch))).expect("channel only read once");
break;
},
Expand Down Expand Up @@ -248,7 +254,6 @@ where
let buf_size = self.buf_size;
let data_cache = self.data_cache.clone();
let token = token.clone();

join_set.spawn(async move {
loop {
select!{
Expand All @@ -261,8 +266,10 @@ where
let now = Instant::now();
let id = data_batch.id.clone();
let size = data_batch.size;
let data_flag = data_batch.data_flag;
let priority = data_batch.priority;

// processed before
// is processed before
match db_repo.find_by_node_id(&node_name,&id, &Direction::In).await {
Ok(Some(_))=>{
warn!("data {} processed before", &id);
Expand All @@ -276,22 +283,18 @@ where
}
_=>{}
}
let is_data_metadata = if is_metadata(&id) {
//check limit for plain data
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
}){
resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel");
continue;

//check limit for plain data
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
true
}else {
false
};
}){
resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel");
continue;
}

//write batch files
if let Err(err) = data_cache.write(data_batch).await {
Expand All @@ -306,7 +309,9 @@ where
node_name: node_name.clone(),
id:id.clone(),
size,
is_metadata:is_data_metadata,
machine: "".to_string(),
flag: DataFlag::new_from_bit(data_flag),
priority: priority as u8,
state: DataState::Received,
sent: vec![],
direction:Direction::In,
Expand Down
3 changes: 2 additions & 1 deletion crates/compute_unit_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ hyper = "1.4.1"
hyper-util = "0.1.6"
http-body-util = "0.1.2"
walkdir = "2.5.0"
rand = "0.8.5"
rand = "0.8.5"
query-string-builder = "=0.2.0"
62 changes: 32 additions & 30 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use anyhow::Result;
use core::str;
use http_body_util::Collected;
use jiaoziflow::core::db::{
DataFlag,
JobDbRepo,
TrackerState,
};
Expand Down Expand Up @@ -57,6 +58,7 @@ use hyperlocal::{
UnixConnector,
Uri,
};
use query_string_builder::QueryString;
use serde::de::{
self,
Deserializer,
Expand Down Expand Up @@ -87,9 +89,10 @@ use std::fmt;
#[repr(u8)]
pub enum ErrorNumber {
NotReady = 1,
InComingFinish = 2,
AlreadyFinish = 3,
DataNotFound = 4,
DataMissing = 2,
InComingFinish = 3,
AlreadyFinish = 4,
NoAvaiableData = 5,
}

#[derive(Debug)]
Expand Down Expand Up @@ -225,32 +228,37 @@ impl<'de> Deserialize<'de> for IPCError {
pub struct SubmitOuputDataReq {
pub id: String,
pub size: u32,
pub data_flag: DataFlag,
pub priority: u8,
}

impl SubmitOuputDataReq {
pub fn new(id: &str, size: u32) -> Self {
pub fn new(id: &str, size: u32, data_flag: DataFlag, priority: u8) -> Self {
SubmitOuputDataReq {
id: id.to_string(),
size,
data_flag,
priority,
}
}
}

#[derive(Serialize, Deserialize)]
pub struct RequetDataReq {
pub metadata_id: Option<String>,
pub id: Option<String>,
}

impl RequetDataReq {
pub fn new(metadata_id: &str) -> Self {
pub fn new(id: &str) -> Self {
RequetDataReq {
metadata_id: Some(metadata_id.to_string()),
id: Some(id.to_string()),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Status {
pub node_name: String,
pub state: TrackerState,
}

Expand All @@ -263,6 +271,7 @@ where
let program = program_mutex.read().await;
let local_state = program.local_state.read().await;
Status {
node_name: program.name.clone(),
state: local_state.clone(),
}
};
Expand All @@ -276,24 +285,12 @@ async fn process_data_request<R>(
where
R: JobDbRepo + Clone,
{
info!("receive avaiable data reqeust");
let req = req.into_inner();
info!("receive avaiable data reqeust {:?}", req.id.as_ref());
let sender = loop {
let program = program_mutex.read().await;
let local_state = program.local_state.read().await;
debug!("local state {:?}", *local_state);
if *local_state == TrackerState::Finish {
return HttpResponse::BadRequest().json(IPCError::NodeError {
code: ErrorNumber::AlreadyFinish,
msg: "node is already finish".to_string(),
});
}

if *local_state == TrackerState::InComingFinish {
return HttpResponse::BadRequest().json(IPCError::NodeError {
code: ErrorNumber::InComingFinish,
msg: "incoming is already finish".to_string(),
});
}

if *local_state != TrackerState::Init {
break program.ipc_process_data_req_tx.as_ref().cloned();
Expand All @@ -304,10 +301,10 @@ where
};

//read request
let (tx, rx) = oneshot::channel::<Result<Option<AvaiableDataResponse>>>();
let (tx, rx) = oneshot::channel::<Result<Option<AvaiableDataResponse>, IPCError>>();
match sender {
Some(sender) => {
if let Err(err) = sender.send((req.into_inner(), tx)).await {
if let Err(err) = sender.send((req, tx)).await {
return HttpResponse::InternalServerError()
.json(format!("send to avaiable data channel {err}"));
}
Expand All @@ -321,10 +318,10 @@ where
match rx.await {
Ok(Ok(Some(resp))) => HttpResponse::Ok().json(resp),
Ok(Ok(None)) => HttpResponse::NotFound().json(IPCError::NodeError {
code: ErrorNumber::DataNotFound,
code: ErrorNumber::NoAvaiableData,
msg: "no avaiable data".to_string(),
}),
Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()),
Ok(Err(err)) => HttpResponse::InternalServerError().json(err),
Err(err) => HttpResponse::ServiceUnavailable().json(err.to_string()),
}
}
Expand Down Expand Up @@ -538,7 +535,7 @@ pub trait IPCClient {
) -> impl std::future::Future<Output = Result<(), IPCError>> + Send;
fn request_avaiable_data(
&self,
metadata_id: Option<String>,
metadata_id: Option<&str>,
) -> impl std::future::Future<Output = Result<Option<AvaiableDataResponse>, IPCError>> + Send;
}

Expand Down Expand Up @@ -632,15 +629,20 @@ impl IPCClient for IPCClientImpl {

async fn request_avaiable_data(
&self,
metadata_id: Option<String>,
id: Option<&str>,
) -> Result<Option<AvaiableDataResponse>, IPCError> {
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data");
let json = serde_json::to_string(&RequetDataReq { metadata_id })?;
let qs = QueryString::new().with_opt_value("id", id);
let url: hyper::Uri = Uri::new(
self.unix_socket_addr.clone(),
format!("/api/v1/data{qs}").as_str(),
)
.into();

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::GET)
.uri(url)
.header("Content-Type", "application/json")
.body(Full::from(json))
.body(Full::default())
.map_err(IPCError::from)?;

let resp = self.client.request(req).await.map_err(IPCError::from)?;
Expand Down
Loading

0 comments on commit 437fbc8

Please sign in to comment.