From dbd2b3f6c9214492d5e866a226d8beb343bf60cf Mon Sep 17 00:00:00 2001 From: Viktor Charypar Date: Wed, 22 Jan 2025 16:29:36 +0000 Subject: [PATCH 1/3] Add command API to crux_kv --- crux_core/src/command/mod.rs | 1 + crux_core/src/lib.rs | 2 +- crux_kv/src/command.rs | 57 ++++++++++++++++++++++++++++++++++++ crux_kv/src/lib.rs | 2 ++ 4 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 crux_kv/src/command.rs diff --git a/crux_core/src/command/mod.rs b/crux_core/src/command/mod.rs index f9703cae7..7b2b4b19e 100644 --- a/crux_core/src/command/mod.rs +++ b/crux_core/src/command/mod.rs @@ -51,6 +51,7 @@ use futures::{FutureExt as _, Stream, StreamExt as _}; use slab::Slab; use stream::CommandStreamExt as _; +pub use builder::{RequestBuilder, StreamBuilder}; pub use stream::CommandOutput; use crate::capability::Operation; diff --git a/crux_core/src/lib.rs b/crux_core/src/lib.rs index 83ab3f9df..67082b350 100644 --- a/crux_core/src/lib.rs +++ b/crux_core/src/lib.rs @@ -153,12 +153,12 @@ pub mod bridge; pub mod capability; +pub mod command; pub mod testing; #[cfg(feature = "typegen")] pub mod typegen; mod capabilities; -mod command; mod core; use serde::Serialize; diff --git a/crux_kv/src/command.rs b/crux_kv/src/command.rs new file mode 100644 index 000000000..954898bf8 --- /dev/null +++ b/crux_kv/src/command.rs @@ -0,0 +1,57 @@ +//! The Command based API for crux_kv + +use std::{future::Future, marker::PhantomData}; + +use crux_core::{command::RequestBuilder, Command, Request}; + +use super::{KeyValueOperation, KeyValueResult}; + +pub struct KeyValue { + // Allow the impl to declare trait bounds once. Thanks rustc + effect: PhantomData, + event: PhantomData, +} + +impl KeyValue +where + Effect: Send + From> + 'static, + Event: Send + 'static, +{ + pub fn get( + key: impl Into, + ) -> RequestBuilder> { + Command::request_from_shell(KeyValueOperation::Get { key: key.into() }) + } + + pub fn set( + key: impl Into, + value: Vec, + ) -> RequestBuilder> { + Command::request_from_shell(KeyValueOperation::Set { + key: key.into(), + value, + }) + } + + pub fn delete( + key: impl Into, + ) -> RequestBuilder> { + Command::request_from_shell(KeyValueOperation::Delete { key: key.into() }) + } + + pub fn exists( + key: impl Into, + ) -> RequestBuilder> { + Command::request_from_shell(KeyValueOperation::Exists { key: key.into() }) + } + + pub fn list_keys( + prefix: impl Into, + cursor: u64, + ) -> RequestBuilder> { + Command::request_from_shell(KeyValueOperation::ListKeys { + prefix: prefix.into(), + cursor, + }) + } +} diff --git a/crux_kv/src/lib.rs b/crux_kv/src/lib.rs index 0cd92cb35..6a198eb94 100644 --- a/crux_kv/src/lib.rs +++ b/crux_kv/src/lib.rs @@ -420,5 +420,7 @@ impl KeyValueResult { } } +pub mod command; + #[cfg(test)] mod tests; From ced89276455343eebae3a6b38ec522666ee69d54 Mon Sep 17 00:00:00 2001 From: Viktor Charypar Date: Wed, 22 Jan 2025 17:03:49 +0000 Subject: [PATCH 2/3] Allow .map on Request and Stream builders --- crux_core/src/command/builder.rs | 14 ++++ crux_core/src/command/tests/combinators.rs | 98 ++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/crux_core/src/command/builder.rs b/crux_core/src/command/builder.rs index e06783dda..aac0076f5 100644 --- a/crux_core/src/command/builder.rs +++ b/crux_core/src/command/builder.rs @@ -33,6 +33,13 @@ where RequestBuilder { make_task } } + pub fn map(self, map: F) -> RequestBuilder> + where + F: FnOnce(T) -> U + Send + 'static, + { + RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map)) + } + /// Chain another [`RequestBuilder`] to run after completion of this one, /// passing the result to the provided closure `make_next_builder`. /// @@ -253,6 +260,13 @@ where } } + pub fn map(self, map: F) -> StreamBuilder> + where + F: FnMut(T) -> U + Send + 'static, + { + StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map)) + } + /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`], /// passing the result to the provided closure `make_next_builder`. /// diff --git a/crux_core/src/command/tests/combinators.rs b/crux_core/src/command/tests/combinators.rs index f531cc0b5..949874167 100644 --- a/crux_core/src/command/tests/combinators.rs +++ b/crux_core/src/command/tests/combinators.rs @@ -537,3 +537,101 @@ fn stream_followed_by_a_stream() { assert!(cmd.events().next().is_none()) } + +#[test] +fn chaining_with_mapping() { + let mut cmd: Command = Command::request_from_shell(AnOperation::More([3, 4])) + .map(|first| { + let AnOperationOutput::Other(first) = first else { + // TODO: how do I bail quietly here? + panic!("Invalid output!") + }; + + first + }) + .then_request(|first| { + let second = [first[0] + 1, first[1] + 1]; + + Command::request_from_shell(AnOperation::More(second)) + }) + .then_send(Event::Completed); + + let effect = cmd.effects().next().unwrap(); + assert!(cmd.events().next().is_none()); + + let Effect::AnEffect(mut request) = effect; + + assert_eq!(request.operation, AnOperation::More([3, 4])); + request + .resolve(AnOperationOutput::Other([1, 2])) + .expect("to resolve"); + + let effect = cmd.effects().next().unwrap(); + assert!(cmd.events().next().is_none()); + + let Effect::AnEffect(mut request) = effect; + assert_eq!(request.operation, AnOperation::More([2, 3])); + + request + .resolve(AnOperationOutput::Other([1, 2])) + .expect("to resolve"); + + let event = cmd.events().next().unwrap(); + assert!(cmd.effects().next().is_none()); + + assert_eq!(event, Event::Completed(AnOperationOutput::Other([1, 2]))); + + assert!(cmd.is_done()); +} + +#[test] +fn stream_mapping_and_chaining() { + let mut cmd: Command = Command::stream_from_shell(AnOperation::One) + .map(|out| { + let AnOperationOutput::Other([a, b]) = out else { + panic!("Bad output"); + }; + + (a, b) + }) + .then_request(|(a, b)| Command::request_from_shell(AnOperation::More([a + 1, b + 1]))) + .then_send(Event::Completed); + + assert!(cmd.events().next().is_none()); + let mut effects: Vec<_> = cmd.effects().collect(); + + assert_eq!(effects.len(), 1); + + let Effect::AnEffect(mut stream_request) = effects.remove(0); + + assert_eq!(stream_request.operation, AnOperation::One); + + stream_request + .resolve(AnOperationOutput::Other([1, 2])) + .expect("should resolve"); + + let mut effects: Vec<_> = cmd.effects().collect(); + + let Effect::AnEffect(mut plus_one_request) = effects.remove(0); + assert_eq!(plus_one_request.operation, AnOperation::More([2, 3])); + + plus_one_request + .resolve(AnOperationOutput::One) + .expect("should resolve"); + + let events: Vec<_> = cmd.events().collect(); + assert_eq!(events[0], Event::Completed(AnOperationOutput::One)); + + // Can't request the plus one request again + assert!(plus_one_request.resolve(AnOperationOutput::One).is_err()); + + // but can get a new one by resolving stream request again + stream_request + .resolve(AnOperationOutput::Other([2, 3])) + .expect("should resolve"); + + let effect = cmd.effects().next().unwrap(); + + let Effect::AnEffect(plus_one_request) = effect; + assert_eq!(plus_one_request.operation, AnOperation::More([3, 4])); +} From 830160d16b92144a90d1d61fbe300669c7b55305 Mon Sep 17 00:00:00 2001 From: Viktor Charypar Date: Wed, 22 Jan 2025 17:03:49 +0000 Subject: [PATCH 3/3] Return nicer types from crux_kv command builders --- crux_kv/src/command.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/crux_kv/src/command.rs b/crux_kv/src/command.rs index 954898bf8..fcdffef4b 100644 --- a/crux_kv/src/command.rs +++ b/crux_kv/src/command.rs @@ -4,7 +4,9 @@ use std::{future::Future, marker::PhantomData}; use crux_core::{command::RequestBuilder, Command, Request}; -use super::{KeyValueOperation, KeyValueResult}; +use crate::error::KeyValueError; + +use super::KeyValueOperation; pub struct KeyValue { // Allow the impl to declare trait bounds once. Thanks rustc @@ -12,6 +14,10 @@ pub struct KeyValue { event: PhantomData, } +type StatusResult = Result; +type DataResult = Result>, KeyValueError>; +type ListResult = Result<(Vec, u64), KeyValueError>; + impl KeyValue where Effect: Send + From> + 'static, @@ -19,39 +25,44 @@ where { pub fn get( key: impl Into, - ) -> RequestBuilder> { + ) -> RequestBuilder> { Command::request_from_shell(KeyValueOperation::Get { key: key.into() }) + .map(|kv_result| kv_result.unwrap_get()) } pub fn set( key: impl Into, value: Vec, - ) -> RequestBuilder> { + ) -> RequestBuilder> { Command::request_from_shell(KeyValueOperation::Set { key: key.into(), value, }) + .map(|kv_result| kv_result.unwrap_set()) } pub fn delete( key: impl Into, - ) -> RequestBuilder> { + ) -> RequestBuilder> { Command::request_from_shell(KeyValueOperation::Delete { key: key.into() }) + .map(|kv_result| kv_result.unwrap_delete()) } pub fn exists( key: impl Into, - ) -> RequestBuilder> { + ) -> RequestBuilder> { Command::request_from_shell(KeyValueOperation::Exists { key: key.into() }) + .map(|kv_result| kv_result.unwrap_exists()) } pub fn list_keys( prefix: impl Into, cursor: u64, - ) -> RequestBuilder> { + ) -> RequestBuilder> { Command::request_from_shell(KeyValueOperation::ListKeys { prefix: prefix.into(), cursor, }) + .map(|kv_result| kv_result.unwrap_list_keys()) } }