Skip to content

Commit

Permalink
Added support for embedding connections between actors in the scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerBloom committed Feb 21, 2024
1 parent 6065e27 commit bae95fc
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = "0.3.19"
instant = { version = "0.1" }
async-trait = { version = "0.1" }
pin-project = { version = "1.1" }
anymap2 = "0.13"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4.37" }
Expand Down
3 changes: 3 additions & 0 deletions src/compat/native.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anymap2::any::Any;
use futures::Stream;
use pin_project::pin_project;
use std::{
Expand Down Expand Up @@ -83,6 +84,8 @@ impl<T: SendableStream> Stream for SendableWrapper<T> {
}
}

pub(crate) type SendableAnyMap = anymap2::Map<dyn 'static + Send + Any>;

/* ------ General Utils ------ */

#[cfg(all(feature = "tokio", feature = "async-std"))]
Expand Down
3 changes: 3 additions & 0 deletions src/compat/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anymap2::any::Any;
use futures::FutureExt;
use gloo_timers::future::{sleep, TimeoutFuture};
use instant::{Duration, Instant};
Expand Down Expand Up @@ -34,6 +35,8 @@ impl<T> Sendable for T where T: 'static {}
/// that `Send` futures.
pub type SendableWrapper<T> = SendWrapper<T>;

pub(crate) type SendableAnyMap = anymap2::Map<dyn 'static + Any>;

/* ------ General Utils ------ */

/// A wrapper around `wasm-bindgen-future`'s `spawn_local` function, which spawns a future tha
Expand Down
31 changes: 26 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub mod sink;
pub mod stream;

pub use async_trait::async_trait;
use compat::{Sendable, SendableFusedStream, SendableWrapper};
use compat::{Sendable, SendableAnyMap, SendableFusedStream, SendableWrapper};
use joint::{JointActor, JointClient};
pub use scheduler::Scheduler;
use scheduler::{ActorRunner, ActorStream};
Expand Down Expand Up @@ -144,6 +144,7 @@ pub struct ActorBuilder<T, A: ActorState> {
/// specialization is not yet supported.
ty: PhantomData<T>,
send: UnboundedSender<A::Message>,
edges: SendableAnyMap,
#[allow(clippy::type_complexity)]
broadcast: Option<(
broadcast::Sender<SendableWrapper<A::Output>>,
Expand All @@ -168,6 +169,7 @@ where
recv,
broadcast: None,
ty: PhantomData,
edges: SendableAnyMap::new(),
}
}

Expand All @@ -181,6 +183,19 @@ where
self.recv
.push(ActorStream::Secondary(Box::new(stream.map(|m| m.into()))));
}

/// Adds a client to the builder, which the state can access later.
pub fn add_edge<P: 'static + Send, M: 'static + Send>(&mut self, client: SinkClient<P, M>) {
_ = self.edges.insert(client);
}

/// Adds an arbitrary data to the builder, which the state can access later. This method is
/// intended to be used with containers hold that multiple clients of the same type.
///
/// For example, you can attach a series of actor clients that are indexed using a hashmap.
pub fn add_multi_edge<C: 'static + Send>(&mut self, container: C) {
_ = self.edges.insert(container);
}
}

/* --------- Sink actors --------- */
Expand All @@ -198,9 +213,13 @@ where
/// Launches an actor that uses the given state and returns a client to the actor.
pub fn launch(self) -> SinkClient<A::Permanence, A::Message> {
let Self {
send, recv, state, ..
send,
recv,
state,
edges,
..
} = self;
let mut runner = ActorRunner::new(state);
let mut runner = ActorRunner::new(state, edges);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.launch();
SinkClient::new(send)
Expand Down Expand Up @@ -230,11 +249,12 @@ where
mut recv,
state,
broadcast,
edges,
..
} = self;
let (broad, sub) = broadcast.unwrap_or_else(|| broadcast::channel(100));
recv.push(ActorStream::Secondary(Box::new(stream)));
let mut runner = ActorRunner::new(state);
let mut runner = ActorRunner::new(state, edges);
runner.add_broadcaster(broad);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.launch();
Expand Down Expand Up @@ -287,10 +307,11 @@ where
recv,
state,
broadcast,
edges,
..
} = self;
let (broad, sub) = broadcast.unwrap_or_else(|| broadcast::channel(100));
let mut runner = ActorRunner::new(state);
let mut runner = ActorRunner::new(state, edges);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.add_broadcaster(broad);
runner.launch();
Expand Down
71 changes: 65 additions & 6 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use std::{

use crate::{
compat::{
sleep_until, spawn_task, Sendable, SendableFusedStream, SendableFuture, SendableStream,
SendableWrapper, Sleep,
sleep_until, spawn_task, Sendable, SendableAnyMap, SendableFusedStream, SendableFuture,
SendableStream, SendableWrapper, Sleep,
},
sink::SinkClient,
ActorState, Transient,
};

Expand Down Expand Up @@ -50,6 +51,9 @@ pub struct Scheduler<A: ActorState> {
tasks: SendableWrapper<FuturesCollection<()>>,
/// The manager for outbound messages that will be broadcast from the actor.
outbound: Option<SendableWrapper<OutboundQueue<A::Output>>>,
/// Stores edges in the form of `EdgeType`s. This is used to access connections to other actors
/// at runtime without needing to embed them into the actor state directly.
edges: SendableAnyMap,
/// The number of stream that could yield a message for the actor to process. Once this and the
/// `future_count` hit both reach zero, the actor is dead as it can no longer process any
/// messages.
Expand Down Expand Up @@ -84,9 +88,9 @@ pub(crate) struct ActorRunner<A: ActorState> {
}

impl<A: ActorState> ActorRunner<A> {
pub(crate) fn new(state: A) -> Self {
let scheduler = Scheduler::new();
Self { state, scheduler }
pub(crate) fn new(state: A, edges: SendableAnyMap) -> Self {
let scheduler = Scheduler::new(edges);
Self { scheduler, state }
}

pub(crate) fn add_broadcaster(&mut self, broad: broadcast::Sender<SendableWrapper<A::Output>>) {
Expand Down Expand Up @@ -116,6 +120,7 @@ impl<A: ActorState> ActorRunner<A> {
let Self {
state,
mut scheduler,
..
} = self;
state.finalize(&mut scheduler).await;
scheduler.finalize();
Expand All @@ -124,14 +129,15 @@ impl<A: ActorState> ActorRunner<A> {

impl<A: ActorState> Scheduler<A> {
/// The constructor for the scheduler.
fn new() -> Self {
fn new(edges: SendableAnyMap) -> Self {
let recv = SendableWrapper::new(select_all([]));
let queue = SendableWrapper::new(FuturesCollection::new());
let tasks = SendableWrapper::new(FuturesCollection::new());
Self {
recv,
queue,
tasks,
edges,
outbound: None,
stream_count: 0,
future_count: 0,
Expand Down Expand Up @@ -263,6 +269,59 @@ impl<A: ActorState> Scheduler<A> {
out.send(msg.into())
}
}

/// Adds a client to the set of connections to other actors, which the state can access later.
pub fn add_edge<P: 'static + Send, M: 'static + Send>(&mut self, client: SinkClient<P, M>) {
_ = self.edges.insert(client);
}

/// Gets a reference to a sink client to another actor.
pub fn get_edge<P: 'static + Send, M: 'static + Send>(&self) -> Option<&SinkClient<P, M>> {
self.edges.get::<SinkClient<P, M>>()
}

/// Gets mutable reference to a sink client to another actor.
pub fn get_edge_mut<P: 'static + Send, M: 'static + Send>(
&mut self,
) -> Option<&mut SinkClient<P, M>> {
self.edges.get_mut::<SinkClient<P, M>>()
}

/// Removes a client to the set of connections to other actors.
pub fn remove_edge<P: 'static + Send, M: 'static + Send>(&mut self) {
_ = self.edges.remove::<SinkClient<P, M>>();
}

/// Adds an arbitrary data to the set of connections to other actors, which the state can
/// access later. This method is intended to be used with containers hold that multiple clients
/// of the same type.
///
/// For example, you can attach a series of actor clients that are indexed using a hashmap.
pub fn add_multi_edge<C: 'static + Send>(&mut self, container: C) {
_ = self.edges.insert(container);
}

/// Gets a reference to an arbitary type held in the container that holds connections to other
/// actors. This method is intended to be used with containers that hold multiple clients of
/// the same type.
///
/// For example, you can store and access a series of actor clients that are indexed using a
/// hashmap.
pub fn get_multi_edge<C: 'static + Send>(&self) -> Option<&C> {
self.edges.get::<C>()
}

/// Gets a mutable reference to an arbitary type held in the container that holds connections to other
/// actors. This method is intended to be used with containers that hold multiple clients of
/// the same type.
pub fn get_multi_edge_mut<C: 'static + Send>(&mut self) -> Option<&mut C> {
self.edges.get_mut::<C>()
}

/// Adds a piece of arbitrary data from the set of connections to other actors.
pub fn remove_multi_edge<C: 'static + Send>(&mut self) {
_ = self.edges.remove::<C>();
}
}

impl<A> Scheduler<A>
Expand Down

0 comments on commit bae95fc

Please sign in to comment.