Skip to content

Commit

Permalink
feat: add copy in place nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 5, 2024
1 parent 3a70675 commit c4d9878
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"crates/dp_runner",
"crates/compute_unit_runner",
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/dummy_in", "nodes/dummy_out"
]
, "nodes/copy_in_palce"]

[workspace.package]
repository = "https://github.com/GitDataAI/jz-action"
Expand Down
4 changes: 4 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ build-nodes: $(OUTPUT)
cargo build -p dummy_out --release
cp target/release/dummy_out $(OUTPUT)/dummy_out

cargo build -p copy_in_place --release
cp target/release/copy_in_place $(OUTPUT)/copy_in_place

docker_nodes: build-nodes
docker build -f ./nodes/jz_reader/dockerfile -t jz-action/jz_reader:latest .
docker build -f ./nodes/jz_writer/dockerfile -t jz-action/jz_writer:latest .
docker build -f ./nodes/dummy_in/dockerfile -t jz-action/dummy_in:latest .
docker build -f ./nodes/dummy_out/dockerfile -t jz-action/dummy_out:latest .
docker build -f ./nodes/copy_in_place/dockerfile -t jz-action/copy_in_place:latest .

################## minikube
docker: docker_cd docker_dp docker_nodes
Expand Down
21 changes: 21 additions & 0 deletions nodes/copy_in_palce/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "copy_in_palce"
version = "0.1.0"
repository.workspace = true
license.workspace = true
edition.workspace = true
include.workspace = true

[dependencies]
jz_action = { path = "../../"}
compute_unit_runner = {path = "../../crates/compute_unit_runner"}

uuid = {workspace = true}
tokio = { workspace = true}
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
clap = {version="4.5.7", features=["derive"]}
random_word = { version = "0.4.3", features = ["en"] }
122 changes: 122 additions & 0 deletions nodes/copy_in_palce/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use anyhow::{
anyhow,
Result,
};
use clap::Parser;
use compute_unit_runner::ipc::{
self,
IPCClient,
SubmitOuputDataReq,
};
use jz_action::utils::StdIntoAnyhowResult;
use random_word::Lang;
use std::{
path::Path,
str::FromStr, time::Duration,
};
use tokio::{
fs,
io::AsyncWriteExt,
select,
signal::unix::{
signal,
SignalKind,
},
sync::mpsc,
time::{sleep, Instant},
};
use tracing::{
error,
info,
Level,
};

#[derive(Debug, Parser)]
#[command(
name = "copy_in_place",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images. move input directory to dest directory."
)]

struct Args {
#[arg(short, long, default_value = "INFO")]
log_level: String,

#[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")]
unix_socket_addr: String,

#[arg(short, long, default_value = "/app/tmp")]
tmp_path: String,
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_max_level(Level::from_str(&args.log_level)?)
.try_init()
.anyhow()?;

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<Result<()>>(1);
{
let shutdown_tx = shutdown_tx.clone();
let _ = tokio::spawn(async move {
if let Err(e) = copy_in_place(args).await {
let _ = shutdown_tx.send(Err(anyhow!("dummy read {e}"))).await;
}
});
}

{
//catch signal
let _ = tokio::spawn(async move {
let mut sig_term = signal(SignalKind::terminate()).unwrap();
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
select! {
_ = sig_term.recv() => info!("Recieve SIGTERM"),
_ = sig_int.recv() => info!("Recieve SIGTINT"),
};
let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await;
});
}

if let Some(Err(err)) = shutdown_rx.recv().await {
error!("program exit with error {:?}", err)
}
info!("gracefully shutdown");
Ok(())
}

async fn copy_in_place(args: Args) -> Result<()> {
let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
loop {
let instant = Instant::now();

let req = client.request_avaiable_data().await?;
if req.is_none() {
sleep(Duration::from_secs(2)).await;
continue;
}
let id = req.unwrap().id;
let path_str = tmp_path.join(&id);
let root_input_dir = path_str.as_path();


let id = uuid::Uuid::new_v4().to_string();
let output_dir = tmp_path.join(&id);

fs::rename(root_input_dir, output_dir).await?;

info!("generate new data spent {:?}", instant.elapsed());

client.complete_result(&id).await?;

//submit directory after completed a batch
client
.submit_output(SubmitOuputDataReq::new(&id, 30))
.await?;
info!("submit new data {:?}", instant.elapsed());
}
}

0 comments on commit c4d9878

Please sign in to comment.