Skip to content

Commit

Permalink
Rename BidirectionalChannelHalf to BridgeAsyncClient
Browse files Browse the repository at this point in the history
This struct is now used as an AsyncClient, the fact there is a
bidirectional channel being more an internal details.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Sep 13, 2024
1 parent 715e8e1 commit 0bbfd89
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
4 changes: 2 additions & 2 deletions crates/extensions/tedge_mqtt_bridge/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::overall_status;
use crate::BidirectionalChannelHalf;
use crate::BridgeAsyncClient;
use crate::BridgeMessage;
use crate::Status;
use futures::channel::mpsc;
Expand Down Expand Up @@ -27,7 +27,7 @@ pub struct BridgeHealthMonitor {
impl BridgeHealthMonitor {
pub(crate) fn new(
topic: String,
bridge_half: &BidirectionalChannelHalf,
bridge_half: &BridgeAsyncClient,
) -> (mpsc::Sender<(&'static str, Status)>, Self) {
let (tx, rx_status) = mpsc::channel(10);
(
Expand Down
38 changes: 22 additions & 16 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,19 @@ impl MqttBridgeActorBuilder {
.map(|t| SubscribeFilter::new(t.to_owned(), QoS::AtLeastOnce))
.collect();

let [msgs_local, msgs_cloud] =
let [cloud_target, local_target] =
bidirectional_channel(cloud_client.clone(), local_client.clone(), in_flight.into());
let [(convert_local, bidir_local), (convert_cloud, bidir_cloud)] =
rules.converters_and_bidirectional_topic_filters();
let (tx_status, monitor) = BridgeHealthMonitor::new(health_topic.name.clone(), &msgs_cloud);
let (tx_status, monitor) =
BridgeHealthMonitor::new(health_topic.name.clone(), &local_target);
tokio::spawn(monitor.monitor());
tokio::spawn(half_bridge(
local_event_loop,
local_client.clone(),
cloud_target,
convert_local,
bidir_local,
msgs_local,
tx_status.clone(),
"local",
local_topics,
Expand All @@ -150,9 +151,9 @@ impl MqttBridgeActorBuilder {
tokio::spawn(half_bridge(
cloud_event_loop,
cloud_client.clone(),
local_target,
convert_cloud,
bidir_cloud,
msgs_cloud,
tx_status.clone(),
"cloud",
cloud_topics,
Expand All @@ -171,12 +172,12 @@ fn bidirectional_channel(
cloud_client: AsyncClient,
local_client: AsyncClient,
buffer: usize,
) -> [BidirectionalChannelHalf; 2] {
) -> [BridgeAsyncClient; 2] {
let (tx_first, rx_first) = mpsc::channel(buffer);
let (tx_second, rx_second) = mpsc::channel(buffer);
[
BidirectionalChannelHalf::new(cloud_client, tx_first, rx_second),
BidirectionalChannelHalf::new(local_client, tx_second, rx_first),
BridgeAsyncClient::new(cloud_client, tx_first, rx_second),
BridgeAsyncClient::new(local_client, tx_second, rx_first),
]
}

Expand All @@ -200,11 +201,18 @@ enum BridgeMessage {
Pub { publish: Publish },
}

struct BidirectionalChannelHalf {
/// Wraps the target of an half bridge with a channel to its half bridge companion.
///
/// So when a message is received and published by this half,
/// the companion will await for that message to be acknowledged by the target
/// before acknowledging to the source.
struct BridgeAsyncClient {
/// MQTT target for the messages
target: AsyncClient,

/// Receives messages from the companion half bridge
rx: mpsc::Receiver<Option<(String, Publish)>>,

/// Sends to a background task that forwards the messages to the target and companion
///
/// (None, message) => {
Expand All @@ -219,7 +227,7 @@ struct BidirectionalChannelHalf {
unbounded_tx: mpsc::UnboundedSender<BridgeMessage>,
}

impl BidirectionalChannelHalf {
impl BridgeAsyncClient {
pub fn recv(&mut self) -> futures::stream::Next<mpsc::Receiver<Option<(String, Publish)>>> {
self.rx.next()
}
Expand All @@ -234,7 +242,7 @@ impl BidirectionalChannelHalf {
rx: mpsc::Receiver<Option<(String, Publish)>>,
) -> Self {
let (unbounded_tx, unbounded_rx) = mpsc::unbounded();
let companion_bridge_half = BidirectionalChannelHalf {
let companion_bridge_half = BridgeAsyncClient {
target,
rx,
unbounded_tx,
Expand Down Expand Up @@ -401,9 +409,9 @@ impl BidirectionalChannelHalf {
async fn half_bridge(
mut recv_event_loop: EventLoop,
recv_client: AsyncClient,
mut target: BridgeAsyncClient,
transformer: TopicConverter,
bidirectional_topic_filters: Vec<Cow<'static, str>>,
mut companion_bridge_half: BidirectionalChannelHalf,
tx_health: mpsc::Sender<(&'static str, Status)>,
name: &'static str,
topics: Vec<SubscribeFilter>,
Expand Down Expand Up @@ -465,9 +473,7 @@ async fn half_bridge(
received += 1;
if let Some(publish) = loop_breaker.ensure_not_looped(publish).await {
if let Some(topic) = transformer.convert_topic(&publish.topic) {
companion_bridge_half
.publish(topic.to_string(), publish)
.await;
target.publish(topic.to_string(), publish).await;
forwarded += 1;
}
}
Expand All @@ -480,7 +486,7 @@ async fn half_bridge(
) => {
acknowledged += 1;
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) {
companion_bridge_half.ack(msg).await;
target.ack(msg).await;
finalized += 1;
} else {
info!("Bridge {name} connection received ack for unknown pkid={ack_pkid}");
Expand All @@ -491,7 +497,7 @@ async fn half_bridge(
Event::Outgoing(Outgoing::Publish(pkid)) => {
published += 1;
if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) {
match companion_bridge_half.recv().await {
match target.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
loop_breaker.forward_on_topic(topic, &msg);
Expand Down

0 comments on commit 0bbfd89

Please sign in to comment.