From 4d0750c185fe215056a98c3f67ae83f97fdad721 Mon Sep 17 00:00:00 2001 From: leoshimo <56844000+leoshimo@users.noreply.github.com> Date: Sat, 23 Dec 2023 10:19:04 -0700 Subject: [PATCH] feat: ps binding as NativeAsyncFn --- libvrs/src/rt/bindings/proc.rs | 54 ++++++++++++++++++++++++------ libvrs/src/rt/kernel.rs | 6 ++++ libvrs/src/rt/proc.rs | 19 ++++++----- libvrs/src/rt/proc_io.rs | 20 ++--------- libvrs/src/rt/program.rs | 61 +++++++++++++++++++++++++++------- libvrs/src/rt/pubsub.rs | 6 ++++ libvrs/src/rt/registry.rs | 6 ++++ lyric/src/error.rs | 3 ++ 8 files changed, 127 insertions(+), 48 deletions(-) diff --git a/libvrs/src/rt/bindings/proc.rs b/libvrs/src/rt/bindings/proc.rs index 45f105c..6011778 100644 --- a/libvrs/src/rt/bindings/proc.rs +++ b/libvrs/src/rt/bindings/proc.rs @@ -1,8 +1,8 @@ //! Process Management Bindings use crate::rt::proc_io::IOCmd; -use crate::rt::program::{Extern, NativeAsyncFn, NativeFn, NativeFnOp, Program, Val}; +use crate::rt::program::{Extern, Fiber, NativeAsyncFn, NativeFn, NativeFnOp, Program, Val}; use crate::rt::ProcessId; -use lyric::Error; +use lyric::{Error, Result}; use std::time::Duration; use tokio::time; use tracing::debug; @@ -37,13 +37,9 @@ pub(crate) fn pid_fn() -> NativeFn { } /// Binding to list processes -pub(crate) fn ps_fn() -> NativeFn { - NativeFn { - func: |_, _| { - Ok(NativeFnOp::Yield(Val::Extern(Extern::IOCmd(Box::new( - IOCmd::ListProcesses, - ))))) - }, +pub(crate) fn ps_fn() -> NativeAsyncFn { + NativeAsyncFn { + func: |f, _| Box::new(ps_impl(f)), } } @@ -112,13 +108,34 @@ pub(crate) fn spawn_fn() -> NativeFn { } } +/// Implementation for ps +async fn ps_impl(fiber: &mut Fiber) -> Result { + let kernel = fiber + .locals() + .kernel + .as_ref() + .and_then(|k| k.upgrade()) + .ok_or(Error::Runtime( + "Kernel is missing for process".into(), + ))?; + let procs = kernel + .procs() + .await + .map_err(|e| Error::Runtime(format!("{e}")))? + .into_iter() + .map(|pid| Val::Extern(Extern::ProcessId(pid))) + .collect::>(); + Ok(Val::List(procs)) +} + #[cfg(test)] mod tests { use super::*; use crate::rt::{kernel, ProcessResult}; + use assert_matches::assert_matches; #[tokio::test] - async fn self_pid() { + async fn binding_self() { let k = kernel::start(); let hdl = k .spawn_prog(Program::from_expr("(self)").unwrap()) @@ -149,4 +166,21 @@ mod tests { ProcessResult::Done(Val::keyword("ok")) ); } + + #[tokio::test] + async fn ps() { + let k = kernel::start(); + let hdl = k + .spawn_prog(Program::from_expr("(ps)").unwrap()) + .await + .unwrap(); + + let pid = Val::Extern(Extern::ProcessId(hdl.id())); + let exit = hdl.join().await.unwrap(); + assert_matches!( + exit.status.unwrap(), + ProcessResult::Done(Val::List(pids)) if + pids.contains(&pid) + ); + } } diff --git a/libvrs/src/rt/kernel.rs b/libvrs/src/rt/kernel.rs index 0e05663..374d3e2 100644 --- a/libvrs/src/rt/kernel.rs +++ b/libvrs/src/rt/kernel.rs @@ -121,6 +121,12 @@ impl WeakKernelHandle { } } +impl std::cmp::PartialEq for WeakKernelHandle { + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(&self.ev_tx, &other.ev_tx) + } +} + /// Messages for [Kernel] #[derive(Debug)] pub enum Event { diff --git a/libvrs/src/rt/proc.rs b/libvrs/src/rt/proc.rs index 0ad8760..92154c1 100644 --- a/libvrs/src/rt/proc.rs +++ b/libvrs/src/rt/proc.rs @@ -56,29 +56,28 @@ pub struct ProcessExit { impl Process { /// Create a new process for program pub(crate) fn from_prog(id: ProcessId, prog: Program) -> Self { - let io = ProcIO::new(id); Self { id, prog, - locals: Locals { pid: id, io }, + locals: Locals::new(id), } } /// Set kernel handle for process pub(crate) fn kernel(mut self, k: WeakKernelHandle) -> Self { - self.locals.io.kernel(k); + self.locals.kernel(k); self } /// Set registry handle for process pub(crate) fn registry(mut self, r: Registry) -> Self { - self.locals.io.registry(r); + self.locals.registry(r); self } /// Set pubsub handle for process pub(crate) fn pubsub(mut self, pubsub: PubSubHandle) -> Self { - self.locals.io.pubsub(pubsub); + self.locals.pubsub(pubsub); self } @@ -90,15 +89,13 @@ impl Process { let (msg_tx, mut msg_rx) = mpsc::channel(32); let mailbox: MailboxHandle = Mailbox::spawn(self.id); - self.locals.io.mailbox(mailbox.clone()); - let proc_hdl = ProcessHandle { id: self.id, hdl_tx: msg_tx, exit_rx: exit_rx.shared(), mailbox, }; - self.locals.io.handle(proc_hdl.clone()); + self.locals.handle(proc_hdl.clone()); let mut fiber = self.prog.into_fiber(self.locals); @@ -166,6 +163,12 @@ impl ProcessHandle { } } +impl std::cmp::PartialEq for ProcessHandle { + fn eq(&self, other: &Self) -> bool { + self.id() == other.id() + } +} + #[derive(Debug)] enum Event { Kill, diff --git a/libvrs/src/rt/proc_io.rs b/libvrs/src/rt/proc_io.rs index 6a70357..9683d2b 100644 --- a/libvrs/src/rt/proc_io.rs +++ b/libvrs/src/rt/proc_io.rs @@ -31,7 +31,8 @@ pub(crate) struct ProcIO { pub enum IOCmd { // RecvRequest, // SendResponse(Val), - ListProcesses, + + // ListProcesses, KillProcess(ProcessId), Spawn(Program), @@ -101,7 +102,6 @@ impl ProcIO { match cmd { // IOCmd::RecvRequest => self.recv_request().await, // IOCmd::SendResponse(v) => self.send_response(v).await, - IOCmd::ListProcesses => self.list_processes().await, IOCmd::KillProcess(pid) => self.kill_process(pid).await, IOCmd::SendMessage(dst, val) => self.send_message(dst, val).await, IOCmd::ListMessages => self.list_message().await, @@ -116,22 +116,6 @@ impl ProcIO { } } - /// List Processes - async fn list_processes(&self) -> Result { - let kernel = self - .kernel - .as_ref() - .and_then(|k| k.upgrade()) - .ok_or(Error::NoKernel)?; - let procs = kernel - .procs() - .await? - .into_iter() - .map(|pid| Val::Extern(Extern::ProcessId(pid))) - .collect::>(); - Ok(Val::List(procs)) - } - /// Kill process async fn kill_process(&self, pid: ProcessId) -> Result { let kernel = self diff --git a/libvrs/src/rt/program.rs b/libvrs/src/rt/program.rs index 9edfdb9..8a423ed 100644 --- a/libvrs/src/rt/program.rs +++ b/libvrs/src/rt/program.rs @@ -3,9 +3,14 @@ use lyric::{Error, Result, SymbolId}; +use crate::ProcessHandle; + use super::bindings; +use super::kernel::WeakKernelHandle; use super::proc::ProcessId; -use super::proc_io::{IOCmd, ProcIO}; +use super::proc_io::IOCmd; +use super::pubsub::PubSubHandle; +use super::registry::Registry; /// Program used to spawn new processes #[derive(Debug, Clone)] @@ -55,19 +60,18 @@ pub enum Extern { } /// Locals for Program Fiber -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Locals { /// Id of process owning fiber pub(crate) pid: ProcessId, - /// IO sources - pub(crate) io: ProcIO, -} - -impl std::cmp::PartialEq for Locals { - fn eq(&self, other: &Self) -> bool { - use std::ptr; - self.pid == other.pid && ptr::eq(&self.io, &other.io) - } + /// Handle to kernel process + pub(crate) kernel: Option, + /// Handle to process registry + pub(crate) registry: Option, + /// Handle to pubsub + pub(crate) pubsub: Option, + /// Handle to current process + pub(crate) self_handle: Option, } impl Program { @@ -115,6 +119,39 @@ pub fn connection_program() -> Program { .expect("Connection program should compile") } +impl Locals { + pub(crate) fn new(pid: ProcessId) -> Self { + Self { + pid, + kernel: None, + registry: None, + pubsub: None, + self_handle: None, + } + } + + pub(crate) fn kernel(&mut self, kernel: WeakKernelHandle) -> &mut Self { + self.kernel = Some(kernel); + self + } + + pub(crate) fn registry(&mut self, registry: Registry) -> &mut Self { + self.registry = Some(registry); + self + } + + pub(crate) fn pubsub(&mut self, pubsub: PubSubHandle) -> &mut Self { + self.pubsub = Some(pubsub); + self + } + + pub(crate) fn handle(&mut self, handle: ProcessHandle) -> &mut Self { + // + self.self_handle = Some(handle); + self + } +} + impl PartialEq for Program { fn eq(&self, _other: &Self) -> bool { false @@ -150,7 +187,7 @@ fn program_env() -> Env { { e.bind_native(SymbolId::from("kill"), bindings::kill_fn()) .bind_native(SymbolId::from("pid"), bindings::pid_fn()) - .bind_native(SymbolId::from("ps"), bindings::ps_fn()) + .bind_native_async(SymbolId::from("ps"), bindings::ps_fn()) .bind_native(SymbolId::from("self"), bindings::self_fn()) .bind_native_async(SymbolId::from("sleep"), bindings::sleep_fn()) .bind_native(SymbolId::from("spawn"), bindings::spawn_fn()); diff --git a/libvrs/src/rt/pubsub.rs b/libvrs/src/rt/pubsub.rs index 7d2b54a..4b8ca0d 100644 --- a/libvrs/src/rt/pubsub.rs +++ b/libvrs/src/rt/pubsub.rs @@ -104,6 +104,12 @@ impl PubSubHandle { } } +impl std::cmp::PartialEq for PubSubHandle { + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(&self.tx, &other.tx) + } +} + impl PubSub { /// Spawn a new global pubsub task pub(crate) fn spawn() -> PubSubHandle { diff --git a/libvrs/src/rt/registry.rs b/libvrs/src/rt/registry.rs index e373c29..bc1a2c6 100644 --- a/libvrs/src/rt/registry.rs +++ b/libvrs/src/rt/registry.rs @@ -88,6 +88,12 @@ impl Registry { } } +impl std::cmp::PartialEq for Registry { + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(&self.tx, &other.tx) + } +} + impl RegistryTask { fn new(weak_tx: mpsc::WeakSender) -> Self { Self { diff --git a/lyric/src/error.rs b/lyric/src/error.rs index 15eee67..9d2139c 100644 --- a/lyric/src/error.rs +++ b/lyric/src/error.rs @@ -30,4 +30,7 @@ pub enum Error { #[error("Unexpected top-level fiber yield")] UnexpectedTopLevelYield, + + #[error("Runtime error - {0}")] + Runtime(String), }