From c3af7707ec68c11b8f1e770cfeef7ee5059294d4 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 16 Sep 2024 15:55:59 -0400 Subject: [PATCH] docs: minor updates on docs and examples (#87) Signed-off-by: Keran Yang --- README.md | 8 ++++---- examples/batchmap-cat/Dockerfile | 2 +- .../manifests/simple-batchmap-cat.yaml | 8 +++----- examples/batchmap-cat/src/main.rs | 6 +----- .../manifests/simple-batchmap-flatmap.yaml | 8 +++----- examples/batchmap-flatmap/src/main.rs | 6 +----- examples/map-cat/manifests/simple-map-cat.yaml | 8 +++----- examples/map-cat/src/main.rs | 3 +-- .../manifests/simple-map-udf.yaml | 6 +++--- examples/map-tickgen-serde/src/main.rs | 5 ++--- examples/mapt-event-time-filter/src/main.rs | 18 +++++++++++------- .../manifests/simple-reduce.yaml | 7 +++---- examples/reduce-counter/src/main.rs | 5 +---- .../sideinput/manifests/simple-sideinput.yaml | 4 ++-- .../simple-source/manifests/simple-source.yaml | 4 ++-- .../{simple-sink.yaml => simple-log-sink.yaml} | 4 ++-- .../manifests/simple-source-transformer.yaml | 8 +++----- examples/source-transformer-now/src/main.rs | 5 +---- src/lib.rs | 6 ++++-- 19 files changed, 51 insertions(+), 70 deletions(-) rename examples/sink-log/manifests/{simple-sink.yaml => simple-log-sink.yaml} (90%) diff --git a/README.md b/README.md index ef848bf..f324d9a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # Rust SDK for Numaflow -This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/), [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/) +This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/), +[User Defined Source Tranformers](https://numaflow.numaproj.io/user-guide/sources/transformer/), +[UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/) and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) in [Rust](https://www.rust-lang.org/). > This rust crate is being actively developed and it supports @@ -9,6 +11,4 @@ and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defi ## Examples -You may find examples in the [examples folder](./examples). - - +You may find examples in the [examples folder](./examples). \ No newline at end of file diff --git a/examples/batchmap-cat/Dockerfile b/examples/batchmap-cat/Dockerfile index 85ea85f..41a64f1 100644 --- a/examples/batchmap-cat/Dockerfile +++ b/examples/batchmap-cat/Dockerfile @@ -11,7 +11,7 @@ WORKDIR /numaflow-rs/examples/batchmap-cat RUN cargo build --release # our final base -FROM debian:bookworm AS map-cat +FROM debian:bookworm AS batchmap-cat # copy the build artifact from the build stage COPY --from=build /numaflow-rs/examples/batchmap-cat/target/release/server . diff --git a/examples/batchmap-cat/manifests/simple-batchmap-cat.yaml b/examples/batchmap-cat/manifests/simple-batchmap-cat.yaml index 60fc23c..8ffae92 100644 --- a/examples/batchmap-cat/manifests/simple-batchmap-cat.yaml +++ b/examples/batchmap-cat/manifests/simple-batchmap-cat.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: rust-batchmap-cat + name: simple-batchmap-cat spec: vertices: - name: in @@ -21,11 +21,9 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: cat - from: cat - to: out - - + to: out \ No newline at end of file diff --git a/examples/batchmap-cat/src/main.rs b/examples/batchmap-cat/src/main.rs index 9bf4c73..b54bfa0 100644 --- a/examples/batchmap-cat/src/main.rs +++ b/examples/batchmap-cat/src/main.rs @@ -14,11 +14,7 @@ impl batchmap::BatchMapper for Cat { let mut responses: Vec = Vec::new(); while let Some(datum) = input.recv().await { let mut response = BatchResponse::from_id(datum.id); - response.append(Message { - keys: Some(datum.keys), - value: datum.value, - tags: None, - }); + response.append(Message::new(datum.value).keys(datum.keys.clone())); responses.push(response); } responses diff --git a/examples/batchmap-flatmap/manifests/simple-batchmap-flatmap.yaml b/examples/batchmap-flatmap/manifests/simple-batchmap-flatmap.yaml index 5a006fd..72a8add 100644 --- a/examples/batchmap-flatmap/manifests/simple-batchmap-flatmap.yaml +++ b/examples/batchmap-flatmap/manifests/simple-batchmap-flatmap.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: rust-batchmap-batchmap + name: simple-batchmap-flatmap spec: vertices: - name: in @@ -21,11 +21,9 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: cat - from: cat - to: out - - + to: out \ No newline at end of file diff --git a/examples/batchmap-flatmap/src/main.rs b/examples/batchmap-flatmap/src/main.rs index f9a84ae..75ac063 100644 --- a/examples/batchmap-flatmap/src/main.rs +++ b/examples/batchmap-flatmap/src/main.rs @@ -23,11 +23,7 @@ impl batchmap::BatchMapper for Flatmap { // return the resulting parts for part in parts { - response.append(Message { - keys: Some(datum.keys.clone()), - value: Vec::from(part), - tags: None, - }); + response.append(Message::new(Vec::from(part)).keys(datum.keys.clone())); } responses.push(response); } diff --git a/examples/map-cat/manifests/simple-map-cat.yaml b/examples/map-cat/manifests/simple-map-cat.yaml index 941bafb..d3fde68 100644 --- a/examples/map-cat/manifests/simple-map-cat.yaml +++ b/examples/map-cat/manifests/simple-map-cat.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-rust-map-cat + name: simple-map-cat spec: vertices: - name: in @@ -21,11 +21,9 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: cat - from: cat - to: out - - + to: out \ No newline at end of file diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index cb6467d..389de43 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -10,7 +10,6 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - let message = map::Message::new(input.value).keys(input.keys).tags(vec![]); - vec![message] + vec![map::Message::new(input.value).keys(input.keys.clone())] } } diff --git a/examples/map-tickgen-serde/manifests/simple-map-udf.yaml b/examples/map-tickgen-serde/manifests/simple-map-udf.yaml index c0b1c36..8503a4c 100644 --- a/examples/map-tickgen-serde/manifests/simple-map-udf.yaml +++ b/examples/map-tickgen-serde/manifests/simple-map-udf.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-rust-tickgen-serde + name: simple-tickgen-serde spec: vertices: - name: in @@ -21,9 +21,9 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: tickgen-value - from: tickgen-value - to: out + to: out \ No newline at end of file diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index fecb8cf..ccc9f2a 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -31,7 +31,7 @@ struct ResultPayload { #[tonic::async_trait] impl map::Mapper for TickGen { - async fn map(&self, input: map::MapRequest) -> Vec { + async fn map(&self, input: map::MapRequest) -> Vec { let Ok(payload) = serde_json::from_slice::(&input.value) else { return vec![]; }; @@ -45,8 +45,7 @@ impl map::Mapper for TickGen { }) .unwrap_or_default(), ) - .keys(input.keys) - .tags(vec![]); + .keys(input.keys.clone()); vec![message] } } diff --git a/examples/mapt-event-time-filter/src/main.rs b/examples/mapt-event-time-filter/src/main.rs index 8b07484..1144472 100644 --- a/examples/mapt-event-time-filter/src/main.rs +++ b/examples/mapt-event-time-filter/src/main.rs @@ -2,11 +2,14 @@ use filter_impl::filter_event_time; use numaflow::sourcetransform; use numaflow::sourcetransform::{Message, SourceTransformRequest}; use std::error::Error; + #[tokio::main] async fn main() -> Result<(), Box> { sourcetransform::Server::new(EventTimeFilter).start().await } + struct EventTimeFilter; + #[tonic::async_trait] impl sourcetransform::SourceTransformer for EventTimeFilter { /// Asynchronously transforms input messages based on their event time. @@ -43,9 +46,8 @@ mod tests { use numaflow::sourcetransform::{Message, SourceTransformRequest}; /// Tests that events from 2022 are tagged as within the year 2022. #[test] - fn test_filter_event_time_should_return_after_year_2022() { + fn test_filter_event_time_should_return_within_year_2022() { let time = Utc.with_ymd_and_hms(2022, 7, 2, 2, 0, 0).unwrap(); - let source_request = SourceTransformRequest { keys: vec![], value: vec![], @@ -53,17 +55,17 @@ mod tests { eventtime: time, headers: Default::default(), }; + let messages = filter_event_time(source_request); - assert_eq!((&messages).len(), 1); + assert_eq!((&messages).len(), 1); assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022") } /// Tests that events from 2023 are tagged as after the year 2022. #[test] - fn test_filter_event_time_should_return_within_year_2022() { + fn test_filter_event_time_should_return_after_year_2022() { let time = Utc.with_ymd_and_hms(2023, 7, 2, 2, 0, 0).unwrap(); - let source_request = SourceTransformRequest { keys: vec![], value: vec![], @@ -71,9 +73,10 @@ mod tests { eventtime: time, headers: Default::default(), }; + let messages = filter_event_time(source_request); - assert_eq!((&messages).len(), 1); + assert_eq!((&messages).len(), 1); assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022") } @@ -81,7 +84,6 @@ mod tests { #[test] fn test_filter_event_time_should_drop() { let time = Utc.with_ymd_and_hms(2021, 7, 2, 2, 0, 0).unwrap(); - let source_request = SourceTransformRequest { keys: vec![], value: vec![], @@ -89,7 +91,9 @@ mod tests { eventtime: time, headers: Default::default(), }; + let messages = filter_event_time(source_request); + assert_eq!((&messages).len(), 1); assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__") } diff --git a/examples/reduce-counter/manifests/simple-reduce.yaml b/examples/reduce-counter/manifests/simple-reduce.yaml index 20a77dd..3f0448f 100644 --- a/examples/reduce-counter/manifests/simple-reduce.yaml +++ b/examples/reduce-counter/manifests/simple-reduce.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-rust-reduce + name: simple-reduce spec: vertices: - name: in @@ -33,10 +33,9 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: counter - from: counter - to: out - + to: out \ No newline at end of file diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index 83146ab..8f8b17e 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -44,10 +44,7 @@ mod counter { while input.recv().await.is_some() { counter += 1; } - let message = Message::new(counter.to_string().into_bytes()) - .tags(vec![]) - .keys(keys.clone()); - vec![message] + vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())] } } } diff --git a/examples/sideinput/manifests/simple-sideinput.yaml b/examples/sideinput/manifests/simple-sideinput.yaml index 34bb09c..b80aa82 100644 --- a/examples/sideinput/manifests/simple-sideinput.yaml +++ b/examples/sideinput/manifests/simple-sideinput.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: my-pipeline + name: simple-sideinput spec: sideInputs: - name: myticker @@ -32,7 +32,7 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in to: si-log diff --git a/examples/simple-source/manifests/simple-source.yaml b/examples/simple-source/manifests/simple-source.yaml index 1084d1a..0af46a2 100644 --- a/examples/simple-source/manifests/simple-source.yaml +++ b/examples/simple-source/manifests/simple-source.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: kafka-pl + name: simple-source spec: vertices: - name: in @@ -16,7 +16,7 @@ spec: scale: min: 1 sink: - log: { } + log: {} edges: - from: in to: out \ No newline at end of file diff --git a/examples/sink-log/manifests/simple-sink.yaml b/examples/sink-log/manifests/simple-log-sink.yaml similarity index 90% rename from examples/sink-log/manifests/simple-sink.yaml rename to examples/sink-log/manifests/simple-log-sink.yaml index 419dfa1..ab94c52 100644 --- a/examples/sink-log/manifests/simple-sink.yaml +++ b/examples/sink-log/manifests/simple-log-sink.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-rust-log-sink + name: simple-log-sink spec: vertices: - name: in @@ -19,4 +19,4 @@ spec: image: quay.io/numaio/numaflow-rs/sink-log:stable edges: - from: in - to: out + to: out \ No newline at end of file diff --git a/examples/source-transformer-now/manifests/simple-source-transformer.yaml b/examples/source-transformer-now/manifests/simple-source-transformer.yaml index 7f7349f..e6262fb 100644 --- a/examples/source-transformer-now/manifests/simple-source-transformer.yaml +++ b/examples/source-transformer-now/manifests/simple-source-transformer.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-rust-transformer + name: simple-source-transformer spec: vertices: - name: in @@ -19,9 +19,7 @@ spec: - name: out sink: # A simple log printing sink - log: { } + log: {} edges: - from: in - to: out - - + to: out \ No newline at end of file diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index 42f3273..cdfc023 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -15,9 +15,6 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) - .keys(input.keys) - .tags(vec![]); - vec![message] + vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())] } } diff --git a/src/lib.rs b/src/lib.rs index 0ad867d..db9482e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ //! A Rust SDK for [Numaflow]. The Rust SDK is experimental has only implemented the most important -//! features. It will support all the core features eventually. It supports [Map], [Reduce], and -//! [User Defined Sinks]. +//! features. +//! It will support all the core features eventually. +//! It supports [Map], [Reduce], [User Defined Sources], [User Defined Source Transformer] and [User Defined Sinks]. //! //! Please note that the Rust SDK is experimental and will be refactored in the future to make it more //! idiomatic. @@ -9,6 +10,7 @@ //! [Map]: https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/ //! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/ //! [User Defined Sources]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/ +//! [User Defined Source Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/ //! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/ /// start up code