Skip to content

Commit

Permalink
Address further comments from Pete
Browse files Browse the repository at this point in the history
* Add support for error handling in case topic registration fails
* Remove some duplicate comments
* Clean up some if-else blocks
* Switch async mutex to sync mutex in the subscription cache
* Remove fetch_cache method
* Remove a naked unwrap
* Add some places where explicit errors are returned
  • Loading branch information
matthewd0123 committed Aug 8, 2024
1 parent 8001b84 commit fd26e3d
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 133 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ members = [
"up-streamer", "subscription-cache", "utils/usubscription-static-file"]

[workspace.package]
rust-version = "1.72.1"
rust-version = "1.76.0"
version = "0.1.5-dev" # uProtocol version
repository = "https://github.com/eclipse-uprotocol/up-streamer-rust"
homepage = "https://github.com/eclipse-uprotocol"
Expand All @@ -34,7 +34,6 @@ env_logger = { version = "0.10.1" }
futures = { version = "0.3.30" }
log = { version = "0.4.20" }
json5 = { version = "0.4.1" }
prost = { version = "0.12" }
serde = { version = "1.0.154", features = ["derive"] }
serde_json = { version = "1.0.94" }
uuid = { version = "1.7.0" }
Expand Down
1 change: 0 additions & 1 deletion subscription-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ async-trait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
uuid = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
Expand Down
75 changes: 41 additions & 34 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use async_std::sync::Mutex;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Mutex;
use up_rust::core::usubscription::{
EventDeliveryConfig, FetchSubscriptionsResponse, SubscribeAttributes, SubscriberInfo,
SubscriptionStatus,
};
use up_rust::UUri;
use up_rust::{UCode, UStatus};

pub type SubscribersMap = Mutex<HashMap<String, HashSet<SubscriptionInformation>>>;

Expand Down Expand Up @@ -61,25 +62,33 @@ pub struct SubscriptionCache {
subscription_cache_map: SubscribersMap,
}

impl Default for SubscriptionCache {
fn default() -> Self {
Self {
subscription_cache_map: Mutex::new(HashMap::new()),
}
}
}

/// A [`SubscriptionCache`] is used to store and manage subscriptions to
/// topics. It is kept local to the streamer. The streamer will receive updates
/// from the subscription service, and update the SubscriptionCache accordingly.
impl SubscriptionCache {
pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self {
pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Result<Self, UStatus> {
let mut subscription_cache_hash_map = HashMap::new();
for subscription in subscription_cache_map.subscriptions {
let topic = if let Some(topic) = subscription.topic.into_option() {
topic
} else {
println!("Unable to parse URI from subscription, skipping...");
continue;
};
let subscriber = if let Some(subscriber) = subscription.subscriber.into_option() {
subscriber
} else {
println!("Unable to parse subscriber from subscription, skipping...");
continue;
};
let topic = subscription.topic.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
let subscriber = subscription.subscriber.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
Expand All @@ -100,38 +109,36 @@ impl SubscriptionCache {
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone(),
subscriber,
topic: topic.clone()?,
subscriber: subscriber.clone()?,
status,
attributes,
config,
};
let subscriber_authority_name = subscription_information
.subscriber
.uri
.as_ref()
.unwrap()
.authority_name
.clone();
let subscriber_authority_name = match subscription_information.subscriber.uri.as_ref() {
Some(uri) => uri.authority_name.clone(),
None => {
return Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve authority name",
))
}
};
subscription_cache_hash_map
.entry(subscriber_authority_name)
.or_insert_with(HashSet::new)
.insert(subscription_information);
}
Self {
Ok(Self {
subscription_cache_map: Mutex::new(subscription_cache_hash_map),
}
})
}

pub async fn fetch_cache_entry(
&self,
entry: String,
) -> Option<HashSet<SubscriptionInformation>> {
let map = self.subscription_cache_map.lock().await;
pub fn fetch_cache_entry(&self, entry: String) -> Option<HashSet<SubscriptionInformation>> {
let map = match self.subscription_cache_map.lock() {
Ok(map) => map,
Err(_) => return None,
};
map.get(&entry).cloned()
}

pub async fn fetch_cache(&self) -> HashMap<String, HashSet<SubscriptionInformation>> {
self.subscription_cache_map.lock().await.clone()
}
}
8 changes: 8 additions & 0 deletions up-linux-streamer/DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
// Used when initializing host transport
authority: "linux"
},
usubscription_config: {
// Lists the path to the subscription file when using static file
file_path: "./utils/usubscription-static-file/static-configs/testdata.json"
},
zenoh_transport_config: {
// Endpoint to be used when starting Zenoh session
endpoint: "tcp/0.0.0.0:7447"
},
someip_config: {
// Determines the authority_name of the mechatronics network
// Used when initializing SOME/IP transport
Expand Down
4 changes: 2 additions & 2 deletions up-linux-streamer/examples/mE_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<(), UStatus> {

// There will be a single vsomeip_transport, as there is a connection into device and a streamer
// TODO: Add error handling if we fail to create a UPTransportVsomeip
let client: Arc<dyn UTransport> = Arc::new(
let publisher: Arc<dyn UTransport> = Arc::new(
UPTransportVsomeip::new_with_config(
&PUB_TOPIC_AUTHORITY.to_string(),
&REMOTE_AUTHORITY.to_string(),
Expand Down Expand Up @@ -86,6 +86,6 @@ async fn main() -> Result<(), UStatus> {
.unwrap();
println!("Sending Publish message:\n{publish_msg:?}");

client.send(publish_msg).await?;
publisher.send(publish_msg).await?;
}
}
14 changes: 6 additions & 8 deletions up-linux-streamer/examples/mE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[allow(dead_code)]
struct PublishReceiver {
client: Arc<dyn UTransport>,
}
struct PublishReceiver {}
impl PublishReceiver {
pub fn new(client: Arc<dyn UTransport>) -> Self {
Self { client }
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
Expand Down Expand Up @@ -80,7 +78,7 @@ async fn main() -> Result<(), UStatus> {

// There will be a single vsomeip_transport, as there is a connection into device and a streamer
// TODO: Add error handling if we fail to create a UPTransportVsomeip
let service: Arc<dyn UTransport> = Arc::new(
let subscriber: Arc<dyn UTransport> = Arc::new(
UPTransportVsomeip::new_with_config(
&SERVICE_AUTHORITY.to_string(),
&REMOTE_AUTHORITY.to_string(),
Expand All @@ -99,9 +97,9 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new(service.clone()));
let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new());
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
service
subscriber
.register_listener(&source_filter, None, publish_receiver.clone())
.await?;

Expand Down
29 changes: 21 additions & 8 deletions up-linux-streamer/examples/uE_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
********************************************************************************/

use async_trait::async_trait;
use clap::Parser;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use protobuf::Message;
use std::str::FromStr;
Expand All @@ -33,6 +34,14 @@ const CLIENT_RESOURCE_ID: u16 = 0;

const REQUEST_TTL: u32 = 1000;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// The endpoint for Zenoh client to connect to
#[arg(short, long, default_value = "tcp/0.0.0.0:7445")]
endpoint: String,
}

struct ServiceResponseListener;

#[async_trait]
Expand Down Expand Up @@ -60,21 +69,25 @@ impl UListener for ServiceResponseListener {
async fn main() -> Result<(), UStatus> {
env_logger::init();

let args = Args::parse();

println!("uE_client");

// TODO: Probably make somewhat configurable?
// Create a configuration object
let mut zenoh_config = Config::default();

// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445");
if !args.endpoint.is_empty() {
// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str());

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
}

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
// TODO: Add error handling if we fail to create a UPClientZenoh
let client: Arc<dyn UTransport> = Arc::new(
UPClientZenoh::new(zenoh_config, "linux".to_string())
.await
Expand Down
28 changes: 21 additions & 7 deletions up-linux-streamer/examples/uE_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use clap::Parser;
use chrono::Local;
use chrono::Timelike;
use hello_world_protos::hello_world_topics::Timer;
Expand All @@ -27,24 +28,37 @@ const PUB_TOPIC_UE_ID: u16 = 0x3039;
const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// The endpoint for Zenoh client to connect to
#[arg(short, long, default_value = "tcp/0.0.0.0:7444")]
endpoint: String,
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
env_logger::init();

let args = Args::parse();

println!("uE_publisher");

// TODO: Probably make somewhat configurable?
// Create a configuration object
let mut zenoh_config = Config::default();

// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445");
if !args.endpoint.is_empty() {
// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str());

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
}

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
// TODO: Add error handling if we fail to create a UPClientZenoh
let client: Arc<dyn UTransport> = Arc::new(
UPClientZenoh::new(zenoh_config, "linux".to_string())
Expand Down
32 changes: 24 additions & 8 deletions up-linux-streamer/examples/uE_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use clap::Parser;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::error;
use protobuf::Message;
Expand All @@ -22,6 +23,15 @@ impl ServiceRequestResponder {
Self { client }
}
}

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// The endpoint for Zenoh client to connect to
#[arg(short, long, default_value = "tcp/0.0.0.0:7443")]
endpoint: String,
}

#[async_trait]
impl UListener for ServiceRequestResponder {
async fn on_receive(&self, msg: UMessage) {
Expand Down Expand Up @@ -61,18 +71,24 @@ impl UListener for ServiceRequestResponder {
async fn main() -> Result<(), UStatus> {
env_logger::init();

let args = Args::parse();

println!("uE_service");

// TODO: Probably make somewhat configurable?
let mut zenoh_config = Config::default();
// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445");

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));

if !args.endpoint.is_empty() {
// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str());

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
}

// TODO: Add error handling if we fail to create a UPClientZenoh
let service: Arc<dyn UTransport> = Arc::new(
UPClientZenoh::new(zenoh_config, "linux".to_string())
Expand Down
Loading

0 comments on commit fd26e3d

Please sign in to comment.