Skip to content

Commit

Permalink
docs: minor updates on docs and examples (#87)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Sep 16, 2024
1 parent 5c37e7c commit c3af770
Show file tree
Hide file tree
Showing 19 changed files with 51 additions and 70 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Rust SDK for Numaflow

This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/), [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/)
This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/),
[User Defined Source Tranformers](https://numaflow.numaproj.io/user-guide/sources/transformer/),
[UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/)
and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) in [Rust](https://www.rust-lang.org/).

> This rust crate is being actively developed and it supports
Expand All @@ -9,6 +11,4 @@ and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defi
## Examples

You may find examples in the [examples folder](./examples).


You may find examples in the [examples folder](./examples).
2 changes: 1 addition & 1 deletion examples/batchmap-cat/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WORKDIR /numaflow-rs/examples/batchmap-cat
RUN cargo build --release

# our final base
FROM debian:bookworm AS map-cat
FROM debian:bookworm AS batchmap-cat

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/examples/batchmap-cat/target/release/server .
Expand Down
8 changes: 3 additions & 5 deletions examples/batchmap-cat/manifests/simple-batchmap-cat.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: rust-batchmap-cat
name: simple-batchmap-cat
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
6 changes: 1 addition & 5 deletions examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ impl batchmap::BatchMapper for Cat {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message {
keys: Some(datum.keys),
value: datum.value,
tags: None,
});
response.append(Message::new(datum.value).keys(datum.keys.clone()));
responses.push(response);
}
responses
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: rust-batchmap-batchmap
name: simple-batchmap-flatmap
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
6 changes: 1 addition & 5 deletions examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ impl batchmap::BatchMapper for Flatmap {

// return the resulting parts
for part in parts {
response.append(Message {
keys: Some(datum.keys.clone()),
value: Vec::from(part),
tags: None,
});
response.append(Message::new(Vec::from(part)).keys(datum.keys.clone()));
}
responses.push(response);
}
Expand Down
8 changes: 3 additions & 5 deletions examples/map-cat/manifests/simple-map-cat.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-map-cat
name: simple-map-cat
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
3 changes: 1 addition & 2 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
let message = map::Message::new(input.value).keys(input.keys).tags(vec![]);
vec![message]
vec![map::Message::new(input.value).keys(input.keys.clone())]
}
}
6 changes: 3 additions & 3 deletions examples/map-tickgen-serde/manifests/simple-map-udf.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-tickgen-serde
name: simple-tickgen-serde
spec:
vertices:
- name: in
Expand All @@ -21,9 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: tickgen-value
- from: tickgen-value
to: out
to: out
5 changes: 2 additions & 3 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct ResultPayload {

#[tonic::async_trait]
impl map::Mapper for TickGen {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
async fn map(&self, input: map::MapRequest) -> Vec<Message> {
let Ok(payload) = serde_json::from_slice::<Payload>(&input.value) else {
return vec![];
};
Expand All @@ -45,8 +45,7 @@ impl map::Mapper for TickGen {
})
.unwrap_or_default(),
)
.keys(input.keys)
.tags(vec![]);
.keys(input.keys.clone());
vec![message]
}
}
18 changes: 11 additions & 7 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
sourcetransform::Server::new(EventTimeFilter).start().await
}

struct EventTimeFilter;

#[tonic::async_trait]
impl sourcetransform::SourceTransformer for EventTimeFilter {
/// Asynchronously transforms input messages based on their event time.
Expand Down Expand Up @@ -43,53 +46,54 @@ mod tests {
use numaflow::sourcetransform::{Message, SourceTransformRequest};
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_after_year_2022() {
fn test_filter_event_time_should_return_within_year_2022() {
let time = Utc.with_ymd_and_hms(2022, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);
assert_eq!((&messages).len(), 1);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022")
}

/// Tests that events from 2023 are tagged as after the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
fn test_filter_event_time_should_return_after_year_2022() {
let time = Utc.with_ymd_and_hms(2023, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);
assert_eq!((&messages).len(), 1);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022")
}

/// Tests that events before 2022 are dropped.
#[test]
fn test_filter_event_time_should_drop() {
let time = Utc.with_ymd_and_hms(2021, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__")
}
Expand Down
7 changes: 3 additions & 4 deletions examples/reduce-counter/manifests/simple-reduce.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-reduce
name: simple-reduce
spec:
vertices:
- name: in
Expand Down Expand Up @@ -33,10 +33,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: counter
- from: counter
to: out

to: out
5 changes: 1 addition & 4 deletions examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ mod counter {
while input.recv().await.is_some() {
counter += 1;
}
let message = Message::new(counter.to_string().into_bytes())
.tags(vec![])
.keys(keys.clone());
vec![message]
vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())]
}
}
}
4 changes: 2 additions & 2 deletions examples/sideinput/manifests/simple-sideinput.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
name: simple-sideinput
spec:
sideInputs:
- name: myticker
Expand Down Expand Up @@ -32,7 +32,7 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: si-log
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/manifests/simple-source.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: kafka-pl
name: simple-source
spec:
vertices:
- name: in
Expand All @@ -16,7 +16,7 @@ spec:
scale:
min: 1
sink:
log: { }
log: {}
edges:
- from: in
to: out
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-log-sink
name: simple-log-sink
spec:
vertices:
- name: in
Expand All @@ -19,4 +19,4 @@ spec:
image: quay.io/numaio/numaflow-rs/sink-log:stable
edges:
- from: in
to: out
to: out
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-transformer
name: simple-source-transformer
spec:
vertices:
- name: in
Expand All @@ -19,9 +19,7 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: out


to: out
5 changes: 1 addition & 4 deletions examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys)
.tags(vec![]);
vec![message]
vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())]
}
}
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A Rust SDK for [Numaflow]. The Rust SDK is experimental has only implemented the most important
//! features. It will support all the core features eventually. It supports [Map], [Reduce], and
//! [User Defined Sinks].
//! features.
//! It will support all the core features eventually.
//! It supports [Map], [Reduce], [User Defined Sources], [User Defined Source Transformer] and [User Defined Sinks].
//!
//! Please note that the Rust SDK is experimental and will be refactored in the future to make it more
//! idiomatic.
Expand All @@ -9,6 +10,7 @@
//! [Map]: https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/
//! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/
//! [User Defined Sources]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/
//! [User Defined Source Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/
//! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/

/// start up code
Expand Down

0 comments on commit c3af770

Please sign in to comment.