Skip to content

Commit

Permalink
Introduce specific BidirectionalChannelHalf methods
Browse files Browse the repository at this point in the history
The generic `send` method has been replaced by explicit `publish` and `ack` methods.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Sep 13, 2024
1 parent a6282d7 commit 715e8e1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 44 deletions.
13 changes: 8 additions & 5 deletions crates/extensions/tedge_mqtt_bridge/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::overall_status;
use crate::BidirectionalChannelHalf;
use crate::BridgeMessage;
use crate::Status;
use futures::channel::mpsc;
use futures::SinkExt;
Expand All @@ -17,16 +18,16 @@ use tracing::log::info;
///
/// When [Self::monitor] runs, this will watch the status of the bridge halves, and notify the
/// relevant MQTT topic about the overall health.
pub struct BridgeHealthMonitor<T> {
pub struct BridgeHealthMonitor {
topic: String,
rx_status: mpsc::Receiver<(&'static str, Status)>,
companion_bridge_half: mpsc::UnboundedSender<(Option<String>, T)>,
companion_bridge_half: mpsc::UnboundedSender<BridgeMessage>,
}

impl BridgeHealthMonitor<Publish> {
impl BridgeHealthMonitor {
pub(crate) fn new(
topic: String,
bridge_half: &BidirectionalChannelHalf<Publish>,
bridge_half: &BidirectionalChannelHalf,
) -> (mpsc::Sender<(&'static str, Status)>, Self) {
let (tx, rx_status) = mpsc::channel(10);
(
Expand Down Expand Up @@ -57,7 +58,9 @@ impl BridgeHealthMonitor<Publish> {
// Publish the health message over MQTT, but with no duplicate
// in order to maintain synchronisation between the two bridge halves
self.companion_bridge_half
.send((None, health_msg))
.send(BridgeMessage::Pub {
publish: health_msg,
})
.await
.unwrap();
}
Expand Down
113 changes: 74 additions & 39 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl MqttBridgeActorBuilder {
tokio::spawn(monitor.monitor());
tokio::spawn(half_bridge(
local_event_loop,
cloud_client.clone(),
local_client.clone(),
convert_local,
bidir_local,
Expand All @@ -150,7 +149,6 @@ impl MqttBridgeActorBuilder {
));
tokio::spawn(half_bridge(
cloud_event_loop,
local_client.clone(),
cloud_client.clone(),
convert_cloud,
bidir_cloud,
Expand All @@ -173,7 +171,7 @@ fn bidirectional_channel(
cloud_client: AsyncClient,
local_client: AsyncClient,
buffer: usize,
) -> [BidirectionalChannelHalf<Publish>; 2] {
) -> [BidirectionalChannelHalf; 2] {
let (tx_first, rx_first) = mpsc::channel(buffer);
let (tx_second, rx_second) = mpsc::channel(buffer);
[
Expand All @@ -182,11 +180,31 @@ fn bidirectional_channel(
]
}

struct BidirectionalChannelHalf<T> {
enum BridgeMessage {
/// A message to be published to a given target topic
///
/// This message will have to be acknowledged to its source by the companion half bridge
BridgePub {
target_topic: String,
publish: Publish,
},

/// A message to be acknowledged on the target
///
/// This message has been received by the companion half bridge
BridgeAck { publish: Publish },

/// A message *generated* by the bridge
///
/// This message has not to be acknowledged, as not received by the bridge.
Pub { publish: Publish },
}

struct BidirectionalChannelHalf {
/// MQTT target for the messages
target: AsyncClient,
/// Receives messages from the companion half bridge
rx: mpsc::Receiver<Option<(String, T)>>,
rx: mpsc::Receiver<Option<(String, Publish)>>,
/// Sends to a background task that forwards the messages to the target and companion
///
/// (None, message) => {
Expand All @@ -198,29 +216,18 @@ struct BidirectionalChannelHalf<T> {
/// - Some(topic, message.clone()) is sent to the companion
/// }
///
unbounded_tx: mpsc::UnboundedSender<(Option<String>, T)>,
unbounded_tx: mpsc::UnboundedSender<BridgeMessage>,
}

impl<T> BidirectionalChannelHalf<T> {
pub fn send<'a>(
&'a mut self,
target_topic: Option<String>,
message: T,
) -> futures::sink::Send<'a, mpsc::UnboundedSender<(Option<String>, T)>, (Option<String>, T)>
{
self.unbounded_tx.send((target_topic, message))
}

pub fn recv(&mut self) -> futures::stream::Next<mpsc::Receiver<Option<(String, T)>>> {
impl BidirectionalChannelHalf {
pub fn recv(&mut self) -> futures::stream::Next<mpsc::Receiver<Option<(String, Publish)>>> {
self.rx.next()
}

pub fn clone_sender(&self) -> mpsc::UnboundedSender<(Option<String>, T)> {
pub fn clone_sender(&self) -> mpsc::UnboundedSender<BridgeMessage> {
self.unbounded_tx.clone()
}
}

impl BidirectionalChannelHalf<Publish> {
fn new(
target: AsyncClient,
tx: mpsc::Sender<Option<(String, Publish)>>,
Expand All @@ -236,21 +243,54 @@ impl BidirectionalChannelHalf<Publish> {
companion_bridge_half
}

async fn publish(&mut self, target_topic: String, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgePub {
target_topic,
publish,
})
.await
.unwrap()
}

async fn ack(&mut self, publish: Publish) {
self.unbounded_tx
.send(BridgeMessage::BridgeAck { publish })
.await
.unwrap()
}

fn spawn_publisher(
&self,
mut tx: mpsc::Sender<Option<(String, Publish)>>,
mut unbounded_rx: mpsc::UnboundedReceiver<(Option<String>, Publish)>,
mut unbounded_rx: mpsc::UnboundedReceiver<BridgeMessage>,
) {
let target = self.target.clone();
tokio::spawn(async move {
while let Some((target_topic, publish)) = unbounded_rx.next().await {
let topic = target_topic.clone().unwrap_or(publish.topic.clone());
let duplicate = target_topic.map(|topic| (topic, publish.clone()));
tx.send(duplicate).await.unwrap();
target
.publish(topic, publish.qos, publish.retain, publish.payload)
.await
.unwrap();
while let Some(message) = unbounded_rx.next().await {
match message {
BridgeMessage::BridgePub {
target_topic,
publish,
} => {
let duplicate = (target_topic.clone(), publish.clone());
tx.send(Some(duplicate)).await.unwrap();
target
.publish(target_topic, publish.qos, publish.retain, publish.payload)
.await
.unwrap();
}
BridgeMessage::Pub { publish } => {
tx.send(None).await.unwrap();
target
.publish(publish.topic, publish.qos, publish.retain, publish.payload)
.await
.unwrap();
}
BridgeMessage::BridgeAck { publish } => {
target.ack(&publish).await.unwrap();
}
}
}
});
}
Expand Down Expand Up @@ -360,11 +400,10 @@ impl BidirectionalChannelHalf<Publish> {
#[allow(clippy::too_many_arguments)]
async fn half_bridge(
mut recv_event_loop: EventLoop,
target: AsyncClient,
recv_client: AsyncClient,
transformer: TopicConverter,
bidirectional_topic_filters: Vec<Cow<'static, str>>,
mut companion_bridge_half: BidirectionalChannelHalf<Publish>,
mut companion_bridge_half: BidirectionalChannelHalf,
tx_health: mpsc::Sender<(&'static str, Status)>,
name: &'static str,
topics: Vec<SubscribeFilter>,
Expand Down Expand Up @@ -427,9 +466,8 @@ async fn half_bridge(
if let Some(publish) = loop_breaker.ensure_not_looped(publish).await {
if let Some(topic) = transformer.convert_topic(&publish.topic) {
companion_bridge_half
.send(Some(topic.to_string()), publish)
.await
.unwrap();
.publish(topic.to_string(), publish)
.await;
forwarded += 1;
}
}
Expand All @@ -442,11 +480,8 @@ async fn half_bridge(
) => {
acknowledged += 1;
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) {
if let Err(err) = target.ack(&msg).await {
info!("Bridge {name} connection failed to ack: {err:?}");
} else {
finalized += 1;
}
companion_bridge_half.ack(msg).await;
finalized += 1;
} else {
info!("Bridge {name} connection received ack for unknown pkid={ack_pkid}");
}
Expand Down

0 comments on commit 715e8e1

Please sign in to comment.