Skip to content

Commit

Permalink
Address further comments
Browse files Browse the repository at this point in the history
* Add comment for future work
* Change scope of ?
* Change some names
* Clean up backpedalling method
  • Loading branch information
matthewd0123 committed Aug 9, 2024
1 parent 7aefa30 commit b236907
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 58 deletions.
10 changes: 6 additions & 4 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct SubscriptionInformation {
pub config: EventDeliveryConfig,
}

// Will be moving this to up-rust
// Issue: https://github.com/eclipse-uprotocol/up-rust/issues/178
impl Eq for SubscriptionInformation {}

impl PartialEq for SubscriptionInformation {
Expand Down Expand Up @@ -82,13 +84,13 @@ impl SubscriptionCache {
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
})?;
let subscriber = subscription.subscriber.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
})?;
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
Expand All @@ -109,8 +111,8 @@ impl SubscriptionCache {
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone()?,
subscriber: subscriber.clone()?,
topic: topic.clone(),
subscriber: subscriber.clone(),
status,
attributes,
config,
Expand Down
2 changes: 1 addition & 1 deletion up-linux-streamer/DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
zenoh_transport_config: {
// Endpoint to be used when starting Zenoh session
endpoint: "tcp/0.0.0.0:7447"
config_file: "./up-linux-streamer/ZENOH_CONFIG.json5"
},
someip_config: {
// Determines the authority_name of the mechatronics network
Expand Down
419 changes: 419 additions & 0 deletions up-linux-streamer/ZENOH_CONFIG.json5

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions up-linux-streamer/examples/mE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[allow(dead_code)]
struct PublishReceiver {}
impl PublishReceiver {
pub fn new() -> Self {
Self {}
}
}
struct PublishReceiver;

#[async_trait]
impl UListener for PublishReceiver {
async fn on_receive(&self, msg: UMessage) {
Expand All @@ -47,7 +43,7 @@ impl UListener for PublishReceiver {
let Some(payload_bytes) = msg.payload else {
panic!("No bytes available");
};
let _ = match Timer::parse_from_bytes(&payload_bytes) {
match Timer::parse_from_bytes(&payload_bytes) {
Ok(timer_message) => {
println!("timer: {timer_message:?}");
timer_message
Expand Down Expand Up @@ -97,7 +93,7 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new());
let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver);
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
subscriber
.register_listener(&source_filter, None, publish_receiver.clone())
Expand Down
9 changes: 2 additions & 7 deletions up-linux-streamer/examples/uE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[allow(dead_code)]
struct PublishReceiver {}
impl PublishReceiver {
pub fn new() -> Self {
Self {}
}
}
struct PublishReceiver;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -105,7 +100,7 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new());
let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver);
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
subscriber
.register_listener(&source_filter, None, publish_receiver.clone())
Expand Down
2 changes: 1 addition & 1 deletion up-linux-streamer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct USubscriptionConfig {
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ZenohTransportConfig {
pub(crate) endpoint: String,
pub(crate) config_file: String,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down
24 changes: 11 additions & 13 deletions up-linux-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ mod config;

use crate::config::{Config, HostTransport};
use clap::Parser;
use log::trace;
use log::{warn, trace};
use std::fs::File;
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::{env, thread};
use up_rust::{UCode, UStatus, UTransport};
Expand All @@ -14,7 +13,6 @@ use up_transport_vsomeip::UPTransportVsomeip;
use up_transport_zenoh::UPClientZenoh;
use usubscription_static_file::USubscriptionStaticFile;
use zenoh::config::Config as ZenohConfig;
use zenoh::config::EndPoint as ZenohEndpoint;

#[derive(Parser)]
#[command()]
Expand Down Expand Up @@ -55,16 +53,16 @@ async fn main() -> Result<(), UStatus> {
usubscription,
);

let mut zenoh_config = ZenohConfig::default();

// Specify the address to listen on using IPv4
let ipv4_endpoint = ZenohEndpoint::from_str(config.zenoh_transport_config.endpoint.as_str());

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.endpoints
.push(ipv4_endpoint.expect("FAIL"));
let zenoh_config = match ZenohConfig::from_file(config.zenoh_transport_config.config_file) {
Ok(config) => {
trace!("Able to read zenoh config from file");
config
}
Err(error) => {
warn!("Unable to read zenoh config from file, using default: {}", error);
ZenohConfig::default()
}
};

let host_transport: Arc<dyn UTransport> = Arc::new(match config.host_config.transport {
HostTransport::Zenoh => {
Expand Down
74 changes: 50 additions & 24 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

#![allow(clippy::mutable_key_type)]

use crate::endpoint::Endpoint;
use async_std::channel::{Receiver, Sender};
use async_std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -209,12 +207,13 @@ impl ForwardingListeners {
let forwarding_listener =
Arc::new(ForwardingListener::new(forwarding_id, out_sender.clone()));

type UUriPair = Vec<(UUri, Option<UUri>)>;
let mut uuri_to_backpedal: HashSet<UUriPair> = HashSet::new();
type SourceSinkFilterPair = (UUri, Option<UUri>);
#[allow(clippy::mutable_key_type)]
let mut uuris_to_backpedal: HashSet<SourceSinkFilterPair> = HashSet::new();

// Perform async registration and fetching

uuri_to_backpedal.insert(vec![(any_uuri(), Some(uauthority_to_uuri(out_authority)))]);
uuris_to_backpedal.insert((any_uuri(), Some(uauthority_to_uuri(out_authority))));
if let Err(err) = in_transport
.register_listener(
&any_uuri(),
Expand All @@ -223,47 +222,74 @@ impl ForwardingListeners {
)
.await
{
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}");
for uuri_pair in &uuri_to_backpedal {
for (uuri1, uuri2) in uuri_pair {
// Do something with uuri1 and uuri2
let _ = in_transport
.unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone())
.await;
}
warn!(
"{}:{} unable to register request listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
for uuri_pair in &uuris_to_backpedal {
if let Err(err) = in_transport
.unregister_listener(
&uuri_pair.0,
uuri_pair.1.as_ref(),
forwarding_listener.clone(),
)
.await
{
warn!(
"{}:{} unable to unregister listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
};
}
return Err(ForwardingListenerError::FailToRegisterNotificationRequestResponseListener);
} else {
debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener");
debug!(
"{}:{} able to register request listener",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG
);
}

#[allow(clippy::mutable_key_type)]
let subscribers = match subscription_cache
.lock()
.await
.fetch_cache_entry(out_authority.into())
{
Some(subscribers) => subscribers,
None => {
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}");
warn!(
"{}:{} no subscribers found for out_authority: {:?}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, out_authority
);
HashSet::new()
}
};

for subscriber in subscribers {
uuri_to_backpedal.insert(vec![(subscriber.topic.clone(), None)]);
uuris_to_backpedal.insert((subscriber.topic.clone(), None));
if let Err(err) = in_transport
.register_listener(&subscriber.topic, None, forwarding_listener.clone())
.await
{
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}");
warn!(
"{}:{} unable to register listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
// Perform async unregister_listener
for uuri_pair in &uuri_to_backpedal {
for (uuri1, uuri2) in uuri_pair {
// Do something with uuri1 and uuri2
let _ = in_transport
.unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone())
.await;
}
for uuri_pair in &uuris_to_backpedal {
if let Err(err) = in_transport
.unregister_listener(
&uuri_pair.0,
uuri_pair.1.as_ref(),
forwarding_listener.clone(),
)
.await
{
warn!(
"{}:{} unable to unregister listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
};
}
return Err(ForwardingListenerError::FailToRegisterPublishListener(
subscriber.topic,
Expand Down

0 comments on commit b236907

Please sign in to comment.