Skip to content

Commit

Permalink
Merge pull request #2817 from didier-wenzek/refactor/rename-mqtt-mess…
Browse files Browse the repository at this point in the history
…age-type

Rename mqtt_channel::Message -> MqttMessage
  • Loading branch information
didier-wenzek authored Apr 9, 2024
2 parents 41b80d3 + 1b569f8 commit 8910ea5
Show file tree
Hide file tree
Showing 40 changed files with 480 additions and 448 deletions.
12 changes: 6 additions & 6 deletions crates/common/mqtt_channel/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use crate::Message;
use crate::MqttError;
use crate::MqttMessage;
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::SinkExt;
use futures::StreamExt;

#[async_trait]
pub trait SubChannel: StreamExt<Item = Message> + Unpin + Send {}
pub trait SubChannel: StreamExt<Item = MqttMessage> + Unpin + Send {}

#[async_trait]
pub trait ErrChannel: StreamExt<Item = MqttError> + Unpin + Send {}

#[async_trait]
pub trait PubChannel: SinkExt<Message> + Unpin + Send {
pub trait PubChannel: SinkExt<MqttMessage> + Unpin + Send {
/// Publish a message - unless the pub channel has been closed.
async fn publish(&mut self, message: Message) -> Result<(), MqttError> {
async fn publish(&mut self, message: MqttMessage) -> Result<(), MqttError> {
Ok(self
.send(message)
.await
Expand All @@ -23,10 +23,10 @@ pub trait PubChannel: SinkExt<Message> + Unpin + Send {
}

#[async_trait]
impl SubChannel for mpsc::UnboundedReceiver<Message> {}
impl SubChannel for mpsc::UnboundedReceiver<MqttMessage> {}

#[async_trait]
impl ErrChannel for mpsc::UnboundedReceiver<MqttError> {}

#[async_trait]
impl PubChannel for mpsc::UnboundedSender<Message> {}
impl PubChannel for mpsc::UnboundedSender<MqttMessage> {}
14 changes: 7 additions & 7 deletions crates/common/mqtt_channel/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::Message;
use crate::MqttMessage;
use crate::TopicFilter;
use certificate::parse_root_certificate;
use certificate::CertificateError;
Expand Down Expand Up @@ -51,7 +51,7 @@ pub struct Config {
/// LastWill message for a mqtt client
///
/// Default: None
pub last_will_message: Option<Message>,
pub last_will_message: Option<MqttMessage>,

/// With first message on connection
///
Expand Down Expand Up @@ -130,17 +130,17 @@ impl zeroize::Zeroize for PrivateKey {

#[derive(Clone)]
pub struct InitMessageFn {
initfn: Arc<Box<dyn Fn() -> Message + Send + Sync>>,
initfn: Arc<Box<dyn Fn() -> MqttMessage + Send + Sync>>,
}

impl InitMessageFn {
pub fn new(call_back: impl Fn() -> Message + Sync + Send + 'static) -> InitMessageFn {
pub fn new(call_back: impl Fn() -> MqttMessage + Sync + Send + 'static) -> InitMessageFn {
InitMessageFn {
initfn: Arc::new(Box::new(call_back)),
}
}

pub fn new_init_message(&self) -> Message {
pub fn new_init_message(&self) -> MqttMessage {
(*self.initfn)()
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Config {
}

/// Set the last will message, this will be published when the mqtt connection gets closed.
pub fn with_last_will_message(self, lwm: Message) -> Self {
pub fn with_last_will_message(self, lwm: MqttMessage) -> Self {
Self {
last_will_message: Some(lwm),
..self
Expand All @@ -251,7 +251,7 @@ impl Config {
/// Set the initial message
pub fn with_initial_message(
self,
initial_message: impl Fn() -> Message + Send + Sync + 'static,
initial_message: impl Fn() -> MqttMessage + Send + Sync + 'static,
) -> Self {
Self {
initial_message: Some(InitMessageFn::new(initial_message)),
Expand Down
14 changes: 7 additions & 7 deletions crates/common/mqtt_channel/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Config;
use crate::ErrChannel;
use crate::Message;
use crate::MqttError;
use crate::MqttMessage;
use crate::PubChannel;
use crate::SubChannel;
use futures::channel::mpsc;
Expand All @@ -24,10 +24,10 @@ use tokio::time::sleep;
/// A connection to some MQTT server
pub struct Connection {
/// The channel of the input messages received by this connection.
pub received: mpsc::UnboundedReceiver<Message>,
pub received: mpsc::UnboundedReceiver<MqttMessage>,

/// The channel of the output messages to be published on this connection.
pub published: mpsc::UnboundedSender<Message>,
pub published: mpsc::UnboundedSender<MqttMessage>,

/// The channel of the error messages received by this connection.
pub errors: mpsc::UnboundedReceiver<MqttError>,
Expand Down Expand Up @@ -120,7 +120,7 @@ impl Connection {

async fn open(
config: &Config,
mut message_sender: mpsc::UnboundedSender<Message>,
mut message_sender: mpsc::UnboundedSender<MqttMessage>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
) -> Result<(AsyncClient, EventLoop), MqttError> {
const INSECURE_MQTT_PORT: u16 = 1883;
Expand Down Expand Up @@ -198,7 +198,7 @@ impl Connection {
mqtt_client: AsyncClient,
config: Config,
mut event_loop: EventLoop,
mut message_sender: mpsc::UnboundedSender<Message>,
mut message_sender: mpsc::UnboundedSender<MqttMessage>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
) -> Result<(), MqttError> {
loop {
Expand Down Expand Up @@ -269,9 +269,9 @@ impl Connection {

async fn sender_loop(
mqtt_client: AsyncClient,
mut messages_receiver: mpsc::UnboundedReceiver<Message>,
mut messages_receiver: mpsc::UnboundedReceiver<MqttMessage>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
last_will: Option<Message>,
last_will: Option<MqttMessage>,
done: oneshot::Sender<()>,
) {
loop {
Expand Down
4 changes: 2 additions & 2 deletions crates/common/mqtt_channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A library to connect the local MQTT bus, publish messages and subscribe topics.
//!
//! ```no_run
//! use mqtt_channel::{Config, Connection, Message, Topic, MqttError, StreamExt, SinkExt};
//! use mqtt_channel::{Config, Connection, MqttMessage, Topic, MqttError, StreamExt, SinkExt};
//! use std::convert::TryInto;
//!
//! #[tokio::main]
Expand All @@ -17,7 +17,7 @@
//!
//! // Messages are published by sending them on the published channel
//! let output_topic = "test/output/topic".try_into()?;
//! published_messages.send(Message::new(&output_topic, "hello mqtt")).await?;
//! published_messages.send(MqttMessage::new(&output_topic, "hello mqtt")).await?;
//!
//! // Messages are received from the subscriptions on the received channel
//! let message = received_messages.next().await.ok_or(MqttError::ReadOnClosedConnection)?;
Expand Down
37 changes: 19 additions & 18 deletions crates/common/mqtt_channel/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::fmt::Write;

/// A message to be sent to or received from MQTT.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub struct MqttMessage {
pub topic: Topic,
pub payload: DebugPayload,
#[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")]
Expand Down Expand Up @@ -135,12 +135,12 @@ impl DebugPayload {
/// A message payload
pub type Payload = Vec<u8>;

impl Message {
pub fn new<B>(topic: &Topic, payload: B) -> Message
impl MqttMessage {
pub fn new<B>(topic: &Topic, payload: B) -> MqttMessage
where
B: Into<Payload>,
{
Message {
MqttMessage {
topic: topic.clone(),
payload: DebugPayload(payload.into()),
qos: QoS::AtLeastOnce,
Expand Down Expand Up @@ -175,15 +175,15 @@ impl Message {
}
}

impl From<Message> for Publish {
fn from(val: Message) -> Self {
impl From<MqttMessage> for Publish {
fn from(val: MqttMessage) -> Self {
let mut publish = Publish::new(&val.topic.name, val.qos, val.payload.0);
publish.retain = val.retain;
publish
}
}

impl From<Publish> for Message {
impl From<Publish> for MqttMessage {
fn from(msg: Publish) -> Self {
let Publish {
topic,
Expand All @@ -193,7 +193,7 @@ impl From<Publish> for Message {
..
} = msg;

Message {
MqttMessage {
topic: Topic::new_unchecked(&topic),
payload: DebugPayload(payload.to_vec()),
qos,
Expand All @@ -202,13 +202,13 @@ impl From<Publish> for Message {
}
}

impl<T, U> From<(T, U)> for Message
impl<T, U> From<(T, U)> for MqttMessage
where
T: AsRef<str>,
U: AsRef<str>,
{
fn from(value: (T, U)) -> Self {
Message::new(&Topic::new_unchecked(value.0.as_ref()), value.1.as_ref())
MqttMessage::new(&Topic::new_unchecked(value.0.as_ref()), value.1.as_ref())
}
}

Expand All @@ -221,37 +221,37 @@ mod tests {
#[test]
fn check_null_terminated_messages() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b"123\0"[..]);
let message = MqttMessage::new(&topic, &b"123\0"[..]);

assert_eq!(message.payload_bytes(), b"123");
}

#[test]
fn payload_bytes_removes_only_last_null_char() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b"123\0\0"[..]);
let message = MqttMessage::new(&topic, &b"123\0\0"[..]);

assert_eq!(message.payload_bytes(), b"123\0");
}

#[test]
fn check_empty_messages() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b""[..]);
let message = MqttMessage::new(&topic, &b""[..]);

assert_eq!(message.payload_bytes(), b"");
}
#[test]
fn check_non_null_terminated_messages() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b"123"[..]);
let message = MqttMessage::new(&topic, &b"123"[..]);

assert_eq!(message.payload_bytes(), b"123");
}
#[test]
fn payload_str_with_invalid_utf8_char_in_the_middle() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b"temperature\xc3\x28"[..]);
let message = MqttMessage::new(&topic, &b"temperature\xc3\x28"[..]);
assert_eq!(
message.payload_str().unwrap_err().to_string(),
"Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 11: temperature..."
Expand All @@ -260,7 +260,7 @@ mod tests {
#[test]
fn payload_str_with_invalid_utf8_char_in_the_beginning() {
let topic = Topic::new("trimmed").unwrap();
let message = Message::new(&topic, &b"\xc3\x28"[..]);
let message = MqttMessage::new(&topic, &b"\xc3\x28"[..]);
assert_eq!(
message.payload_str().unwrap_err().to_string(),
"Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..."
Expand All @@ -269,7 +269,7 @@ mod tests {

#[test]
fn message_serialize_deserialize() {
let message = Message {
let message = MqttMessage {
topic: Topic::new("test").unwrap(),
payload: DebugPayload("test-payload".as_bytes().to_vec()),
qos: QoS::AtMostOnce,
Expand All @@ -278,7 +278,8 @@ mod tests {

let json = serde_json::to_value(&message).expect("Serialization failed");
assert_eq!(json.get("payload").unwrap(), &json!("test-payload"));
let deserialized: Message = serde_json::from_value(json).expect("Deserialization failed");
let deserialized: MqttMessage =
serde_json::from_value(json).expect("Deserialization failed");
assert_eq!(deserialized, message);
}
}
26 changes: 13 additions & 13 deletions crates/common/mqtt_channel/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ async fn subscribing_to_messages() -> Result<(), anyhow::Error> {

#[derive(Debug, Clone, Eq, PartialEq)]
enum MaybeMessage {
Next(Message),
Next(MqttMessage),
Eos,
Timeout,
}

fn message(t: &str, p: &str) -> Message {
fn message(t: &str, p: &str) -> MqttMessage {
let topic = Topic::new(t).expect("a valid topic");
let payload = p.as_bytes();
Message::new(&topic, payload)
MqttMessage::new(&topic, payload)
}

async fn next_message(received: &mut (impl StreamExt<Item = Message> + Unpin)) -> MaybeMessage {
async fn next_message(received: &mut (impl StreamExt<Item = MqttMessage> + Unpin)) -> MaybeMessage {
match tokio::time::timeout(TIMEOUT, received.next()).await {
Ok(Some(msg)) => MaybeMessage::Next(msg),
Ok(None) => MaybeMessage::Eos,
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> {
while let MaybeMessage::Next(msg) = next_message(&mut input).await {
let req = msg.payload_str().expect("utf8 payload");
let res = req.to_uppercase();
let msg = Message::new(&out_topic, res.as_bytes());
let msg = MqttMessage::new(&out_topic, res.as_bytes());
if output.send(msg).await.is_err() {
// the connection has been closed
break;
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> {
while let Some(msg) = input.next().await {
let req = msg.payload_str().expect("utf8 payload");
let res = req.to_uppercase();
let msg = Message::new(&out_topic, res.as_bytes());
let msg = MqttMessage::new(&out_topic, res.as_bytes());
if output.send(msg).await.is_err() {
break;
}
Expand Down Expand Up @@ -456,15 +456,15 @@ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), any
let mut con = Connection::new(&mqtt_config).await.expect("a connection");

con.published
.send(Message::new(&topic, "datum 1"))
.send(MqttMessage::new(&topic, "datum 1"))
.await
.expect("message sent");
con.published
.send(Message::new(&topic, "datum 2"))
.send(MqttMessage::new(&topic, "datum 2"))
.await
.expect("message sent");
con.published
.send(Message::new(&topic, "datum 3"))
.send(MqttMessage::new(&topic, "datum 3"))
.await
.expect("message sent");

Expand Down Expand Up @@ -498,7 +498,7 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro
let topic = Topic::new_unchecked(topic);
let mqtt_config = Config::default()
.with_port(broker.port)
.with_last_will_message(Message {
.with_last_will_message(MqttMessage {
topic: topic.clone(),
payload: "good bye".to_string().into(),
qos: QoS::AtLeastOnce,
Expand All @@ -507,17 +507,17 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro
let mut con = Connection::new(&mqtt_config).await.expect("a connection");

con.published
.send(Message::new(&topic, "hello 1"))
.send(MqttMessage::new(&topic, "hello 1"))
.await
.expect("message sent");

con.published
.send(Message::new(&topic, "hello 2"))
.send(MqttMessage::new(&topic, "hello 2"))
.await
.expect("message sent");

con.published
.send(Message::new(&topic, "hello 3"))
.send(MqttMessage::new(&topic, "hello 3"))
.await
.expect("message sent");

Expand Down
Loading

0 comments on commit 8910ea5

Please sign in to comment.