Feedback on Parallel DAG Task Executor #6287
-
Hey! 👋 I am new to Rust and tokio. I wanted to write a parallel task executor. I would be happy if someone could give me feedback. If this is not the right place to ask, I would be happy if someone could tell me where I can ask this kind of question. My goal is that this program:
The code can also be found here: If there are ways to write this in a more idiomatic way, I would also be happy to learn. I know that there are crates that do similar things but this is more a learning exercise for me. Some explanation for the code:
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
type TaskId = usize;
enum Resource {
X(&'static str),
Y(&'static str),
}
impl Resource {
pub async fn make(&self) {
match self {
Resource::X(x) => {
println!("MAKE({})", x);
sleep(Duration::from_secs(1)).await;
println!("FINISHED({})", x);
}
Resource::Y(y) => {
println!("MAKE({})", y);
sleep(Duration::from_secs(2)).await;
println!("FINISHED({})", y);
}
}
}
pub fn name(&self) -> &'static str {
match self {
Resource::X(x) => x,
Resource::Y(y) => y,
}
}
}
struct Job {
deps: Vec<TaskId>,
res: Resource,
finished: bool,
}
impl Job {
pub fn new(deps: Vec<TaskId>, res: Resource) -> Self {
Job {
deps,
res,
finished: false,
}
}
fn ready(&self, e: &Executor) -> bool {
if self.finished {
return false;
}
for u in &self.deps {
match e.jobs[*u].try_lock() {
Ok(u) => {
if !u.finished {
return false;
}
}
Err(_) => {
println!("ready check: try lock failed");
return false;
}
}
}
true
}
}
struct Executor {
jobs: Vec<Arc<Mutex<Job>>>,
notify_finished: Sender<()>,
}
impl Executor {
pub fn new(notify_finished: Sender<()>) -> Self {
Executor {
jobs: Vec::new(),
notify_finished,
}
}
pub async fn run(this: Arc<Self>, mut rx: Receiver<()>) {
while let Some(()) = rx.recv().await {
println!("-- Got notified! Running ready tasks!");
Self::run_ready(Arc::clone(&this)).await;
let done = this
.jobs
.iter()
.all(|j| j.try_lock().is_ok_and(|x| x.finished));
if done {
return;
}
}
}
pub async fn run_ready(this: Arc<Self>) {
println!("notify..");
for j in &this.jobs {
match j.try_lock() {
Ok(x) => {
// Lock worked. Let's see if we can do something.
if x.ready(&this) {
println!(" {} spawned", x.res.name());
let j_copy = Arc::clone(j);
let this_copy = Arc::clone(&this);
tokio::spawn(async move {
let mut j = j_copy.lock().await;
// Job is still ready if not already finished.
if j.finished {
// We're late! Another thread already did the job.
println!(" {} already finished, skip", j.res.name());
return;
}
// Since we're the only one holding the lock now,
// we can be sure that the job will only done exactly once
// and there's no parallel Job::make() call for this job.
j.res.make().await;
j.finished = true;
let _ = this_copy.notify_finished.send(()).await;
});
} else {
println!(" {} not ready", x.res.name());
}
}
Err(_) => {
// Nothing to do, if it's locked then another thread is already handling the job.
println!(" try lock didn't work");
}
}
}
}
pub fn add(&mut self, up: Vec<TaskId>, exec: Resource) -> TaskId {
self.jobs.push(Arc::new(Mutex::new(Job::new(up, exec))));
(self.jobs.len() - 1) as TaskId
}
}
#[tokio::main]
async fn main() {
// example:
// A -->
// C
// B -->
let (tx, rx) = channel(4096);
let _ = tx.send(()).await;
let mut e = Executor::new(tx);
let a = e.add(Vec::new(), Resource::Y("A"));
let b = e.add(Vec::new(), Resource::Y("B"));
let _c = e.add(vec![a, b], Resource::X("C"));
let e = Arc::new(e);
Executor::run(Arc::clone(&e), rx).await;
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
That seems really complicated to me. I would probably just Watch channel is suggested here as it supports multiple tasks waiting for one task, but oneshot could be used if the output is used only in one place. |
Beta Was this translation helpful? Give feedback.
That seems really complicated to me. I would probably just
tokio::spawn
each task in your DAG and use a watch channel to wait for previous tasks to finish. exampleWatch channel is suggested here as it supports multiple tasks waiting for one task, but oneshot could be used if the output is used only in one place.