Skip to content

Commit

Permalink
Wrap UnboundedSender<BridgeMessage> with BridgeMessageSender
Browse files Browse the repository at this point in the history
So the BridgeHealthMonitor doesn't have to know the internal
representation of the BridgeMessage used to tee messages over the MQTT
target and the half bridge companion.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Sep 13, 2024
1 parent 0bbfd89 commit 1ee231d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 35 deletions.
15 changes: 6 additions & 9 deletions crates/extensions/tedge_mqtt_bridge/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::overall_status;
use crate::BridgeAsyncClient;
use crate::BridgeMessage;
use crate::BridgeMessageSender;
use crate::Status;
use futures::channel::mpsc;
use futures::SinkExt;
Expand All @@ -21,7 +21,7 @@ use tracing::log::info;
pub struct BridgeHealthMonitor {
topic: String,
rx_status: mpsc::Receiver<(&'static str, Status)>,
companion_bridge_half: mpsc::UnboundedSender<BridgeMessage>,
companion_bridge_half: BridgeMessageSender,
}

impl BridgeHealthMonitor {
Expand Down Expand Up @@ -55,14 +55,11 @@ impl BridgeHealthMonitor {
Publish::new(&self.topic, QoS::AtLeastOnce, status.unwrap().json());
health_msg.retain = true;

// Publish the health message over MQTT, but with no duplicate
// in order to maintain synchronisation between the two bridge halves
// Publish the health message over MQTT, but with no duplicate for the companion
// as this message doesn't have to be acknowledged
self.companion_bridge_half
.send(BridgeMessage::Pub {
publish: health_msg,
})
.await
.unwrap();
.internal_publish(health_msg)
.await;
}
}
}
Expand Down
64 changes: 38 additions & 26 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,27 +213,17 @@ struct BridgeAsyncClient {
/// Receives messages from the companion half bridge
rx: mpsc::Receiver<Option<(String, Publish)>>,

/// Sends to a background task that forwards the messages to the target and companion
///
/// (None, message) => {
/// - the message is published unchanged to the target
/// - a None sentinel value is sent to the companion
/// }
/// (Some(topic), message) => {
/// - the message is published to the target on the given topic
/// - Some(topic, message.clone()) is sent to the companion
/// }
///
unbounded_tx: mpsc::UnboundedSender<BridgeMessage>,
/// Sends messages to a background task that forwards the messages to the target and companion
sender: BridgeMessageSender,
}

impl BridgeAsyncClient {
pub fn recv(&mut self) -> futures::stream::Next<mpsc::Receiver<Option<(String, Publish)>>> {
self.rx.next()
}

pub fn clone_sender(&self) -> mpsc::UnboundedSender<BridgeMessage> {
self.unbounded_tx.clone()
pub fn clone_sender(&self) -> BridgeMessageSender {
self.sender.clone()
}

fn new(
Expand All @@ -245,27 +235,18 @@ impl BridgeAsyncClient {
let companion_bridge_half = BridgeAsyncClient {
target,
rx,
unbounded_tx,
sender: BridgeMessageSender { unbounded_tx },
};
companion_bridge_half.spawn_publisher(tx, unbounded_rx);
companion_bridge_half
}

async fn publish(&mut self, target_topic: String, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgePub {
target_topic,
publish,
})
.await
.unwrap()
self.sender.publish(target_topic, publish).await
}

async fn ack(&mut self, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgeAck { publish })
.await
.unwrap()
self.sender.ack(publish).await
}

fn spawn_publisher(
Expand Down Expand Up @@ -304,6 +285,37 @@ impl BridgeAsyncClient {
}
}

#[derive(Clone)]
struct BridgeMessageSender {
unbounded_tx: mpsc::UnboundedSender<BridgeMessage>,
}

impl BridgeMessageSender {
async fn internal_publish(&mut self, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::Pub { publish })
.await
.unwrap()
}

async fn publish(&mut self, target_topic: String, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgePub {
target_topic,
publish,
})
.await
.unwrap()
}

async fn ack(&mut self, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgeAck { publish })
.await
.unwrap()
}
}

/// Forward messages received from `recv_event_loop` to `target`
///
/// The result of running this function constitutes half the MQTT bridge, hence the name.
Expand Down

0 comments on commit 1ee231d

Please sign in to comment.