diff --git a/src/network/network_handler.rs b/src/network/network_handler.rs index a66b7b9..07dc976 100644 --- a/src/network/network_handler.rs +++ b/src/network/network_handler.rs @@ -412,7 +412,11 @@ impl NetworkHandler { Status::Subscribed => { if let Some(resp_buf) = self.try_match_pubsub_message(result).await { self.receive_result(resp_buf); - if self.subscriptions.is_empty() { + if self.subscriptions.is_empty() && self.pending_subscriptions.is_empty() { + debug!( + "[{}] goint out from Pub/Sub state to connected state", + self.tag + ); self.status = Status::Connected; } } @@ -591,24 +595,36 @@ impl NetworkHandler { | RefPubSubMessage::SSubscribe(channel_or_pattern) => { if let Some(pending_sub) = self.pending_subscriptions.pop_front() { if pending_sub.channel_or_pattern == channel_or_pattern { - self.subscriptions.insert( - channel_or_pattern.to_vec(), - (pending_sub.subscription_type, pending_sub.sender), - ); + if self + .subscriptions + .insert( + channel_or_pattern.to_vec(), + (pending_sub.subscription_type, pending_sub.sender), + ) + .is_some() + { + return Some(Err(Error::Client( + format!( + "There is already a subscription on channel '{}'", + String::from_utf8_lossy(channel_or_pattern) + ) + .to_string(), + ))); + } if pending_sub.more_to_come { return None; } } else { error!( - "[{}] Unexpected subscription confirmation on channel '{:?}'", + "[{}] Unexpected subscription confirmation on channel '{}'", self.tag, String::from_utf8_lossy(channel_or_pattern) ); } } else { error!( - "[{}] Cannot find pending subscription for channel '{:?}'", + "[{}] Cannot find pending subscription for channel '{}'", self.tag, String::from_utf8_lossy(channel_or_pattern) ); @@ -666,7 +682,7 @@ impl NetworkHandler { } None => { error!( - "[{}] Unexpected message on channel '{:?}' for pattern '{:?}'", + "[{}] Unexpected message on channel '{}' for pattern '{}'", self.tag, String::from_utf8_lossy(channel), String::from_utf8_lossy(pattern) diff --git a/src/tests/pub_sub_commands.rs b/src/tests/pub_sub_commands.rs index a1106e0..e826259 100644 --- a/src/tests/pub_sub_commands.rs +++ b/src/tests/pub_sub_commands.rs @@ -769,11 +769,10 @@ async fn split() -> Result<()> { Ok(()) } - #[cfg_attr(feature = "tokio-runtime", tokio::test)] #[cfg_attr(feature = "async-std-runtime", async_std::test)] #[serial] -async fn subscribe_twice() -> Result<()> { +async fn subscribe_multiple_times_to_the_same_channel() -> Result<()> { let pub_sub_client = get_test_client().await?; let regular_client = get_test_client().await?; @@ -782,12 +781,15 @@ async fn subscribe_twice() -> Result<()> { let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?; assert!(pub_sub_stream.subscribe("mychannel").await.is_err()); + assert!(pub_sub_client.subscribe("mychannel").await.is_err()); pub_sub_stream.psubscribe("pattern").await?; assert!(pub_sub_stream.psubscribe("pattern").await.is_err()); + assert!(pub_sub_client.psubscribe("pattern").await.is_err()); - pub_sub_stream.ssubscribe("mychannel").await?; - assert!(pub_sub_stream.ssubscribe("mychannel").await.is_err()); + pub_sub_stream.ssubscribe("myshardchannel").await?; + assert!(pub_sub_stream.ssubscribe("myshardchannel").await.is_err()); + assert!(pub_sub_client.ssubscribe("myshardchannel").await.is_err()); Ok(()) -} \ No newline at end of file +}