From dda922507f5b4092e593c66ae33f54837973e5be Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 3 Aug 2023 11:56:43 +0800 Subject: [PATCH] feat: impl pubsub in metasrv (#2045) * feat: impl pubsub * add test_subscriber_disconnect unit test * chore: cr * cr * cr --- Cargo.lock | 1 + src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/error.rs | 9 + src/meta-srv/src/handler.rs | 1 + .../src/handler/publish_heartbeat_handler.rs | 50 +++++ src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 18 ++ src/meta-srv/src/metasrv/builder.rs | 16 ++ src/meta-srv/src/pubsub.rs | 47 +++++ src/meta-srv/src/pubsub/publish.rs | 72 +++++++ src/meta-srv/src/pubsub/subscribe_manager.rs | 115 +++++++++++ src/meta-srv/src/pubsub/subscriber.rs | 75 +++++++ src/meta-srv/src/pubsub/tests.rs | 185 ++++++++++++++++++ 13 files changed, 591 insertions(+) create mode 100644 src/meta-srv/src/handler/publish_heartbeat_handler.rs create mode 100644 src/meta-srv/src/pubsub.rs create mode 100644 src/meta-srv/src/pubsub/publish.rs create mode 100644 src/meta-srv/src/pubsub/subscribe_manager.rs create mode 100644 src/meta-srv/src/pubsub/subscriber.rs create mode 100644 src/meta-srv/src/pubsub/tests.rs diff --git a/Cargo.lock b/Cargo.lock index b08d71984c8d..0074c227fe9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5283,6 +5283,7 @@ dependencies = [ "serde", "serde_json", "servers", + "session", "snafu", "store-api", "table", diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index faaab3224ae4..aca607a2aa61 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -58,5 +58,6 @@ uuid.workspace = true [dev-dependencies] chrono.workspace = true common-procedure-test = { path = "../common/procedure-test" } +session = { path = "../session", features = ["testing"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 3ec1a699acdd..d2d0be501720 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; use tonic::Code; +use crate::pubsub::Message; + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -461,6 +463,12 @@ pub enum Error { #[snafu(display("Invalid heartbeat request: {}", err_msg))] InvalidHeartbeatRequest { err_msg: String, location: Location }, + + #[snafu(display("Failed to publish message: {:?}", source))] + PublishMessage { + source: SendError, + location: Location, + }, } pub type Result = std::result::Result; @@ -506,6 +514,7 @@ impl ErrorExt for Error { | Error::UpdateTableMetadata { .. } | Error::NoEnoughAvailableDatanode { .. } | Error::ConvertGrpcExpr { .. } + | Error::PublishMessage { .. } | Error::Join { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index f675bc5fedf0..611a0d5b18a3 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -54,6 +54,7 @@ pub mod mailbox_handler; pub mod node_stat; mod on_leader_start_handler; mod persist_stats_handler; +pub mod publish_heartbeat_handler; pub(crate) mod region_lease_handler; mod response_header_handler; diff --git a/src/meta-srv/src/handler/publish_heartbeat_handler.rs b/src/meta-srv/src/handler/publish_heartbeat_handler.rs new file mode 100644 index 000000000000..beceb4fe9af3 --- /dev/null +++ b/src/meta-srv/src/handler/publish_heartbeat_handler.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use async_trait::async_trait; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; +use crate::pubsub::{Message, PublishRef}; + +pub struct PublishHeartbeatHandler { + publish: PublishRef, +} + +impl PublishHeartbeatHandler { + pub fn new(publish: PublishRef) -> PublishHeartbeatHandler { + PublishHeartbeatHandler { publish } + } +} + +#[async_trait] +impl HeartbeatHandler for PublishHeartbeatHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + _: &mut Context, + _: &mut HeartbeatAccumulator, + ) -> Result<()> { + let msg = Message::Heartbeat(Box::new(req.clone())); + self.publish.send_msg(msg).await; + + Ok(()) + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 2058de1fa50d..3b23152a67d9 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -32,6 +32,7 @@ mod metrics; #[cfg(feature = "mock")] pub mod mocks; pub mod procedure; +pub mod pubsub; pub mod selector; mod sequence; pub mod service; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 94fb59325882..0bf5427ff11d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::metadata_service::MetadataServiceRef; +use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; @@ -177,6 +178,7 @@ pub struct MetaSrv { ddl_manager: DdlManagerRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemerty_task: Arc, + pubsub: Option<(PublishRef, SubscribeManagerRef)>, } impl MetaSrv { @@ -196,6 +198,7 @@ impl MetaSrv { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); let leader_cached_kv_store = self.leader_cached_kv_store.clone(); + let subscribe_manager = self.subscribe_manager().cloned(); let mut rx = election.subscribe_leader_change(); let task_handler = self.greptimedb_telemerty_task.clone(); let _handle = common_runtime::spawn_bg(async move { @@ -219,6 +222,12 @@ impl MetaSrv { }); } LeaderChangeMessage::StepDown(leader) => { + if let Some(sub_manager) = subscribe_manager.clone() { + info!("Leader changed, un_subscribe all"); + if let Err(e) = sub_manager.un_subscribe_all() { + error!("Failed to un_subscribe all, error: {}", e); + } + } error!("Leader :{:?} step down", leader); let _ = task_handler.stop().await.map_err(|e| { debug!( @@ -329,6 +338,14 @@ impl MetaSrv { &self.table_metadata_manager } + pub fn publish(&self) -> Option<&PublishRef> { + self.pubsub.as_ref().map(|suite| &suite.0) + } + + pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> { + self.pubsub.as_ref().map(|suite| &suite.1) + } + #[inline] pub fn new_ctx(&self) -> Context { let server_addr = self.options().server_addr.clone(); @@ -339,6 +356,7 @@ impl MetaSrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let skip_all = Arc::new(AtomicBool::new(false)); + Context { server_addr, in_memory, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index fe562cc0ae17..de041ab70039 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -26,6 +26,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; +use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, @@ -40,6 +41,7 @@ use crate::metasrv::{ }; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; +use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; use crate::service::mailbox::MailboxRef; @@ -60,6 +62,7 @@ pub struct MetaSrvBuilder { lock: Option, metadata_service: Option, datanode_clients: Option>, + pubsub: Option<(PublishRef, SubscribeManagerRef)>, } impl MetaSrvBuilder { @@ -75,6 +78,7 @@ impl MetaSrvBuilder { lock: None, metadata_service: None, datanode_clients: None, + pubsub: None, } } @@ -128,6 +132,11 @@ impl MetaSrvBuilder { self } + pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self { + self.pubsub = Some((publish, subscribe_manager)); + self + } + pub async fn build(self) -> Result { let started = Arc::new(AtomicBool::new(false)); @@ -142,6 +151,7 @@ impl MetaSrvBuilder { lock, metadata_service, datanode_clients, + pubsub, } = self; let options = options.unwrap_or_default(); @@ -215,6 +225,11 @@ impl MetaSrvBuilder { } group.add_handler(RegionLeaseHandler::default()).await; group.add_handler(PersistStatsHandler::default()).await; + if let Some((publish, _)) = pubsub.as_ref() { + group + .add_handler(PublishHeartbeatHandler::new(publish.clone())) + .await; + } group } }; @@ -237,6 +252,7 @@ impl MetaSrvBuilder { ddl_manager, table_metadata_manager, greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await, + pubsub, }) } } diff --git a/src/meta-srv/src/pubsub.rs b/src/meta-srv/src/pubsub.rs new file mode 100644 index 000000000000..0560861ebc9d --- /dev/null +++ b/src/meta-srv/src/pubsub.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::HeartbeatRequest; + +mod publish; +mod subscribe_manager; +mod subscriber; +#[cfg(test)] +mod tests; + +pub use publish::{DefaultPublish, Publish, PublishRef}; +pub use subscribe_manager::{ + AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery, + UnSubRequest, +}; +pub use subscriber::{Subscriber, SubscriberRef, Transport}; + +/// Subscribed topic type, determined by the ability of meta. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Topic { + Heartbeat, +} + +#[derive(Clone, Debug)] +pub enum Message { + Heartbeat(Box), +} + +impl Message { + pub fn topic(&self) -> Topic { + match self { + Message::Heartbeat(_) => Topic::Heartbeat, + } + } +} diff --git a/src/meta-srv/src/pubsub/publish.rs b/src/meta-srv/src/pubsub/publish.rs new file mode 100644 index 000000000000..8657b376c690 --- /dev/null +++ b/src/meta-srv/src/pubsub/publish.rs @@ -0,0 +1,72 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; + +use common_telemetry::error; + +use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest}; + +/// This trait provides a `send_msg` method that can be used by other modules +/// of meta to publish [Message]. +#[async_trait::async_trait] +pub trait Publish: Send + Sync { + async fn send_msg(&self, message: Message); +} + +pub type PublishRef = Arc; + +/// The default implementation of [Publish] +pub struct DefaultPublish { + subscribe_manager: Arc, + _transport: PhantomData, +} + +impl DefaultPublish { + pub fn new(subscribe_manager: Arc) -> Self { + Self { + subscribe_manager, + _transport: PhantomData, + } + } +} + +#[async_trait::async_trait] +impl Publish for DefaultPublish +where + M: SubscribeManager, + T: Transport + Debug, +{ + async fn send_msg(&self, message: Message) { + let sub_list = self + .subscribe_manager + .subscribers_by_topic(&message.topic()); + + for sub in sub_list { + if sub.transport_msg(message.clone()).await.is_err() { + // If an error occurs, we consider the subscriber offline, + // so un_subscribe here. + let req = UnSubRequest { + subscriber_id: sub.id(), + }; + + if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) { + error!(e; "failed to un_subscribe, req: {:?}", req); + } + } + } + } +} diff --git a/src/meta-srv/src/pubsub/subscribe_manager.rs b/src/meta-srv/src/pubsub/subscribe_manager.rs new file mode 100644 index 000000000000..743562e8aaa2 --- /dev/null +++ b/src/meta-srv/src/pubsub/subscribe_manager.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::info; +use dashmap::DashMap; +use tokio::sync::mpsc::Sender; + +use crate::error::Result; +use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport}; + +pub trait SubscribeQuery: Send + Sync { + fn subscribers_by_topic(&self, topic: &Topic) -> Vec>; +} + +pub trait SubscribeManager: SubscribeQuery { + fn subscribe(&self, req: AddSubRequest) -> Result<()>; + + fn un_subscribe(&self, req: UnSubRequest) -> Result<()>; + + fn un_subscribe_all(&self) -> Result<()>; +} + +pub type SubscribeManagerRef = Arc>>; + +pub struct AddSubRequest { + pub topic_list: Vec, + pub subscriber: Subscriber, +} + +#[derive(Debug, Clone)] +pub struct UnSubRequest { + pub subscriber_id: u32, +} +pub struct DefaultSubscribeManager { + topic2sub: DashMap>>>, +} + +impl Default for DefaultSubscribeManager { + fn default() -> Self { + Self { + topic2sub: DashMap::new(), + } + } +} + +impl SubscribeQuery for DefaultSubscribeManager +where + T: Transport, +{ + fn subscribers_by_topic(&self, topic: &Topic) -> Vec> { + self.topic2sub + .get(topic) + .map(|list_ref| list_ref.clone()) + .unwrap_or_else(Vec::new) + } +} + +impl SubscribeManager for DefaultSubscribeManager +where + T: Transport, +{ + fn subscribe(&self, req: AddSubRequest) -> Result<()> { + let AddSubRequest { + topic_list, + subscriber, + } = req; + + info!( + "Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}", + subscriber.id(), + subscriber.name(), + topic_list + ); + + let subscriber = Arc::new(subscriber); + + for topic in topic_list { + let mut entry = self.topic2sub.entry(topic).or_insert_with(Vec::new); + entry.push(subscriber.clone()); + } + + Ok(()) + } + + fn un_subscribe(&self, req: UnSubRequest) -> Result<()> { + let UnSubRequest { subscriber_id } = req; + + info!("Add a un_subscription, subscriber_id: {}", subscriber_id); + + for mut sub_list in self.topic2sub.iter_mut() { + sub_list.retain(|subscriber| subscriber.id() != subscriber_id) + } + + Ok(()) + } + + fn un_subscribe_all(&self) -> Result<()> { + self.topic2sub.clear(); + + Ok(()) + } +} diff --git a/src/meta-srv/src/pubsub/subscriber.rs b/src/meta-srv/src/pubsub/subscriber.rs new file mode 100644 index 000000000000..6b4eaa4357fa --- /dev/null +++ b/src/meta-srv/src/pubsub/subscriber.rs @@ -0,0 +1,75 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use snafu::ResultExt; +use tokio::sync::mpsc::Sender; + +use crate::error::{self, Result}; +use crate::pubsub::Message; + +#[derive(Debug)] +pub struct Subscriber { + /// Subscriber's id, globally unique, assigned by leader meta. + id: u32, + /// Subscriber's name, passed in by subscriber. + name: String, + /// Transport channel from meta to subscriber. + transporter: T, +} + +pub type SubscriberRef = Arc>; + +impl Subscriber { + pub fn new(id: u32, name: impl Into, transporter: T) -> Self { + let name = name.into(); + + Self { + id, + name, + transporter, + } + } + + pub fn id(&self) -> u32 { + self.id + } + + pub fn name(&self) -> &str { + &self.name + } +} + +impl Subscriber +where + T: Transport, +{ + pub async fn transport_msg(&self, message: Message) -> Result<()> { + self.transporter.transport_msg(message).await + } +} + +/// This trait defines how messages are delivered from meta to the subscriber. +#[async_trait::async_trait] +pub trait Transport: Send + Sync { + async fn transport_msg(&self, msg: Message) -> Result<()>; +} + +#[async_trait::async_trait] +impl Transport for Sender { + async fn transport_msg(&self, msg: Message) -> Result<()> { + self.send(msg).await.context(error::PublishMessageSnafu) + } +} diff --git a/src/meta-srv/src/pubsub/tests.rs b/src/meta-srv/src/pubsub/tests.rs new file mode 100644 index 000000000000..41f1e3e95d89 --- /dev/null +++ b/src/meta-srv/src/pubsub/tests.rs @@ -0,0 +1,185 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::HeartbeatRequest; +use tokio::sync::mpsc::{Receiver, Sender}; + +use super::DefaultSubscribeManager; +use crate::pubsub::{ + AddSubRequest, DefaultPublish, Message, Publish, SubscribeManager, SubscribeQuery, Subscriber, + Topic, UnSubRequest, +}; + +#[tokio::test] +async fn test_pubsub() { + let manager = Arc::new(DefaultSubscribeManager::default()); + + let (subscriber1, mut rx1) = mock_subscriber(1, "tidigong"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber1, + }; + manager.subscribe(req).unwrap(); + + let (subscriber2, mut rx2) = mock_subscriber(2, "gcrm"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber2, + }; + manager.subscribe(req).unwrap(); + + let manager_clone = manager.clone(); + let message_number: usize = 5; + tokio::spawn(async move { + let publisher: DefaultPublish>, Sender> = + DefaultPublish::new(manager_clone); + for _ in 0..message_number { + publisher.send_msg(mock_message()).await; + } + }); + + for _ in 0..message_number { + let msg = rx1.recv().await.unwrap(); + check_message(msg); + let msg = rx2.recv().await.unwrap(); + check_message(msg); + } + + manager + .un_subscribe(UnSubRequest { subscriber_id: 1 }) + .unwrap(); + let may_msg = rx1.recv().await; + assert!(may_msg.is_none()); + + manager.un_subscribe_all().unwrap(); + let may_msg = rx2.recv().await; + assert!(may_msg.is_none()); +} + +#[tokio::test] +async fn test_subscriber_disconnect() { + let manager = Arc::new(DefaultSubscribeManager::default()); + + let (subscriber1, rx1) = mock_subscriber(1, "tidigong"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber1, + }; + manager.subscribe(req).unwrap(); + + let (subscriber2, rx2) = mock_subscriber(2, "gcrm"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber2, + }; + manager.subscribe(req).unwrap(); + + let manager_clone = manager.clone(); + let message_number: usize = 5; + let join = tokio::spawn(async move { + let publisher: DefaultPublish>, Sender> = + DefaultPublish::new(manager_clone); + for _ in 0..message_number { + publisher.send_msg(mock_message()).await; + } + }); + + // Simulate subscriber disconnection. + std::mem::drop(rx1); + std::mem::drop(rx2); + + join.await.unwrap(); + + let subscriber_list = manager.subscribers_by_topic(&Topic::Heartbeat); + assert!(subscriber_list.is_empty()); +} + +#[test] +fn test_message() { + let msg = Message::Heartbeat(Box::default()); + assert_eq!(Topic::Heartbeat, msg.topic()); +} + +#[test] +fn test_sub_manager() { + let manager = DefaultSubscribeManager::default(); + + let subscriber = mock_subscriber(1, "tidigong").0; + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber, + }; + manager.subscribe(req).unwrap(); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); + assert_eq!(1, ret.len()); + + let subscriber = mock_subscriber(2, "gcrm").0; + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber, + }; + manager.subscribe(req).unwrap(); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); + assert_eq!(2, ret.len()); + + let req = UnSubRequest { subscriber_id: 1 }; + manager.un_subscribe(req).unwrap(); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); + assert_eq!(1, ret.len()); + + let req = UnSubRequest { subscriber_id: 2 }; + manager.un_subscribe(req).unwrap(); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); + assert_eq!(0, ret.len()); +} + +#[tokio::test] +async fn test_subscriber() { + let (subscriber, mut rx) = mock_subscriber(1, "tudigong"); + assert_eq!(1, subscriber.id()); + assert_eq!("tudigong", subscriber.name()); + + subscriber.transport_msg(mock_message()).await.unwrap(); + + let may_msg = rx.recv().await; + assert!(may_msg.is_some()); + match may_msg.unwrap() { + Message::Heartbeat(hb) => { + assert_eq!(123, hb.duration_since_epoch); + } + } +} + +fn mock_subscriber(id: u32, name: &str) -> (Subscriber>, Receiver) { + let (tx, rx) = tokio::sync::mpsc::channel(1024); + let sub = Subscriber::new(id, name, tx); + (sub, rx) +} + +fn mock_message() -> Message { + Message::Heartbeat(Box::new(HeartbeatRequest { + duration_since_epoch: 123, + ..Default::default() + })) +} + +fn check_message(message: Message) { + match message { + Message::Heartbeat(hb) => { + assert_eq!(123, hb.duration_since_epoch); + } + } +}