Skip to content

Commit

Permalink
feat: ps binding as NativeAsyncFn
Browse files Browse the repository at this point in the history
  • Loading branch information
leoshimo committed Dec 23, 2023
1 parent 7e9e2ed commit 4d0750c
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 48 deletions.
54 changes: 44 additions & 10 deletions libvrs/src/rt/bindings/proc.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Process Management Bindings
use crate::rt::proc_io::IOCmd;
use crate::rt::program::{Extern, NativeAsyncFn, NativeFn, NativeFnOp, Program, Val};
use crate::rt::program::{Extern, Fiber, NativeAsyncFn, NativeFn, NativeFnOp, Program, Val};
use crate::rt::ProcessId;
use lyric::Error;
use lyric::{Error, Result};
use std::time::Duration;
use tokio::time;
use tracing::debug;
Expand Down Expand Up @@ -37,13 +37,9 @@ pub(crate) fn pid_fn() -> NativeFn {
}

/// Binding to list processes
pub(crate) fn ps_fn() -> NativeFn {
NativeFn {
func: |_, _| {
Ok(NativeFnOp::Yield(Val::Extern(Extern::IOCmd(Box::new(
IOCmd::ListProcesses,
)))))
},
pub(crate) fn ps_fn() -> NativeAsyncFn {
NativeAsyncFn {
func: |f, _| Box::new(ps_impl(f)),
}
}

Expand Down Expand Up @@ -112,13 +108,34 @@ pub(crate) fn spawn_fn() -> NativeFn {
}
}

/// Implementation for ps
async fn ps_impl(fiber: &mut Fiber) -> Result<Val> {
let kernel = fiber
.locals()
.kernel
.as_ref()
.and_then(|k| k.upgrade())
.ok_or(Error::Runtime(
"Kernel is missing for process".into(),
))?;
let procs = kernel
.procs()
.await
.map_err(|e| Error::Runtime(format!("{e}")))?
.into_iter()
.map(|pid| Val::Extern(Extern::ProcessId(pid)))
.collect::<Vec<_>>();
Ok(Val::List(procs))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::rt::{kernel, ProcessResult};
use assert_matches::assert_matches;

#[tokio::test]
async fn self_pid() {
async fn binding_self() {
let k = kernel::start();
let hdl = k
.spawn_prog(Program::from_expr("(self)").unwrap())
Expand Down Expand Up @@ -149,4 +166,21 @@ mod tests {
ProcessResult::Done(Val::keyword("ok"))
);
}

#[tokio::test]
async fn ps() {
let k = kernel::start();
let hdl = k
.spawn_prog(Program::from_expr("(ps)").unwrap())
.await
.unwrap();

let pid = Val::Extern(Extern::ProcessId(hdl.id()));
let exit = hdl.join().await.unwrap();
assert_matches!(
exit.status.unwrap(),
ProcessResult::Done(Val::List(pids)) if
pids.contains(&pid)
);
}
}
6 changes: 6 additions & 0 deletions libvrs/src/rt/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ impl WeakKernelHandle {
}
}

impl std::cmp::PartialEq for WeakKernelHandle {
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(&self.ev_tx, &other.ev_tx)
}
}

/// Messages for [Kernel]
#[derive(Debug)]
pub enum Event {
Expand Down
19 changes: 11 additions & 8 deletions libvrs/src/rt/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,28 @@ pub struct ProcessExit {
impl Process {
/// Create a new process for program
pub(crate) fn from_prog(id: ProcessId, prog: Program) -> Self {
let io = ProcIO::new(id);
Self {
id,
prog,
locals: Locals { pid: id, io },
locals: Locals::new(id),
}
}

/// Set kernel handle for process
pub(crate) fn kernel(mut self, k: WeakKernelHandle) -> Self {
self.locals.io.kernel(k);
self.locals.kernel(k);
self
}

/// Set registry handle for process
pub(crate) fn registry(mut self, r: Registry) -> Self {
self.locals.io.registry(r);
self.locals.registry(r);
self
}

/// Set pubsub handle for process
pub(crate) fn pubsub(mut self, pubsub: PubSubHandle) -> Self {
self.locals.io.pubsub(pubsub);
self.locals.pubsub(pubsub);
self
}

Expand All @@ -90,15 +89,13 @@ impl Process {
let (msg_tx, mut msg_rx) = mpsc::channel(32);

let mailbox: MailboxHandle = Mailbox::spawn(self.id);
self.locals.io.mailbox(mailbox.clone());

let proc_hdl = ProcessHandle {
id: self.id,
hdl_tx: msg_tx,
exit_rx: exit_rx.shared(),
mailbox,
};
self.locals.io.handle(proc_hdl.clone());
self.locals.handle(proc_hdl.clone());

let mut fiber = self.prog.into_fiber(self.locals);

Expand Down Expand Up @@ -166,6 +163,12 @@ impl ProcessHandle {
}
}

impl std::cmp::PartialEq for ProcessHandle {
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}

#[derive(Debug)]
enum Event {
Kill,
Expand Down
20 changes: 2 additions & 18 deletions libvrs/src/rt/proc_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub(crate) struct ProcIO {
pub enum IOCmd {
// RecvRequest,
// SendResponse(Val),
ListProcesses,

// ListProcesses,
KillProcess(ProcessId),
Spawn(Program),

Expand Down Expand Up @@ -101,7 +102,6 @@ impl ProcIO {
match cmd {
// IOCmd::RecvRequest => self.recv_request().await,
// IOCmd::SendResponse(v) => self.send_response(v).await,
IOCmd::ListProcesses => self.list_processes().await,
IOCmd::KillProcess(pid) => self.kill_process(pid).await,
IOCmd::SendMessage(dst, val) => self.send_message(dst, val).await,
IOCmd::ListMessages => self.list_message().await,
Expand All @@ -116,22 +116,6 @@ impl ProcIO {
}
}

/// List Processes
async fn list_processes(&self) -> Result<Val> {
let kernel = self
.kernel
.as_ref()
.and_then(|k| k.upgrade())
.ok_or(Error::NoKernel)?;
let procs = kernel
.procs()
.await?
.into_iter()
.map(|pid| Val::Extern(Extern::ProcessId(pid)))
.collect::<Vec<_>>();
Ok(Val::List(procs))
}

/// Kill process
async fn kill_process(&self, pid: ProcessId) -> Result<Val> {
let kernel = self
Expand Down
61 changes: 49 additions & 12 deletions libvrs/src/rt/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
use lyric::{Error, Result, SymbolId};

use crate::ProcessHandle;

use super::bindings;
use super::kernel::WeakKernelHandle;
use super::proc::ProcessId;
use super::proc_io::{IOCmd, ProcIO};
use super::proc_io::IOCmd;
use super::pubsub::PubSubHandle;
use super::registry::Registry;

/// Program used to spawn new processes
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -55,19 +60,18 @@ pub enum Extern {
}

/// Locals for Program Fiber
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Locals {
/// Id of process owning fiber
pub(crate) pid: ProcessId,
/// IO sources
pub(crate) io: ProcIO,
}

impl std::cmp::PartialEq for Locals {
fn eq(&self, other: &Self) -> bool {
use std::ptr;
self.pid == other.pid && ptr::eq(&self.io, &other.io)
}
/// Handle to kernel process
pub(crate) kernel: Option<WeakKernelHandle>,
/// Handle to process registry
pub(crate) registry: Option<Registry>,
/// Handle to pubsub
pub(crate) pubsub: Option<PubSubHandle>,
/// Handle to current process
pub(crate) self_handle: Option<ProcessHandle>,
}

impl Program {
Expand Down Expand Up @@ -115,6 +119,39 @@ pub fn connection_program() -> Program {
.expect("Connection program should compile")
}

impl Locals {
pub(crate) fn new(pid: ProcessId) -> Self {
Self {
pid,
kernel: None,
registry: None,
pubsub: None,
self_handle: None,
}
}

pub(crate) fn kernel(&mut self, kernel: WeakKernelHandle) -> &mut Self {
self.kernel = Some(kernel);
self
}

pub(crate) fn registry(&mut self, registry: Registry) -> &mut Self {
self.registry = Some(registry);
self
}

pub(crate) fn pubsub(&mut self, pubsub: PubSubHandle) -> &mut Self {
self.pubsub = Some(pubsub);
self
}

pub(crate) fn handle(&mut self, handle: ProcessHandle) -> &mut Self {
//
self.self_handle = Some(handle);
self
}
}

impl PartialEq for Program {
fn eq(&self, _other: &Self) -> bool {
false
Expand Down Expand Up @@ -150,7 +187,7 @@ fn program_env() -> Env {
{
e.bind_native(SymbolId::from("kill"), bindings::kill_fn())
.bind_native(SymbolId::from("pid"), bindings::pid_fn())
.bind_native(SymbolId::from("ps"), bindings::ps_fn())
.bind_native_async(SymbolId::from("ps"), bindings::ps_fn())
.bind_native(SymbolId::from("self"), bindings::self_fn())
.bind_native_async(SymbolId::from("sleep"), bindings::sleep_fn())
.bind_native(SymbolId::from("spawn"), bindings::spawn_fn());
Expand Down
6 changes: 6 additions & 0 deletions libvrs/src/rt/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl PubSubHandle {
}
}

impl std::cmp::PartialEq for PubSubHandle {
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(&self.tx, &other.tx)
}
}

impl PubSub {
/// Spawn a new global pubsub task
pub(crate) fn spawn() -> PubSubHandle {
Expand Down
6 changes: 6 additions & 0 deletions libvrs/src/rt/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ impl Registry {
}
}

impl std::cmp::PartialEq for Registry {
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(&self.tx, &other.tx)
}
}

impl RegistryTask {
fn new(weak_tx: mpsc::WeakSender<Cmd>) -> Self {
Self {
Expand Down
3 changes: 3 additions & 0 deletions lyric/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ pub enum Error {

#[error("Unexpected top-level fiber yield")]
UnexpectedTopLevelYield,

#[error("Runtime error - {0}")]
Runtime(String),
}

0 comments on commit 4d0750c

Please sign in to comment.