Skip to content

Commit

Permalink
Make builtin bridge statistics more precise.
Browse files Browse the repository at this point in the history
The counts of messages forwarded to the target
and of those properly acknowledged to the source (i.e. finalized),
where optimistic: i.e. counting messages pushed in the processing queue
and not those actually processed. This is now fixed.

Has also been fixed, the received, published and acknowledged counters
excluding bridge unrelated messages (such as health messages).

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Sep 13, 2024
1 parent 1ee231d commit 028ea2f
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use std::collections::hash_map;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tedge_actors::futures::channel::mpsc;
Expand Down Expand Up @@ -215,6 +218,12 @@ struct BridgeAsyncClient {

/// Sends messages to a background task that forwards the messages to the target and companion
sender: BridgeMessageSender,

/// Count of messages that have been published (excluding health messages)
published: Arc<AtomicUsize>,

/// Count of messages that have been acknowledged
acknowledged: Arc<AtomicUsize>,
}

impl BridgeAsyncClient {
Expand All @@ -236,6 +245,8 @@ impl BridgeAsyncClient {
target,
rx,
sender: BridgeMessageSender { unbounded_tx },
published: Arc::new(AtomicUsize::new(0)),
acknowledged: Arc::new(AtomicUsize::new(0)),
};
companion_bridge_half.spawn_publisher(tx, unbounded_rx);
companion_bridge_half
Expand All @@ -249,12 +260,22 @@ impl BridgeAsyncClient {
self.sender.ack(publish).await
}

fn published(&self) -> usize {
self.published.load(Ordering::Relaxed)
}

fn acknowledged(&self) -> usize {
self.acknowledged.load(Ordering::Relaxed)
}

fn spawn_publisher(
&self,
mut tx: mpsc::Sender<Option<(String, Publish)>>,
mut unbounded_rx: mpsc::UnboundedReceiver<BridgeMessage>,
) {
let target = self.target.clone();
let published = self.published.clone();
let acknowledged = self.acknowledged.clone();
tokio::spawn(async move {
while let Some(message) = unbounded_rx.next().await {
match message {
Expand All @@ -268,6 +289,7 @@ impl BridgeAsyncClient {
.publish(target_topic, publish.qos, publish.retain, publish.payload)
.await
.unwrap();
published.fetch_add(1, Ordering::Relaxed);
}
BridgeMessage::Pub { publish } => {
tx.send(None).await.unwrap();
Expand All @@ -278,6 +300,7 @@ impl BridgeAsyncClient {
}
BridgeMessage::BridgeAck { publish } => {
target.ack(&publish).await.unwrap();
acknowledged.fetch_add(1, Ordering::Relaxed);
}
}
}
Expand Down Expand Up @@ -441,11 +464,8 @@ async fn half_bridge(
MessageLoopBreaker::new(recv_client.clone(), bidirectional_topic_filters);

let mut received = 0; // Count of messages received by this half-bridge
let mut forwarded = 0; // Count of messages forwarded to the companion half-bridge
let mut published = 0; // Count of messages published (by the companion)
let mut acknowledged = 0; // Count of messages acknowledged (by the MQTT end-point of the companion)
let mut finalized = 0; // Count of messages fully processed by this half-bridge
let mut ignored = 0; // Count of messages published to soon by the companion (AwaitAck)

loop {
let res = recv_event_loop.poll().await;
Expand All @@ -466,8 +486,10 @@ async fn half_bridge(
}
};
debug!("Received notification ({name}) {notification:?}");
debug!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={} acknowledged={acknowledged} finalized={finalized} ignored={ignored}",
forward_pkid_to_received_msg.len()
debug!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={waiting} acknowledged={acknowledged} finalized={finalized}",
forwarded = target.published(),
waiting = forward_pkid_to_received_msg.len(),
finalized = target.acknowledged(),
);

match notification {
Expand All @@ -482,11 +504,10 @@ async fn half_bridge(

// Forward messages from event loop to target
Event::Incoming(Incoming::Publish(publish)) => {
received += 1;
if let Some(publish) = loop_breaker.ensure_not_looped(publish).await {
if let Some(topic) = transformer.convert_topic(&publish.topic) {
received += 1;
target.publish(topic.to_string(), publish).await;
forwarded += 1;
}
}
}
Expand All @@ -496,22 +517,21 @@ async fn half_bridge(
Incoming::PubAck(PubAck { pkid: ack_pkid })
| Incoming::PubRec(PubRec { pkid: ack_pkid }),
) => {
acknowledged += 1;
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) {
acknowledged += 1;
target.ack(msg).await;
finalized += 1;
} else {
info!("Bridge {name} connection received ack for unknown pkid={ack_pkid}");
}
}

// Keep track of packet IDs so we can acknowledge messages
Event::Outgoing(Outgoing::Publish(pkid)) => {
published += 1;
if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) {
match target.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
published += 1;
loop_breaker.forward_on_topic(topic, &msg);
if pkid != 0 {
// Messages with pkid 0 (meaning QoS=0) should not be added to the hashmap
Expand All @@ -528,7 +548,6 @@ async fn half_bridge(
}
} else {
info!("Bridge {name} connection ignoring already known pkid={pkid}");
ignored += 1;
}
}

Expand Down

0 comments on commit 028ea2f

Please sign in to comment.