Skip to content

Commit

Permalink
Address comments from Pete, clippy
Browse files Browse the repository at this point in the history
* Change subscription_cache to track based on authority instead of topic
* Remove some unnecessary commented code
* Resolve clippy issues preventing build from being successful
  • Loading branch information
matthewd0123 committed Aug 7, 2024
1 parent 37e4fe3 commit d578125
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 211 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ tarpaulin-report.html

.idea/
.vscode/launch.json
.vscode/settings.json
181 changes: 88 additions & 93 deletions example-utils/usubscription-static-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@ use up_rust::core::usubscription::{
FetchSubscriptionsResponse, NotificationsRequest, SubscriberInfo, Subscription,
SubscriptionRequest, SubscriptionResponse, USubscription, UnsubscribeRequest,
};
use up_rust::{UStatus, UUri};
use up_rust::{UCode, UStatus, UUri};

pub struct USubscriptionStaticFile {
static_file: PathBuf,
}

impl USubscriptionStaticFile {
pub fn new(static_file: Option<PathBuf>) -> Self {
// Set static_file to static_file if it has value, else set to "static-configs/testdata.json"
let static_file =
static_file.unwrap_or_else(|| PathBuf::from("static-configs/testdata.json"));
pub fn new(static_file: PathBuf) -> Self {
USubscriptionStaticFile { static_file }
}
}
Expand All @@ -43,7 +40,7 @@ impl USubscriptionStaticFile {
impl USubscription for USubscriptionStaticFile {
async fn subscribe(
&self,
subscription_request: SubscriptionRequest,
_subscription_request: SubscriptionRequest,
) -> Result<SubscriptionResponse, UStatus> {
todo!()
}
Expand All @@ -64,120 +61,118 @@ impl USubscription for USubscriptionStaticFile {

let mut subscriptions_vec = Vec::new();

let empty_fetch_response = FetchSubscriptionsResponse {
..Default::default()
};

println!("subscription_json_file: {:?}", subscription_json_file);

match canonicalize(subscription_json_file) {
Ok(subscription_json_file) => {

match fs::read_to_string(&subscription_json_file) {
Ok(data) => match serde_json::from_str::<Value>(&data) {
Ok(res) => {
if let Some(obj) = res.as_object() {
for (key, value) in obj {
println!("key: {}, value: {}", key, value);
let mut subscriber_set: HashSet<UUri> = HashSet::new();

if let Some(array) = value.as_array() {
for subscriber in array {
println!("subscriber: {}", subscriber);

if let Some(subscriber_str) = subscriber.as_str() {
match UUri::from_str(subscriber_str) {
Ok(uri) => {
println!("All good for subscriber");
subscriber_set.insert(uri);
}
Err(error) => {
println!("Error with Deserializing Subscriber: {}", error);
}
}
} else {
println!("Unable to parse subscriber");
}
}
}

println!("key: {}", key);
let topic = match UUri::from_str(&key.to_string()) {
Ok(mut uri) => {
println!("All good for key");
uri.resource_id = 0x8001;
uri
}
Err(error) => {
println!("Error with Deserializing Key: {}", error);
continue;
}
};

let details_vec = Vec::new();
for subscriber in subscriber_set {
let subscriber_info_tmp = SubscriberInfo {
uri: Some(subscriber).into(),
details: details_vec.clone(),
..Default::default()
};
let subscription_tmp = Subscription {
topic: Some(topic.clone()).into(),
subscriber: Some(subscriber_info_tmp).into(),
..Default::default()
};
subscriptions_vec.push(subscription_tmp);
}
let subscription_json_file = match canonicalize(subscription_json_file) {
Ok(path) => path,
Err(e) => {
return Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Static subscription file not found: {e:?}"),
))
}
};

let data = fs::read_to_string(subscription_json_file).map_err(|e| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Unable to read file: {e:?}"),
)
})?;

let res: Value = serde_json::from_str(&data).map_err(|e| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Unable to parse JSON: {e:?}"),
)
})?;

if let Some(obj) = res.as_object() {
for (key, value) in obj {
println!("key: {}, value: {}", key, value);
let mut subscriber_set: HashSet<UUri> = HashSet::new();

if let Some(array) = value.as_array() {
for subscriber in array {
println!("subscriber: {}", subscriber);

if let Some(subscriber_str) = subscriber.as_str() {
match UUri::from_str(subscriber_str) {
Ok(uri) => {
println!("All good for subscriber");
subscriber_set.insert(uri);
}
Err(error) => {
println!("Error with Deserializing Subscriber: {}", error);
}
}

println!("{}", res);
dbg!(&subscriptions_vec);
let fetch_response = FetchSubscriptionsResponse {
subscriptions: subscriptions_vec,
..Default::default()
};
Ok(fetch_response)
} else {
println!("Unable to parse subscriber");
}
Err(e) => {
eprintln!("Unable to parse JSON: {}", e);
Ok(empty_fetch_response)
}
},
Err(e) => {
eprintln!("Unable to read file: {}", e);
Ok(empty_fetch_response)
}
}
}
Err(e) => {
eprintln!("Unable to canonicalize path: {}", e);
Ok(empty_fetch_response)

println!("key: {}", key);
let topic = match UUri::from_str(&key.to_string()) {
Ok(mut uri) => {
println!("All good for key");
uri.resource_id = 0x8001;
uri
}
Err(error) => {
println!("Error with Deserializing Key: {}", error);
continue;
}
};

let details_vec = Vec::new();
for subscriber in subscriber_set {
let subscriber_info_tmp = SubscriberInfo {
uri: Some(subscriber).into(),
details: details_vec.clone(),
..Default::default()
};
let subscription_tmp = Subscription {
topic: Some(topic.clone()).into(),
subscriber: Some(subscriber_info_tmp).into(),
..Default::default()
};
subscriptions_vec.push(subscription_tmp);
}
}
}

println!("{}", res);
dbg!(&subscriptions_vec);
let fetch_response = FetchSubscriptionsResponse {
subscriptions: subscriptions_vec,
..Default::default()
};

Ok(fetch_response)
}

async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
async fn unsubscribe(&self, _unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
todo!()
}

async fn register_for_notifications(
&self,
notifications_register_request: NotificationsRequest,
_notifications_register_request: NotificationsRequest,
) -> Result<(), UStatus> {
todo!()
Ok(())
}

async fn unregister_for_notifications(
&self,
notifications_unregister_request: NotificationsRequest,
_notifications_unregister_request: NotificationsRequest,
) -> Result<(), UStatus> {
todo!()
}

async fn fetch_subscribers(
&self,
fetch_subscribers_request: FetchSubscribersRequest,
_fetch_subscribers_request: FetchSubscribersRequest,
) -> Result<FetchSubscribersResponse, UStatus> {
todo!();
}
Expand Down
49 changes: 28 additions & 21 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
********************************************************************************/

use async_std::sync::Mutex;
use protobuf::MessageField;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use up_rust::core::usubscription::{
Expand All @@ -21,9 +20,11 @@ use up_rust::core::usubscription::{
};
use up_rust::UUri;

pub type SubscribersMap = Mutex<HashMap<UUri, HashSet<SubscriptionInformation>>>;
pub type SubscribersMap = Mutex<HashMap<String, HashSet<SubscriptionInformation>>>;

// Tracks subscription information inside the SubscriptionCache
pub struct SubscriptionInformation {
pub topic: UUri,
pub subscriber: SubscriberInfo,
pub status: SubscriptionStatus,
pub attributes: SubscribeAttributes,
Expand All @@ -47,6 +48,7 @@ impl Hash for SubscriptionInformation {
impl Clone for SubscriptionInformation {
fn clone(&self) -> Self {
Self {
topic: self.topic.clone(),
subscriber: self.subscriber.clone(),
status: self.status.clone(),
attributes: self.attributes.clone(),
Expand All @@ -66,45 +68,53 @@ impl SubscriptionCache {
pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self {
let mut subscription_cache_hash_map = HashMap::new();
for subscription in subscription_cache_map.subscriptions {
let uri = if let Some(uri) = subscription.topic.into_option(){
uri
let topic = if let Some(topic) = subscription.topic.into_option() {
topic
} else {
println!("Unable to parse URI from subscription, skipping...");
continue;
};
let subscriber = if let Some(subscriber) = subscription.subscriber.into_option(){
let subscriber = if let Some(subscriber) = subscription.subscriber.into_option() {
subscriber
} else {
println!("Unable to parse subscriber from subscription, skipping...");
continue;
};
let status = if let Some(status) = subscription.status.into_option(){
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
println!("Unable to parse status from subscription, setting as default");
SubscriptionStatus::default()
};
let attributes = if let Some(attributes) = subscription.attributes.into_option(){
let attributes = if let Some(attributes) = subscription.attributes.into_option() {
attributes
} else {
println!("Unable to parse attributes from subscription, setting as default");
SubscribeAttributes::default()
};
let config = if let Some(config) = subscription.config.into_option(){
let config = if let Some(config) = subscription.config.into_option() {
config
} else {
println!("Unable to parse config from subscription, setting as default");
EventDeliveryConfig::default()
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone(),
subscriber,
status,
attributes,
config
config,
};
let subscriber_authority_name = subscription_information
.subscriber
.uri
.as_ref()
.unwrap()
.authority_name
.clone();
subscription_cache_hash_map
.entry(uri)
.entry(subscriber_authority_name)
.or_insert_with(HashSet::new)
.insert(subscription_information);
}
Expand All @@ -113,18 +123,15 @@ impl SubscriptionCache {
}
}

pub async fn fetch_cache(
pub async fn fetch_cache_entry(
&self,
) -> HashMap<UUri, HashSet<SubscriptionInformation>> {
let cache_map = self.subscription_cache_map.lock().await;
let mut cloned_map = HashMap::new();

for (key, value) in cache_map.iter() {
let cloned_key = key.clone();
let cloned_value = value.iter().cloned().collect::<HashSet<_>>();
cloned_map.insert(cloned_key, cloned_value);
}
entry: String,
) -> Option<HashSet<SubscriptionInformation>> {
let map = self.subscription_cache_map.lock().await;
map.get(&entry).cloned()
}

cloned_map
pub async fn fetch_cache(&self) -> HashMap<String, HashSet<SubscriptionInformation>> {
self.subscription_cache_map.lock().await.clone()
}
}
1 change: 1 addition & 0 deletions up-linux-streamer/examples/mE_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn main() -> Result<(), UStatus> {
&REMOTE_AUTHORITY.to_string(),
PUB_TOPIC_UE_ID,
&vsomeip_config.unwrap(),
None,
)
.unwrap(),
);
Expand Down
1 change: 1 addition & 0 deletions up-linux-streamer/examples/mE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async fn main() -> Result<(), UStatus> {
&REMOTE_AUTHORITY.to_string(),
SERVICE_UE_ID,
&vsomeip_config.unwrap(),
None,
)
.unwrap(),
);
Expand Down
Loading

0 comments on commit d578125

Please sign in to comment.