Skip to content

Commit

Permalink
XMessage::Over for client to ask to stop communication
Browse files Browse the repository at this point in the history
The number of task can be set and pass to add tasks in batch.
  • Loading branch information
unkcpz committed Nov 13, 2024
1 parent 811e8d0 commit cff6d89
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 122 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ For testing purpose, I recommend to use in memory DB by running:
surreal start --user root --pass root memory
```

Add task to table and run it
Add task to table and run it.
The tasks can have different scale that run with different range of snooze time, and have different block type that required to be launched in async time or in threads.

```bash
./actionwurm task add -h
./actionwurm task play <id>
./actionwurm task play -a
```

The tasks can have different scale that run with different range of snooze time, and have different block type that required to be launched in async time or in threads.

To check the task list and filtering on specific state of tasks

```bash
Expand Down Expand Up @@ -128,6 +127,9 @@ I should polish and clear about design note and make an AEP for it first.

At the current stage, the code base is small and every part is clear defined without too much abstractions.

Since the task pool is added by using mocked surrealdb, which requires huge amount of crates dependencies.
The worker and actioner binaries should be moved to crates that has independent `Cargo.toml`, to make the compile of server crate fast.

---------------------

- [ ] benchmark throughput, not too much to bench, the bottleneck is in DB access.
Expand Down
173 changes: 92 additions & 81 deletions src/actioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ pub async fn handle(
// - should reported from worker when the mission is finished
// - should also get information from worker complain about the long running
// block process if it runs on non-block worker.
if let Some(Ok(msg)) = framed_reader.next().await {
match msg {
XMessage::WorkerTablePrint => {
let resp_msg = XMessage::BulkMessage(format!("{}\n", worker_table.render().await,));
framed_writer.send(resp_msg).await?;
}
loop {
if let Some(Ok(msg)) = framed_reader.next().await {
match msg {
// You say over, I say over
XMessage::Over => {
framed_writer.send(XMessage::Over).await?;
break;
}

XMessage::WorkerTablePrint => {
let rtable = worker_table.render().await.to_string();
let resp_msg = XMessage::BulkMessage(rtable);
framed_writer.send(resp_msg).await?;
}

XMessage::TaskTablePrint { states } => {
let count_info = task_table.count().await;
let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.",
XMessage::TaskTablePrint { states } => {
let count_info = task_table.count().await;
let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.",
count_info.get(&State::Created).unwrap_or(&0),
count_info.get(&State::Ready).unwrap_or(&0),
count_info.get(&State::Submit).unwrap_or(&0),
Expand All @@ -50,87 +58,90 @@ pub async fn handle(
count_info.get(&State::Terminated(0)).unwrap_or(&0),
count_info.get(&State::Terminated(-1)).unwrap_or(&0),
);
let tasks = task_table.filter_by_states(states).await;
let task_table = task::Table::from_mapping(tasks);
let resp_msg = XMessage::BulkMessage(format!(
"{}\n\n{}\n",
task_table.render().await,
count_info,
));
framed_writer.send(resp_msg).await?;
}
let tasks = task_table.filter_by_states(states).await;
let task_table = task::Table::from_mapping(tasks);
let resp_msg = XMessage::BulkMessage(format!(
"{}\n\n{}",
task_table.render().await,
count_info,
));
framed_writer.send(resp_msg).await?;
}

// Signal direction - src: actioner, dst: coordinator
// Handle signal n/a -> Created
XMessage::ActionerOp(Operation::AddTask(record_id)) => {
// TODO: need to check if the task exist
// TODO: priority passed from operation
let task_ = Task::new(0, &record_id);
let id = task_table.create(task_.clone()).await;

// send resp to actioner
let resp_msg = XMessage::BulkMessage(format!(
"Add task id={id}, map to task record_id={record_id} to run.\n"
));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Ready
XMessage::ActionerOp(Operation::PlayTask(id)) => {
// TODO: need to check init state is able to be played
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Ready;
task_table.update(&id, task_).await?;
// Signal direction - src: actioner, dst: coordinator
// Handle signal n/a -> Created
XMessage::ActionerOp(Operation::AddTask(record_id)) => {
// TODO: need to check if the task exist
// TODO: priority passed from operation
let task_ = Task::new(0, &record_id);
let id = task_table.create(task_.clone()).await;

// send resp to actioner
let resp_msg = XMessage::BulkMessage(format!(
"Add task id={id}, map to task record_id={record_id} to run."
));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Ready
XMessage::ActionerOp(Operation::PlayTask(id)) => {
// TODO: need to check init state is able to be played
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Ready;
task_table.update(&id, task_).await?;
}

let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.\n",));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal all pause/created x -> Ready
XMessage::ActionerOp(Operation::PlayAllTask) => {
// TODO: also include pause state to resume
let resumable_tasks = task_table
.filter_by_states(vec![task::State::Created])
.await;

for (task_id, _) in resumable_tasks {
let Some(mut task_) = task_table.read(&task_id).await else {
continue;
};
// XXX: check, is cloned?? so the old_state is different from after changed
let old_state = task_.state;

task_.state = task::State::Ready;
task_table.update(&task_id, task_).await?;
println!(
"Play task {task_id}: {} -> {}",
old_state,
task::State::Ready
);
let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.",));
framed_writer.send(resp_msg).await?;
}
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Terminated(-1)
XMessage::ActionerOp(Operation::KillTask(id)) => {
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Terminated(-1);
task_table.update(&id, task_).await?;
// Signal direction - src: actioner, dst: coordinator
// Handle signal all pause/created x -> Ready
XMessage::ActionerOp(Operation::PlayAllTask) => {
// TODO: also include pause state to resume
let resumable_tasks = task_table
.filter_by_states(vec![task::State::Created])
.await;

for (task_id, _) in resumable_tasks {
let Some(mut task_) = task_table.read(&task_id).await else {
continue;
};
// XXX: check, is cloned?? so the old_state is different from after changed
let old_state = task_.state;

// TODO: also sending a cancelling signal to the runnning task on worker
task_.state = task::State::Ready;
task_table.update(&task_id, task_).await?;
println!(
"Play task {task_id}: {} -> {}",
old_state,
task::State::Ready
);
}
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Terminated(-1)
XMessage::ActionerOp(Operation::KillTask(id)) => {
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Terminated(-1);
task_table.update(&id, task_).await?;

// TODO: also sending a cancelling signal to the runnning task on worker

let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",));
let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",));
framed_writer.send(resp_msg).await?;
}
}

// boss is asking nonsense
_ => {
let resp_msg = XMessage::BulkMessage(format!(
"Shutup, I try to ignore you, since you say '{msg:#?}'"
));
framed_writer.send(resp_msg).await?;
}
}
_ => {
let resp_msg = XMessage::BulkMessage(format!(
"Shutup, I try to ignore you, since you say '{msg:#?}'"
));
framed_writer.send(resp_msg).await?;
}
}
}

Expand Down
106 changes: 68 additions & 38 deletions src/bin/actioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use clap::{Parser, Subcommand, ValueEnum};
use futures::SinkExt;
use rand::{self, Rng};
use surrealdb::sql::Datetime;
use tokio::time;
use uuid::Uuid;

use tokio::net::TcpStream;
Expand Down Expand Up @@ -103,6 +104,10 @@ enum BlockType {
enum TaskCommand {
/// Add a new task
Add {
// number of tasks to add
#[arg(short, long)]
number: u64,

// Scale of task, small, medium, large
#[arg(short, long, value_enum)]
scale: TaskScale,
Expand Down Expand Up @@ -205,38 +210,44 @@ async fn main() -> anyhow::Result<()> {
}
},
Commands::Task { command } => match command {
TaskCommand::Add { scale, block_type } => {
let isblock = match block_type {
BlockType::Sync => true,
BlockType::Async => false,
};

// small scale for 100 ~ 1000 millis
// medium for 1000 ~ 10_000 millis
// large for 10_000 ~ 100_000
let x = {
let mut rng = rand::thread_rng();
rng.gen_range(1..10)
};

let st = match scale {
TaskScale::Small => x * 100,
TaskScale::Medium => x * 1000,
TaskScale::Large => x * 10_000,
};

let task = MockTask::new(st, isblock, Utc::now().into());

let created: Option<Record> = db.create("task").content(task).await?;

if let Some(created) = created {
let record_id = created.id.to_string();

framed_writer
.send(XMessage::ActionerOp(Operation::AddTask(record_id)))
.await?;
} else {
eprintln!("not able to create task to pool.");
TaskCommand::Add {
number,
scale,
block_type,
} => {
for _ in 0..number {
let isblock = match block_type {
BlockType::Sync => true,
BlockType::Async => false,
};

// small scale for 100 ~ 1000 millis
// medium for 1000 ~ 10_000 millis
// large for 10_000 ~ 100_000
let x = {
let mut rng = rand::thread_rng();
rng.gen_range(1..10)
};

let st = match scale {
TaskScale::Small => x * 100,
TaskScale::Medium => x * 1000,
TaskScale::Large => x * 10_000,
};

let task = MockTask::new(st, isblock, Utc::now().into());

let created: Option<Record> = db.create("task").content(task).await?;

if let Some(created) = created {
let record_id = created.id.to_string();

framed_writer
.send(XMessage::ActionerOp(Operation::AddTask(record_id)))
.await?;
} else {
eprintln!("not able to create task to pool.");
}
}
}
TaskCommand::Play { all: true, .. } => {
Expand Down Expand Up @@ -286,13 +297,32 @@ async fn main() -> anyhow::Result<()> {
},
}

if let Some(Ok(msg)) = framed_reader.next().await {
match msg {
XMessage::BulkMessage(s) => {
println!("{s}");
// After a command bundle, send over to finish the comm and listen to echo
framed_writer.send(XMessage::Over).await?;

let timeout = time::Duration::from_millis(500);
loop {
tokio::select! {
Some(Ok(msg)) = framed_reader.next() => {
match msg {
XMessage::BulkMessage(s) => {
println!("{s}");
}

XMessage::Over => {
break;
}

_ => {
dbg!(msg);
}
}
}
_ => {
dbg!(msg);

() = time::sleep(timeout) => {
println!("Timeout reached: No message received.");
// Handle timeout, e.g., log, retry, or terminate loop
break;
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ pub enum XMessage {
from: task::State,
to: task::State,
},

// over
Over,
}

#[derive(Debug)]
Expand Down

0 comments on commit cff6d89

Please sign in to comment.