Skip to content

Commit

Permalink
Add publish to up-streamer rust
Browse files Browse the repository at this point in the history
* Add support for tracking subscriptions and forwarding published messages in ustreamer.rs
* Add usubscription-static-file to stub out subscriptions before usubscription is written
* Add tests for mE (SOME/IP) subscribe/publish and uE (zenoh) subscribe/publish

NOTE: This PR also contains changes that @PLeVasseur has done for up-streamer-rust. His will be merged first and then I will rebase based on that merge.
  • Loading branch information
matthewd0123 committed Jun 27, 2024
1 parent 5589247 commit 785ff9e
Show file tree
Hide file tree
Showing 24 changed files with 942 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ lcov.info
tarpaulin-report.html

.idea/
.vscode/launch.json
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ resolver = "2"
members = [
"example-utils/hello-world-protos",
"up-linux-streamer",
"up-streamer",
]
"up-streamer", "subscription-cache", "example-utils/usubscription-static-file"]

[workspace.package]
rust-version = "1.76.0"
rust-version = "1.72.1"
version = "0.1.5-dev" # uProtocol version
repository = "https://github.com/eclipse-uprotocol/up-streamer-rust"
homepage = "https://github.com/eclipse-uprotocol"
Expand All @@ -34,6 +33,7 @@ futures = { version = "0.3.30" }
log = { version = "0.4.20" }
prost = { version = "0.12" }
prost-types = { version = "0.12" }
serde = { version = "1.0.111" }
serde_json = { version = "1.0.111" }
uuid = { version = "1.7.0" }
up-rust = { default-features = false, git = "https://github.com/eclipse-uprotocol/up-rust", rev = "3a50104421a801d52e1d9c68979db54c013ce43d" }
Expand Down
19 changes: 19 additions & 0 deletions example-utils/usubscription-static-file/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "usubscription-static-file"
rust-version.workspace = true
version.workspace = true
repository.workspace = true
homepage.workspace = true
edition.workspace = true
keywords.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true, features = ["unstable"] }
async-trait = "0.1.80"
serde_json = { version = "1.0.111" }
subscription-cache = {path="../../subscription-cache"}
up-rust = { workspace = true, features = ["usubscription"] }
uriparse = { version = "0.6" }
183 changes: 183 additions & 0 deletions example-utils/usubscription-static-file/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

#![allow(clippy::mutable_key_type)]

use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashSet;
use std::fs::{self, canonicalize};
use std::path::PathBuf;
use std::str::FromStr;
use up_rust::core::usubscription::{
FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest,
FetchSubscriptionsResponse, NotificationsRequest, SubscriberInfo, Subscription,
SubscriptionRequest, SubscriptionResponse, USubscription, UnsubscribeRequest,
};
use up_rust::{UStatus, UUri};

pub struct USubscriptionStaticFile {
static_file: PathBuf,
}

impl USubscriptionStaticFile {
pub fn new(static_file: Option<PathBuf>) -> Self {
// Set static_file to static_file if it has value, else set to "static-configs/testdata.json"
let static_file =
static_file.unwrap_or_else(|| PathBuf::from("static-configs/testdata.json"));
USubscriptionStaticFile { static_file }
}
}

#[async_trait]
impl USubscription for USubscriptionStaticFile {
async fn subscribe(
&self,
subscription_request: SubscriptionRequest,
) -> Result<SubscriptionResponse, UStatus> {
todo!()
}

async fn fetch_subscriptions(
&self,
fetch_subscriptions_request: FetchSubscriptionsRequest,
) -> Result<FetchSubscriptionsResponse, UStatus> {
// Reads in a file and builds it into a subscription_cache data type
// This is a static file, so we will just return the same set of subscribers
// for all URIs
println!(
"fetch_subscriptions for topic: {}",
fetch_subscriptions_request.subscriber()
);

let subscription_json_file = self.static_file.clone();

let mut subscriptions_vec = Vec::new();

let empty_fetch_response = FetchSubscriptionsResponse {
..Default::default()
};

match canonicalize(subscription_json_file) {
Ok(subscription_json_file) => {
println!("subscription_json_file: {:?}", subscription_json_file);

match fs::read_to_string(&subscription_json_file) {
Ok(data) => match serde_json::from_str::<Value>(&data) {
Ok(res) => {
if let Some(obj) = res.as_object() {
for (key, value) in obj {
println!("key: {}, value: {}", key, value);
let mut subscriber_set: HashSet<UUri> = HashSet::new();

if let Some(array) = value.as_array() {
for subscriber in array {
println!("subscriber: {}", subscriber);

if let Some(subscriber_str) = subscriber.as_str() {
match UUri::from_str(subscriber_str) {
Ok(uri) => {
println!("All good for subscriber");
subscriber_set.insert(uri);
}
Err(error) => {
println!("Error with Deserializing Subscriber: {}", error);
}
}
} else {
println!("Unable to parse subscriber");
}
}
}

println!("key: {}", key);
let topic = match UUri::from_str(&key.to_string()) {
Ok(mut uri) => {
println!("All good for key");
uri.resource_id = 0x8001;
uri
}
Err(error) => {
println!("Error with Deserializing Key: {}", error);
continue;
}
};

let details_vec = Vec::new();
for subscriber in subscriber_set {
let subscriber_info_tmp = SubscriberInfo {
uri: Some(subscriber).into(),
details: details_vec.clone(),
..Default::default()
};
let subscription_tmp = Subscription {
topic: Some(topic.clone()).into(),
subscriber: Some(subscriber_info_tmp).into(),
..Default::default()
};
subscriptions_vec.push(subscription_tmp);
}
}
}

println!("{}", res);
dbg!(&subscriptions_vec);
let fetch_response = FetchSubscriptionsResponse {
subscriptions: subscriptions_vec,
..Default::default()
};
Ok(fetch_response)
}
Err(e) => {
eprintln!("Unable to parse JSON: {}", e);
Ok(empty_fetch_response)
}
},
Err(e) => {
eprintln!("Unable to read file: {}", e);
Ok(empty_fetch_response)
}
}
}
Err(e) => {
eprintln!("Unable to canonicalize path: {}", e);
Ok(empty_fetch_response)
}
}
}

async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
todo!()
}

async fn register_for_notifications(
&self,
notifications_register_request: NotificationsRequest,
) -> Result<(), UStatus> {
todo!()
}

async fn unregister_for_notifications(
&self,
notifications_unregister_request: NotificationsRequest,
) -> Result<(), UStatus> {
todo!()
}

async fn fetch_subscribers(
&self,
fetch_subscribers_request: FetchSubscribersRequest,
) -> Result<FetchSubscribersResponse, UStatus> {
todo!();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"//pub_topic/1236/1/327": ["//me_authority/17185/1/1057", "//VIN.phones/8000/2/160", "//VIN.apps/8000/2/170"],
"//linux/1237/1/327": ["//linux/17185/1/1057", "//VIN.phones/8000/2/160", "//VIN.apps/8000/2/170"],
"//VIN.topic/1000/1/3": ["//VIN.vehicles/8000/2/150"],
"//VIN.topic/1000/1/4": ["//VIN.phones/8000/2/160"]
}
24 changes: 24 additions & 0 deletions subscription-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "subscription-cache"
rust-version.workspace = true
version.workspace = true
repository.workspace = true
homepage.workspace = true
edition.workspace = true
keywords.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true, features = ["unstable"] }
async-trait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
uuid = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
up-rust = { workspace = true, features = ["usubscription"] }
protobuf = { version = "3.3", features = ["with-bytes"] }
Loading

0 comments on commit 785ff9e

Please sign in to comment.