From 715e8e1b939fcfc60c34e76123e0bacf93c2dab5 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 13 Sep 2024 15:12:10 +0200 Subject: [PATCH] Introduce specific BidirectionalChannelHalf methods The generic `send` method has been replaced by explicit `publish` and `ack` methods. Signed-off-by: Didier Wenzek --- .../tedge_mqtt_bridge/src/health.rs | 13 +- .../extensions/tedge_mqtt_bridge/src/lib.rs | 113 ++++++++++++------ 2 files changed, 82 insertions(+), 44 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/health.rs b/crates/extensions/tedge_mqtt_bridge/src/health.rs index b7cc439079..c9ed72bb89 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/health.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/health.rs @@ -1,5 +1,6 @@ use crate::overall_status; use crate::BidirectionalChannelHalf; +use crate::BridgeMessage; use crate::Status; use futures::channel::mpsc; use futures::SinkExt; @@ -17,16 +18,16 @@ use tracing::log::info; /// /// When [Self::monitor] runs, this will watch the status of the bridge halves, and notify the /// relevant MQTT topic about the overall health. -pub struct BridgeHealthMonitor { +pub struct BridgeHealthMonitor { topic: String, rx_status: mpsc::Receiver<(&'static str, Status)>, - companion_bridge_half: mpsc::UnboundedSender<(Option, T)>, + companion_bridge_half: mpsc::UnboundedSender, } -impl BridgeHealthMonitor { +impl BridgeHealthMonitor { pub(crate) fn new( topic: String, - bridge_half: &BidirectionalChannelHalf, + bridge_half: &BidirectionalChannelHalf, ) -> (mpsc::Sender<(&'static str, Status)>, Self) { let (tx, rx_status) = mpsc::channel(10); ( @@ -57,7 +58,9 @@ impl BridgeHealthMonitor { // Publish the health message over MQTT, but with no duplicate // in order to maintain synchronisation between the two bridge halves self.companion_bridge_half - .send((None, health_msg)) + .send(BridgeMessage::Pub { + publish: health_msg, + }) .await .unwrap(); } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 5ec7eeb803..6b3e3bbb32 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -138,7 +138,6 @@ impl MqttBridgeActorBuilder { tokio::spawn(monitor.monitor()); tokio::spawn(half_bridge( local_event_loop, - cloud_client.clone(), local_client.clone(), convert_local, bidir_local, @@ -150,7 +149,6 @@ impl MqttBridgeActorBuilder { )); tokio::spawn(half_bridge( cloud_event_loop, - local_client.clone(), cloud_client.clone(), convert_cloud, bidir_cloud, @@ -173,7 +171,7 @@ fn bidirectional_channel( cloud_client: AsyncClient, local_client: AsyncClient, buffer: usize, -) -> [BidirectionalChannelHalf; 2] { +) -> [BidirectionalChannelHalf; 2] { let (tx_first, rx_first) = mpsc::channel(buffer); let (tx_second, rx_second) = mpsc::channel(buffer); [ @@ -182,11 +180,31 @@ fn bidirectional_channel( ] } -struct BidirectionalChannelHalf { +enum BridgeMessage { + /// A message to be published to a given target topic + /// + /// This message will have to be acknowledged to its source by the companion half bridge + BridgePub { + target_topic: String, + publish: Publish, + }, + + /// A message to be acknowledged on the target + /// + /// This message has been received by the companion half bridge + BridgeAck { publish: Publish }, + + /// A message *generated* by the bridge + /// + /// This message has not to be acknowledged, as not received by the bridge. + Pub { publish: Publish }, +} + +struct BidirectionalChannelHalf { /// MQTT target for the messages target: AsyncClient, /// Receives messages from the companion half bridge - rx: mpsc::Receiver>, + rx: mpsc::Receiver>, /// Sends to a background task that forwards the messages to the target and companion /// /// (None, message) => { @@ -198,29 +216,18 @@ struct BidirectionalChannelHalf { /// - Some(topic, message.clone()) is sent to the companion /// } /// - unbounded_tx: mpsc::UnboundedSender<(Option, T)>, + unbounded_tx: mpsc::UnboundedSender, } -impl BidirectionalChannelHalf { - pub fn send<'a>( - &'a mut self, - target_topic: Option, - message: T, - ) -> futures::sink::Send<'a, mpsc::UnboundedSender<(Option, T)>, (Option, T)> - { - self.unbounded_tx.send((target_topic, message)) - } - - pub fn recv(&mut self) -> futures::stream::Next>> { +impl BidirectionalChannelHalf { + pub fn recv(&mut self) -> futures::stream::Next>> { self.rx.next() } - pub fn clone_sender(&self) -> mpsc::UnboundedSender<(Option, T)> { + pub fn clone_sender(&self) -> mpsc::UnboundedSender { self.unbounded_tx.clone() } -} -impl BidirectionalChannelHalf { fn new( target: AsyncClient, tx: mpsc::Sender>, @@ -236,21 +243,54 @@ impl BidirectionalChannelHalf { companion_bridge_half } + async fn publish(&mut self, target_topic: String, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgePub { + target_topic, + publish, + }) + .await + .unwrap() + } + + async fn ack(&mut self, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgeAck { publish }) + .await + .unwrap() + } + fn spawn_publisher( &self, mut tx: mpsc::Sender>, - mut unbounded_rx: mpsc::UnboundedReceiver<(Option, Publish)>, + mut unbounded_rx: mpsc::UnboundedReceiver, ) { let target = self.target.clone(); tokio::spawn(async move { - while let Some((target_topic, publish)) = unbounded_rx.next().await { - let topic = target_topic.clone().unwrap_or(publish.topic.clone()); - let duplicate = target_topic.map(|topic| (topic, publish.clone())); - tx.send(duplicate).await.unwrap(); - target - .publish(topic, publish.qos, publish.retain, publish.payload) - .await - .unwrap(); + while let Some(message) = unbounded_rx.next().await { + match message { + BridgeMessage::BridgePub { + target_topic, + publish, + } => { + let duplicate = (target_topic.clone(), publish.clone()); + tx.send(Some(duplicate)).await.unwrap(); + target + .publish(target_topic, publish.qos, publish.retain, publish.payload) + .await + .unwrap(); + } + BridgeMessage::Pub { publish } => { + tx.send(None).await.unwrap(); + target + .publish(publish.topic, publish.qos, publish.retain, publish.payload) + .await + .unwrap(); + } + BridgeMessage::BridgeAck { publish } => { + target.ack(&publish).await.unwrap(); + } + } } }); } @@ -360,11 +400,10 @@ impl BidirectionalChannelHalf { #[allow(clippy::too_many_arguments)] async fn half_bridge( mut recv_event_loop: EventLoop, - target: AsyncClient, recv_client: AsyncClient, transformer: TopicConverter, bidirectional_topic_filters: Vec>, - mut companion_bridge_half: BidirectionalChannelHalf, + mut companion_bridge_half: BidirectionalChannelHalf, tx_health: mpsc::Sender<(&'static str, Status)>, name: &'static str, topics: Vec, @@ -427,9 +466,8 @@ async fn half_bridge( if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { if let Some(topic) = transformer.convert_topic(&publish.topic) { companion_bridge_half - .send(Some(topic.to_string()), publish) - .await - .unwrap(); + .publish(topic.to_string(), publish) + .await; forwarded += 1; } } @@ -442,11 +480,8 @@ async fn half_bridge( ) => { acknowledged += 1; if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) { - if let Err(err) = target.ack(&msg).await { - info!("Bridge {name} connection failed to ack: {err:?}"); - } else { - finalized += 1; - } + companion_bridge_half.ack(msg).await; + finalized += 1; } else { info!("Bridge {name} connection received ack for unknown pkid={ack_pkid}"); }