Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message dispatching & small refactorings #8

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ impl UPClientMqtt {
tokio::spawn(async move {
while let Some(msg_opt) = message_stream.next().await {
let Some(msg) = msg_opt else {
//TODO: None means that the connection is dropped. This should be handled correctly.
trace!("Received empty message from stream.");
continue;
};
Expand All @@ -371,13 +372,10 @@ impl UPClientMqtt {
}
};

let payload = msg.payload();
let upayload = payload.to_vec();

// Create UMessage from UAttributes and UPayload.
let umessage = UMessage {
attributes: Some(uattributes).into(),
payload: Some(upayload.into()),
payload: Some(Bytes::copy_from_slice(msg.payload())),
..Default::default()
};

Expand All @@ -398,20 +396,33 @@ impl UPClientMqtt {
trace!("No listeners registered for topic: {}", sub_topic);
continue;
};
let owned_listeners = listeners.clone();

// Release the locks early, before message dispatch
drop(topic_map_read);
drop(subscription_map_read);

for listener in listeners.iter() {
listener.on_receive(umessage.clone()).await;
for listener in owned_listeners {
let msg = umessage.clone();
tokio::spawn(async move {
listener.on_receive(msg.clone()).await;
});
}
} else {
// Filter the topic map for topics that match the received topic, including wildcards.
let topics_iter = topic_map_read
let listeners: Vec<ComparableListener> = topic_map_read
.iter()
.filter(|(key, _)| UPClientMqtt::compare_topic(topic, key));

for (_topic, listeners) in topics_iter {
for listener in listeners.iter() {
listener.on_receive(umessage.clone()).await;
}
.filter(|(key, _)| UPClientMqtt::compare_topic(topic, key))
.flat_map(|(_topic, listener)| listener.to_owned())
.collect();
// Drop lock before message dispatch
drop(topic_map_read);

for listener in listeners {
let msg = umessage.clone();
tokio::spawn(async move {
listener.on_receive(msg.clone()).await;
});
}
}
}
Expand Down