From 1d6e514403de42a3c8525ca83131f2a67100603f Mon Sep 17 00:00:00 2001 From: Viktor Charypar Date: Mon, 3 Feb 2025 11:23:11 +0000 Subject: [PATCH] Remove deep recursion in Command::and, allow spawn on a command instance --- crux_core/src/command/mod.rs | 41 +++++++++++--------- crux_core/src/command/tests/async_effects.rs | 16 ++++++++ crux_core/src/command/tests/combinators.rs | 21 ++++++++++ 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/crux_core/src/command/mod.rs b/crux_core/src/command/mod.rs index 7ea8eeaf..fe800e6a 100644 --- a/crux_core/src/command/mod.rs +++ b/crux_core/src/command/mod.rs @@ -62,6 +62,7 @@ use crate::Request; pub struct Command { effects: Receiver, events: Receiver, + context: CommandContext, // Executor internals // TODO: should this be a separate type? @@ -115,7 +116,7 @@ where let task = Task { finished: Default::default(), aborted: aborted.clone(), - future: create_task(context).boxed(), + future: create_task(context.clone()).boxed(), join_handle_wakers: waker_receiver, }; @@ -129,6 +130,7 @@ where Command { effects: effect_receiver, events: event_receiver, + context, ready_queue: ready_receiver, spawn_queue: spawn_receiver, ready_sender, @@ -141,21 +143,7 @@ where /// Create an empty, completed Command. This is useful as a return value from `update` if /// there are no side-effects to perform. pub fn done() -> Self { - let (_, effects) = crossbeam_channel::bounded(0); - let (_, events) = crossbeam_channel::bounded(0); - let (_, spawn_queue) = crossbeam_channel::bounded(0); - let (ready_sender, ready_queue) = crossbeam_channel::bounded(0); - - Command { - effects, - events, - ready_queue, - spawn_queue, - tasks: Slab::with_capacity(0), - ready_sender, - waker: Default::default(), - aborted: Default::default(), - } + Command::new(|_ctx| futures::future::ready(())) } /// Create a command from another command with compatible `Effect` and `Event` types @@ -279,12 +267,16 @@ where } /// Convenience for [`Command::all`] which runs another command concurrently with this one - pub fn and(self, other: Self) -> Self + pub fn and(mut self, other: Self) -> Self where Effect: Unpin, Event: Unpin, { - Command::all([self, other]) + self.spawn(|ctx| async move { + other.host(ctx.effects, ctx.events).await; + }); + + self } /// Create a command running a number of commands concurrently @@ -345,6 +337,19 @@ where }) } + /// Spawn an additional task on the command. The task will execute concurrently with + /// existing tasks + /// + /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests, + /// events back to the app, and to spawn additional tasks. The closure is expected to return a future. + pub fn spawn(&mut self, create_task: F) + where + F: FnOnce(CommandContext) -> Fut, + Fut: Future + Send + 'static, + { + self.context.spawn(create_task); + } + /// Returns an abort handle which can be used to remotely terminate a running Command /// and all its subtask. /// diff --git a/crux_core/src/command/tests/async_effects.rs b/crux_core/src/command/tests/async_effects.rs index 3d265a6e..4c6ae55e 100644 --- a/crux_core/src/command/tests/async_effects.rs +++ b/crux_core/src/command/tests/async_effects.rs @@ -235,3 +235,19 @@ fn effects_can_spawn_communicating_tasks() { assert!(cmd.is_done()); } + +#[test] +fn tasks_can_be_spawned_on_existing_effects() { + let mut cmd: Command = Command::done(); + + assert!(cmd.is_done()); + assert!(cmd.effects().next().is_none()); + + cmd.spawn(|ctx| async move { + ctx.request_from_shell(AnOperation::One).await; + }); + + // Command is not done any more + assert!(!cmd.is_done()); + assert!(cmd.effects().next().is_some()); +} diff --git a/crux_core/src/command/tests/combinators.rs b/crux_core/src/command/tests/combinators.rs index 94987416..6bd7974b 100644 --- a/crux_core/src/command/tests/combinators.rs +++ b/crux_core/src/command/tests/combinators.rs @@ -224,6 +224,27 @@ fn and() { assert!(cmd.is_done()); } +#[test] +fn and_doesnt_blow_the_stack() { + let mut cmd: Command = Command::done(); + + for _ in 1..2000 { + cmd = cmd.and(Command::done()); + } + + // Polling the task should work + let _ = cmd.effects(); +} + +#[test] +fn all_doesnt_blow_the_stack() { + let commands: Vec> = (1..2000).map(|_| Command::done()).collect(); + let mut cmd = Command::all(commands); + + // Polling the task should work + let _ = cmd.effects(); +} + #[test] fn all() { let cmd_one = Command::request_from_shell(AnOperation::One).then_send(Event::Completed);