From 5764c6cf3d367ea61bbeffaab88dbd42d50838b3 Mon Sep 17 00:00:00 2001 From: Rafael Merlin Date: Sun, 7 Jul 2024 03:56:03 +1000 Subject: [PATCH] Add example for Input and output Bindings (#57) * Add example for Input and output Bindings * fix: add example tag * chore: lint and fix Signed-off-by: mikeee * feat: implement validation * chore: lint Signed-off-by: mikeee * chore: bump timeout Signed-off-by: mikeee * chore: ignore return code for kafka setup Signed-off-by: mikeee * chore: fix kafka step Signed-off-by: mikeee --------- Signed-off-by: Mike Nguyen Signed-off-by: mikeee Co-authored-by: Mike Nguyen --- .github/workflows/validate-examples.yml | 2 +- Cargo.toml | 8 +++ examples/bindings/README.md | 70 ++++++++++++++++++ examples/bindings/components/bindings.yml | 23 ++++++ examples/bindings/dapr.yaml | 16 +++++ examples/bindings/input.rs | 87 +++++++++++++++++++++++ examples/bindings/output.rs | 35 +++++++++ src/appcallback.rs | 8 +++ src/client.rs | 10 ++- src/server/actor/runtime/mod.rs | 2 +- 10 files changed, 258 insertions(+), 3 deletions(-) create mode 100644 examples/bindings/README.md create mode 100644 examples/bindings/components/bindings.yml create mode 100644 examples/bindings/dapr.yaml create mode 100644 examples/bindings/input.rs create mode 100644 examples/bindings/output.rs diff --git a/.github/workflows/validate-examples.yml b/.github/workflows/validate-examples.yml index 1e278c6a..129c2c4a 100644 --- a/.github/workflows/validate-examples.yml +++ b/.github/workflows/validate-examples.yml @@ -144,7 +144,7 @@ jobs: fail-fast: false matrix: examples: - [ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] + [ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] steps: - name: Check out code uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index c12534b9..4d06b997 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,14 @@ path = "examples/pubsub/publisher.rs" name = "subscriber" path = "examples/pubsub/subscriber.rs" +[[example]] +name = "output-bindings" +path = "examples/bindings/output.rs" + +[[example]] +name = "input-bindings" +path = "examples/bindings/input.rs" + [[example]] name = "query_state_q1" path = "examples/query_state/query1.rs" diff --git a/examples/bindings/README.md b/examples/bindings/README.md new file mode 100644 index 00000000..d6b4ad79 --- /dev/null +++ b/examples/bindings/README.md @@ -0,0 +1,70 @@ +# Input and Output Bindings Example + +This is a simple example that demonstrates Dapr's binding capabilities. To implement input bindings in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for input bindings to work: + +1. `list_input_bindings` - Dapr runtime calls this method to get list of bindings the application is subscribed to. +2. `on_binding_event` - Defines how the application handles the input binding event. + +> **Note:** Make sure to use latest version of proto bindings. + +In order to have both examples working with the same binding configuration ServiceBus was used here. If you don't have it available you can change to a binding that works for both Input and Output from [this list](https://docs.dapr.io/reference/components-reference/supported-bindings/) + + +## Running + +To run this example: + +1. Run a kafka container + + + +```bash +docker run -p 9092:9092 apache/kafka:3.7.1 +``` + + + +2. Run the multi-app run template (`dapr.yaml`) + + + +```bash +dapr run -f . +``` + + \ No newline at end of file diff --git a/examples/bindings/components/bindings.yml b/examples/bindings/components/bindings.yml new file mode 100644 index 00000000..9fa513d8 --- /dev/null +++ b/examples/bindings/components/bindings.yml @@ -0,0 +1,23 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: binding-example +spec: + type: bindings.kafka + metadata: + - name: direction + value: "input, output" + # Kafka broker connection setting + - name: brokers + value: localhost:9092 + # consumer configuration: topic and consumer group + - name: topics + value: sample + - name: consumerGroup + value: group1 + # publisher configuration: topic + - name: publishTopic + value: sample + - name: authType + value: "none" + diff --git a/examples/bindings/dapr.yaml b/examples/bindings/dapr.yaml new file mode 100644 index 00000000..aa984e7e --- /dev/null +++ b/examples/bindings/dapr.yaml @@ -0,0 +1,16 @@ +version: 1 +common: + resourcesPath: ./components/ + daprdLogDestination: console +apps: + - appID: rust-input-b + appDirPath: ./ + appProtocol: grpc + appPort: 50051 + logLevel: debug + command: ["cargo", "run", "--example", "input-bindings"] + - appID: rust-output-b + appDirPath: ./ + appProtocol: grpc + logLevel: debug + command: ["cargo", "run", "--example", "output-bindings"] \ No newline at end of file diff --git a/examples/bindings/input.rs b/examples/bindings/input.rs new file mode 100644 index 00000000..0e582000 --- /dev/null +++ b/examples/bindings/input.rs @@ -0,0 +1,87 @@ +use tonic::{transport::Server, Request, Response, Status}; + +use dapr::dapr::dapr::proto::common::v1::{InvokeRequest, InvokeResponse}; +use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer}; +use dapr::dapr::dapr::proto::runtime::v1::{ + BindingEventRequest, BindingEventResponse, ListInputBindingsResponse, + ListTopicSubscriptionsResponse, TopicEventRequest, TopicEventResponse, +}; + +#[derive(Default)] +pub struct AppCallbackService {} + +#[tonic::async_trait] +impl AppCallback for AppCallbackService { + /// Invokes service method with InvokeRequest. + async fn on_invoke( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(InvokeResponse::default())) + } + + /// Lists all topics subscribed by this app. + async fn list_topic_subscriptions( + &self, + _request: Request<()>, + ) -> Result, Status> { + Ok(Response::new(ListTopicSubscriptionsResponse::default())) + } + + /// Subscribes events from Pubsub. + async fn on_topic_event( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(TopicEventResponse::default())) + } + + /// Lists all input bindings subscribed by this app. + /// NOTE: Dapr runtime will call this method to get + /// the list of bindings the app wants to subscribe to. + /// In this example, the app is subscribing to a local pubsub binding named "binding-example" + + async fn list_input_bindings( + &self, + _request: Request<()>, + ) -> Result, Status> { + let list_bindings = ListInputBindingsResponse { + bindings: vec![String::from("binding-example")], + }; + + Ok(Response::new(list_bindings)) + } + + /// Listens events from the input bindings. + async fn on_binding_event( + &self, + request: Request, + ) -> Result, Status> { + let r = request.into_inner(); + let name = &r.name; + let data = &r.data; + + let message = String::from_utf8_lossy(&data); + println!("Binding Name: {}", &name); + println!("Message: {}", &message); + + Ok(Response::new(BindingEventResponse::default())) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "[::]:50051".parse().unwrap(); + + let callback_service = AppCallbackService::default(); + + println!("AppCallback server listening on: {}", addr); + + // Create a gRPC server with the callback_service. + Server::builder() + .add_service(AppCallbackServer::new(callback_service)) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/examples/bindings/output.rs b/examples/bindings/output.rs new file mode 100644 index 00000000..608a45cb --- /dev/null +++ b/examples/bindings/output.rs @@ -0,0 +1,35 @@ +use std::{collections::HashMap, thread, time::Duration}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // TODO: Handle this issue in the sdk + // Introduce delay so that dapr grpc port is assigned before app tries to connect + thread::sleep(Duration::from_secs(2)); + + // Get the Dapr port and create a connection + let addr = "https://127.0.0.1".to_string(); + + // Create the client + let mut client = dapr::Client::::connect(addr).await?; + + // name of the component + let binding_name = "binding-example"; + + for count in 0..10 { + // message metadata + let mut metadata = HashMap::::new(); + metadata.insert("count".to_string(), count.to_string()); + + // message + let message = format!("{} => hello from rust!", &count).into_bytes(); + + client + .invoke_binding(binding_name, message, "create", Some(metadata)) + .await?; + + // sleep for 500ms to simulate delay b/w two events + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} diff --git a/src/appcallback.rs b/src/appcallback.rs index 2532a3bd..a60efcfe 100644 --- a/src/appcallback.rs +++ b/src/appcallback.rs @@ -66,6 +66,14 @@ impl TopicSubscription { } } +impl ListInputBindingsResponse { + pub fn binding(binding_name: String) -> Self { + Self { + bindings: vec![binding_name], + } + } +} + pub struct AppCallbackService { handlers: Vec, } diff --git a/src/client.rs b/src/client.rs index b58d08f9..7ba101bf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -70,15 +70,23 @@ impl Client { &mut self, name: S, data: Vec, + operation: S, + metadata: Option>, ) -> Result where S: Into, { + let mut mdata = HashMap::::new(); + if let Some(m) = metadata { + mdata = m; + } + self.0 .invoke_binding(InvokeBindingRequest { name: name.into(), data, - ..Default::default() + operation: operation.into(), + metadata: mdata, }) .await } diff --git a/src/server/actor/runtime/mod.rs b/src/server/actor/runtime/mod.rs index 6a9a9e0e..56efcb2a 100644 --- a/src/server/actor/runtime/mod.rs +++ b/src/server/actor/runtime/mod.rs @@ -168,7 +168,7 @@ impl ActorTypeRegistration { pub fn register_method( mut self, method_name: &str, - handler: impl Handler + Send + Sync, + handler: impl Handler + Sync, ) -> Self where T: 'static,