diff --git a/Cargo.lock b/Cargo.lock index 242fb17f4565..284dcaa4c10e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1245,6 +1245,7 @@ dependencies = [ "catalog", "chrono", "common-catalog", + "common-config", "common-error", "common-macro", "common-meta", @@ -1259,6 +1260,7 @@ dependencies = [ "datatypes", "futures", "futures-util", + "humantime", "itertools 0.10.5", "lazy_static", "log-store", @@ -2861,6 +2863,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "common-version", "common-wal", "dashmap", "datafusion", @@ -3584,6 +3587,8 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-time", + "common-version", "datanode", "futures", "humantime-serde", @@ -3886,7 +3891,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b5412f72257c18410fdccbb893fa5d245b846141#b5412f72257c18410fdccbb893fa5d245b846141" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f699e240f7a6c83f139dabac8669714f08513120#f699e240f7a6c83f139dabac8669714f08513120" dependencies = [ "prost 0.12.4", "serde", @@ -9141,6 +9146,7 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-config", "common-error", "common-grpc", "common-macro", diff --git a/Cargo.toml b/Cargo.toml index c67aad930d01..28c1c4973f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f699e240f7a6c83f139dabac8669714f08513120" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index ff57e0cb0add..ddda28ba8864 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -17,6 +17,7 @@ arrow-schema.workspace = true async-stream.workspace = true async-trait = "0.1" common-catalog.workspace = true +common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-meta.workspace = true @@ -30,6 +31,7 @@ datafusion.workspace = true datatypes.workspace = true futures = "0.3" futures-util.workspace = true +humantime.workspace = true itertools.workspace = true lazy_static.workspace = true meta-client.workspace = true diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 8391dab045c8..20cac754b9f0 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -49,6 +49,12 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to list nodes in cluster: {source}"))] + ListNodes { + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to re-compile script due to internal error"))] CompileScriptInternal { location: Location, @@ -294,6 +300,7 @@ impl ErrorExt for Error { } Error::ListCatalogs { source, .. } + | Error::ListNodes { source, .. } | Error::ListSchemas { source, .. } | Error::ListTables { source, .. } => source.status_code(), diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 8d488212fc45..5a812ff6c1ea 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cluster_info; pub mod columns; pub mod key_column_usage; mod memory_table; @@ -23,6 +24,7 @@ pub mod schemata; mod table_constraints; mod table_names; pub mod tables; +pub(crate) mod utils; use std::collections::HashMap; use std::sync::{Arc, Weak}; @@ -47,6 +49,7 @@ pub use table_names::*; use self::columns::InformationSchemaColumns; use crate::error::Result; +use crate::information_schema::cluster_info::InformationSchemaClusterInfo; use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; use crate::information_schema::memory_table::{get_schema_columns, MemoryTable}; use crate::information_schema::partitions::InformationSchemaPartitions; @@ -150,6 +153,7 @@ impl InformationSchemaProvider { fn build_tables(&mut self) { let mut tables = HashMap::new(); + // SECURITY NOTE: // Carefully consider the tables that may expose sensitive cluster configurations, // authentication details, and other critical information. // Only put these tables under `greptime` catalog to prevent info leak. @@ -166,6 +170,10 @@ impl InformationSchemaProvider { REGION_PEERS.to_string(), self.build_table(REGION_PEERS).unwrap(), ); + tables.insert( + CLUSTER_INFO.to_string(), + self.build_table(CLUSTER_INFO).unwrap(), + ); } tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); @@ -251,6 +259,9 @@ impl InformationSchemaProvider { self.catalog_name.clone(), self.catalog_manager.clone(), )) as _), + CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new( + self.catalog_manager.clone(), + )) as _), _ => None, } } diff --git a/src/catalog/src/information_schema/cluster_info.rs b/src/catalog/src/information_schema/cluster_info.rs new file mode 100644 index 000000000000..0f01852bb541 --- /dev/null +++ b/src/catalog/src/information_schema/cluster_info.rs @@ -0,0 +1,317 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; +use std::time::Duration; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID; +use common_config::Mode; +use common_error::ext::BoxedError; +use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus}; +use common_meta::peer::Peer; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::logging::warn; +use common_time::timestamp::Timestamp; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::timestamp::TimestampMillisecond; +use datatypes::value::Value; +use datatypes::vectors::{ + Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder, +}; +use snafu::ResultExt; +use store_api::storage::{ScanRequest, TableId}; + +use super::CLUSTER_INFO; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result}; +use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::CatalogManager; + +const PEER_ID: &str = "peer_id"; +const PEER_TYPE: &str = "peer_type"; +const PEER_ADDR: &str = "peer_addr"; +const VERSION: &str = "version"; +const GIT_COMMIT: &str = "git_commit"; +const START_TIME: &str = "start_time"; +const UPTIME: &str = "uptime"; +const ACTIVE_TIME: &str = "active_time"; + +const INIT_CAPACITY: usize = 42; + +/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster. +/// +/// - `peer_id`: the peer server id. +/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc. +/// - `peer_addr`: the peer gRPC address. +/// - `version`: the build package version of the peer. +/// - `git_commit`: the build git commit hash of the peer. +/// - `start_time`: the starting time of the peer. +/// - `uptime`: the uptime of the peer. +/// - `active_time`: the time since the last activity of the peer. +/// +pub(super) struct InformationSchemaClusterInfo { + schema: SchemaRef, + catalog_manager: Weak, + start_time_ms: u64, +} + +impl InformationSchemaClusterInfo { + pub(super) fn new(catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_manager, + start_time_ms: common_time::util::current_time_millis() as u64, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false), + ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + START_TIME, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true), + ])) + } + + fn builder(&self) -> InformationSchemaClusterInfoBuilder { + InformationSchemaClusterInfoBuilder::new( + self.schema.clone(), + self.catalog_manager.clone(), + self.start_time_ms, + ) + } +} + +impl InformationTable for InformationSchemaClusterInfo { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID + } + + fn table_name(&self) -> &'static str { + CLUSTER_INFO + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_cluster_info(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaClusterInfoBuilder { + schema: SchemaRef, + start_time_ms: u64, + catalog_manager: Weak, + + peer_ids: Int64VectorBuilder, + peer_types: StringVectorBuilder, + peer_addrs: StringVectorBuilder, + versions: StringVectorBuilder, + git_commits: StringVectorBuilder, + start_times: TimestampMillisecondVectorBuilder, + uptimes: StringVectorBuilder, + active_times: StringVectorBuilder, +} + +impl InformationSchemaClusterInfoBuilder { + fn new( + schema: SchemaRef, + catalog_manager: Weak, + start_time_ms: u64, + ) -> Self { + Self { + schema, + catalog_manager, + peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY), + versions: StringVectorBuilder::with_capacity(INIT_CAPACITY), + git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY), + start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY), + uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY), + active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY), + start_time_ms, + } + } + + /// Construct the `information_schema.cluster_info` virtual table + async fn make_cluster_info(&mut self, request: Option) -> Result { + let predicates = Predicates::from_scan_request(&request); + let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); + + match mode { + Mode::Standalone => { + let build_info = common_version::build_info(); + + self.add_node_info( + &predicates, + NodeInfo { + // For the standalone: + // - id always 0 + // - empty string for peer_addr + peer: Peer { + id: 0, + addr: "".to_string(), + }, + last_activity_ts: -1, + status: NodeStatus::Standalone, + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + // Use `self.start_time_ms` instead. + // It's not precise but enough. + start_time_ms: self.start_time_ms, + }, + ); + } + Mode::Distributed => { + if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { + let node_infos = meta_client + .list_nodes(None) + .await + .map_err(BoxedError::new) + .context(ListNodesSnafu)?; + + for node_info in node_infos { + self.add_node_info(&predicates, node_info); + } + } else { + warn!("Could not find meta client in distributed mode."); + } + } + } + + self.finish() + } + + fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) { + let peer_type = node_info.status.role_name(); + + let row = [ + (PEER_ID, &Value::from(node_info.peer.id)), + (PEER_TYPE, &Value::from(peer_type)), + (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())), + (VERSION, &Value::from(node_info.version.as_str())), + (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())), + ]; + + if !predicates.eval(&row) { + return; + } + + if peer_type == "FRONTEND" { + // Always set peer_id to be -1 for frontends + self.peer_ids.push(Some(-1)); + } else { + self.peer_ids.push(Some(node_info.peer.id as i64)); + } + + self.peer_types.push(Some(peer_type)); + self.peer_addrs.push(Some(&node_info.peer.addr)); + self.versions.push(Some(&node_info.version)); + self.git_commits.push(Some(&node_info.git_commit)); + if node_info.start_time_ms > 0 { + self.start_times + .push(Some(TimestampMillisecond(Timestamp::new_millisecond( + node_info.start_time_ms as i64, + )))); + self.uptimes.push(Some( + Self::format_duration_since(node_info.start_time_ms).as_str(), + )); + } else { + self.start_times.push(None); + self.uptimes.push(None); + } + + if node_info.last_activity_ts > 0 { + self.active_times.push(Some( + Self::format_duration_since(node_info.last_activity_ts as u64).as_str(), + )); + } else { + self.active_times.push(None); + } + } + + fn format_duration_since(ts: u64) -> String { + let now = common_time::util::current_time_millis() as u64; + let duration_since = now - ts; + humantime::format_duration(Duration::from_millis(duration_since)).to_string() + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.peer_ids.finish()), + Arc::new(self.peer_types.finish()), + Arc::new(self.peer_addrs.finish()), + Arc::new(self.versions.finish()), + Arc::new(self.git_commits.finish()), + Arc::new(self.start_times.finish()), + Arc::new(self.uptimes.finish()), + Arc::new(self.active_times.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaClusterInfo { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_cluster_info(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/information_schema/region_peers.rs index 9a436ab7f7b7..004941a17cb5 100644 --- a/src/catalog/src/information_schema/region_peers.rs +++ b/src/catalog/src/information_schema/region_peers.rs @@ -55,7 +55,7 @@ const INIT_CAPACITY: usize = 42; /// /// - `region_id`: the region id /// - `peer_id`: the region storage datanode peer id -/// - `peer_addr`: the region storage datanode peer address +/// - `peer_addr`: the region storage datanode gRPC peer address /// - `is_leader`: whether the peer is the leader /// - `status`: the region status, `ALIVE` or `DOWNGRADED`. /// - `down_seconds`: the duration of being offline, in seconds. diff --git a/src/catalog/src/information_schema/runtime_metrics.rs b/src/catalog/src/information_schema/runtime_metrics.rs index 52233db39111..98aef10b5099 100644 --- a/src/catalog/src/information_schema/runtime_metrics.rs +++ b/src/catalog/src/information_schema/runtime_metrics.rs @@ -28,8 +28,8 @@ use datatypes::prelude::{ConcreteDataType, MutableVector}; use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{ - ConstantVector, Float64VectorBuilder, StringVector, StringVectorBuilder, - TimestampMillisecondVector, VectorRef, + ConstantVector, Float64VectorBuilder, StringVectorBuilder, TimestampMillisecondVector, + VectorRef, }; use itertools::Itertools; use snafu::ResultExt; @@ -45,8 +45,8 @@ pub(super) struct InformationSchemaMetrics { const METRIC_NAME: &str = "metric_name"; const METRIC_VALUE: &str = "value"; const METRIC_LABELS: &str = "labels"; -const NODE: &str = "node"; -const NODE_TYPE: &str = "node_type"; +const PEER_ADDR: &str = "peer_addr"; +const PEER_TYPE: &str = "peer_type"; const TIMESTAMP: &str = "timestamp"; /// The `information_schema.runtime_metrics` virtual table. @@ -63,8 +63,8 @@ impl InformationSchemaMetrics { ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false), ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true), - ColumnSchema::new(NODE, ConcreteDataType::string_datatype(), false), - ColumnSchema::new(NODE_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false), ColumnSchema::new( TIMESTAMP, ConcreteDataType::timestamp_millisecond_datatype(), @@ -104,6 +104,7 @@ impl InformationTable for InformationSchemaMetrics { .map_err(Into::into) }), )); + Ok(Box::pin( RecordBatchStreamAdapter::try_new(stream) .map_err(BoxedError::new) @@ -118,6 +119,8 @@ struct InformationSchemaMetricsBuilder { metric_names: StringVectorBuilder, metric_values: Float64VectorBuilder, metric_labels: StringVectorBuilder, + peer_addrs: StringVectorBuilder, + peer_types: StringVectorBuilder, } impl InformationSchemaMetricsBuilder { @@ -127,13 +130,24 @@ impl InformationSchemaMetricsBuilder { metric_names: StringVectorBuilder::with_capacity(42), metric_values: Float64VectorBuilder::with_capacity(42), metric_labels: StringVectorBuilder::with_capacity(42), + peer_addrs: StringVectorBuilder::with_capacity(42), + peer_types: StringVectorBuilder::with_capacity(42), } } - fn add_metric(&mut self, metric_name: &str, labels: String, metric_value: f64) { + fn add_metric( + &mut self, + metric_name: &str, + labels: String, + metric_value: f64, + peer: Option<&str>, + peer_type: &str, + ) { self.metric_names.push(Some(metric_name)); self.metric_values.push(Some(metric_value)); self.metric_labels.push(Some(&labels)); + self.peer_addrs.push(peer); + self.peer_types.push(Some(peer_type)); } async fn make_metrics(&mut self, _request: Option) -> Result { @@ -170,18 +184,19 @@ impl InformationSchemaMetricsBuilder { .join(", "), // Safety: always has a sample ts.samples[0].value, + // The peer column is always `None` for standalone + None, + "STANDALONE", ); } + // FIXME(dennis): fetching other peers metrics self.finish() } fn finish(&mut self) -> Result { let rows_num = self.metric_names.len(); - let unknowns = Arc::new(ConstantVector::new( - Arc::new(StringVector::from(vec!["unknown"])), - rows_num, - )); + let timestamps = Arc::new(ConstantVector::new( Arc::new(TimestampMillisecondVector::from_slice([ current_time_millis(), @@ -193,9 +208,8 @@ impl InformationSchemaMetricsBuilder { Arc::new(self.metric_names.finish()), Arc::new(self.metric_values.finish()), Arc::new(self.metric_labels.finish()), - // TODO(dennis): supports node and node_type for cluster - unknowns.clone(), - unknowns, + Arc::new(self.peer_addrs.finish()), + Arc::new(self.peer_types.finish()), timestamps, ]; @@ -243,8 +257,8 @@ mod tests { assert!(result_literal.contains(METRIC_NAME)); assert!(result_literal.contains(METRIC_VALUE)); assert!(result_literal.contains(METRIC_LABELS)); - assert!(result_literal.contains(NODE)); - assert!(result_literal.contains(NODE_TYPE)); + assert!(result_literal.contains(PEER_ADDR)); + assert!(result_literal.contains(PEER_TYPE)); assert!(result_literal.contains(TIMESTAMP)); } } diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/information_schema/table_names.rs index bed3aae60088..eb856e031834 100644 --- a/src/catalog/src/information_schema/table_names.rs +++ b/src/catalog/src/information_schema/table_names.rs @@ -40,5 +40,6 @@ pub const GLOBAL_STATUS: &str = "global_status"; pub const SESSION_STATUS: &str = "session_status"; pub const RUNTIME_METRICS: &str = "runtime_metrics"; pub const PARTITIONS: &str = "partitions"; -pub const REGION_PEERS: &str = "greptime_region_peers"; +pub const REGION_PEERS: &str = "region_peers"; pub const TABLE_CONSTRAINTS: &str = "table_constraints"; +pub const CLUSTER_INFO: &str = "cluster_info"; diff --git a/src/catalog/src/information_schema/utils.rs b/src/catalog/src/information_schema/utils.rs new file mode 100644 index 000000000000..e476e1fc9147 --- /dev/null +++ b/src/catalog/src/information_schema/utils.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use common_config::Mode; +use meta_client::client::MetaClient; +use snafu::OptionExt; + +use crate::error::{Result, UpgradeWeakCatalogManagerRefSnafu}; +use crate::kvbackend::KvBackendCatalogManager; +use crate::CatalogManager; + +/// Try to get the server running mode from `[CatalogManager]` weak reference. +pub fn running_mode(catalog_manager: &Weak) -> Result> { + let catalog_manager = catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + Ok(catalog_manager + .as_any() + .downcast_ref::() + .map(|manager| manager.running_mode()) + .copied()) +} + +/// Try to get the `[MetaClient]` from `[CatalogManager]` weak reference. +pub fn meta_client(catalog_manager: &Weak) -> Result>> { + let catalog_manager = catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + let meta_client = match catalog_manager + .as_any() + .downcast_ref::() + { + None => None, + Some(manager) => manager.meta_client(), + }; + + Ok(meta_client) +} diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index f08362fe70fe..305bc915102b 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -22,6 +22,7 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, }; use common_catalog::format_full_table_name; +use common_config::Mode; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator}; use common_meta::instruction::CacheIdent; @@ -33,6 +34,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; +use meta_client::client::MetaClient; use moka::future::{Cache as AsyncCache, CacheBuilder}; use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; @@ -56,6 +58,8 @@ use crate::CatalogManager; /// comes from `SystemCatalog`, which is static and read-only. #[derive(Clone)] pub struct KvBackendCatalogManager { + mode: Mode, + meta_client: Option>, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables @@ -101,6 +105,8 @@ const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60); impl KvBackendCatalogManager { pub async fn new( + mode: Mode, + meta_client: Option>, backend: KvBackendRef, multi_cache_invalidator: Arc, ) -> Arc { @@ -113,6 +119,8 @@ impl KvBackendCatalogManager { .await; Arc::new_cyclic(|me| Self { + mode, + meta_client, partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), system_catalog: SystemCatalog { @@ -127,6 +135,16 @@ impl KvBackendCatalogManager { }) } + /// Returns the server running mode. + pub fn running_mode(&self) -> &Mode { + &self.mode + } + + /// Returns the `[MetaClient]`. + pub fn meta_client(&self) -> Option> { + self.meta_client.clone() + } + pub fn partition_manager(&self) -> PartitionRuleManagerRef { self.partition_manager.clone() } diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 219d52bbb98b..edd174699cc7 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -21,6 +21,7 @@ use catalog::kvbackend::{ }; use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; +use common_config::Mode; use common_error::ext::ErrorExt; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_query::Output; @@ -256,8 +257,13 @@ async fn create_query_engine(meta_addr: &str) -> Result { let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ cached_meta_backend.clone(), ])); - let catalog_list = - KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await; + let catalog_list = KvBackendCatalogManager::new( + Mode::Distributed, + Some(meta_client.clone()), + cached_meta_backend.clone(), + multi_cache_invalidator, + ) + .await; let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 0ff35846256f..e6e13004af29 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -253,6 +253,8 @@ impl StartCommand { cached_meta_backend.clone(), ])); let catalog_manager = KvBackendCatalogManager::new( + opts.mode, + Some(meta_client.clone()), cached_meta_backend.clone(), multi_cache_invalidator.clone(), ) @@ -266,6 +268,7 @@ impl StartCommand { ]); let heartbeat_task = HeartbeatTask::new( + &opts, meta_client.clone(), opts.heartbeat.clone(), Arc::new(executor), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 212adf562698..93969c1c2d8b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -404,8 +404,13 @@ impl StartCommand { .context(StartFrontendSnafu)?; let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); - let catalog_manager = - KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; + let catalog_manager = KvBackendCatalogManager::new( + dn_opts.mode, + None, + kv_backend.clone(), + multi_cache_invalidator.clone(), + ) + .await; let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 30024b03fa41..c039d4dbf62b 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -91,6 +91,8 @@ pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28; pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29; /// id for information_schema.columns pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30; +/// id for information_schema.cluster_info +pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; /// ----- End of information_schema tables ----- pub const MITO_ENGINE: &str = "mito"; diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 59a68ba768bc..672edaee5191 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -21,6 +21,16 @@ pub fn metadata_store_dir(store_dir: &str) -> String { format!("{store_dir}/metadata") } +/// The Server running mode +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Copy)] +#[serde(rename_all = "lowercase")] +pub enum Mode { + // The single process mode. + Standalone, + // The distributed cluster mode. + Distributed, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct KvBackendConfig { diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index ba3aecea2678..c45afcc195c1 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -86,6 +86,12 @@ pub struct NodeInfo { pub last_activity_ts: i64, /// The status of the node. Different roles have different node status. pub status: NodeStatus, + // The node build version + pub version: String, + // The node build git commit hash + pub git_commit: String, + // The node star timestamp + pub start_time_ms: u64, } #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] @@ -100,6 +106,19 @@ pub enum NodeStatus { Datanode(DatanodeStatus), Frontend(FrontendStatus), Metasrv(MetasrvStatus), + Standalone, +} + +impl NodeStatus { + // Get the role name of the node status + pub fn role_name(&self) -> &str { + match self { + NodeStatus::Datanode(_) => "DATANODE", + NodeStatus::Frontend(_) => "FRONTEND", + NodeStatus::Metasrv(_) => "METASRV", + NodeStatus::Standalone => "STANDALONE", + } + } } /// The status of a datanode. @@ -271,6 +290,9 @@ mod tests { leader_regions: 3, follower_regions: 4, }), + version: "".to_string(), + git_commit: "".to_string(), + start_time_ms: 1, }; let node_info_bytes: Vec = node_info.try_into().unwrap(); @@ -287,6 +309,8 @@ mod tests { leader_regions: 3, follower_regions: 4, }), + start_time_ms: 1, + .. } ); } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 4df93d393cf2..6a34918e24b9 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -30,6 +30,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-version.workspace = true common-wal.workspace = true dashmap.workspace = true datafusion.workspace = true diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 28253f00e59c..01ebacec6508 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionRole, RegionStat, Role}; +use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -43,6 +43,7 @@ use crate::region_server::RegionServer; pub(crate) mod handler; pub(crate) mod task_tracker; +/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. pub struct HeartbeatTask { node_id: u64, node_epoch: u64, @@ -246,6 +247,7 @@ impl HeartbeatTask { } } _ = &mut sleep => { + let build_info = common_version::build_info(); let region_stats = Self::load_region_stats(®ion_server_clone).await; let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; @@ -254,6 +256,12 @@ impl HeartbeatTask { region_stats, duration_since_epoch, node_epoch, + info: Some(NodeInfo { + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + // The start timestamp is the same as node_epoch currently. + start_time_ms: node_epoch, + }), ..Default::default() }; sleep.as_mut().reset(now + Duration::from_millis(interval)); diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 0d4e666f8c77..512531565d78 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -33,6 +33,8 @@ common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +common-time.workspace = true +common-version.workspace = true datanode.workspace = true humantime-serde.workspace = true lazy_static.workspace = true diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index ffe52eece82b..d2c343150132 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer}; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -30,28 +30,35 @@ use tokio::time::{Duration, Instant}; use crate::error; use crate::error::Result; +use crate::frontend::FrontendOptions; pub mod handler; +/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. #[derive(Clone)] pub struct HeartbeatTask { + server_addr: String, meta_client: Arc, report_interval: u64, retry_interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + start_time_ms: u64, } impl HeartbeatTask { pub fn new( + opts: &FrontendOptions, meta_client: Arc, heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { HeartbeatTask { + server_addr: opts.grpc.addr.clone(), meta_client, report_interval: heartbeat_opts.interval.as_millis() as u64, retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, resp_handler_executor, + start_time_ms: common_time::util::current_time_millis() as u64, } } @@ -102,12 +109,28 @@ impl HeartbeatTask { }); } + fn build_node_info(start_time_ms: u64) -> NodeInfo { + let build_info = common_version::build_info(); + + NodeInfo { + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + start_time_ms, + } + } + fn start_heartbeat_report( &self, req_sender: HeartbeatSender, mut outgoing_rx: Receiver, ) { let report_interval = self.report_interval; + let start_time_ms = self.start_time_ms; + let self_peer = Some(Peer { + // The peer id doesn't make sense for frontend, so we just set it 0. + id: 0, + addr: self.server_addr.clone(), + }); common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); @@ -121,6 +144,8 @@ impl HeartbeatTask { Ok(message) => { let req = HeartbeatRequest { mailbox_message: Some(message), + peer: self_peer.clone(), + info: Some(Self::build_node_info(start_time_ms)), ..Default::default() }; Some(req) @@ -138,6 +163,8 @@ impl HeartbeatTask { _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); let req = HeartbeatRequest { + peer: self_peer.clone(), + info: Some(Self::build_node_info(start_time_ms)), ..Default::default() }; Some(req) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index cb8341b3a88a..685c12ee8a95 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -264,6 +264,12 @@ impl ClusterInfo for MetaClient { let mut nodes = if get_metasrv_nodes { let last_activity_ts = -1; // Metasrv does not provide this information. + + // TODO(dennis): Get Metasrv node info + let git_commit = "unknown"; + let version = "unknown"; + let start_time_ms = 0; + let (leader, followers) = cluster_client.get_metasrv_peers().await?; followers .into_iter() @@ -271,11 +277,17 @@ impl ClusterInfo for MetaClient { peer, last_activity_ts, status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }), + version: version.to_string(), + git_commit: git_commit.to_string(), + start_time_ms, }) .chain(leader.into_iter().map(|leader| NodeInfo { peer: leader, last_activity_ts, status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }), + version: version.to_string(), + git_commit: git_commit.to_string(), + start_time_ms, })) .collect::>() } else { diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index 9ece92177aae..56b4d44ce780 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -146,6 +146,7 @@ impl Inner { { let ask_leader = self.ask_leader()?; let mut times = 0; + let mut last_error = None; while times < self.max_retry { if let Some(leader) = &ask_leader.get_leader() { @@ -153,6 +154,7 @@ impl Inner { match body_fn(client).await { Ok(res) => { if util::is_not_leader(get_header(&res)) { + last_error = Some(format!("{leader} is not a leader")); warn!("Failed to {task} to {leader}, not a leader"); let leader = ask_leader.ask_leader().await?; info!("Cluster client updated to new leader addr: {leader}"); @@ -164,6 +166,7 @@ impl Inner { Err(status) => { // The leader may be unreachable. if util::is_unreachable(&status) { + last_error = Some(status.to_string()); warn!("Failed to {task} to {leader}, source: {status}"); let leader = ask_leader.ask_leader().await?; info!("Cluster client updated to new leader addr: {leader}"); @@ -180,7 +183,7 @@ impl Inner { } RetryTimesExceededSnafu { - msg: "Failed to {task}", + msg: format!("Failed to {task}, last error: {:?}", last_error), times: self.max_retry, } .fail() diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index 20cd5385a872..32049dbabdf0 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -162,6 +162,7 @@ impl Inner { { let ask_leader = self.ask_leader()?; let mut times = 0; + let mut last_error = None; while times < self.max_retry { if let Some(leader) = &ask_leader.get_leader() { @@ -169,6 +170,7 @@ impl Inner { match body_fn(client).await { Ok(res) => { if util::is_not_leader(get_header(&res)) { + last_error = Some(format!("{leader} is not a leader")); warn!("Failed to {task} to {leader}, not a leader"); let leader = ask_leader.ask_leader().await?; info!("DDL client updated to new leader addr: {leader}"); @@ -180,6 +182,7 @@ impl Inner { Err(status) => { // The leader may be unreachable. if util::is_unreachable(&status) { + last_error = Some(status.to_string()); warn!("Failed to {task} to {leader}, source: {status}"); let leader = ask_leader.ask_leader().await?; info!("Procedure client updated to new leader addr: {leader}"); @@ -196,7 +199,7 @@ impl Inner { } error::RetryTimesExceededSnafu { - msg: "Failed to {task}", + msg: format!("Failed to {task}, last error: {:?}", last_error), times: self.max_retry, } .fail() diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 48edc4504075..6fc3937e4e45 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, Role}; +use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role}; use common_meta::cluster; use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; use common_meta::peer::Peer; @@ -40,7 +40,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer)) = extract_base_info(req, Role::Frontend) else { + let Some((key, peer, info)) = extract_base_info(req, Role::Frontend) else { return Ok(HandleControl::Continue); }; @@ -48,6 +48,9 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { peer, last_activity_ts: common_time::util::current_time_millis(), status: NodeStatus::Frontend(FrontendStatus {}), + version: info.version, + git_commit: info.git_commit, + start_time_ms: info.start_time_ms, }; save_to_mem_store(key, value, ctx).await?; @@ -71,7 +74,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer)) = extract_base_info(req, Role::Datanode) else { + let Some((key, peer, info)) = extract_base_info(req, Role::Datanode) else { return Ok(HandleControl::Continue); }; @@ -95,6 +98,9 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { leader_regions, follower_regions, }), + version: info.version, + git_commit: info.git_commit, + start_time_ms: info.start_time_ms, }; save_to_mem_store(key, value, ctx).await?; @@ -103,14 +109,22 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { } } -fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey, Peer)> { - let HeartbeatRequest { header, peer, .. } = req; +fn extract_base_info( + req: &HeartbeatRequest, + role: Role, +) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> { + let HeartbeatRequest { + header, peer, info, .. + } = req; let Some(header) = &header else { return None; }; let Some(peer) = &peer else { return None; }; + let Some(info) = &info else { + return None; + }; Some(( NodeInfoKey { @@ -122,6 +136,7 @@ fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey, node_id: peer.id, }, Peer::from(peer.clone()), + info.clone(), )) } diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index 69056ffe1bb2..cf1d25b73410 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -128,6 +128,7 @@ impl cluster_server::Cluster for Metasrv { impl Metasrv { pub fn is_leader(&self) -> bool { - self.election().map(|x| x.is_leader()).unwrap_or(false) + // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader. + self.election().map(|x| x.is_leader()).unwrap_or(true) } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 0749dfca1059..27920758e719 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -30,6 +30,7 @@ catalog.workspace = true chrono.workspace = true common-base.workspace = true common-catalog.workspace = true +common-config.workspace = true common-error.workspace = true common-grpc.workspace = true common-macro.workspace = true diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index f738c1f4373a..ad3f566b2f04 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -19,7 +19,6 @@ use datatypes::schema::Schema; use query::plan::LogicalPlan; -use serde::{Deserialize, Serialize}; pub mod configurator; pub mod error; @@ -46,12 +45,7 @@ pub mod server; mod shutdown; pub mod tls; -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum Mode { - Standalone, - Distributed, -} +pub use common_config::Mode; /// Cached SQL and logical plan for database interfaces #[derive(Clone)] diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index f4c3756ead4d..cf8016a435c9 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -39,6 +39,7 @@ use common_test_util::temp_dir::create_temp_dir; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; +use frontend::frontend::FrontendOptions; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; @@ -357,6 +358,8 @@ impl GreptimeDbClusterBuilder { cached_meta_backend.clone(), ])); let catalog_manager = KvBackendCatalogManager::new( + Mode::Distributed, + Some(meta_client.clone()), cached_meta_backend.clone(), multi_cache_invalidator.clone(), ) @@ -370,6 +373,7 @@ impl GreptimeDbClusterBuilder { ]); let heartbeat_task = HeartbeatTask::new( + &FrontendOptions::default(), meta_client.clone(), HeartbeatOptions::default(), Arc::new(handlers_executor), diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2748291957bc..6b15b4fad999 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -131,8 +131,13 @@ impl GreptimeDbStandaloneBuilder { table_metadata_manager.init().await.unwrap(); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); - let catalog_manager = - KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; + let catalog_manager = KvBackendCatalogManager::new( + Mode::Standalone, + None, + kv_backend.clone(), + multi_cache_invalidator.clone(), + ) + .await; let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 1ceb249978a2..adceb0243c26 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -854,17 +854,21 @@ async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionD let OutputData::Stream(stream) = run_sql( &cluster.frontend, - &format!(r#"select b.peer_id as datanode_id, + &format!( + r#"select b.peer_id as datanode_id, a.greptime_partition_id as region_id - from information_schema.partitions a left join information_schema.greptime_region_peers b + from information_schema.partitions a left join information_schema.region_peers b on a.greptime_partition_id = b.region_id where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"# ), query_ctx.clone(), ) - .await.unwrap().data else { - unreachable!(); - }; + .await + .unwrap() + .data + else { + unreachable!(); + }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result new file mode 100644 index 000000000000..3d4158aaffef --- /dev/null +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -0,0 +1,78 @@ +USE INFORMATION_SCHEMA; + +Affected Rows: 0 + +DESC TABLE CLUSTER_INFO; + ++-------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++-------------+----------------------+-----+------+---------+---------------+ +| peer_id | Int64 | | NO | | FIELD | +| peer_type | String | | NO | | FIELD | +| peer_addr | String | | YES | | FIELD | +| version | String | | NO | | FIELD | +| git_commit | String | | NO | | FIELD | +| start_time | TimestampMillisecond | | YES | | FIELD | +| uptime | String | | YES | | FIELD | +| active_time | String | | YES | | FIELD | ++-------------+----------------------+-----+------+---------+---------------+ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO ORDER BY peer_type; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration|+++++++++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration|+++++++++ + +USE PUBLIC; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/information_schema/cluster_info.sql b/tests/cases/distributed/information_schema/cluster_info.sql new file mode 100644 index 000000000000..2ac906d05dbc --- /dev/null +++ b/tests/cases/distributed/information_schema/cluster_info.sql @@ -0,0 +1,50 @@ +USE INFORMATION_SCHEMA; + +DESC TABLE CLUSTER_INFO; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO ORDER BY peer_type; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE unknown UNKNOWN +-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type; + +USE PUBLIC; diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index 3d1c1359d81f..9c76a87df100 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -24,7 +24,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; +SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id; +---------------+---------+-----------+--------+ | region_id | peer_id | is_leader | status | @@ -128,7 +128,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; +SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id; +---------------+---------+-----------+--------+ | region_id | peer_id | is_leader | status | @@ -148,7 +148,6 @@ INSERT INTO my_table VALUES Affected Rows: 8 - SELECT * FROM my_table; +------+---+-------------------------+ diff --git a/tests/cases/standalone/common/partition.sql b/tests/cases/standalone/common/partition.sql index cc0c962743d7..3f75b293c332 100644 --- a/tests/cases/standalone/common/partition.sql +++ b/tests/cases/standalone/common/partition.sql @@ -14,7 +14,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; +SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id; INSERT INTO my_table VALUES (100, 'a', 1), @@ -54,7 +54,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; +SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id; INSERT INTO my_table VALUES (100, 'a', 1), @@ -65,7 +65,7 @@ INSERT INTO my_table VALUES (2100, 'f', 6), (2200, 'g', 7), (2400, 'h', 8); - + SELECT * FROM my_table; DROP TABLE my_table; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 61cd6c844317..e73d5b749def 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -20,6 +20,7 @@ show tables; | build_info | | character_sets | | check_constraints | +| cluster_info | | collation_character_set_applicability | | collations | | column_privileges | @@ -29,13 +30,13 @@ show tables; | events | | files | | global_status | -| greptime_region_peers | | key_column_usage | | optimizer_trace | | parameters | | partitions | | profiling | | referential_constraints | +| region_peers | | routines | | runtime_metrics | | schema_privileges | diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 8d0a8ca2d3df..cbe6e2d39f18 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -15,6 +15,7 @@ order by table_schema, table_name; | greptime | information_schema | build_info | LOCAL TEMPORARY | 8 | | | greptime | information_schema | character_sets | LOCAL TEMPORARY | 9 | | | greptime | information_schema | check_constraints | LOCAL TEMPORARY | 12 | | +| greptime | information_schema | cluster_info | LOCAL TEMPORARY | 31 | | | greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY | 11 | | | greptime | information_schema | collations | LOCAL TEMPORARY | 10 | | | greptime | information_schema | column_privileges | LOCAL TEMPORARY | 6 | | @@ -24,13 +25,13 @@ order by table_schema, table_name; | greptime | information_schema | events | LOCAL TEMPORARY | 13 | | | greptime | information_schema | files | LOCAL TEMPORARY | 14 | | | greptime | information_schema | global_status | LOCAL TEMPORARY | 25 | | -| greptime | information_schema | greptime_region_peers | LOCAL TEMPORARY | 29 | | | greptime | information_schema | key_column_usage | LOCAL TEMPORARY | 16 | | | greptime | information_schema | optimizer_trace | LOCAL TEMPORARY | 17 | | | greptime | information_schema | parameters | LOCAL TEMPORARY | 18 | | | greptime | information_schema | partitions | LOCAL TEMPORARY | 28 | | | greptime | information_schema | profiling | LOCAL TEMPORARY | 19 | | | greptime | information_schema | referential_constraints | LOCAL TEMPORARY | 20 | | +| greptime | information_schema | region_peers | LOCAL TEMPORARY | 29 | | | greptime | information_schema | routines | LOCAL TEMPORARY | 21 | | | greptime | information_schema | runtime_metrics | LOCAL TEMPORARY | 27 | | | greptime | information_schema | schema_privileges | LOCAL TEMPORARY | 22 | | @@ -61,6 +62,14 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | check_constraints | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | check_constraints | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | check_constraints | constraint_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | cluster_info | active_time | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | git_commit | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | cluster_info | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | peer_id | 1 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | +| greptime | information_schema | cluster_info | peer_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | cluster_info | start_time | 6 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | | +| greptime | information_schema | cluster_info | uptime | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | version | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collation_character_set_applicability | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collation_character_set_applicability | collation_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collations | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | @@ -174,12 +183,6 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | greptime_region_peers | down_seconds | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | -| greptime | information_schema | greptime_region_peers | is_leader | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | greptime_region_peers | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | greptime_region_peers | peer_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | greptime_region_peers | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | greptime_region_peers | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | key_column_usage | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | key_column_usage | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | @@ -268,6 +271,12 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | referential_constraints | unique_constraint_name | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | referential_constraints | unique_constraint_schema | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | referential_constraints | update_rule | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_peers | down_seconds | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | +| greptime | information_schema | region_peers | is_leader | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_peers | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_peers | peer_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_peers | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | region_peers | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | routines | character_maximum_length | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_octet_length | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_set_client | 29 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | @@ -301,8 +310,8 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | routines | sql_path | 22 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | runtime_metrics | labels | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | runtime_metrics | metric_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | runtime_metrics | node | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | runtime_metrics | node_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | runtime_metrics | peer_addr | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | runtime_metrics | peer_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | runtime_metrics | timestamp | 6 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | No | timestamp(3) | | | | greptime | information_schema | runtime_metrics | value | 2 | | | 22 | | | | | | | select,insert | | Float64 | double | FIELD | | No | double | | | | greptime | information_schema | schema_privileges | grantee | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | @@ -678,12 +687,12 @@ DESC TABLE RUNTIME_METRICS; | metric_name | String | | NO | | FIELD | | value | Float64 | | NO | | FIELD | | labels | String | | YES | | FIELD | -| node | String | | NO | | FIELD | -| node_type | String | | NO | | FIELD | +| peer_addr | String | | YES | | FIELD | +| peer_type | String | | NO | | FIELD | | timestamp | TimestampMillisecond | | NO | | FIELD | +-------------+----------------------+-----+------+---------+---------------+ -DESC TABLE GREPTIME_REGION_PEERS; +DESC TABLE REGION_PEERS; +--------------+--------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index d54c2c0ebd51..89db8514e5da 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -117,7 +117,7 @@ SELECT * FROM CHECK_CONSTRAINTS; DESC TABLE RUNTIME_METRICS; -DESC TABLE GREPTIME_REGION_PEERS; +DESC TABLE REGION_PEERS; USE INFORMATION_SCHEMA; diff --git a/tests/cases/standalone/information_schema/cluster_info.result b/tests/cases/standalone/information_schema/cluster_info.result new file mode 100644 index 000000000000..853946c7fa00 --- /dev/null +++ b/tests/cases/standalone/information_schema/cluster_info.result @@ -0,0 +1,63 @@ +USE INFORMATION_SCHEMA; + +Affected Rows: 0 + +DESC TABLE CLUSTER_INFO; + ++-------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++-------------+----------------------+-----+------+---------+---------------+ +| peer_id | Int64 | | NO | | FIELD | +| peer_type | String | | NO | | FIELD | +| peer_addr | String | | YES | | FIELD | +| version | String | | NO | | FIELD | +| git_commit | String | | NO | | FIELD | +| start_time | TimestampMillisecond | | YES | | FIELD | +| uptime | String | | YES | | FIELD | +| active_time | String | | YES | | FIELD | ++-------------+----------------------+-----+------+---------+---------------+ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE'; + ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++ + +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; + +++ +++ + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0; + +++++ + +SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 0; + +++ +++ + +USE PUBLIC; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/information_schema/cluster_info.sql b/tests/cases/standalone/information_schema/cluster_info.sql new file mode 100644 index 000000000000..58eca548a98a --- /dev/null +++ b/tests/cases/standalone/information_schema/cluster_info.sql @@ -0,0 +1,33 @@ +USE INFORMATION_SCHEMA; + +DESC TABLE CLUSTER_INFO; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE'; + +SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; + +-- SQLNESS REPLACE version node_version +-- SQLNESS REPLACE (\d\.\d\.\d) Version +-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash +-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time +-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE [\s\-]+ +SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0; + +SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 0; + +USE PUBLIC;