Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retained Messages #41

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions mqtt-v5-broker/src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{client::ClientMessage, tree::SubscriptionTree};
use crate::{client::ClientMessage, retained::RetainedMessageTree, tree::SubscriptionTree};
use bytes::Bytes;
use mqtt_v5::{
topic::TopicFilter,
types::{
Expand Down Expand Up @@ -193,13 +194,20 @@ pub struct Broker {
sender: Sender<BrokerMessage>,
receiver: Receiver<BrokerMessage>,
subscriptions: SubscriptionTree<SessionSubscription>,
retained_messages: RetainedMessageTree<Bytes>,
}

impl Broker {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(100);

Self { sessions: HashMap::new(), sender, receiver, subscriptions: SubscriptionTree::new() }
Self {
sessions: HashMap::new(),
sender,
receiver,
subscriptions: SubscriptionTree::new(),
retained_messages: RetainedMessageTree::new(),
}
}

pub fn sender(&self) -> Sender<BrokerMessage> {
Expand Down Expand Up @@ -346,6 +354,7 @@ impl Broker {

async fn handle_subscribe(&mut self, client_id: String, packet: SubscribePacket) {
let subscriptions = &mut self.subscriptions;
let retained_messages = &self.retained_messages;

if let Some(session) = self.sessions.get_mut(&client_id) {
// If a Server receives a SUBSCRIBE packet containing a Topic Filter that
Expand All @@ -368,7 +377,7 @@ impl Broker {
// and return the QoS that was granted.
let granted_qos_values = packet
.subscription_topics
.into_iter()
.iter()
.map(|topic| {
let session_subscription = SessionSubscription {
client_id: client_id.clone(),
Expand All @@ -394,6 +403,35 @@ impl Broker {
};

session.send(ClientMessage::Packet(Packet::SubscribeAck(subscribe_ack))).await;

// Send all retained messages which match the new subscriptions
let mut publish_packets = vec![];
for topic in packet.subscription_topics {
for (topic, retained_message) in
retained_messages.retained_messages(&topic.topic_filter)
{
let publish = PublishPacket {
is_duplicate: false,
qos: QoS::AtMostOnce, // TODO(bschwind)
retain: true,
topic,
payload: retained_message.clone(),

packet_id: None, // TODO(bschwind)
payload_format_indicator: None,
message_expiry_interval: None,
topic_alias: None,
response_topic: None,
correlation_data: None,
user_properties: vec![],
subscription_identifier: None,
content_type: None,
};
publish_packets.push(Packet::Publish(publish));
}
}

session.send(ClientMessage::Packets(publish_packets)).await;
}
}

Expand Down Expand Up @@ -531,6 +569,16 @@ impl Broker {
async fn handle_publish(&mut self, client_id: String, packet: PublishPacket) {
let mut is_dup = false;

if packet.retain {
if packet.payload.len() > 0 {
println!("Storing retained message for topic {:?}", packet.topic);
self.retained_messages.insert(&packet.topic, packet.payload.clone());
} else {
println!("Deleting retained message for topic {:?}", packet.topic);
self.retained_messages.remove(&packet.topic);
}
}

// For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing
// publish receive with the same ID, just send the publish receive again but don't forward
// the message.
Expand Down
1 change: 1 addition & 0 deletions mqtt-v5-broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio_util::codec::Framed;

mod broker;
mod client;
mod retained;
mod tree;

async fn client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {
Expand Down
Loading