Skip to content

Commit

Permalink
Add monitor resource implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
filmor committed Jun 11, 2024
1 parent 5744d8f commit f7aee7e
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 232 deletions.
3 changes: 2 additions & 1 deletion rustler/src/codegen_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::fmt;
use crate::{Encoder, Env, OwnedBinary, Term};

// Re-export of inventory
pub use crate::resource::ResourceRegistration;
pub use inventory;

// Names used by the `rustler::init!` macro or other generated code.
pub use crate::wrapper::exception::raise_exception;
pub use crate::wrapper::{
c_char, c_int, c_uint, c_void, get_nif_resource_type_init_size, DEF_NIF_ENTRY, DEF_NIF_FUNC,
MUTABLE_NIF_RESOURCE_HANDLE, NIF_ENV, NIF_MAJOR_VERSION, NIF_MINOR_VERSION, NIF_TERM,
NIF_ENV, NIF_MAJOR_VERSION, NIF_MINOR_VERSION, NIF_TERM,
};

#[cfg(windows)]
Expand Down
3 changes: 2 additions & 1 deletion rustler/src/env.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::thread::is_scheduler_thread;
use crate::types::LocalPid;
use crate::wrapper::{NIF_ENV, NIF_TERM};
use crate::{Encoder, Term};
Expand Down Expand Up @@ -229,7 +230,7 @@ impl OwnedEnv {
F: FnOnce(Env<'a>) -> T,
T: Encoder,
{
if unsafe { rustler_sys::enif_thread_type() } != rustler_sys::ERL_NIF_THR_UNDEFINED {
if is_scheduler_thread() {
panic!("send_and_clear: current thread is managed");
}

Expand Down
8 changes: 4 additions & 4 deletions rustler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub use crate::types::{
#[cfg(feature = "big_integer")]
pub use crate::types::BigInt;

pub mod resource;
pub use crate::resource::ResourceArc;
mod resource;
pub use crate::resource::{Monitor, MonitorResource, Resource, ResourceArc};

#[doc(hidden)]
pub mod dynamic;
Expand All @@ -72,8 +72,8 @@ pub use nif::Nif;
pub type NifResult<T> = Result<T, Error>;

pub use rustler_codegen::{
init, nif, NifException, NifMap, NifRecord, NifStruct, NifTaggedEnum, NifTuple, NifUnitEnum,
NifUntaggedEnum, Resource,
init, nif, MonitorResource, NifException, NifMap, NifRecord, NifStruct, NifTaggedEnum,
NifTuple, NifUnitEnum, NifUntaggedEnum, Resource,
};

#[cfg(feature = "serde")]
Expand Down
226 changes: 67 additions & 159 deletions rustler/src/resource.rs → rustler/src/resource/handle.rs
Original file line number Diff line number Diff line change
@@ -1,152 +1,16 @@
//! Support for storing Rust data in Erlang terms.
//!
//! A NIF resource allows you to safely store Rust structs in a term, and therefore keep it across
//! NIF calls. The struct will be automatically dropped when the BEAM GC decides that there are no
//! more references to the resource.
use std::any::TypeId;
use std::collections::HashMap;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::ptr;
use std::sync::OnceLock;
use std::{ffi::CString, mem};

use super::{Binary, Decoder, Encoder, Env, Error, NifResult, Term};
use crate::resource::resource::open_resource_type;
pub use crate::wrapper::{
c_void, resource, NifResourceFlags, MUTABLE_NIF_RESOURCE_HANDLE, NIF_ENV, NIF_RESOURCE_TYPE,
};

#[derive(Debug)]
pub struct ResourceRegistration {
name: &'static str,
get_type_id: fn() -> TypeId,
destructor: unsafe extern "C" fn(_env: NIF_ENV, handle: MUTABLE_NIF_RESOURCE_HANDLE),
}
inventory::collect!(ResourceRegistration);

static mut RESOURCE_TYPES: OnceLock<HashMap<TypeId, usize>> = OnceLock::new();

fn get_resource_type<T: 'static>() -> Option<NIF_RESOURCE_TYPE> {
let map = unsafe { RESOURCE_TYPES.get()? };
map.get(&TypeId::of::<T>())
.map(|ptr| *ptr as NIF_RESOURCE_TYPE)
}

impl ResourceRegistration {
pub const fn new<T: ResourceType>(name: &'static str) -> Self {
Self {
name,
destructor: resource_destructor::<T>,
get_type_id: TypeId::of::<T>,
}
}

pub fn initialize(env: Env) {
for reg in inventory::iter::<Self>() {
reg.register(env);
}
}

pub fn register(&self, env: Env) {
let res: Option<NIF_RESOURCE_TYPE> = unsafe {
open_resource_type(
env.as_c_arg(),
CString::new(self.name).unwrap().as_bytes_with_nul(),
Some(self.destructor),
NIF_RESOURCE_FLAGS::ERL_NIF_RT_CREATE,
)
};

let type_id = (self.get_type_id)();

unsafe {
RESOURCE_TYPES.get_or_init(Default::default);
RESOURCE_TYPES
.get_mut()
.unwrap()
.insert(type_id, res.unwrap() as usize);
}
}
}

/// Re-export a type used by the `resource!` macro.
#[doc(hidden)]
pub use crate::wrapper::NIF_RESOURCE_FLAGS;

#[doc(hidden)]
pub trait ResourceType: Sized + Send + Sync + 'static {
fn get_resource_type() -> Option<NIF_RESOURCE_TYPE> {
get_resource_type::<Self>()
}
}

impl<'a> Term<'a> {
unsafe fn get_resource_ptrs<T: ResourceType>(&self) -> Option<(*const c_void, *mut T)> {
let typ = T::get_resource_type()?;
let res = resource::get_resource(self.get_env().as_c_arg(), self.as_c_arg(), typ)?;
Some((res, align_alloced_mem_for_struct::<T>(res) as *mut T))
}

pub fn get_resource<T: ResourceType>(&self) -> Option<&'a T> {
unsafe { self.get_resource_ptrs().map(|(_, ptr)| &*ptr) }
}

pub unsafe fn get_mut_resource<T: ResourceType>(&self) -> Option<&'a mut T> {
self.get_resource_ptrs().map(|(_, ptr)| &mut *ptr)
}
}

impl<T> Encoder for ResourceArc<T>
where
T: ResourceType,
{
fn encode<'a>(&self, env: Env<'a>) -> Term<'a> {
self.as_term(env)
}
}
impl<'a, T> Decoder<'a> for ResourceArc<T>
where
T: ResourceType + 'a,
{
fn decode(term: Term<'a>) -> NifResult<Self> {
ResourceArc::from_term(term)
}
}

impl<'a, T> Decoder<'a> for &'a T
where
T: ResourceType + 'a,
{
fn decode(term: Term<'a>) -> NifResult<Self> {
term.get_resource().ok_or(Error::BadArg)
}
}

/// Drop a T that lives in an Erlang resource. (erlang_nif-sys requires us to declare this
/// function safe, but it is of course thoroughly unsafe!)
pub unsafe extern "C" fn resource_destructor<T>(
_env: NIF_ENV,
handle: MUTABLE_NIF_RESOURCE_HANDLE,
) {
unsafe {
let aligned = align_alloced_mem_for_struct::<T>(handle);
let res = aligned as *mut T;
ptr::read(res);
}
}
use rustler_sys::{c_void, ErlNifEnv};

fn get_alloc_size_struct<T>() -> usize {
mem::size_of::<T>() + mem::align_of::<T>()
}
use crate::thread::is_scheduler_thread;
use crate::{
Binary, Decoder, Encoder, Env, Error, LocalPid, Monitor, MonitorResource, NifResult, Term,
};

/// Given a pointer `ptr` to an allocation of `get_alloc_size_struct::<T>()` bytes, return the
/// first aligned pointer within the allocation where a `T` may be stored.
/// Unsafe: `ptr` must point to a large enough allocation and not be null.
unsafe fn align_alloced_mem_for_struct<T>(ptr: *const c_void) -> *const c_void {
let offset = mem::align_of::<T>() - ((ptr as usize) % mem::align_of::<T>());
ptr.add(offset)
}
use super::traits::{Resource, ResourceExt};
use super::util::{align_alloced_mem_for_struct, get_alloc_size_struct};

/// A reference to a resource of type `T`.
///
Expand All @@ -159,19 +23,19 @@ unsafe fn align_alloced_mem_for_struct<T>(ptr: *const c_void) -> *const c_void {
/// convert back and forth between the two using `Encoder` and `Decoder`.
pub struct ResourceArc<T>
where
T: ResourceType,
T: Resource,
{
raw: *const c_void,
inner: *mut T,
}

// Safe because T is `Sync` and `Send`.
unsafe impl<T> Send for ResourceArc<T> where T: ResourceType {}
unsafe impl<T> Sync for ResourceArc<T> where T: ResourceType {}
unsafe impl<T> Send for ResourceArc<T> where T: Resource {}
unsafe impl<T> Sync for ResourceArc<T> where T: Resource {}

impl<T> ResourceArc<T>
where
T: ResourceType,
T: Resource,
{
/// Makes a new ResourceArc from the given type. Note that the type must have
/// ResourceType implemented for it. See module documentation for info on this.
Expand Down Expand Up @@ -253,9 +117,32 @@ where
}
}

impl<T> ResourceArc<T>
where
T: Resource + MonitorResource,
{
pub fn monitor(&self, caller_env: Option<&Env>, pid: &LocalPid) -> Option<Monitor> {
let env = maybe_env(caller_env);
let mut mon = MaybeUninit::uninit();
let res = unsafe {
rustler_sys::enif_monitor_process(env, self.raw, pid.as_c_arg(), mon.as_mut_ptr()) == 0
};
if res {
Some(unsafe { Monitor::new(mon.assume_init()) })
} else {
None
}
}

pub fn demonitor(&self, caller_env: Option<&Env>, mon: &Monitor) -> bool {
let env = maybe_env(caller_env);
unsafe { rustler_sys::enif_demonitor_process(env, self.raw, mon.as_c_arg()) == 0 }
}
}

impl<T> Deref for ResourceArc<T>
where
T: ResourceType,
T: Resource,
{
type Target = T;

Expand All @@ -266,7 +153,7 @@ where

impl<T> Clone for ResourceArc<T>
where
T: ResourceType,
T: Resource,
{
/// Cloning a `ResourceArc` simply increments the reference count for the
/// resource. The `T` value is not cloned.
Expand All @@ -281,7 +168,7 @@ where

impl<T> Drop for ResourceArc<T>
where
T: ResourceType,
T: Resource,
{
/// When a `ResourceArc` is dropped, the reference count is decremented. If
/// there are no other references to the resource, the `T` value is dropped.
Expand All @@ -294,13 +181,34 @@ where
}
}

#[macro_export]
macro_rules! resource {
($struct_name:ty, $env: ident) => {{
impl $crate::resource::ResourceType for $struct_name {}
impl<T> Encoder for ResourceArc<T>
where
T: Resource,
{
fn encode<'a>(&self, env: Env<'a>) -> Term<'a> {
self.as_term(env)
}
}
impl<'a, T> Decoder<'a> for ResourceArc<T>
where
T: Resource + 'a,
{
fn decode(term: Term<'a>) -> NifResult<Self> {
ResourceArc::from_term(term)
}
}

let tuple = rustler::resource::ResourceRegistration::new::<$struct_name>(
stringify!(#name)
).register($env);
}};
fn maybe_env(env: Option<&Env>) -> *mut ErlNifEnv {
if is_scheduler_thread() {
let env = env.expect("Env required when calling from a scheduler thread");
// Panic if `env` is not the environment of the calling process.
env.pid();
env.as_c_arg()
} else {
assert!(
env.is_none(),
"Env provided when not calling from a scheduler thread"
);
ptr::null_mut()
}
}
Loading

0 comments on commit f7aee7e

Please sign in to comment.