Skip to content

Commit

Permalink
fix subscribe multiple times to the same channel
Browse files Browse the repository at this point in the history
when subscribing to the same channel on the same client, an error will be returned
  • Loading branch information
mcatanzariti committed Apr 18, 2024
1 parent 95f26fd commit b168d8e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
32 changes: 24 additions & 8 deletions src/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ impl NetworkHandler {
Status::Subscribed => {
if let Some(resp_buf) = self.try_match_pubsub_message(result).await {
self.receive_result(resp_buf);
if self.subscriptions.is_empty() {
if self.subscriptions.is_empty() && self.pending_subscriptions.is_empty() {
debug!(
"[{}] goint out from Pub/Sub state to connected state",
self.tag
);
self.status = Status::Connected;
}
}
Expand Down Expand Up @@ -591,24 +595,36 @@ impl NetworkHandler {
| RefPubSubMessage::SSubscribe(channel_or_pattern) => {
if let Some(pending_sub) = self.pending_subscriptions.pop_front() {
if pending_sub.channel_or_pattern == channel_or_pattern {
self.subscriptions.insert(
channel_or_pattern.to_vec(),
(pending_sub.subscription_type, pending_sub.sender),
);
if self
.subscriptions
.insert(
channel_or_pattern.to_vec(),
(pending_sub.subscription_type, pending_sub.sender),
)
.is_some()
{
return Some(Err(Error::Client(
format!(
"There is already a subscription on channel '{}'",
String::from_utf8_lossy(channel_or_pattern)
)
.to_string(),
)));
}

if pending_sub.more_to_come {
return None;
}
} else {
error!(
"[{}] Unexpected subscription confirmation on channel '{:?}'",
"[{}] Unexpected subscription confirmation on channel '{}'",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
}
} else {
error!(
"[{}] Cannot find pending subscription for channel '{:?}'",
"[{}] Cannot find pending subscription for channel '{}'",
self.tag,
String::from_utf8_lossy(channel_or_pattern)
);
Expand Down Expand Up @@ -666,7 +682,7 @@ impl NetworkHandler {
}
None => {
error!(
"[{}] Unexpected message on channel '{:?}' for pattern '{:?}'",
"[{}] Unexpected message on channel '{}' for pattern '{}'",
self.tag,
String::from_utf8_lossy(channel),
String::from_utf8_lossy(pattern)
Expand Down
12 changes: 7 additions & 5 deletions src/tests/pub_sub_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,11 +769,10 @@ async fn split() -> Result<()> {
Ok(())
}


#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn subscribe_twice() -> Result<()> {
async fn subscribe_multiple_times_to_the_same_channel() -> Result<()> {
let pub_sub_client = get_test_client().await?;
let regular_client = get_test_client().await?;

Expand All @@ -782,12 +781,15 @@ async fn subscribe_twice() -> Result<()> {

let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
assert!(pub_sub_stream.subscribe("mychannel").await.is_err());
assert!(pub_sub_client.subscribe("mychannel").await.is_err());

pub_sub_stream.psubscribe("pattern").await?;
assert!(pub_sub_stream.psubscribe("pattern").await.is_err());
assert!(pub_sub_client.psubscribe("pattern").await.is_err());

pub_sub_stream.ssubscribe("mychannel").await?;
assert!(pub_sub_stream.ssubscribe("mychannel").await.is_err());
pub_sub_stream.ssubscribe("myshardchannel").await?;
assert!(pub_sub_stream.ssubscribe("myshardchannel").await.is_err());
assert!(pub_sub_client.ssubscribe("myshardchannel").await.is_err());

Ok(())
}
}

0 comments on commit b168d8e

Please sign in to comment.