Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command support in crux_kv and tweaks and fixes to the Command API to enable the capability migration #301

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crux_core/src/command/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ where
RequestBuilder { make_task }
}

pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
where
F: FnOnce(T) -> U + Send + 'static,
{
RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
}

/// Chain another [`RequestBuilder`] to run after completion of this one,
/// passing the result to the provided closure `make_next_builder`.
///
Expand Down Expand Up @@ -253,6 +260,13 @@ where
}
}

pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
where
F: FnMut(T) -> U + Send + 'static,
{
StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
}

/// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
/// passing the result to the provided closure `make_next_builder`.
///
Expand Down
1 change: 1 addition & 0 deletions crux_core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use futures::{FutureExt as _, Stream, StreamExt as _};
use slab::Slab;
use stream::CommandStreamExt as _;

pub use builder::{RequestBuilder, StreamBuilder};
pub use stream::CommandOutput;

use crate::capability::Operation;
Expand Down
98 changes: 98 additions & 0 deletions crux_core/src/command/tests/combinators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,101 @@ fn stream_followed_by_a_stream() {

assert!(cmd.events().next().is_none())
}

#[test]
fn chaining_with_mapping() {
let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More([3, 4]))
.map(|first| {
let AnOperationOutput::Other(first) = first else {
// TODO: how do I bail quietly here?
panic!("Invalid output!")
};

first
})
.then_request(|first| {
let second = [first[0] + 1, first[1] + 1];

Command::request_from_shell(AnOperation::More(second))
})
.then_send(Event::Completed);

let effect = cmd.effects().next().unwrap();
assert!(cmd.events().next().is_none());

let Effect::AnEffect(mut request) = effect;

assert_eq!(request.operation, AnOperation::More([3, 4]));
request
.resolve(AnOperationOutput::Other([1, 2]))
.expect("to resolve");

let effect = cmd.effects().next().unwrap();
assert!(cmd.events().next().is_none());

let Effect::AnEffect(mut request) = effect;
assert_eq!(request.operation, AnOperation::More([2, 3]));

request
.resolve(AnOperationOutput::Other([1, 2]))
.expect("to resolve");

let event = cmd.events().next().unwrap();
assert!(cmd.effects().next().is_none());

assert_eq!(event, Event::Completed(AnOperationOutput::Other([1, 2])));

assert!(cmd.is_done());
}

#[test]
fn stream_mapping_and_chaining() {
let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::One)
.map(|out| {
let AnOperationOutput::Other([a, b]) = out else {
panic!("Bad output");
};

(a, b)
})
.then_request(|(a, b)| Command::request_from_shell(AnOperation::More([a + 1, b + 1])))
.then_send(Event::Completed);

assert!(cmd.events().next().is_none());
let mut effects: Vec<_> = cmd.effects().collect();

assert_eq!(effects.len(), 1);

let Effect::AnEffect(mut stream_request) = effects.remove(0);

assert_eq!(stream_request.operation, AnOperation::One);

stream_request
.resolve(AnOperationOutput::Other([1, 2]))
.expect("should resolve");

let mut effects: Vec<_> = cmd.effects().collect();

let Effect::AnEffect(mut plus_one_request) = effects.remove(0);
assert_eq!(plus_one_request.operation, AnOperation::More([2, 3]));

plus_one_request
.resolve(AnOperationOutput::One)
.expect("should resolve");

let events: Vec<_> = cmd.events().collect();
assert_eq!(events[0], Event::Completed(AnOperationOutput::One));

// Can't request the plus one request again
assert!(plus_one_request.resolve(AnOperationOutput::One).is_err());

// but can get a new one by resolving stream request again
stream_request
.resolve(AnOperationOutput::Other([2, 3]))
.expect("should resolve");

let effect = cmd.effects().next().unwrap();

let Effect::AnEffect(plus_one_request) = effect;
assert_eq!(plus_one_request.operation, AnOperation::More([3, 4]));
}
2 changes: 1 addition & 1 deletion crux_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@

pub mod bridge;
pub mod capability;
pub mod command;
pub mod testing;
#[cfg(feature = "typegen")]
pub mod typegen;

mod capabilities;
mod command;
mod core;

use serde::Serialize;
Expand Down
68 changes: 68 additions & 0 deletions crux_kv/src/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! The Command based API for crux_kv

use std::{future::Future, marker::PhantomData};

use crux_core::{command::RequestBuilder, Command, Request};

use crate::error::KeyValueError;

use super::KeyValueOperation;

pub struct KeyValue<Effect, Event> {
// Allow the impl to declare trait bounds once. Thanks rustc
effect: PhantomData<Effect>,
event: PhantomData<Event>,
}

type StatusResult = Result<bool, KeyValueError>;
type DataResult = Result<Option<Vec<u8>>, KeyValueError>;
type ListResult = Result<(Vec<String>, u64), KeyValueError>;

impl<Effect, Event> KeyValue<Effect, Event>
where
Effect: Send + From<Request<KeyValueOperation>> + 'static,
Event: Send + 'static,
{
pub fn get(
key: impl Into<String>,
) -> RequestBuilder<Effect, Event, impl Future<Output = DataResult>> {
Command::request_from_shell(KeyValueOperation::Get { key: key.into() })
.map(|kv_result| kv_result.unwrap_get())
}

pub fn set(
key: impl Into<String>,
value: Vec<u8>,
) -> RequestBuilder<Effect, Event, impl Future<Output = DataResult>> {
Command::request_from_shell(KeyValueOperation::Set {
key: key.into(),
value,
})
.map(|kv_result| kv_result.unwrap_set())
}

pub fn delete(
key: impl Into<String>,
) -> RequestBuilder<Effect, Event, impl Future<Output = DataResult>> {
Command::request_from_shell(KeyValueOperation::Delete { key: key.into() })
.map(|kv_result| kv_result.unwrap_delete())
}

pub fn exists(
key: impl Into<String>,
) -> RequestBuilder<Effect, Event, impl Future<Output = StatusResult>> {
Command::request_from_shell(KeyValueOperation::Exists { key: key.into() })
.map(|kv_result| kv_result.unwrap_exists())
}

pub fn list_keys(
prefix: impl Into<String>,
cursor: u64,
) -> RequestBuilder<Effect, Event, impl Future<Output = ListResult>> {
Command::request_from_shell(KeyValueOperation::ListKeys {
prefix: prefix.into(),
cursor,
})
.map(|kv_result| kv_result.unwrap_list_keys())
}
}
2 changes: 2 additions & 0 deletions crux_kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,5 +420,7 @@ impl KeyValueResult {
}
}

pub mod command;

#[cfg(test)]
mod tests;
Loading