Skip to content

Commit

Permalink
[eclipse-uprotocol#79] Do not use Zenoh Queryables for RPC
Browse files Browse the repository at this point in the history
Removed code implementing RPC based on Queryables. Adapted tests and
example code.
  • Loading branch information
sophokles73 committed Oct 10, 2024
1 parent 3a97fae commit 1d15ccf
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 684 deletions.
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pedantic = "deny"
[dependencies]
anyhow = "1.0.75"
async-trait = "0.1"
bitmask-enum = "2.2.4"
bytes = "1.6.1"
lazy_static = "1.4.0"
protobuf = { version = "3.3" }
Expand Down
9 changes: 6 additions & 3 deletions examples/l2_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ mod common;

use std::{str::FromStr, sync::Arc};
use up_rust::{
communication::{CallOptions, RpcClient, UPayload},
communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload},
LocalUriProvider, UPayloadFormat, UPriority, UUri, UUID,
};
use up_transport_zenoh::{UPTransportZenoh, ZenohRpcClient};
use up_transport_zenoh::UPTransportZenoh;

#[tokio::main]
async fn main() {
Expand All @@ -30,7 +30,10 @@ async fn main() {
.await
.unwrap(),
);
let rpc_client = Arc::new(ZenohRpcClient::new(zenoh_transport.clone()));
let rpc_client = InMemoryRpcClient::new(zenoh_transport.clone(), zenoh_transport.clone())
.await
.map(Arc::new)
.expect("failed to create RpcClient for Zenoh transport");

let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap();

Expand Down
121 changes: 2 additions & 119 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
pub mod rpc;
pub mod uri_provider;
pub mod utransport;

pub use rpc::ZenohRpcClient;

use bitmask_enum::bitmask;
use protobuf::Message;
use std::{
collections::HashMap,
Expand All @@ -25,18 +21,11 @@ use std::{
};
use tokio::runtime::Runtime;
use tracing::error;
use up_rust::{
ComparableListener, LocalUriProvider, UAttributes, UCode, UListener, UPriority, UStatus, UUri,
};
use up_rust::{ComparableListener, LocalUriProvider, UAttributes, UCode, UPriority, UStatus, UUri};
// Re-export Zenoh config
pub use zenoh::config as zenoh_config;
use zenoh::{
bytes::ZBytes,
internal::runtime::Runtime as ZRuntime,
key_expr::OwnedKeyExpr,
pubsub::Subscriber,
qos::Priority,
query::{Query, Queryable},
bytes::ZBytes, internal::runtime::Runtime as ZRuntime, pubsub::Subscriber, qos::Priority,
Session,
};

Expand All @@ -52,28 +41,11 @@ lazy_static::lazy_static! {
.expect("Unable to create callback runtime");
}

#[bitmask(u8)]
enum MessageFlag {
Publish,
Notification,
Request,
Response,
}

type SubscriberMap = Arc<Mutex<HashMap<(String, ComparableListener), Subscriber<()>>>>;
type QueryableMap = Arc<Mutex<HashMap<(String, ComparableListener), Queryable<()>>>>;
type QueryMap = Arc<Mutex<HashMap<String, Query>>>;
type RpcCallbackMap = Arc<Mutex<HashMap<OwnedKeyExpr, Arc<dyn UListener>>>>;
pub struct UPTransportZenoh {
session: Arc<Session>,
// Able to unregister Subscriber
subscriber_map: SubscriberMap,
// Able to unregister Queryable
queryable_map: QueryableMap,
// Save the reqid to be able to send back response
query_map: QueryMap,
// Save the callback for RPC response
rpc_callback_map: RpcCallbackMap,
// URI
local_uri: UUri,
}
Expand Down Expand Up @@ -164,9 +136,6 @@ impl UPTransportZenoh {
Ok(UPTransportZenoh {
session: Arc::new(session),
subscriber_map: Arc::new(Mutex::new(HashMap::new())),
queryable_map: Arc::new(Mutex::new(HashMap::new())),
query_map: Arc::new(Mutex::new(HashMap::new())),
rpc_callback_map: Arc::new(Mutex::new(HashMap::new())),
local_uri,
})
}
Expand Down Expand Up @@ -279,59 +248,6 @@ impl UPTransportZenoh {
};
Ok(uattributes)
}

// You can take a look at the table in up-spec for more detail
// https://github.com/eclipse-uprotocol/up-spec/blob/ca8172a8cf17d70e4f095e6c0d57fe2ebc68c58d/up-l1/README.adoc#23-registerlistener
#[allow(clippy::nonminimal_bool)] // Don't simplify the boolean expression for better understanding
fn get_listener_message_type(
source_uuri: &UUri,
sink_uuri: Option<&UUri>,
) -> Result<MessageFlag, UStatus> {
let mut flag = MessageFlag::none();
let rpc_range = 1..0x7FFF_u32;
let nonrpc_range = 0x8000..0xFFFE_u32;

let src_resource = source_uuri.resource_id;
// Notification / Request / Response
if let Some(dst_uuri) = sink_uuri {
let dst_resource = dst_uuri.resource_id;

if (nonrpc_range.contains(&src_resource) && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Notification;
}
if (src_resource == 0 && rpc_range.contains(&dst_resource))
|| (src_resource == 0xFFFF && rpc_range.contains(&dst_resource))
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Request;
}
if (rpc_range.contains(&src_resource) && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0)
|| (src_resource == 0xFFFF && dst_resource == 0xFFFF)
{
flag |= MessageFlag::Response;
}
} else if nonrpc_range.contains(&src_resource) {
flag |= MessageFlag::Publish;
}
if flag.is_none() {
let src_resource = format!("{:X}", source_uuri.resource_id);
let dst_resource = if let Some(dst_uuri) = sink_uuri {
format!("{:X}", dst_uuri.resource_id)
} else {
String::from("None")
};
Err(UStatus::fail_with_code(
UCode::INTERNAL,
format!("Wrong combination of resource ID in source UUri ({src_resource}) and sink UUri ({dst_resource}). Please check up-spec for more details."),
))
} else {
Ok(flag)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -384,37 +300,4 @@ mod tests {
);
}
}

#[test_case("//192.168.1.100/10AB/3/80CD", None, Ok(MessageFlag::Publish); "Publish Message")]
#[test_case("//192.168.1.100/10AB/3/80CD", Some("//192.168.1.101/20EF/4/0"), Ok(MessageFlag::Notification); "Notification Message")]
#[test_case("//192.168.1.100/10AB/3/0", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Request Message")]
#[test_case("//192.168.1.101/20EF/4/B", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Response); "Response Message")]
#[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Listen to all Request Messages")]
#[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Notification | MessageFlag::Response); "Listen to Notification and Response Messages")]
#[test_case("//*/FFFF/FF/FFFF", Some("//[::1]/FFFF/FF/FFFF"), Ok(MessageFlag::Notification | MessageFlag::Request | MessageFlag::Response); "Listen to all messages to a device")]
#[test_case("//*/FFFF/FF/FFFF", None, Err(UCode::INTERNAL); "Impossible scenario: Listen to all Publish Messages")]
#[test_case("//192.168.1.100/10AB/3/0", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Request Message")]
#[test_case("//192.168.1.101/20EF/4/B", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Response Message")]
#[test_case("//192.168.1.100/10AB/3/80CD", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Notification Message")]
#[tokio::test(flavor = "multi_thread")]
async fn test_get_listener_message_type(
src_uri: &str,
sink_uri: Option<&str>,
result: Result<MessageFlag, UCode>,
) {
let src = UUri::from_str(src_uri).unwrap();
if let Some(uri) = sink_uri {
let dst = UUri::from_str(uri).unwrap();
assert_eq!(
UPTransportZenoh::get_listener_message_type(&src, Some(&dst))
.map_err(|e| e.get_code()),
result
);
} else {
assert_eq!(
UPTransportZenoh::get_listener_message_type(&src, None).map_err(|e| e.get_code()),
result
);
}
}
}
144 changes: 0 additions & 144 deletions src/rpc.rs

This file was deleted.

Loading

0 comments on commit 1d15ccf

Please sign in to comment.