Skip to content

Commit

Permalink
feat: impl pubsub in metasrv (#2045)
Browse files Browse the repository at this point in the history
* feat: impl pubsub

* add test_subscriber_disconnect unit test

* chore: cr

* cr

* cr
  • Loading branch information
fengys1996 authored Aug 3, 2023
1 parent fdd4929 commit dda9225
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure-test = { path = "../common/procedure-test" }
session = { path = "../session", features = ["testing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::Code;

use crate::pubsub::Message;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -461,6 +463,12 @@ pub enum Error {

#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest { err_msg: String, location: Location },

#[snafu(display("Failed to publish message: {:?}", source))]
PublishMessage {
source: SendError<Message>,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -506,6 +514,7 @@ impl ErrorExt for Error {
| Error::UpdateTableMetadata { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::ConvertGrpcExpr { .. }
| Error::PublishMessage { .. }
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub(crate) mod region_lease_handler;
mod response_header_handler;

Expand Down
50 changes: 50 additions & 0 deletions src/meta-srv/src/handler/publish_heartbeat_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};

pub struct PublishHeartbeatHandler {
publish: PublishRef,
}

impl PublishHeartbeatHandler {
pub fn new(publish: PublishRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publish }
}
}

#[async_trait]
impl HeartbeatHandler for PublishHeartbeatHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
_: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;

Ok(())
}
}
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod selector;
mod sequence;
pub mod service;
Expand Down
18 changes: 18 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -177,6 +178,7 @@ pub struct MetaSrv {
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}

impl MetaSrv {
Expand All @@ -196,6 +198,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager().cloned();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemerty_task.clone();
let _handle = common_runtime::spawn_bg(async move {
Expand All @@ -219,6 +222,12 @@ impl MetaSrv {
});
}
LeaderChangeMessage::StepDown(leader) => {
if let Some(sub_manager) = subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
error!("Leader :{:?} step down", leader);
let _ = task_handler.stop().await.map_err(|e| {
debug!(
Expand Down Expand Up @@ -329,6 +338,14 @@ impl MetaSrv {
&self.table_metadata_manager
}

pub fn publish(&self) -> Option<&PublishRef> {
self.pubsub.as_ref().map(|suite| &suite.0)
}

pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.pubsub.as_ref().map(|suite| &suite.1)
}

#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().server_addr.clone();
Expand All @@ -339,6 +356,7 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));

Context {
server_addr,
in_memory,
Expand Down
16 changes: 16 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
Expand All @@ -40,6 +41,7 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
Expand All @@ -60,6 +62,7 @@ pub struct MetaSrvBuilder {
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}

impl MetaSrvBuilder {
Expand All @@ -75,6 +78,7 @@ impl MetaSrvBuilder {
lock: None,
metadata_service: None,
datanode_clients: None,
pubsub: None,
}
}

Expand Down Expand Up @@ -128,6 +132,11 @@ impl MetaSrvBuilder {
self
}

pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self {
self.pubsub = Some((publish, subscribe_manager));
self
}

pub async fn build(self) -> Result<MetaSrv> {
let started = Arc::new(AtomicBool::new(false));

Expand All @@ -142,6 +151,7 @@ impl MetaSrvBuilder {
lock,
metadata_service,
datanode_clients,
pubsub,
} = self;

let options = options.unwrap_or_default();
Expand Down Expand Up @@ -215,6 +225,11 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
if let Some((publish, _)) = pubsub.as_ref() {
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
}
group
}
};
Expand All @@ -237,6 +252,7 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
pubsub,
})
}
}
Expand Down
47 changes: 47 additions & 0 deletions src/meta-srv/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::HeartbeatRequest;

mod publish;
mod subscribe_manager;
mod subscriber;
#[cfg(test)]
mod tests;

pub use publish::{DefaultPublish, Publish, PublishRef};
pub use subscribe_manager::{
AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery,
UnSubRequest,
};
pub use subscriber::{Subscriber, SubscriberRef, Transport};

/// Subscribed topic type, determined by the ability of meta.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Topic {
Heartbeat,
}

#[derive(Clone, Debug)]
pub enum Message {
Heartbeat(Box<HeartbeatRequest>),
}

impl Message {
pub fn topic(&self) -> Topic {
match self {
Message::Heartbeat(_) => Topic::Heartbeat,
}
}
}
72 changes: 72 additions & 0 deletions src/meta-srv/src/pubsub/publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use common_telemetry::error;

use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};

/// This trait provides a `send_msg` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
}

pub type PublishRef = Arc<dyn Publish>;

/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
_transport: PhantomData<T>,
}

impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
_transport: PhantomData,
}
}
}

#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
where
M: SubscribeManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
.subscribers_by_topic(&message.topic());

for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
};

if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
error!(e; "failed to un_subscribe, req: {:?}", req);
}
}
}
}
}
Loading

0 comments on commit dda9225

Please sign in to comment.