diff --git a/hydroflow/tests/surface_multiset_delta.rs b/hydroflow/tests/surface_multiset_delta.rs index 7a66861393c6..23f9d77398d1 100644 --- a/hydroflow/tests/surface_multiset_delta.rs +++ b/hydroflow/tests/surface_multiset_delta.rs @@ -4,8 +4,8 @@ use multiplatform_test::multiplatform_test; #[multiplatform_test] pub fn test_multiset_delta() { - let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); - let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); + let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); + let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow_syntax! { source_stream(input_recv) @@ -14,19 +14,31 @@ pub fn test_multiset_delta() { }; assert_graphvis_snapshots!(flow); - input_send.send(3).unwrap(); - input_send.send(4).unwrap(); - input_send.send(3).unwrap(); + input_send.send('a').unwrap(); + input_send.send('b').unwrap(); + input_send.send('a').unwrap(); flow.run_tick(); - assert_eq!(&[3, 4, 3], &*collect_ready::, _>(&mut result_recv)); + // 'a', 'b', 'a' + assert_eq!(&['a', 'b', 'a'], &*collect_ready::, _>(&mut result_recv)); - input_send.send(3).unwrap(); - input_send.send(5).unwrap(); - input_send.send(3).unwrap(); - input_send.send(3).unwrap(); + input_send.send('a').unwrap(); + input_send.send('c').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + flow.run_tick(); + // 'c', 'a' + // First two 'a's are removed due to previous tick. + assert_eq!(&['c', 'a'], &*collect_ready::, _>(&mut result_recv)); + + input_send.send('b').unwrap(); + input_send.send('c').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); flow.run_tick(); - // First two "3"s are removed due to previous tick. - assert_eq!(&[5, 3], &*collect_ready::, _>(&mut result_recv)); + // 3 'a's and the 'c' are removed due to previous tick. + assert_eq!(&['b', 'a'], &*collect_ready::, _>(&mut result_recv)); } #[multiplatform_test] diff --git a/hydroflow_lang/src/graph/ops/multiset_delta.rs b/hydroflow_lang/src/graph/ops/multiset_delta.rs index dd39882ef4d3..a9bc0b5e4f87 100644 --- a/hydroflow_lang/src/graph/ops/multiset_delta.rs +++ b/hydroflow_lang/src/graph/ops/multiset_delta.rs @@ -5,30 +5,54 @@ use super::{ WriteContextArgs, RANGE_0, RANGE_1, }; -// TODO(mingwei): more doc -/// Multiset delta from the previous tick. +/// The multiset inverse of [`persist()`](#persist). +/// +/// > 1 input stream of `T`, 1 output stream of `T`, where `T: Eq + Hash` +/// +/// For set semantics, [`unique()`](#unique) can be thought of as a "delta" operator, the inverse +/// of [`persist()`](#persist). In `persist`, new items come in, and all items are repeatedly +/// released out. Conversely, `unique` take repeated items in, and only releases the new ones out. +/// +/// This operator does a similar inversion but for multiset semantics, with some caveats. When it +/// receives duplicate items, instead of ignoring them, it "subtracts" them from the items received +/// in the previous tick: i.e. if we received `k` copies of an item in the previous tick, and we +/// receive `l > k` copies in the current tick, we output `l - k` copies of the item. +/// However unlike `unique`, this count is only maintained for the previous tick, not over all time. +/// +/// In the example below, in the second tick two 'a's are removed because two 'a's were received in +/// the previous tick. The third 'a' is released though. /// /// ```rustbook -/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); +/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); /// let mut flow = hydroflow::hydroflow_syntax! { /// source_stream(input_recv) /// -> multiset_delta() /// -> for_each(|n| println!("{}", n)); /// }; /// -/// input_send.send(3).unwrap(); -/// input_send.send(4).unwrap(); -/// input_send.send(3).unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('b').unwrap(); +/// input_send.send('a').unwrap(); +/// flow.run_tick(); +/// // 'a', 'b', 'a' +/// +/// input_send.send('a').unwrap(); +/// input_send.send('c').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); /// flow.run_tick(); -/// // 3, 4, +/// // 'c', 'a' +/// // First two 'a's are removed due to previous tick. /// -/// input_send.send(3).unwrap(); -/// input_send.send(5).unwrap(); -/// input_send.send(3).unwrap(); -/// input_send.send(3).unwrap(); +/// input_send.send('b').unwrap(); +/// input_send.send('c').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); /// flow.run_tick(); -/// // 5, 3 -/// // First two "3"s are removed due to previous tick. +/// // 'b', 'a' +/// // 3 'a's and the 'c' are removed due to previous tick. /// ``` pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { name: "multiset_delta", diff --git a/hydroflow_lang/src/graph/ops/persist.rs b/hydroflow_lang/src/graph/ops/persist.rs index d5447642d60d..5652d2fc50a5 100644 --- a/hydroflow_lang/src/graph/ops/persist.rs +++ b/hydroflow_lang/src/graph/ops/persist.rs @@ -7,7 +7,9 @@ use super::{ use crate::diagnostic::{Diagnostic, Level}; use crate::graph::{FlowProps, LatticeFlowType}; -/// Stores each item as it passes through, and replays all item every tick. +/// Stores each item as it passes through, and replays all items every tick. +/// +/// > 1 input stream, 1 output stream /// /// ```hydroflow /// // Normally `source_iter(...)` only emits once, but `persist()` will replay the `"hello"`