-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add example for Input and output Bindings (#57)
* Add example for Input and output Bindings * fix: add example tag * chore: lint and fix Signed-off-by: mikeee <[email protected]> * feat: implement validation * chore: lint Signed-off-by: mikeee <[email protected]> * chore: bump timeout Signed-off-by: mikeee <[email protected]> * chore: ignore return code for kafka setup Signed-off-by: mikeee <[email protected]> * chore: fix kafka step Signed-off-by: mikeee <[email protected]> --------- Signed-off-by: Mike Nguyen <[email protected]> Signed-off-by: mikeee <[email protected]> Co-authored-by: Mike Nguyen <[email protected]>
- Loading branch information
1 parent
ea64454
commit 5764c6c
Showing
10 changed files
with
258 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# Input and Output Bindings Example | ||
|
||
This is a simple example that demonstrates Dapr's binding capabilities. To implement input bindings in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for input bindings to work: | ||
|
||
1. `list_input_bindings` - Dapr runtime calls this method to get list of bindings the application is subscribed to. | ||
2. `on_binding_event` - Defines how the application handles the input binding event. | ||
|
||
> **Note:** Make sure to use latest version of proto bindings. | ||
In order to have both examples working with the same binding configuration ServiceBus was used here. If you don't have it available you can change to a binding that works for both Input and Output from [this list](https://docs.dapr.io/reference/components-reference/supported-bindings/) | ||
|
||
|
||
## Running | ||
|
||
To run this example: | ||
|
||
1. Run a kafka container | ||
|
||
<!-- STEP | ||
name: Run kafka instance | ||
background: true | ||
sleep: 60 | ||
timeout_seconds: 120 | ||
expected_return_code: | ||
expected_stderr_lines: | ||
--> | ||
|
||
```bash | ||
docker run -p 9092:9092 apache/kafka:3.7.1 | ||
``` | ||
|
||
<!-- END_STEP --> | ||
|
||
2. Run the multi-app run template (`dapr.yaml`) | ||
|
||
<!-- STEP | ||
name: Run Multi-app Run | ||
output_match_mode: substring | ||
match_order: sequential | ||
expected_stdout_lines: | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 0 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 1 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 2 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 3 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 4 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 5 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 6 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 7 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 8 => hello from rust!' | ||
- '== APP - rust-input-b == Binding Name: binding-example' | ||
- '== APP - rust-input-b == Message: 9 => hello from rust!' | ||
background: true | ||
sleep: 30 | ||
timeout_seconds: 90 | ||
--> | ||
|
||
```bash | ||
dapr run -f . | ||
``` | ||
|
||
<!-- END_STEP --> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
apiVersion: dapr.io/v1alpha1 | ||
kind: Component | ||
metadata: | ||
name: binding-example | ||
spec: | ||
type: bindings.kafka | ||
metadata: | ||
- name: direction | ||
value: "input, output" | ||
# Kafka broker connection setting | ||
- name: brokers | ||
value: localhost:9092 | ||
# consumer configuration: topic and consumer group | ||
- name: topics | ||
value: sample | ||
- name: consumerGroup | ||
value: group1 | ||
# publisher configuration: topic | ||
- name: publishTopic | ||
value: sample | ||
- name: authType | ||
value: "none" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
version: 1 | ||
common: | ||
resourcesPath: ./components/ | ||
daprdLogDestination: console | ||
apps: | ||
- appID: rust-input-b | ||
appDirPath: ./ | ||
appProtocol: grpc | ||
appPort: 50051 | ||
logLevel: debug | ||
command: ["cargo", "run", "--example", "input-bindings"] | ||
- appID: rust-output-b | ||
appDirPath: ./ | ||
appProtocol: grpc | ||
logLevel: debug | ||
command: ["cargo", "run", "--example", "output-bindings"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
use tonic::{transport::Server, Request, Response, Status}; | ||
|
||
use dapr::dapr::dapr::proto::common::v1::{InvokeRequest, InvokeResponse}; | ||
use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer}; | ||
use dapr::dapr::dapr::proto::runtime::v1::{ | ||
BindingEventRequest, BindingEventResponse, ListInputBindingsResponse, | ||
ListTopicSubscriptionsResponse, TopicEventRequest, TopicEventResponse, | ||
}; | ||
|
||
#[derive(Default)] | ||
pub struct AppCallbackService {} | ||
|
||
#[tonic::async_trait] | ||
impl AppCallback for AppCallbackService { | ||
/// Invokes service method with InvokeRequest. | ||
async fn on_invoke( | ||
&self, | ||
_request: Request<InvokeRequest>, | ||
) -> Result<Response<InvokeResponse>, Status> { | ||
Ok(Response::new(InvokeResponse::default())) | ||
} | ||
|
||
/// Lists all topics subscribed by this app. | ||
async fn list_topic_subscriptions( | ||
&self, | ||
_request: Request<()>, | ||
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> { | ||
Ok(Response::new(ListTopicSubscriptionsResponse::default())) | ||
} | ||
|
||
/// Subscribes events from Pubsub. | ||
async fn on_topic_event( | ||
&self, | ||
_request: Request<TopicEventRequest>, | ||
) -> Result<Response<TopicEventResponse>, Status> { | ||
Ok(Response::new(TopicEventResponse::default())) | ||
} | ||
|
||
/// Lists all input bindings subscribed by this app. | ||
/// NOTE: Dapr runtime will call this method to get | ||
/// the list of bindings the app wants to subscribe to. | ||
/// In this example, the app is subscribing to a local pubsub binding named "binding-example" | ||
async fn list_input_bindings( | ||
&self, | ||
_request: Request<()>, | ||
) -> Result<Response<ListInputBindingsResponse>, Status> { | ||
let list_bindings = ListInputBindingsResponse { | ||
bindings: vec![String::from("binding-example")], | ||
}; | ||
|
||
Ok(Response::new(list_bindings)) | ||
} | ||
|
||
/// Listens events from the input bindings. | ||
async fn on_binding_event( | ||
&self, | ||
request: Request<BindingEventRequest>, | ||
) -> Result<Response<BindingEventResponse>, Status> { | ||
let r = request.into_inner(); | ||
let name = &r.name; | ||
let data = &r.data; | ||
|
||
let message = String::from_utf8_lossy(&data); | ||
println!("Binding Name: {}", &name); | ||
println!("Message: {}", &message); | ||
|
||
Ok(Response::new(BindingEventResponse::default())) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let addr = "[::]:50051".parse().unwrap(); | ||
|
||
let callback_service = AppCallbackService::default(); | ||
|
||
println!("AppCallback server listening on: {}", addr); | ||
|
||
// Create a gRPC server with the callback_service. | ||
Server::builder() | ||
.add_service(AppCallbackServer::new(callback_service)) | ||
.serve(addr) | ||
.await?; | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
use std::{collections::HashMap, thread, time::Duration}; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
// TODO: Handle this issue in the sdk | ||
// Introduce delay so that dapr grpc port is assigned before app tries to connect | ||
thread::sleep(Duration::from_secs(2)); | ||
|
||
// Get the Dapr port and create a connection | ||
let addr = "https://127.0.0.1".to_string(); | ||
|
||
// Create the client | ||
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?; | ||
|
||
// name of the component | ||
let binding_name = "binding-example"; | ||
|
||
for count in 0..10 { | ||
// message metadata | ||
let mut metadata = HashMap::<String, String>::new(); | ||
metadata.insert("count".to_string(), count.to_string()); | ||
|
||
// message | ||
let message = format!("{} => hello from rust!", &count).into_bytes(); | ||
|
||
client | ||
.invoke_binding(binding_name, message, "create", Some(metadata)) | ||
.await?; | ||
|
||
// sleep for 500ms to simulate delay b/w two events | ||
tokio::time::sleep(Duration::from_millis(500)).await; | ||
} | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters