Skip to content

Commit

Permalink
fix: clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 9, 2024
1 parent 696b954 commit a291633
Show file tree
Hide file tree
Showing 35 changed files with 706 additions and 671 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ jobs:
steps:
- uses: actions/checkout@v4

- name: Run Clippy
run: cargo +nightly clippy --fix --workspace --exclude jiaozifs_client_rs --allow-dirty

- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Run tests
run: cargo test --verbose --workspace

- name: Start minikube
uses: medyagh/setup-minikube@latest

Expand All @@ -25,5 +34,4 @@ jobs:
eval $(minikube -p minikube docker-env)
make minikube-docker
- name: Run tests
run: cargo test --verbose --workspace
9 changes: 3 additions & 6 deletions crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use tokio::{
signal,
SignalKind,
},
sync::{
Mutex,
RwLock,
},
sync::RwLock,
task::JoinSet,
};

Expand Down Expand Up @@ -112,9 +109,9 @@ async fn main() -> Result<()> {
_ = token.cancelled() => {
handler.stop(true).await;
info!("ipc server stopped");
return Ok(());
}
};
Ok::<(), anyhow::Error>(())
});
}
{
Expand All @@ -135,7 +132,7 @@ async fn main() -> Result<()> {

{
//catch signal
let _ = tokio::spawn(async move {
tokio::spawn(async move {
let mut sig_term = signal(SignalKind::terminate()).unwrap();
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
select! {
Expand Down
31 changes: 11 additions & 20 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@ use actix_web::{
HttpResponse,
HttpServer,
};
use anyhow::{
anyhow,
Result,
};
use anyhow::Result;
use core::str;
use http_body_util::Collected;
use jz_action::{
core::db::{
JobDbRepo,
TrackerState,
},
utils::StdIntoAnyhowResult,
use jz_action::core::db::{
JobDbRepo,
TrackerState,
};
use serde::{
ser::SerializeStruct,
Expand Down Expand Up @@ -64,7 +58,6 @@ use serde::de::{
self,
Deserializer,
MapAccess,
SeqAccess,
Visitor,
};
use serde_repr::*;
Expand Down Expand Up @@ -535,7 +528,7 @@ impl IPCClientImpl {

impl IPCClient for IPCClientImpl {
async fn finish(&self) -> Result<(), IPCError> {
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into();
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status");

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::POST)
Expand All @@ -558,7 +551,7 @@ impl IPCClient for IPCClientImpl {
}

async fn status(&self) -> Result<Status, IPCError> {
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status").into();
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/status");

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::GET)
Expand All @@ -584,7 +577,7 @@ impl IPCClient for IPCClientImpl {

async fn submit_output(&self, req: SubmitOuputDataReq) -> Result<(), IPCError> {
let json = serde_json::to_string(&req)?;
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/submit").into();
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/submit");

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::POST)
Expand All @@ -607,7 +600,7 @@ impl IPCClient for IPCClientImpl {
}

async fn request_avaiable_data(&self) -> Result<Option<AvaiableDataResponse>, IPCError> {
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into();
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data");
let req: Request<Full<Bytes>> = Request::builder()
.method(Method::GET)
.uri(url)
Expand Down Expand Up @@ -637,7 +630,7 @@ impl IPCClient for IPCClientImpl {
async fn complete_result(&self, id: &str) -> Result<(), IPCError> {
let req = CompleteDataReq::new(id);
let json = serde_json::to_string(&req)?;
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data").into();
let url: Uri = Uri::new(self.unix_socket_addr.clone(), "/api/v1/data");

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::POST)
Expand All @@ -663,8 +656,6 @@ impl IPCClient for IPCClientImpl {
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use tracing_subscriber;

#[tokio::test]
async fn test_my_error() {
Expand All @@ -682,7 +673,7 @@ mod tests {
assert_eq!(code, ErrorNumber::AlreadyFinish);
assert_eq!(msg, "aaaa");
}
_ => Err(anyhow!("not expect type")).unwrap(),
_ => panic!("unexpected error type"),
}
}

Expand All @@ -696,7 +687,7 @@ mod tests {
IPCError::UnKnown(msg) => {
assert_eq!(msg, "bbbbbbbbbbbbbb");
}
_ => Err(anyhow!("not expect type")).unwrap(),
_ => panic!("unexpected error type"),
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ use tracing::{
use nodes_sdk::{
fs_cache::FileCache,
multi_sender::MultiSender,
MessageSender,
};
use tokio::{
select,
sync::{
broadcast,
mpsc,
oneshot,
RwLock,
},
task::JoinSet,
Expand Down Expand Up @@ -83,19 +83,16 @@ where
pub(crate) downstreams: Vec<String>,

// channel for process avaiable data request
pub(crate) ipc_process_data_req_tx:
Option<mpsc::Sender<((), oneshot::Sender<Result<Option<AvaiableDataResponse>>>)>>,
pub(crate) ipc_process_data_req_tx: Option<MessageSender<(), Option<AvaiableDataResponse>>>,

// channel for response complete data. do clean work when receive this request
pub(crate) ipc_process_completed_data_tx:
Option<mpsc::Sender<(CompleteDataReq, oneshot::Sender<Result<()>>)>>,
pub(crate) ipc_process_completed_data_tx: Option<MessageSender<CompleteDataReq, ()>>,

// channel for submit output data
pub(crate) ipc_process_submit_output_tx:
Option<mpsc::Sender<(SubmitOuputDataReq, oneshot::Sender<Result<()>>)>>,
pub(crate) ipc_process_submit_output_tx: Option<MessageSender<SubmitOuputDataReq, ()>>,

// channel for receive finish state from user container
pub(crate) ipc_process_finish_state_tx: Option<mpsc::Sender<((), oneshot::Sender<Result<()>>)>>,
pub(crate) ipc_process_finish_state_tx: Option<MessageSender<(), ()>>,
}
impl<R> MediaDataTracker<R>
where
Expand All @@ -114,11 +111,11 @@ where
fs_cache,
buf_size,
name: name.to_string(),
repo: repo,
repo,
local_state: Arc::new(RwLock::new(TrackerState::Init)),
up_nodes: up_nodes,
upstreams: upstreams,
downstreams: downstreams,
up_nodes,
upstreams,
downstreams,
ipc_process_submit_output_tx: None,
ipc_process_completed_data_tx: None,
ipc_process_data_req_tx: None,
Expand Down Expand Up @@ -157,7 +154,7 @@ where
.map(|count| info!("revert {count} SelectForSent data to Received"))?;

//check ready if both upnodes is finish and no pending data, we think it finish
if up_nodes.len() > 0 {
if !up_nodes.is_empty() {
let is_all_success = try_join_all(up_nodes.iter().map(|node_name|db_repo.get_node_by_name(node_name))).await
.map_err(|err|anyhow!("query node data {err}"))?
.iter()
Expand Down Expand Up @@ -210,7 +207,7 @@ where
let mut join_set: JoinSet<Result<()>> = JoinSet::new();

let new_data_tx = broadcast::Sender::new(1);
if self.downstreams.len() > 0 {
if !self.downstreams.is_empty() {
//process outgoing data
{
let new_data_tx = new_data_tx.clone();
Expand Down Expand Up @@ -247,7 +244,7 @@ where
return anyhow::Ok(());
}
new_data = new_data_rx.recv() => {
if let Err(_) = new_data {
if new_data.is_err() {
//drop fast to provent exceed channel capacity
continue;
}
Expand All @@ -270,7 +267,7 @@ where
};

//write outgoing
if new_batch.size >0 && downstreams.len()>0 {
if new_batch.size >0 && !downstreams.is_empty() {
info!("start to send data {} {:?}", &req.id, now.elapsed());
let sent_nodes: Vec<_>= req.sent.iter().map(|v|v.as_str()).collect();
if let Err(sent_nodes) = multi_sender.send(new_batch, &sent_nodes).await {
Expand Down Expand Up @@ -383,7 +380,7 @@ where

//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
if self.upstreams.len() > 0 {
if !self.upstreams.is_empty() {
//process user contaienr request
let db_repo = self.repo.clone();
let node_name = self.name.clone();
Expand Down Expand Up @@ -524,7 +521,7 @@ where
});
}

if self.upstreams.len() == 0 {
if self.upstreams.is_empty() {
//receive event from pod
//inputs nodes need to tell its finish
let db_repo = self.repo.clone();
Expand Down
15 changes: 5 additions & 10 deletions crates/compute_unit_runner/src/state_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
}
_ = interval.tick() => {
match repo
.get_node_by_name(&name)
.get_node_by_name(name)
.await{
Ok(record)=> {
debug!("{} fetch state from db", record.node_name);
Expand All @@ -70,15 +70,10 @@ where
*local_state = record.state.clone();
info!("update state {:?} -> {:?}", old_local_state, local_state);
drop(local_state);
match record.state {
TrackerState::Ready => {
if old_local_state == TrackerState::Init {
//start
info!("start data processing {:?}", record.incoming_streams);
join_set = Some(program_guard.route_data(token.clone()).await?);
}
}
_ => {}
if record.state == TrackerState::Ready && old_local_state == TrackerState::Init {
//start
info!("start data processing {:?}", record.incoming_streams);
join_set = Some(program_guard.route_data(token.clone()).await?);
}
},
Err(err)=> error!("fetch node state from db {err}")
Expand Down
32 changes: 13 additions & 19 deletions crates/dp_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@ use jz_action::{
},
network::datatransfer::MediaDataBatchResponse,
};
use nodes_sdk::fs_cache::FileCache;
use nodes_sdk::{
fs_cache::FileCache,
MessageSender,
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
select,
sync::{
mpsc::{
sync::mpsc::{
self,
Sender,
},
oneshot,
},
task::JoinSet,
time::{
self,
Expand Down Expand Up @@ -60,12 +59,9 @@ where

pub(crate) upstreams: Vec<String>,

pub(crate) downstreams: Vec<String>,
pub(crate) incoming_tx: Option<MessageSender<MediaDataBatchResponse, ()>>,

pub(crate) incoming_tx: Option<Sender<(MediaDataBatchResponse, oneshot::Sender<Result<()>>)>>,

pub(crate) request_tx:
Option<Sender<((), oneshot::Sender<Result<Option<MediaDataBatchResponse>>>)>>,
pub(crate) request_tx: Option<MessageSender<(), Option<MediaDataBatchResponse>>>,
}

impl<R> ChannelTracker<R>
Expand All @@ -79,17 +75,15 @@ where
buf_size: usize,
up_nodes: Vec<String>,
upstreams: Vec<String>,
downstreams: Vec<String>,
) -> Self {
ChannelTracker {
name: name.to_string(),
repo: repo,
repo,
fs_cache,
buf_size,
local_state: TrackerState::Init,
up_nodes: up_nodes,
upstreams: upstreams,
downstreams: downstreams,
up_nodes,
upstreams,
request_tx: None,
incoming_tx: None,
}
Expand Down Expand Up @@ -177,7 +171,7 @@ where
&mut self,
token: CancellationToken,
) -> Result<JoinSet<Result<()>>> {
if self.upstreams.len() == 0 {
if self.upstreams.is_empty() {
return Err(anyhow!("no upstream"));
}

Expand Down Expand Up @@ -292,7 +286,7 @@ where
//write batch files
if let Err(err) = fs_cache.write(data_batch).await {
error!("write files to disk fail {}", err);
resp.send(Err(err.into())).expect("request alread listen this channel");
resp.send(Err(err)).expect("request alread listen this channel");
continue;
}

Expand All @@ -301,7 +295,7 @@ where
if let Err(err) = db_repo.insert_new_path(&DataRecord{
node_name: node_name.clone(),
id:id.clone(),
size: size,
size,
state: DataState::Received,
sent: vec![],
direction:Direction::In,
Expand Down
Loading

0 comments on commit a291633

Please sign in to comment.