From 49ef5192c2e67e375ee711197f6477e1c58eb809 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Mon, 7 Oct 2024 17:54:03 +0200 Subject: [PATCH] [#79] Do not use Zenoh Queryables for RPC Removed code implementing RPC based on Queryables. Adapted tests and example code. --- Cargo.lock | 11 - Cargo.toml | 1 - examples/l2_rpc_client.rs | 9 +- src/lib.rs | 121 +---------- src/rpc.rs | 144 ------------- src/utransport.rs | 442 ++++---------------------------------- tests/l2_rpc.rs | 8 +- tests/rpc.rs | 10 +- 8 files changed, 62 insertions(+), 684 deletions(-) delete mode 100644 src/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index 489a32c..9bfdbc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,16 +230,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bitmask-enum" -version = "2.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb15541e888071f64592c0b4364fdff21b7cb0a247f984296699351963a8721" -dependencies = [ - "quote", - "syn 2.0.72", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -2637,7 +2627,6 @@ version = "0.2.0" dependencies = [ "anyhow", "async-trait", - "bitmask-enum", "bytes", "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 0885ebf..2700aec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ pedantic = "deny" [dependencies] anyhow = "1.0.75" async-trait = "0.1" -bitmask-enum = "2.2.4" bytes = "1.6.1" lazy_static = "1.4.0" protobuf = { version = "3.3" } diff --git a/examples/l2_rpc_client.rs b/examples/l2_rpc_client.rs index fec8932..497c09c 100644 --- a/examples/l2_rpc_client.rs +++ b/examples/l2_rpc_client.rs @@ -14,10 +14,10 @@ mod common; use std::{str::FromStr, sync::Arc}; use up_rust::{ - communication::{CallOptions, RpcClient, UPayload}, + communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload}, LocalUriProvider, UPayloadFormat, UPriority, UUri, UUID, }; -use up_transport_zenoh::{UPTransportZenoh, ZenohRpcClient}; +use up_transport_zenoh::UPTransportZenoh; #[tokio::main] async fn main() { @@ -30,7 +30,10 @@ async fn main() { .await .unwrap(), ); - let rpc_client = Arc::new(ZenohRpcClient::new(zenoh_transport.clone())); + let rpc_client = InMemoryRpcClient::new(zenoh_transport.clone(), zenoh_transport.clone()) + .await + .map(Arc::new) + .expect("failed to create RpcClient for Zenoh transport"); let sink_uuri = UUri::from_str("//rpc_server/1/1/1").unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 46c292c..5bf9a53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,13 +10,9 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -pub mod rpc; pub mod uri_provider; pub mod utransport; -pub use rpc::ZenohRpcClient; - -use bitmask_enum::bitmask; use protobuf::Message; use std::{ collections::HashMap, @@ -25,18 +21,11 @@ use std::{ }; use tokio::runtime::Runtime; use tracing::error; -use up_rust::{ - ComparableListener, LocalUriProvider, UAttributes, UCode, UListener, UPriority, UStatus, UUri, -}; +use up_rust::{ComparableListener, LocalUriProvider, UAttributes, UCode, UPriority, UStatus, UUri}; // Re-export Zenoh config pub use zenoh::config as zenoh_config; use zenoh::{ - bytes::ZBytes, - internal::runtime::Runtime as ZRuntime, - key_expr::OwnedKeyExpr, - pubsub::Subscriber, - qos::Priority, - query::{Query, Queryable}, + bytes::ZBytes, internal::runtime::Runtime as ZRuntime, pubsub::Subscriber, qos::Priority, Session, }; @@ -52,28 +41,11 @@ lazy_static::lazy_static! { .expect("Unable to create callback runtime"); } -#[bitmask(u8)] -enum MessageFlag { - Publish, - Notification, - Request, - Response, -} - type SubscriberMap = Arc>>>; -type QueryableMap = Arc>>>; -type QueryMap = Arc>>; -type RpcCallbackMap = Arc>>>; pub struct UPTransportZenoh { session: Arc, // Able to unregister Subscriber subscriber_map: SubscriberMap, - // Able to unregister Queryable - queryable_map: QueryableMap, - // Save the reqid to be able to send back response - query_map: QueryMap, - // Save the callback for RPC response - rpc_callback_map: RpcCallbackMap, // URI uri: UUri, } @@ -161,9 +133,6 @@ impl UPTransportZenoh { Ok(UPTransportZenoh { session: Arc::new(session), subscriber_map: Arc::new(Mutex::new(HashMap::new())), - queryable_map: Arc::new(Mutex::new(HashMap::new())), - query_map: Arc::new(Mutex::new(HashMap::new())), - rpc_callback_map: Arc::new(Mutex::new(HashMap::new())), uri, }) } @@ -276,59 +245,6 @@ impl UPTransportZenoh { }; Ok(uattributes) } - - // You can take a look at the table in up-spec for more detail - // https://github.com/eclipse-uprotocol/up-spec/blob/ca8172a8cf17d70e4f095e6c0d57fe2ebc68c58d/up-l1/README.adoc#23-registerlistener - #[allow(clippy::nonminimal_bool)] // Don't simplify the boolean expression for better understanding - fn get_listener_message_type( - source_uuri: &UUri, - sink_uuri: Option<&UUri>, - ) -> Result { - let mut flag = MessageFlag::none(); - let rpc_range = 1..0x7FFF_u32; - let nonrpc_range = 0x8000..0xFFFE_u32; - - let src_resource = source_uuri.resource_id; - // Notification / Request / Response - if let Some(dst_uuri) = sink_uuri { - let dst_resource = dst_uuri.resource_id; - - if (nonrpc_range.contains(&src_resource) && dst_resource == 0) - || (src_resource == 0xFFFF && dst_resource == 0) - || (src_resource == 0xFFFF && dst_resource == 0xFFFF) - { - flag |= MessageFlag::Notification; - } - if (src_resource == 0 && rpc_range.contains(&dst_resource)) - || (src_resource == 0xFFFF && rpc_range.contains(&dst_resource)) - || (src_resource == 0xFFFF && dst_resource == 0xFFFF) - { - flag |= MessageFlag::Request; - } - if (rpc_range.contains(&src_resource) && dst_resource == 0) - || (src_resource == 0xFFFF && dst_resource == 0) - || (src_resource == 0xFFFF && dst_resource == 0xFFFF) - { - flag |= MessageFlag::Response; - } - } else if nonrpc_range.contains(&src_resource) { - flag |= MessageFlag::Publish; - } - if flag.is_none() { - let src_resource = format!("{:X}", source_uuri.resource_id); - let dst_resource = if let Some(dst_uuri) = sink_uuri { - format!("{:X}", dst_uuri.resource_id) - } else { - String::from("None") - }; - Err(UStatus::fail_with_code( - UCode::INTERNAL, - format!("Wrong combination of resource ID in source UUri ({src_resource}) and sink UUri ({dst_resource}). Please check up-spec for more details."), - )) - } else { - Ok(flag) - } - } } #[cfg(test)] @@ -376,37 +292,4 @@ mod tests { ); } } - - #[test_case("//192.168.1.100/10AB/3/80CD", None, Ok(MessageFlag::Publish); "Publish Message")] - #[test_case("//192.168.1.100/10AB/3/80CD", Some("//192.168.1.101/20EF/4/0"), Ok(MessageFlag::Notification); "Notification Message")] - #[test_case("//192.168.1.100/10AB/3/0", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Request Message")] - #[test_case("//192.168.1.101/20EF/4/B", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Response); "Response Message")] - #[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.101/20EF/4/B"), Ok(MessageFlag::Request); "Listen to all Request Messages")] - #[test_case("//*/FFFF/FF/FFFF", Some("//192.168.1.100/10AB/3/0"), Ok(MessageFlag::Notification | MessageFlag::Response); "Listen to Notification and Response Messages")] - #[test_case("//*/FFFF/FF/FFFF", Some("//[::1]/FFFF/FF/FFFF"), Ok(MessageFlag::Notification | MessageFlag::Request | MessageFlag::Response); "Listen to all messages to a device")] - #[test_case("//*/FFFF/FF/FFFF", None, Err(UCode::INTERNAL); "Impossible scenario: Listen to all Publish Messages")] - #[test_case("//192.168.1.100/10AB/3/0", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Request Message")] - #[test_case("//192.168.1.101/20EF/4/B", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Response Message")] - #[test_case("//192.168.1.100/10AB/3/80CD", Some("//*/FFFF/FF/FFFF"), Err(UCode::INTERNAL); "Impossible scenario: Broadcast Notification Message")] - #[tokio::test(flavor = "multi_thread")] - async fn test_get_listener_message_type( - src_uri: &str, - sink_uri: Option<&str>, - result: Result, - ) { - let src = UUri::from_str(src_uri).unwrap(); - if let Some(uri) = sink_uri { - let dst = UUri::from_str(uri).unwrap(); - assert_eq!( - UPTransportZenoh::get_listener_message_type(&src, Some(&dst)) - .map_err(|e| e.get_code()), - result - ); - } else { - assert_eq!( - UPTransportZenoh::get_listener_message_type(&src, None).map_err(|e| e.get_code()), - result - ); - } - } } diff --git a/src/rpc.rs b/src/rpc.rs deleted file mode 100644 index 5732a4f..0000000 --- a/src/rpc.rs +++ /dev/null @@ -1,144 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ -use crate::UPTransportZenoh; -use async_trait::async_trait; -use std::{string::ToString, sync::Arc, time::Duration}; -use tracing::error; -use up_rust::{ - communication::{CallOptions, RpcClient, ServiceInvocationError, UPayload}, - LocalUriProvider, UAttributes, UCode, UMessageType, UPayloadFormat, UPriority, UStatus, UUri, - UUID, -}; -use zenoh::query::QueryTarget; - -pub struct ZenohRpcClient { - transport: Arc, -} -impl ZenohRpcClient { - /// Creates a new RPC client for the Zenoh transport. - /// - /// # Arguments - /// - /// * `transport` - The Zenoh uProtocol Transport Layer. - pub fn new(transport: Arc) -> Self { - ZenohRpcClient { transport } - } -} - -#[async_trait] -impl RpcClient for ZenohRpcClient { - async fn invoke_method( - &self, - method: UUri, - call_options: CallOptions, - payload: Option, - ) -> Result, ServiceInvocationError> { - // Get data and format from UPayload - let mut payload_data = None; - let mut payload_format = UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED; - if let Some(payload) = payload { - payload_format = payload.payload_format(); - payload_data = Some(payload.payload()); - } - - // Get source UUri - let source_uri = self.transport.get_source_uri(); - - let attributes = UAttributes { - type_: UMessageType::UMESSAGE_TYPE_REQUEST.into(), - id: Some(call_options.message_id().unwrap_or_else(UUID::build)).into(), - priority: call_options - .priority() - .unwrap_or(UPriority::UPRIORITY_CS4) - .into(), - source: Some(source_uri.clone()).into(), - sink: Some(method.clone()).into(), - ttl: Some(call_options.ttl()), - token: call_options.token(), - payload_format: payload_format.into(), - ..Default::default() - }; - - // Get Zenoh key - let zenoh_key = self - .transport - .to_zenoh_key_string(&source_uri, Some(&method)); - - // Put UAttributes into Zenoh user attachment - let Ok(attachment) = UPTransportZenoh::uattributes_to_attachment(&attributes) else { - let msg = "Unable to transform UAttributes to user attachment in Zenoh".to_string(); - error!("{msg}"); - return Err(ServiceInvocationError::Internal(msg)); - }; - - // Send the query - let mut getbuilder = self.transport.session.get(&zenoh_key); - getbuilder = match payload_data { - Some(data) => getbuilder.payload(data), - None => getbuilder, - } - .attachment(attachment) - .target(QueryTarget::BestMatching) - .timeout(Duration::from_millis(u64::from(call_options.ttl()))); - let Ok(replies) = getbuilder.await else { - let msg = "Error while sending Zenoh query".to_string(); - error!("{msg}"); - return Err(ServiceInvocationError::RpcError(UStatus { - code: UCode::INTERNAL.into(), - message: Some(msg), - ..Default::default() - })); - }; - - // Receive the reply - let Ok(reply) = replies.recv_async().await else { - let msg = "Error while receiving Zenoh reply".to_string(); - error!("{msg}"); - return Err(ServiceInvocationError::RpcError(UStatus { - code: UCode::INTERNAL.into(), - message: Some(msg), - ..Default::default() - })); - }; - match reply.result() { - Ok(sample) => { - let Some(payload_format) = sample - .attachment() - .and_then(|attach| UPTransportZenoh::attachment_to_uattributes(attach).ok()) - .map(|attr| attr.payload_format.enum_value_or_default()) - else { - let msg = "Unable to get the UPayloadFormat from the attachment".to_string(); - error!("{msg}"); - return Err(ServiceInvocationError::RpcError(UStatus { - code: UCode::INTERNAL.into(), - message: Some(msg), - ..Default::default() - })); - }; - Ok(Some(UPayload::new( - sample.payload().into::>(), - payload_format, - ))) - } - Err(e) => { - let msg = format!("Error while parsing Zenoh reply: {e:?}"); - error!("{msg}"); - return Err(ServiceInvocationError::RpcError(UStatus { - code: UCode::INTERNAL.into(), - message: Some(msg), - ..Default::default() - })); - } - } - } -} diff --git a/src/utransport.rs b/src/utransport.rs index f8ad3a5..63e9507 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -10,50 +10,23 @@ * * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use crate::{MessageFlag, UPTransportZenoh, CB_RUNTIME}; +use crate::{UPTransportZenoh, CB_RUNTIME}; use async_trait::async_trait; use bytes::Bytes; use lazy_static::lazy_static; -use std::{ - sync::{Arc, Mutex}, - time::Duration, -}; -use tokio::{ - runtime::{Handle, Runtime}, - task, -}; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; use tracing::{error, warn}; use up_rust::{ - ComparableListener, UAttributes, UAttributesValidators, UCode, UListener, UMessage, - UMessageType, UStatus, UTransport, UUri, -}; -use zenoh::{ - key_expr::keyexpr, - query::{Query, QueryTarget, Reply}, - sample::Sample, + ComparableListener, UAttributes, UAttributesValidators, UCode, UListener, UMessage, UStatus, + UTransport, UUri, }; +use zenoh::sample::Sample; lazy_static! { static ref TOKIO_RUNTIME: Mutex = Mutex::new(Runtime::new().unwrap()); } -#[inline] -fn invoke_block_callback(listener: &Arc, resp_msg: UMessage) { - match Handle::try_current() { - Ok(handle) => { - task::block_in_place(|| { - handle.block_on(listener.on_receive(resp_msg)); - }); - } - Err(_) => { - TOKIO_RUNTIME - .lock() - .unwrap() - .block_on(listener.on_receive(resp_msg)); - } - }; -} - #[inline] fn spawn_nonblock_callback(listener: &Arc, listener_msg: UMessage) { let listener = listener.clone(); @@ -63,158 +36,35 @@ fn spawn_nonblock_callback(listener: &Arc, listener_msg: UMessage } impl UPTransportZenoh { - async fn send_publish_notification( + async fn put_message( &self, zenoh_key: &str, payload: Bytes, - attributes: UAttributes, + attributes: &UAttributes, ) -> Result<(), UStatus> { // Transform UAttributes to user attachment in Zenoh - let Ok(attachment) = UPTransportZenoh::uattributes_to_attachment(&attributes) else { - let msg = "Unable to transform UAttributes to attachment".to_string(); + let attachment = UPTransportZenoh::uattributes_to_attachment(attributes).map_err(|e| { + let msg = format!("Unable to transform UAttributes to attachment: {e}"); error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); - }; + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) + })?; // Map the priority to Zenoh - let priority = UPTransportZenoh::map_zenoh_priority( - attributes.priority.enum_value().map_err(|_| { - let msg = "Unable to map to Zenoh priority".to_string(); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?, - ); + let priority = + UPTransportZenoh::map_zenoh_priority(attributes.priority.enum_value_or_default()); // Send data - let putbuilder = self - .session + self.session .put(zenoh_key, payload) .priority(priority) - .attachment(attachment); - putbuilder - .await - .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to send with Zenoh"))?; - - Ok(()) - } - - async fn send_request( - &self, - zenoh_key: &str, - payload: Bytes, - attributes: UAttributes, - ) -> Result<(), UStatus> { - // Transform UAttributes to user attachment in Zenoh - let Ok(attachment) = UPTransportZenoh::uattributes_to_attachment(&attributes) else { - let msg = "Unable to transform UAttributes to attachment".to_string(); - error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); - }; - - // Retrieve the callback - let zenoh_key = keyexpr::new(zenoh_key).unwrap(); - let mut resp_callback = None; - // Iterate all the saved callback and find the correct one. - for (saved_key, callback) in self.rpc_callback_map.lock().unwrap().iter() { - if zenoh_key.intersects(saved_key) { - resp_callback = Some(callback.clone()); - break; - } - } - let Some(resp_callback) = resp_callback else { - let msg = "Unable to get callback".to_string(); - error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); - }; - let zenoh_callback = move |reply: Reply| { - match reply.result() { - Ok(sample) => { - // Get UAttribute from the attachment - let Some(attachment) = sample.attachment() else { - warn!("Unable to get the attachment"); - return; - }; - let u_attribute = match UPTransportZenoh::attachment_to_uattributes(attachment) - { - Ok(uattr) => uattr, - Err(e) => { - warn!("Unable to transform attachment to UAttributes: {e:?}"); - return; - } - }; - // Create UMessage - invoke_block_callback( - &resp_callback, - UMessage { - attributes: Some(u_attribute).into(), - payload: Some(sample.payload().into()), - ..Default::default() - }, - ); - } - Err(e) => { - warn!("Unable to parse Zenoh reply: {e:?}"); - } - } - }; - - // Send query - let getbuilder = self - .session - .get(zenoh_key) - .payload(payload) - .attachment(attachment) - .target(QueryTarget::BestMatching) - .timeout(Duration::from_millis(u64::from( - attributes.ttl.unwrap_or(1000), - ))) - .callback(zenoh_callback); - getbuilder.await.map_err(|e| { - let msg = format!("Unable to send get with Zenoh: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INTERNAL, msg) - })?; - - Ok(()) - } - - async fn send_response(&self, payload: Bytes, attributes: UAttributes) -> Result<(), UStatus> { - // Transform UAttributes to user attachment in Zenoh - let Ok(attachment) = UPTransportZenoh::uattributes_to_attachment(&attributes) else { - let msg = "Unable to transform UAttributes to attachment".to_string(); - error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); - }; - - // Find out the corresponding query from HashMap - let reqid = attributes.reqid.to_string(); - let query = self - .query_map - .lock() - .unwrap() - .remove(&reqid) - .ok_or_else(|| { - let msg = "query doesn't exist".to_string(); - error!("{msg}"); - UStatus::fail_with_code(UCode::INTERNAL, msg) - })? - .clone(); - - // Send back the query - query - .reply(query.key_expr().clone(), payload) .attachment(attachment) .await - .map_err(|e| { - let msg = format!("Unable to reply with Zenoh: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INTERNAL, msg) - })?; + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to send with Zenoh"))?; Ok(()) } - async fn register_publish_notification_listener( + async fn register_subscriber( &self, zenoh_key: &String, listener: Arc, @@ -262,91 +112,28 @@ impl UPTransportZenoh { Ok(()) } - - async fn register_request_listener( - &self, - zenoh_key: &String, - listener: Arc, - ) -> Result<(), UStatus> { - // Setup callback - let listener_cloned = listener.clone(); - let query_map = self.query_map.clone(); - let callback = move |query: Query| { - // Create UAttribute from Zenoh user attachment - let Some(attachment) = query.attachment() else { - warn!("Unable to get attachment"); - return; - }; - let u_attribute = match UPTransportZenoh::attachment_to_uattributes(attachment) { - Ok(uattributes) => uattributes, - Err(e) => { - warn!("Unable to transform user attachment to UAttributes: {e:?}"); - return; - } - }; - // Create UMessage and store the query into HashMap (Will be used in send_response) - let msg = UMessage { - attributes: Some(u_attribute.clone()).into(), - payload: query.payload().map(zenoh::bytes::ZBytes::into), - ..Default::default() - }; - query_map - .lock() - .unwrap() - .insert(u_attribute.id.to_string(), query); - spawn_nonblock_callback(&listener_cloned, msg); - }; - - // Create Zenoh queryable - if let Ok(queryable) = self - .session - .declare_queryable(zenoh_key) - .callback_mut(callback) - .await - { - self.queryable_map.lock().unwrap().insert( - (zenoh_key.clone(), ComparableListener::new(listener)), - queryable, - ); - } else { - let msg = "Unable to register callback with Zenoh".to_string(); - error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); - } - - Ok(()) - } - - fn register_response_listener(&self, zenoh_key: &str, listener: Arc) { - // Store the response callback (Will be used in send_request) - let zenoh_key = keyexpr::new(zenoh_key).unwrap(); - self.rpc_callback_map - .lock() - .unwrap() - .insert(zenoh_key.to_owned(), listener); - } } #[async_trait] impl UTransport for UPTransportZenoh { async fn send(&self, message: UMessage) -> Result<(), UStatus> { - let attributes = *message.attributes.0.ok_or_else(|| { + let attribs = message.attributes.as_ref().ok_or_else(|| { let msg = "Invalid UAttributes".to_string(); error!("{msg}"); UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) })?; + UAttributesValidators::get_validator_for_attributes(attribs) + .validate(attribs) + .map_err(|e| { + let msg = e.to_string(); + error!("{msg}"); + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) + })?; + // Get Zenoh key - let source = *attributes.clone().source.0.ok_or_else(|| { - let msg = "attributes.source should not be empty".to_string(); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - let zenoh_key = if let Some(sink) = attributes.sink.clone().0 { - self.to_zenoh_key_string(&source, Some(&sink)) - } else { - self.to_zenoh_key_string(&source, None) - }; + let zenoh_key = + self.to_zenoh_key_string(attribs.source.get_or_default(), attribs.sink.as_ref()); // Get payload let payload = if let Some(payload) = message.payload { @@ -355,78 +142,7 @@ impl UTransport for UPTransportZenoh { Bytes::new() }; - // Check the type of UAttributes (Publish / Notification / Request / Response) - match attributes - .type_ - .enum_value() - .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to parse type"))? - { - UMessageType::UMESSAGE_TYPE_PUBLISH => { - UAttributesValidators::Publish - .validator() - .validate(&attributes) - .map_err(|e| { - let msg = format!("Wrong Publish UAttributes: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - // Send Publish - self.send_publish_notification(&zenoh_key, payload, attributes) - .await - } - UMessageType::UMESSAGE_TYPE_NOTIFICATION => { - UAttributesValidators::Notification - .validator() - .validate(&attributes) - .map_err(|e| { - let msg = format!("Wrong Notification UAttributes: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - // Send Publish - self.send_publish_notification(&zenoh_key, payload, attributes) - .await - } - UMessageType::UMESSAGE_TYPE_REQUEST => { - UAttributesValidators::Request - .validator() - .validate(&attributes) - .map_err(|e| { - let msg = format!("Wrong Request UAttributes: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - // Send Request - self.send_request(&zenoh_key, payload, attributes).await - } - UMessageType::UMESSAGE_TYPE_RESPONSE => { - UAttributesValidators::Response - .validator() - .validate(&attributes) - .map_err(|e| { - let msg = format!("Wrong Response UAttributes: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - // Send Response - self.send_response(payload, attributes).await - } - UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { - let msg = "Wrong Message type in UAttributes".to_string(); - error!("{msg}"); - Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)) - } - } - } - - async fn receive( - &self, - _source_filter: &UUri, - _sink_filter: Option<&UUri>, - ) -> Result { - let msg = "Not implemented".to_string(); - error!("{msg}"); - Err(UStatus::fail_with_code(UCode::UNIMPLEMENTED, msg)) + self.put_message(&zenoh_key, payload, attribs).await } async fn register_listener( @@ -435,36 +151,8 @@ impl UTransport for UPTransportZenoh { sink_filter: Option<&UUri>, listener: Arc, ) -> Result<(), UStatus> { - let flag = UPTransportZenoh::get_listener_message_type(source_filter, sink_filter)?; - // Publish & Notification - if flag.contains(MessageFlag::Publish) || flag.contains(MessageFlag::Notification) { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); - self.register_publish_notification_listener(&zenoh_key, listener.clone()) - .await?; - } - // RPC request - if flag.contains(MessageFlag::Request) { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); - self.register_request_listener(&zenoh_key, listener.clone()) - .await?; - } - // RPC response - if flag.contains(MessageFlag::Response) { - if let Some(sink_filter) = sink_filter { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(sink_filter, Some(source_filter)); - self.register_response_listener(&zenoh_key, listener.clone()); - } else { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Sink should not be None in Response", - )); - } - } - - Ok(()) + let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); + self.register_subscriber(&zenoh_key, listener.clone()).await } async fn unregister_listener( @@ -473,64 +161,18 @@ impl UTransport for UPTransportZenoh { sink_filter: Option<&UUri>, listener: Arc, ) -> Result<(), UStatus> { - let flag = UPTransportZenoh::get_listener_message_type(source_filter, sink_filter)?; - // Publish & Notification - if flag.contains(MessageFlag::Publish) || flag.contains(MessageFlag::Notification) { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); - if self - .subscriber_map - .lock() - .unwrap() - .remove(&(zenoh_key.clone(), ComparableListener::new(listener.clone()))) - .is_none() - { - let msg = "Publish / Notifcation listener doesn't exist".to_string(); - warn!("{msg}"); - return Err(UStatus::fail_with_code(UCode::NOT_FOUND, msg)); - } - } - // RPC request - if flag.contains(MessageFlag::Request) { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); - if self - .queryable_map - .lock() - .unwrap() - .remove(&(zenoh_key.clone(), ComparableListener::new(listener.clone()))) - .is_none() - { - let msg = "RPC request listener doesn't exist".to_string(); - warn!("{msg}"); - return Err(UStatus::fail_with_code(UCode::NOT_FOUND, msg)); - } - } - // RPC response - if flag.contains(MessageFlag::Response) { - if let Some(sink_filter) = sink_filter { - // Get Zenoh key - let zenoh_key = self.to_zenoh_key_string(sink_filter, Some(source_filter)); - let zenoh_key = keyexpr::new(&zenoh_key).unwrap(); - if self - .rpc_callback_map - .lock() - .unwrap() - .remove(zenoh_key) - .is_none() - { - let msg = "RPC response callback doesn't exist".to_string(); - warn!("{msg}"); - return Err(UStatus::fail_with_code(UCode::NOT_FOUND, msg)); - } - } else { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Sink should not be None in Response", - )); - } + let zenoh_key = self.to_zenoh_key_string(source_filter, sink_filter); + if self + .subscriber_map + .lock() + .unwrap() + .remove(&(zenoh_key.clone(), ComparableListener::new(listener.clone()))) + .is_none() + { + let msg = "Publish / Notifcation listener doesn't exist".to_string(); + warn!("{msg}"); + return Err(UStatus::fail_with_code(UCode::NOT_FOUND, msg)); } - Ok(()) } } diff --git a/tests/l2_rpc.rs b/tests/l2_rpc.rs index f83daab..8eba6ce 100644 --- a/tests/l2_rpc.rs +++ b/tests/l2_rpc.rs @@ -16,12 +16,11 @@ use std::sync::Arc; use tokio::time::{sleep, Duration}; use up_rust::{ communication::{ - CallOptions, InMemoryRpcServer, RequestHandler, RpcClient, RpcServer, + CallOptions, InMemoryRpcClient, InMemoryRpcServer, RequestHandler, RpcClient, RpcServer, ServiceInvocationError, UPayload, }, LocalUriProvider, UPayloadFormat, }; -use up_transport_zenoh::ZenohRpcClient; const METHOD_RESOURCE_ID: u16 = 0x00a0; @@ -89,7 +88,10 @@ async fn test_l2_rpc() { sleep(Duration::from_millis(1000)).await; // Create L2 RPC client - let rpc_client = Arc::new(ZenohRpcClient::new(uptransport_client.clone())); + let rpc_client = InMemoryRpcClient::new(uptransport_client.clone(), uptransport_client.clone()) + .await + .map(Arc::new) + .expect("failed to create RpcClient for Zenoh transport"); let payload = UPayload::new(request_data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT); let call_options = CallOptions::for_rpc_request(5_000, None, None, None); diff --git a/tests/rpc.rs b/tests/rpc.rs index 9c0868f..c89ee27 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -24,11 +24,11 @@ use tokio::{ time::{sleep, Duration}, }; use up_rust::{ - communication::{CallOptions, RpcClient, UPayload}, + communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload}, LocalUriProvider, UListener, UMessage, UMessageBuilder, UPayloadFormat, UPriority, UTransport, UUri, UUID, }; -use up_transport_zenoh::{UPTransportZenoh, ZenohRpcClient}; +use up_transport_zenoh::UPTransportZenoh; // RequestListener struct RequestListener { @@ -149,7 +149,11 @@ async fn test_rpc_server_client( // Send Request with ZenohRpcClient (L2 API) { - let rpc_client = Arc::new(ZenohRpcClient::new(uptransport_client.clone())); + let rpc_client = + InMemoryRpcClient::new(uptransport_client.clone(), uptransport_client.clone()) + .await + .map(Arc::new) + .expect("failed to create RpcClient for Zenoh transport"); let payload = UPayload::new(request_data.clone(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT); let call_options = CallOptions::for_rpc_request(