diff --git a/subscription-cache/src/lib.rs b/subscription-cache/src/lib.rs index 2723b721..a3d5dfa2 100644 --- a/subscription-cache/src/lib.rs +++ b/subscription-cache/src/lib.rs @@ -32,6 +32,8 @@ pub struct SubscriptionInformation { pub config: EventDeliveryConfig, } +// Will be moving this to up-rust +// Issue: https://github.com/eclipse-uprotocol/up-rust/issues/178 impl Eq for SubscriptionInformation {} impl PartialEq for SubscriptionInformation { @@ -82,13 +84,13 @@ impl SubscriptionCache { UCode::INVALID_ARGUMENT, "Unable to retrieve topic".to_string(), ) - }); + })?; let subscriber = subscription.subscriber.into_option().ok_or_else(|| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, "Unable to retrieve topic".to_string(), ) - }); + })?; let status = if let Some(status) = subscription.status.into_option() { status } else { @@ -109,8 +111,8 @@ impl SubscriptionCache { }; // Create new hashset if the key does not exist and insert the subscription let subscription_information = SubscriptionInformation { - topic: topic.clone()?, - subscriber: subscriber.clone()?, + topic: topic.clone(), + subscriber: subscriber.clone(), status, attributes, config, diff --git a/up-linux-streamer/DEFAULT_CONFIG.json5 b/up-linux-streamer/DEFAULT_CONFIG.json5 index bbc92c18..c1930387 100644 --- a/up-linux-streamer/DEFAULT_CONFIG.json5 +++ b/up-linux-streamer/DEFAULT_CONFIG.json5 @@ -22,7 +22,7 @@ }, zenoh_transport_config: { // Endpoint to be used when starting Zenoh session - endpoint: "tcp/0.0.0.0:7447" + config_file: "./up-linux-streamer/ZENOH_CONFIG.json5" }, someip_config: { // Determines the authority_name of the mechatronics network diff --git a/up-linux-streamer/ZENOH_CONFIG.json5 b/up-linux-streamer/ZENOH_CONFIG.json5 new file mode 100644 index 00000000..5cbcc0e1 --- /dev/null +++ b/up-linux-streamer/ZENOH_CONFIG.json5 @@ -0,0 +1,419 @@ +/// This file attempts to list and document available configuration elements. +/// For a more complete view of the configuration's structure, check out `zenoh/src/config.rs`'s `Config` structure. +/// Note that the values here are correctly typed, but may not be sensible, so copying this file to change only the parts that matter to you is not good practice. +{ + /// The identifier (as unsigned 128bit integer in hexadecimal lowercase - leading zeros are not accepted) + /// that zenoh runtime will use. + /// If not set, a random unsigned 128bit integer will be used. + /// WARNING: this id must be unique in your zenoh network. + // id: "1234567890abcdef", + + /// The node's mode (router, peer or client) +// mode: "peer", + +// /// The node's metadata (name, location, DNS name, etc.) Arbitrary JSON data not interpreted by zenohd and available in admin space @/router/ +// metadata: { +// name: "strawberry", +// location: "Penny Lane" +// }, + +// /// Which endpoints to connect to. E.g. tcp/localhost:7447. +// /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup. +// connect: { +// endpoints: [ +// // "/
" +// ], +// }, + + /// Which endpoints to listen on. E.g. tcp/localhost:7447. + /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers, + /// peers, or client can use to establish a zenoh session. + listen: { + endpoints: [ + "tcp/0.0.0.0:7447" + ], + }, + /// Configure the scouting mechanisms and their behaviours +// scouting: { +// /// In client mode, the period dedicated to scouting for a router before failing +// timeout: 3000, +// /// In peer mode, the period dedicated to scouting remote peers before attempting other operations +// delay: 200, +// /// The multicast scouting configuration. +// multicast: { +// /// Whether multicast scouting is enabled or not +// enabled: true, +// /// The socket which should be used for multicast scouting +// address: "224.0.0.224:7446", +// /// The network interface which should be used for multicast scouting +// interface: "auto", // If not set or set to "auto" the interface if picked automatically +// /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast. +// /// Accepts a single value or different values for router, peer and client. +// /// Each value is bit-or-like combinations of "peer", "router" and "client". +// autoconnect: { router: "", peer: "router|peer" }, +// /// Whether or not to listen for scout messages on UDP multicast and reply to them. +// listen: true, +// }, +// /// The gossip scouting configuration. +// gossip: { +// /// Whether gossip scouting is enabled or not +// enabled: true, +// /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network. +// /// When false, gossip scouting informations are only propagated to the next hop. +// /// Activating multihop gossip implies more scouting traffic and a lower scalability. +// /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have +// /// direct connectivity with each other. +// multihop: false, +// /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. +// /// Accepts a single value or different values for router, peer and client. +// /// Each value is bit-or-like combinations of "peer", "router" and "client". +// autoconnect: { router: "", peer: "router|peer" }, +// }, +// }, + +// /// Configuration of data messages timestamps management. +// timestamping: { +// /// Whether data messages should be timestamped if not already. +// /// Accepts a single boolean value or different values for router, peer and client. +// enabled: { router: true, peer: false, client: false }, +// /// Whether data messages with timestamps in the future should be dropped or not. +// /// If set to false (default), messages with timestamps in the future are retimestamped. +// /// Timestamps are ignored if timestamping is disabled. +// drop_future_timestamp: false, +// }, + +// /// The default timeout to apply to queries in milliseconds. +// queries_default_timeout: 10000, + +// /// The routing strategy to use and it's configuration. +// routing: { +// /// The routing strategy to use in routers and it's configuration. +// router: { +// /// When set to true a router will forward data between two peers +// /// directly connected to it if it detects that those peers are not +// /// connected to each other. +// /// The failover brokering only works if gossip discovery is enabled. +// peers_failover_brokering: true, +// }, +// /// The routing strategy to use in peers and it's configuration. +// peer: { +// /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). +// mode: "peer_to_peer", +// }, +// }, + + // /// The declarations aggregation strategy. + // aggregation: { + // /// A list of key-expressions for which all included subscribers will be aggregated into. + // subscribers: [ + // // key_expression + // ], + // /// A list of key-expressions for which all included publishers will be aggregated into. + // publishers: [ + // // key_expression + // ], + // }, + +// /// Configure internal transport parameters +// transport: { +// unicast: { +// /// Timeout in milliseconds when opening a link +// accept_timeout: 10000, +// /// Maximum number of zenoh session in pending state while accepting +// accept_pending: 100, +// /// Maximum number of sessions that can be simultaneously alive +// max_sessions: 1000, +// /// Maximum number of incoming links that are admitted per session +// max_links: 1, +// /// Enables the LowLatency transport +// /// This option does not make LowLatency transport mandatory, the actual implementation of transport +// /// used will depend on Establish procedure and other party's settings +// /// +// /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. +// /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to +// /// enable 'lowlatency' you need to explicitly disable 'qos'. +// lowlatency: false, +// /// Enables QoS on unicast communications. +// qos: { +// enabled: true, +// }, +// /// Enables compression on unicast communications. +// /// Compression capabilities are negotiated during session establishment. +// /// If both Zenoh nodes support compression, then compression is activated. +// compression: { +// enabled: false, +// }, +// }, +// multicast: { +// /// Enables QoS on multicast communication. +// /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. +// qos: { +// enabled: false, +// }, +// /// Enables compression on multicast communication. +// /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. +// compression: { +// enabled: false, +// }, +// }, +// link: { +// /// An optional whitelist of protocols to be used for accepting and opening sessions. +// /// If not configured, all the supported protocols are automatically whitelisted. +// /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] +// /// For example, to only enable "tls" and "quic": +// // protocols: ["tls", "quic"], +// /// Configure the zenoh TX parameters of a link +// tx: { +// /// The resolution in bits to be used for the message sequence numbers. +// /// When establishing a session with another Zenoh instance, the lowest value of the two instances will be used. +// /// Accepted values: 8bit, 16bit, 32bit, 64bit. +// sequence_number_resolution: "32bit", +// /// Link lease duration in milliseconds to announce to other zenoh nodes +// lease: 10000, +// /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive +// /// messages will be sent at the configured time interval. +// /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, +// /// set the actual keep_alive timeout to one fourth of the lease time. +// /// This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity +// /// check which considers a link as failed when no messages are received in 3.5 times the +// /// target interval. +// keep_alive: 4, +// /// Batch size in bytes is expressed as a 16bit unsigned integer. +// /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). +// /// The default batch size value is the maximum batch size: 65535. +// batch_size: 65535, +// /// Each zenoh link has a transmission queue that can be configured +// queue: { +// /// The size of each priority queue indicates the number of batches a given queue can contain. +// /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. +// /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, +// /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU. +// /// If qos is false, then only the DATA priority will be allocated. +// size: { +// control: 1, +// real_time: 1, +// interactive_high: 1, +// interactive_low: 1, +// data_high: 2, +// data: 4, +// data_low: 4, +// background: 4, +// }, +// /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. +// /// Higher values lead to a more aggressive batching but it will introduce additional latency. +// backoff: 100, +// // Number of threads dedicated to transmission +// // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) +// // threads: 4, +// }, +// }, +// /// Configure the zenoh RX parameters of a link +// rx: { +// /// Receiving buffer size in bytes for each link +// /// The default the rx_buffer_size value is the same as the default batch size: 65335. +// /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate +// /// more in-flight data. This is particularly relevant when dealing with large messages. +// /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. +// buffer_size: 65535, +// /// Maximum size of the defragmentation buffer at receiver end. +// /// Fragmented messages that are larger than the configured size will be dropped. +// /// The default value is 1GiB. This would work in most scenarios. +// /// NOTE: reduce the value if you are operating on a memory constrained device. +// max_message_size: 1073741824, +// }, +// /// Configure TLS specific parameters +// tls: { +// /// Path to the certificate of the certificate authority used to validate either the server +// /// or the client's keys and certificates, depending on the node's mode. If not specified +// /// on router mode then the default WebPKI certificates are used instead. +// root_ca_certificate: null, +// /// Path to the TLS server private key +// server_private_key: null, +// /// Path to the TLS server public certificate +// server_certificate: null, +// /// Client authentication, if true enables mTLS (mutual authentication) +// client_auth: false, +// /// Path to the TLS client private key +// client_private_key: null, +// /// Path to the TLS client public certificate +// client_certificate: null, +// // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers. +// // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your +// // ca to verify that the server at baz.com is actually baz.com, let this be true (default). +// server_name_verification: null, +// }, +// }, +// /// Shared memory configuration +// shared_memory: { +// enabled: false, +// }, +// /// Access control configuration +// auth: { +// /// The configuration of authentification. +// /// A password implies a username is required. +// usrpwd: { +// user: null, +// password: null, +// /// The path to a file containing the user password dictionary +// dictionary_file: null, +// }, +// pubkey: { +// public_key_pem: null, +// private_key_pem: null, +// public_key_file: null, +// private_key_file: null, +// key_size: null, +// known_keys_file: null, +// }, +// }, +// }, + + /// Configure the Admin Space + /// Unstable: this configuration part works as advertised, but may change in a future release + // adminspace: { + // // read and/or write permissions on the admin space + // permissions: { + // read: true, + // write: false, + // }, + // }, + + /// + /// Plugins configurations + /// + // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup + // plugins_search_dirs: [], + // /// Plugins are only loaded if present in the configuration. When starting + // /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. + // plugins: { + // /// If no `__path__` is given to a plugin, zenohd will automatically search for a shared library matching the plugin's name (here, `libzenoh_plugin_rest.so` would be searched for on linux) + // + // /// Plugin settings may contain field `__config__` + // /// - If `__config__` is specified, it's content is merged into plugin configuration + // /// - Properties loaded from `__config__` file overrides existing properties + // /// - If json objects in loaded file contains `__config__` properties, they are processed recursively + // /// This is used in the 'storcge_manager' which supports subplugins, each with it's own config + // /// + // /// See below exapmle of plugin configuration using `__config__` property + // + // /// Configure the REST API plugin + // rest: { + // /// Setting this option to true allows zenohd to panic should it detect issues with this plugin. Setting it to false politely asks the plugin not to panic. + // __required__: true, // defaults to false + // /// load configuration from the file + // __config__: "./plugins/zenoh-plugin-rest/config.json5", + // /// http port to answer to rest requests + // http_port: 8000, + // }, + // + // /// Configure the storage manager plugin + // storage_manager: { + // /// When a path is present, automatic search is disabled, and zenohd will instead select the first path which manages to load. + // __path__: [ + // "./target/release/libzenoh_plugin_storage_manager.so", + // "./target/release/libzenoh_plugin_storage_manager.dylib", + // ], + // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup + // backend_search_dirs: [], + // /// The "memory" volume is always available, but you may create other volumes here, with various backends to support the actual storing. + // volumes: { + // /// An influxdb backend is also available at https://github.com/eclipse-zenoh/zenoh-backend-influxdb + // influxdb: { + // url: "https://myinfluxdb.example", + // /// Some plugins may need passwords in their configuration. + // /// To avoid leaking them through the adminspace, they may be masked behind a privacy barrier. + // /// any value held at the key "private" will not be shown in the adminspace. + // private: { + // username: "user1", + // password: "pw1", + // }, + // }, + // influxdb2: { + // /// A second backend of the same type can be spawned using `__path__`, for examples when different DBs are needed. + // backend: "influxdb", + // private: { + // username: "user2", + // password: "pw2", + // }, + // url: "https://localhost:8086", + // }, + // }, + // + // /// Configure the storages supported by the volumes + // storages: { + // demo: { + // /// Storages always need to know what set of keys they must work with. These sets are defined by a key expression. + // key_expr: "demo/memory/**", + // /// Storages also need to know which volume will be used to actually store their key-value pairs. + // /// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient. + // volume: "memory", + // }, + // demo2: { + // key_expr: "demo/memory2/**", + // volume: "memory", + // /// Storage manager plugin handles metadata in order to ensure convergence of distributed storages configured in Zenoh. + // /// Metadata includes the set of wild card updates and deletions (tombstones). + // /// Once the samples are guaranteed to be delivered, the metadata can be garbage collected. + // garbage_collection: { + // /// The garbage collection event will be periodic with this duration. + // /// The duration is specified in seconds. + // period: 30, + // /// Metadata older than this parameter will be garbage collected. + // /// The duration is specified in seconds. + // lifespan: 86400, + // }, + // /// If multiple storages subscribing to the same key_expr should be synchronized, declare them as replicas. + // /// In the absence of this configuration, a normal storage is initialized + // /// Note: all the samples to be stored in replicas should be timestamped + // replica_config: { + // /// Specifying the parameters is optional, by default the values provided will be used. + // /// Time interval between different synchronization attempts in seconds + // publication_interval: 5, + // /// Expected propagation delay of the network in milliseconds + // propagation_delay: 200, + // /// This is the chunk that you would like your data to be divide into in time, in milliseconds. + // /// Higher the frequency of updates, lower the delta should be chosen + // /// To be efficient, delta should be the time containing no more than 100,000 samples + // delta: 1000, + // } + // }, + // demo3: { + // key_expr: "demo/memory3/**", + // volume: "memory", + // /// A complete storage advertises itself as containing all the known keys matching the configured key expression. + // /// If not configured, complete defaults to false. + // complete: "true", + // }, + // influx_demo: { + // key_expr: "demo/influxdb/**", + // /// This prefix will be stripped of the received keys when storing. + // strip_prefix: "demo/influxdb", + // /// influxdb-backed volumes need a bit more configuration, which is passed like-so: + // volume: { + // id: "influxdb", + // db: "example", + // }, + // }, + // influx_demo2: { + // key_expr: "demo/influxdb2/**", + // strip_prefix: "demo/influxdb2", + // volume: { + // id: "influxdb2", + // db: "example", + // }, + // }, + // }, + // }, + // }, + + // /// Plugin configuration example using `__config__` property + // plugins: { + // rest: { + // __config__: "./plugins/zenoh-plugin-rest/config.json5", + // }, + // storage_manager: { + // __config__: "./plugins/zenoh-plugin-storage-manager/config.json5", + // } + // }, + +} \ No newline at end of file diff --git a/up-linux-streamer/examples/mE_subscriber.rs b/up-linux-streamer/examples/mE_subscriber.rs index f02d9888..2f4eefcd 100644 --- a/up-linux-streamer/examples/mE_subscriber.rs +++ b/up-linux-streamer/examples/mE_subscriber.rs @@ -33,12 +33,8 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver {} -impl PublishReceiver { - pub fn new() -> Self { - Self {} - } -} +struct PublishReceiver; + #[async_trait] impl UListener for PublishReceiver { async fn on_receive(&self, msg: UMessage) { @@ -47,7 +43,7 @@ impl UListener for PublishReceiver { let Some(payload_bytes) = msg.payload else { panic!("No bytes available"); }; - let _ = match Timer::parse_from_bytes(&payload_bytes) { + match Timer::parse_from_bytes(&payload_bytes) { Ok(timer_message) => { println!("timer: {timer_message:?}"); timer_message @@ -97,7 +93,7 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new()); + let publish_receiver: Arc = Arc::new(PublishReceiver); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases subscriber .register_listener(&source_filter, None, publish_receiver.clone()) diff --git a/up-linux-streamer/examples/uE_subscriber.rs b/up-linux-streamer/examples/uE_subscriber.rs index 4bdf6f0c..2222f968 100644 --- a/up-linux-streamer/examples/uE_subscriber.rs +++ b/up-linux-streamer/examples/uE_subscriber.rs @@ -29,12 +29,7 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver {} -impl PublishReceiver { - pub fn new() -> Self { - Self {} - } -} +struct PublishReceiver; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -105,7 +100,7 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new()); + let publish_receiver: Arc = Arc::new(PublishReceiver); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases subscriber .register_listener(&source_filter, None, publish_receiver.clone()) diff --git a/up-linux-streamer/src/config.rs b/up-linux-streamer/src/config.rs index 81f5d614..b61fbd37 100644 --- a/up-linux-streamer/src/config.rs +++ b/up-linux-streamer/src/config.rs @@ -33,7 +33,7 @@ pub struct USubscriptionConfig { #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct ZenohTransportConfig { - pub(crate) endpoint: String, + pub(crate) config_file: String, } #[derive(Deserialize, Serialize, Debug, Clone)] diff --git a/up-linux-streamer/src/main.rs b/up-linux-streamer/src/main.rs index 1d32bbd4..b81999bb 100644 --- a/up-linux-streamer/src/main.rs +++ b/up-linux-streamer/src/main.rs @@ -2,10 +2,9 @@ mod config; use crate::config::{Config, HostTransport}; use clap::Parser; -use log::trace; +use log::{warn, trace}; use std::fs::File; use std::io::Read; -use std::str::FromStr; use std::sync::Arc; use std::{env, thread}; use up_rust::{UCode, UStatus, UTransport}; @@ -14,7 +13,6 @@ use up_transport_vsomeip::UPTransportVsomeip; use up_transport_zenoh::UPClientZenoh; use usubscription_static_file::USubscriptionStaticFile; use zenoh::config::Config as ZenohConfig; -use zenoh::config::EndPoint as ZenohEndpoint; #[derive(Parser)] #[command()] @@ -55,16 +53,16 @@ async fn main() -> Result<(), UStatus> { usubscription, ); - let mut zenoh_config = ZenohConfig::default(); - - // Specify the address to listen on using IPv4 - let ipv4_endpoint = ZenohEndpoint::from_str(config.zenoh_transport_config.endpoint.as_str()); - - // Add the IPv4 endpoint to the Zenoh configuration - zenoh_config - .listen - .endpoints - .push(ipv4_endpoint.expect("FAIL")); + let zenoh_config = match ZenohConfig::from_file(config.zenoh_transport_config.config_file) { + Ok(config) => { + trace!("Able to read zenoh config from file"); + config + } + Err(error) => { + warn!("Unable to read zenoh config from file, using default: {}", error); + ZenohConfig::default() + } + }; let host_transport: Arc = Arc::new(match config.host_config.transport { HostTransport::Zenoh => { diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index ba3758f2..040316b3 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -11,8 +11,6 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#![allow(clippy::mutable_key_type)] - use crate::endpoint::Endpoint; use async_std::channel::{Receiver, Sender}; use async_std::sync::{Arc, Mutex}; @@ -209,12 +207,13 @@ impl ForwardingListeners { let forwarding_listener = Arc::new(ForwardingListener::new(forwarding_id, out_sender.clone())); - type UUriPair = Vec<(UUri, Option)>; - let mut uuri_to_backpedal: HashSet = HashSet::new(); + type SourceSinkFilterPair = (UUri, Option); + #[allow(clippy::mutable_key_type)] + let mut uuris_to_backpedal: HashSet = HashSet::new(); // Perform async registration and fetching - uuri_to_backpedal.insert(vec![(any_uuri(), Some(uauthority_to_uuri(out_authority)))]); + uuris_to_backpedal.insert((any_uuri(), Some(uauthority_to_uuri(out_authority)))); if let Err(err) = in_transport .register_listener( &any_uuri(), @@ -223,20 +222,34 @@ impl ForwardingListeners { ) .await { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); - for uuri_pair in &uuri_to_backpedal { - for (uuri1, uuri2) in uuri_pair { - // Do something with uuri1 and uuri2 - let _ = in_transport - .unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone()) - .await; - } + warn!( + "{}:{} unable to register request listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + for uuri_pair in &uuris_to_backpedal { + if let Err(err) = in_transport + .unregister_listener( + &uuri_pair.0, + uuri_pair.1.as_ref(), + forwarding_listener.clone(), + ) + .await + { + warn!( + "{}:{} unable to unregister listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + }; } return Err(ForwardingListenerError::FailToRegisterNotificationRequestResponseListener); } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); + debug!( + "{}:{} able to register request listener", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG + ); } + #[allow(clippy::mutable_key_type)] let subscribers = match subscription_cache .lock() .await @@ -244,26 +257,39 @@ impl ForwardingListeners { { Some(subscribers) => subscribers, None => { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}"); + warn!( + "{}:{} no subscribers found for out_authority: {:?}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, out_authority + ); HashSet::new() } }; for subscriber in subscribers { - uuri_to_backpedal.insert(vec![(subscriber.topic.clone(), None)]); + uuris_to_backpedal.insert((subscriber.topic.clone(), None)); if let Err(err) = in_transport .register_listener(&subscriber.topic, None, forwarding_listener.clone()) .await { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + warn!( + "{}:{} unable to register listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); // Perform async unregister_listener - for uuri_pair in &uuri_to_backpedal { - for (uuri1, uuri2) in uuri_pair { - // Do something with uuri1 and uuri2 - let _ = in_transport - .unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone()) - .await; - } + for uuri_pair in &uuris_to_backpedal { + if let Err(err) = in_transport + .unregister_listener( + &uuri_pair.0, + uuri_pair.1.as_ref(), + forwarding_listener.clone(), + ) + .await + { + warn!( + "{}:{} unable to unregister listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + }; } return Err(ForwardingListenerError::FailToRegisterPublishListener( subscriber.topic,