diff --git a/crates/extensions/tedge_mqtt_bridge/src/health.rs b/crates/extensions/tedge_mqtt_bridge/src/health.rs index 6b4134bf3f..456cc6f4b4 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/health.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/health.rs @@ -1,6 +1,6 @@ use crate::overall_status; use crate::BridgeAsyncClient; -use crate::BridgeMessage; +use crate::BridgeMessageSender; use crate::Status; use futures::channel::mpsc; use futures::SinkExt; @@ -21,7 +21,7 @@ use tracing::log::info; pub struct BridgeHealthMonitor { topic: String, rx_status: mpsc::Receiver<(&'static str, Status)>, - companion_bridge_half: mpsc::UnboundedSender, + companion_bridge_half: BridgeMessageSender, } impl BridgeHealthMonitor { @@ -55,14 +55,11 @@ impl BridgeHealthMonitor { Publish::new(&self.topic, QoS::AtLeastOnce, status.unwrap().json()); health_msg.retain = true; - // Publish the health message over MQTT, but with no duplicate - // in order to maintain synchronisation between the two bridge halves + // Publish the health message over MQTT, but with no duplicate for the companion + // as this message doesn't have to be acknowledged self.companion_bridge_half - .send(BridgeMessage::Pub { - publish: health_msg, - }) - .await - .unwrap(); + .internal_publish(health_msg) + .await; } } } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index c8661852b1..0e1ed945bc 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -213,18 +213,8 @@ struct BridgeAsyncClient { /// Receives messages from the companion half bridge rx: mpsc::Receiver>, - /// Sends to a background task that forwards the messages to the target and companion - /// - /// (None, message) => { - /// - the message is published unchanged to the target - /// - a None sentinel value is sent to the companion - /// } - /// (Some(topic), message) => { - /// - the message is published to the target on the given topic - /// - Some(topic, message.clone()) is sent to the companion - /// } - /// - unbounded_tx: mpsc::UnboundedSender, + /// Sends messages to a background task that forwards the messages to the target and companion + sender: BridgeMessageSender, } impl BridgeAsyncClient { @@ -232,8 +222,8 @@ impl BridgeAsyncClient { self.rx.next() } - pub fn clone_sender(&self) -> mpsc::UnboundedSender { - self.unbounded_tx.clone() + pub fn clone_sender(&self) -> BridgeMessageSender { + self.sender.clone() } fn new( @@ -245,27 +235,18 @@ impl BridgeAsyncClient { let companion_bridge_half = BridgeAsyncClient { target, rx, - unbounded_tx, + sender: BridgeMessageSender { unbounded_tx }, }; companion_bridge_half.spawn_publisher(tx, unbounded_rx); companion_bridge_half } async fn publish(&mut self, target_topic: String, publish: Publish) { - self.unbounded_tx - .send(BridgeMessage::BridgePub { - target_topic, - publish, - }) - .await - .unwrap() + self.sender.publish(target_topic, publish).await } async fn ack(&mut self, publish: Publish) { - self.unbounded_tx - .send(BridgeMessage::BridgeAck { publish }) - .await - .unwrap() + self.sender.ack(publish).await } fn spawn_publisher( @@ -304,6 +285,37 @@ impl BridgeAsyncClient { } } +#[derive(Clone)] +struct BridgeMessageSender { + unbounded_tx: mpsc::UnboundedSender, +} + +impl BridgeMessageSender { + async fn internal_publish(&mut self, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::Pub { publish }) + .await + .unwrap() + } + + async fn publish(&mut self, target_topic: String, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgePub { + target_topic, + publish, + }) + .await + .unwrap() + } + + async fn ack(&mut self, publish: Publish) { + self.unbounded_tx + .send(BridgeMessage::BridgeAck { publish }) + .await + .unwrap() + } +} + /// Forward messages received from `recv_event_loop` to `target` /// /// The result of running this function constitutes half the MQTT bridge, hence the name.