diff --git a/Cargo.lock b/Cargo.lock index b784ba6b9602..b1560a1d4fab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,8 +1673,6 @@ dependencies = [ "common-error", "common-telemetry", "datatypes", - "lazy_static", - "regex", "serde", "serde_json", "snafu", @@ -2661,7 +2659,6 @@ dependencies = [ "pin-project", "prost", "query", - "regex", "secrecy", "serde", "serde_json", @@ -4140,7 +4137,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=bec16e50c9322758111f73e42fb5d377c7235e05#bec16e50c9322758111f73e42fb5d377c7235e05" +source = "git+https://github.com/MichaelScofield/greptime-proto.git?rev=71c0002045f6cc8d6e42609bcc5ca6cea963baad#71c0002045f6cc8d6e42609bcc5ca6cea963baad" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index f6035e3887c8..26b38b5f9fd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "bec16e50c9322758111f73e42fb5d377c7235e05" } +greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "71c0002045f6cc8d6e42609bcc5ca6cea963baad" } itertools = "0.10" lazy_static = "1.4" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 399c1a0d96e5..9f3307e4057d 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -17,28 +17,23 @@ use std::sync::Arc; use async_trait::async_trait; use common_catalog::consts::MITO_ENGINE; -use common_meta::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, - TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; +use common_meta::helper::{CatalogKey, SchemaKey}; use common_meta::ident::TableIdent; +use common_meta::key::datanode_table::DatanodeTableValue; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; -use common_meta::rpc::store::{PutRequest, RangeRequest}; -use common_meta::rpc::KeyValue; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{error, info, warn}; use metrics::increment_gauge; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; -use table::engine::{EngineContext, TableReference}; +use table::engine::EngineContext; use table::requests::OpenTableRequest; use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - InvalidCatalogValueSnafu, OpenTableSnafu, ParallelOpenTableSnafu, Result, - TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - UnimplementedSnafu, + OpenTableSnafu, ParallelOpenTableSnafu, Result, TableEngineNotFoundSnafu, TableExistsSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnimplementedSnafu, }; use crate::local::MemoryCatalogManager; use crate::remote::region_alive_keeper::RegionAliveKeepers; @@ -77,85 +72,25 @@ impl RemoteCatalogManager { } } - async fn iter_remote_catalogs(&self) -> Result> { - let catalog_range_prefix = build_catalog_prefix(); - let req = RangeRequest::new().with_prefix(catalog_range_prefix.as_bytes()); - - let kvs = self - .backend - .range(req) + async fn initiate_catalogs(&self) -> Result<()> { + let tables = self + .table_metadata_manager + .datanode_table_manager() + .tables(self.node_id) .await - .context(TableMetadataManagerSnafu)? - .kvs; + .context(TableMetadataManagerSnafu)?; - let catalogs = kvs + let joins = tables .into_iter() - .filter_map(|kv| { - let catalog_key = String::from_utf8_lossy(kv.key()); - - match CatalogKey::parse(&catalog_key) { - Ok(x) => Some(x), - Err(e) => { - error!(e; "Ignore invalid catalog key {:?}", catalog_key); - None - } - } - }) - .collect(); - Ok(catalogs) - } - - /// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated. - async fn initiate_catalogs(&self) -> Result<()> { - let catalogs = self.iter_remote_catalogs().await?; - let mut joins = Vec::new(); - for CatalogKey { catalog_name } in catalogs { - info!("Fetch catalog from metasrv: {}", catalog_name); - - let node_id = self.node_id; - let backend = self.backend.clone(); - let engine_manager = self.engine_manager.clone(); - - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); - joins.push(self.initiate_schemas(node_id, backend, engine_manager, catalog_name)); - } - - futures::future::try_join_all(joins).await?; - - Ok(()) - } - - fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey { - SchemaKey { - catalog_name, - schema_name, - } - } - - /// Initiates all tables inside the catalog by fetching data from metasrv. - /// Return maximum table id in the schema. - async fn initiate_tables( - &self, - node_id: u64, - backend: KvBackendRef, - engine_manager: TableEngineManagerRef, - catalog_name: String, - schema_name: String, - ) -> Result<()> { - info!("initializing tables in {}.{}", catalog_name, schema_name); - let kvs = iter_remote_tables(node_id, &backend, &catalog_name, &schema_name).await?; - let joins = kvs - .into_iter() - .map(|(_, table_value)| { - let engine_manager = engine_manager.clone(); + .map(|datanode_table_value| { + let engine_manager = self.engine_manager.clone(); let memory_catalog_manager = self.memory_catalog_manager.clone(); let table_metadata_manager = self.table_metadata_manager.clone(); - let table_id = table_value.table_id(); common_runtime::spawn_bg(async move { + let table_id = datanode_table_value.table_id; if let Err(e) = open_and_register_table( - node_id, engine_manager, - &table_value, + datanode_table_value, memory_catalog_manager, table_metadata_manager, ) @@ -170,261 +105,44 @@ impl RemoteCatalogManager { }) .collect::>(); - futures::future::try_join_all(joins) + let _ = futures::future::try_join_all(joins) .await .context(ParallelOpenTableSnafu)?; Ok(()) } - /// Initiates all schemas inside the catalog by fetching data from metasrv. - /// Return maximum table id in the catalog. - async fn initiate_schemas( - &self, - node_id: u64, - backend: KvBackendRef, - engine_manager: TableEngineManagerRef, - catalog_name: String, - ) -> Result<()> { - let schemas = iter_remote_schemas(&backend, &catalog_name).await?; - - let mut joins = Vec::new(); - for SchemaKey { - catalog_name, - schema_name, - } in schemas - { - info!( - "Fetch schema from metasrv: {}.{}", - &catalog_name, &schema_name - ); - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0); - - let backend = backend.clone(); - let engine_manager = engine_manager.clone(); - - joins.push(self.initiate_tables( - node_id, - backend, - engine_manager, - catalog_name, - schema_name, - )); - } - - futures::future::try_join_all(joins).await?; - Ok(()) - } - - async fn register_table( - &self, - catalog_name: String, - schema_name: String, - table_name: String, - table: TableRef, - ) -> Result> { - let table_info = table.table_info(); - let table_version = table_info.ident.version; - let table_value = TableRegionalValue { - table_id: Some(table.table_info().ident.table_id), - version: table_version, - regions_ids: table.table_info().meta.region_numbers.clone(), - engine_name: Some(table_info.meta.engine.clone()), - }; - let table_key = self - .build_regional_table_key(catalog_name, schema_name, table_name) - .to_string(); - let req = PutRequest::new() - .with_key(table_key.as_bytes()) - .with_value(table_value.as_bytes().context(InvalidCatalogValueSnafu)?); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - debug!( - "Successfully set catalog table entry, key: {}, table value: {:?}", - table_key, table_value - ); - - // TODO(hl): retrieve prev table info using cas - Ok(None) - } - - async fn deregister_table( - &self, - catalog_name: String, - schema_name: String, - table_name: String, - ) -> Result> { - let table_key = self - .build_regional_table_key( - catalog_name.clone(), - schema_name.clone(), - table_name.clone(), - ) - .to_string(); - - let engine_opt = self - .backend - .get(table_key.as_bytes()) - .await - .context(TableMetadataManagerSnafu)? - .map(|KeyValue { key: _, value: v }| { - let TableRegionalValue { - table_id, - engine_name, - .. - } = TableRegionalValue::parse(String::from_utf8_lossy(&v)) - .context(InvalidCatalogValueSnafu)?; - Ok(engine_name.and_then(|name| table_id.map(|id| (name, id)))) - }) - .transpose()? - .flatten(); - - let Some((engine_name, table_id)) = engine_opt else { - warn!("Cannot find table id and engine name for {table_key}"); - return Ok(None); - }; - - self.backend - .delete(table_key.as_bytes(), false) - .await - .context(TableMetadataManagerSnafu)?; - debug!( - "Successfully deleted catalog table entry, key: {}", - table_key - ); - - // deregistering table does not necessarily mean dropping the table - let table = self - .engine_manager - .engine(&engine_name) - .context(TableEngineNotFoundSnafu { engine_name })? - .get_table(&EngineContext {}, table_id) - .with_context(|_| { - let reference = TableReference { - catalog: &catalog_name, - schema: &schema_name, - table: &table_name, - }; - OpenTableSnafu { - table_info: reference.to_string(), - } - })?; - Ok(table) - } - - fn build_regional_table_key( - &self, - catalog_name: String, - schema_name: String, - table_name: String, - ) -> TableRegionalKey { - TableRegionalKey { + fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey { + SchemaKey { catalog_name, schema_name, - table_name, - node_id: self.node_id, } } } -async fn iter_remote_schemas<'a>( - backend: &'a KvBackendRef, - catalog_name: &'a str, -) -> Result> { - let schema_prefix = build_schema_prefix(catalog_name); - let req = RangeRequest::new().with_prefix(schema_prefix.as_bytes()); - - let kvs = backend - .range(req) - .await - .context(TableMetadataManagerSnafu)? - .kvs; - - let schemas = kvs - .into_iter() - .filter_map(|kv| { - let schema_key = String::from_utf8_lossy(kv.key()); - match SchemaKey::parse(&schema_key) { - Ok(x) => Some(x), - Err(e) => { - warn!("Ignore invalid schema key {:?}: {e}", schema_key); - None - } - } - }) - .collect(); - Ok(schemas) -} - -/// Iterate over all table entries on metasrv -async fn iter_remote_tables<'a>( - node_id: u64, - backend: &'a KvBackendRef, - catalog_name: &'a str, - schema_name: &'a str, -) -> Result> { - let table_prefix = build_table_global_prefix(catalog_name, schema_name); - let req = RangeRequest::new().with_prefix(table_prefix.as_bytes()); - - let kvs = backend - .range(req) - .await - .context(TableMetadataManagerSnafu)? - .kvs; - - let mut tables = Vec::with_capacity(kvs.len()); - for kv in kvs { - let tgk = &String::from_utf8_lossy(kv.key()); - let Ok(table_key) = TableGlobalKey::parse(tgk) else { - warn!("Ignore invalid table global key {:?}", tgk); - continue; - }; - - let Ok(table_value) = TableGlobalValue::from_bytes(kv.value()) else { - warn!("Ignore invalid table global value {:?}", String::from_utf8_lossy(kv.value())); - continue; - }; - - info!("Found catalog table entry, key: {table_key}, value: {table_value:?}"); - - // metasrv has allocated region ids to current datanode - if table_value - .regions_id_map - .get(&node_id) - .map(|v| !v.is_empty()) - .unwrap_or(false) - { - tables.push((table_key, table_value)) - } - } - Ok(tables) -} - async fn open_and_register_table( - node_id: u64, engine_manager: TableEngineManagerRef, - table_value: &TableGlobalValue, + datanode_table_value: DatanodeTableValue, memory_catalog_manager: Arc, - _table_metadata_manager: TableMetadataManagerRef, + table_metadata_manager: TableMetadataManagerRef, ) -> Result<()> { let context = EngineContext {}; - let table_id = table_value.table_id(); - - let TableGlobalValue { - table_info, - regions_id_map, - .. - } = table_value; + let table_id = datanode_table_value.table_id; + let region_numbers = datanode_table_value.regions; + let table_info_value = table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(TableMetadataManagerSnafu)? + .context(TableNotFoundSnafu { + table_info: format!("table id: {table_id}"), + })?; + let table_info = &table_info_value.table_info; let catalog_name = table_info.catalog_name.clone(); let schema_name = table_info.schema_name.clone(); let table_name = table_info.name.clone(); - // unwrap safety: checked in yielding this table when `iter_remote_tables` - let region_numbers = regions_id_map.get(&node_id).unwrap(); - let request = OpenTableRequest { catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), @@ -511,17 +229,11 @@ impl CatalogManager for RemoteCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let catalog = request.catalog.clone(); - let schema = request.schema.clone(); - let table_name = request.table_name.clone(); let table = request.table.clone(); let registered = self.memory_catalog_manager.register_table_sync(request)?; if registered { - self.register_table(catalog, schema, table_name, table.clone()) - .await?; - let table_info = table.table_info(); let table_ident = TableIdent { catalog: table_info.catalog_name.clone(), @@ -544,13 +256,6 @@ impl CatalogManager for RemoteCatalogManager { .table(&request.catalog, &request.schema, &request.table_name) .await? else { return Ok(()) }; - self.deregister_table( - request.catalog.clone(), - request.schema.clone(), - request.table_name.clone(), - ) - .await?; - let table_info = table.table_info(); let table_ident = TableIdent { catalog: request.catalog.clone(), diff --git a/src/common/catalog/Cargo.toml b/src/common/catalog/Cargo.toml index 57e4acb68484..9aade9f4e0e7 100644 --- a/src/common/catalog/Cargo.toml +++ b/src/common/catalog/Cargo.toml @@ -9,8 +9,6 @@ async-trait = "0.1" common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } datatypes = { path = "../../datatypes" } -lazy_static = "1.4" -regex = "1.6" serde.workspace = true serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4c0aa34e1f51..6caed7775e64 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -59,8 +59,23 @@ pub enum Error { #[snafu(display("Invalid protobuf message, err: {}", err_msg))] InvalidProtoMsg { err_msg: String, location: Location }, - #[snafu(display("Concurrent modify regions placement: {err_msg}"))] - ConcurrentModifyRegionsPlacement { err_msg: String, location: Location }, + #[snafu(display("Unexpected: {err_msg}"))] + Unexpected { err_msg: String, location: Location }, + + #[snafu(display("Table already exists, table_id: {}", table_id))] + TableAlreadyExists { + table_id: TableId, + location: Location, + }, + + #[snafu(display("Table does not exist, table_name: {}", table_name))] + TableNotExist { + table_name: String, + location: Location, + }, + + #[snafu(display("Failed to rename table, reason: {}", reason))] + RenameTable { reason: String, location: Location }, #[snafu(display("Invalid table metadata, err: {}", err_msg))] InvalidTableMetadata { err_msg: String, location: Location }, @@ -112,12 +127,15 @@ impl ErrorExt for Error { | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } | InvalidTableMetadata { .. } - | MoveRegion { .. } => StatusCode::Unexpected, + | MoveRegion { .. } + | Unexpected { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } - | ConcurrentModifyRegionsPlacement { .. } => StatusCode::Internal, + | TableAlreadyExists { .. } + | TableNotExist { .. } + | RenameTable { .. } => StatusCode::Internal, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { StatusCode::Unexpected diff --git a/src/common/meta/src/helper.rs b/src/common/meta/src/helper.rs index 43d2a6390044..acfc55969ab5 100644 --- a/src/common/meta/src/helper.rs +++ b/src/common/meta/src/helper.rs @@ -20,42 +20,24 @@ use common_catalog::error::{ }; use lazy_static::lazy_static; use regex::Regex; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::{RawTableInfo, TableId, TableVersion}; +use table::metadata::{RawTableInfo, TableId}; pub const CATALOG_KEY_PREFIX: &str = "__c"; pub const SCHEMA_KEY_PREFIX: &str = "__s"; -pub const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; -pub const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; -const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*"; -const TABLE_NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*"; +/// The pattern of a valid catalog, schema or table name. +const NAME_PATTERN: &str = "[a-zA-Z_:][a-zA-Z0-9_:]*"; lazy_static! { - static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!( - "^{CATALOG_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})$" - )) - .unwrap(); + static ref CATALOG_KEY_PATTERN: Regex = + Regex::new(&format!("^{CATALOG_KEY_PREFIX}-({NAME_PATTERN})$")).unwrap(); } lazy_static! { static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!( - "^{SCHEMA_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})$" - )) - .unwrap(); -} - -lazy_static! { - static ref TABLE_GLOBAL_KEY_PATTERN: Regex = Regex::new(&format!( - "^{TABLE_GLOBAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({TABLE_NAME_PATTERN})$" - )) - .unwrap(); -} - -lazy_static! { - static ref TABLE_REGIONAL_KEY_PATTERN: Regex = Regex::new(&format!( - "^{TABLE_REGIONAL_KEY_PREFIX}-({ALPHANUMERICS_NAME_PATTERN})-({ALPHANUMERICS_NAME_PATTERN})-({TABLE_NAME_PATTERN})-([0-9]+)$" + "^{SCHEMA_KEY_PREFIX}-({NAME_PATTERN})-({NAME_PATTERN})$" )) .unwrap(); } @@ -68,75 +50,6 @@ pub fn build_schema_prefix(catalog_name: impl AsRef) -> String { format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref()) } -/// Global table info has only one key across all datanodes so it does not have `node_id` field. -pub fn build_table_global_prefix( - catalog_name: impl AsRef, - schema_name: impl AsRef, -) -> String { - format!( - "{TABLE_GLOBAL_KEY_PREFIX}-{}-{}-", - catalog_name.as_ref(), - schema_name.as_ref() - ) -} - -/// Regional table info varies between datanode, so it contains a `node_id` field. -pub fn build_table_regional_prefix( - catalog_name: impl AsRef, - schema_name: impl AsRef, -) -> String { - format!( - "{}-{}-{}-", - TABLE_REGIONAL_KEY_PREFIX, - catalog_name.as_ref(), - schema_name.as_ref() - ) -} - -/// Table global info has only one key across all datanodes so it does not have `node_id` field. -#[derive(Clone, Hash, Eq, PartialEq)] -pub struct TableGlobalKey { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, -} - -impl Display for TableGlobalKey { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(TABLE_GLOBAL_KEY_PREFIX)?; - f.write_str("-")?; - f.write_str(&self.catalog_name)?; - f.write_str("-")?; - f.write_str(&self.schema_name)?; - f.write_str("-")?; - f.write_str(&self.table_name) - } -} - -impl TableGlobalKey { - pub fn parse>(s: S) -> Result { - let key = s.as_ref(); - let captures = TABLE_GLOBAL_KEY_PATTERN - .captures(key) - .context(InvalidCatalogSnafu { key })?; - ensure!(captures.len() == 4, InvalidCatalogSnafu { key }); - - Ok(Self { - catalog_name: captures[1].to_string(), - schema_name: captures[2].to_string(), - table_name: captures[3].to_string(), - }) - } - - pub fn to_raw_key(&self) -> Vec { - self.to_string().into_bytes() - } - - pub fn try_from_raw_key(key: &[u8]) -> Result { - Self::parse(String::from_utf8_lossy(key)) - } -} - /// Table global info contains necessary info for a datanode to create table regions, including /// table id, table meta(schema...), region id allocation across datanodes. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -152,64 +65,6 @@ impl TableGlobalValue { pub fn table_id(&self) -> TableId { self.table_info.ident.table_id } - - pub fn engine(&self) -> &str { - &self.table_info.meta.engine - } -} - -/// Table regional info that varies between datanode, so it contains a `node_id` field. -pub struct TableRegionalKey { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub node_id: u64, -} - -impl Display for TableRegionalKey { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(TABLE_REGIONAL_KEY_PREFIX)?; - f.write_str("-")?; - f.write_str(&self.catalog_name)?; - f.write_str("-")?; - f.write_str(&self.schema_name)?; - f.write_str("-")?; - f.write_str(&self.table_name)?; - f.write_str("-")?; - f.serialize_u64(self.node_id) - } -} - -impl TableRegionalKey { - pub fn parse>(s: S) -> Result { - let key = s.as_ref(); - let captures = TABLE_REGIONAL_KEY_PATTERN - .captures(key) - .context(InvalidCatalogSnafu { key })?; - ensure!(captures.len() == 5, InvalidCatalogSnafu { key }); - let node_id = captures[4] - .to_string() - .parse() - .map_err(|_| InvalidCatalogSnafu { key }.build())?; - Ok(Self { - catalog_name: captures[1].to_string(), - schema_name: captures[2].to_string(), - table_name: captures[3].to_string(), - node_id, - }) - } -} - -/// Regional table info of specific datanode, including table version on that datanode and -/// region ids allocated by metasrv. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct TableRegionalValue { - // We can remove the `Option` from the table id once all regional values - // stored in meta have table ids. - pub table_id: Option, - pub version: TableVersion, - pub regions_ids: Vec, - pub engine_name: Option, } pub struct CatalogKey { @@ -295,19 +150,10 @@ macro_rules! define_catalog_value { } } -define_catalog_value!( - TableRegionalValue, - TableGlobalValue, - CatalogValue, - SchemaValue -); +define_catalog_value!(TableGlobalValue, CatalogValue, SchemaValue); #[cfg(test)] mod tests { - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema, Schema}; - use table::metadata::{RawTableMeta, TableIdent, TableType}; - use super::*; #[test] @@ -326,95 +172,4 @@ mod tests { assert_eq!("S", schema_key.schema_name); assert_eq!(key, schema_key.to_string()); } - - #[test] - fn test_parse_table_key() { - let key = "__tg-C-S-T"; - let entry = TableGlobalKey::parse(key).unwrap(); - assert_eq!("C", entry.catalog_name); - assert_eq!("S", entry.schema_name); - assert_eq!("T", entry.table_name); - assert_eq!(key, &entry.to_string()); - } - - #[test] - fn test_build_prefix() { - assert_eq!("__c-", build_catalog_prefix()); - assert_eq!("__s-CATALOG-", build_schema_prefix("CATALOG")); - assert_eq!( - "__tg-CATALOG-SCHEMA-", - build_table_global_prefix("CATALOG", "SCHEMA") - ); - } - - #[test] - fn test_serialize_schema() { - let schema = Schema::new(vec![ColumnSchema::new( - "name", - ConcreteDataType::string_datatype(), - true, - )]); - - let meta = RawTableMeta { - schema: RawSchema::from(&schema), - engine: "mito".to_string(), - created_on: chrono::DateTime::default(), - primary_key_indices: vec![0, 1], - next_column_id: 3, - engine_options: Default::default(), - value_indices: vec![2, 3], - options: Default::default(), - region_numbers: vec![1], - }; - - let table_info = RawTableInfo { - ident: TableIdent { - table_id: 42, - version: 1, - }, - name: "table_1".to_string(), - desc: Some("blah".to_string()), - catalog_name: "catalog_1".to_string(), - schema_name: "schema_1".to_string(), - meta, - table_type: TableType::Base, - }; - - let value = TableGlobalValue { - node_id: 0, - regions_id_map: HashMap::from([(0, vec![1, 2, 3])]), - table_info, - }; - let serialized = serde_json::to_string(&value).unwrap(); - let deserialized = TableGlobalValue::parse(serialized).unwrap(); - assert_eq!(value, deserialized); - } - - #[test] - fn test_table_global_value_compatibility() { - let s = r#"{"node_id":1,"regions_id_map":{"1":[0]},"table_info":{"ident":{"table_id":1098,"version":1},"name":"container_cpu_limit","desc":"Created on insertion","catalog_name":"greptime","schema_name":"dd","meta":{"schema":{"column_schemas":[{"name":"container_id","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"container_name","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"docker_image","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"host","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"image_name","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"image_tag","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"interval","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"runtime","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"short_image","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"type","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"dd_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"git.repository_url","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":11,"version":1},"primary_key_indices":[0,1,2,3,4,5,6,7,8,9,12],"value_indices":[10,11],"engine":"mito","next_column_id":12,"region_numbers":[],"engine_options":{},"options":{},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#; - let _ = TableGlobalValue::parse(s).unwrap(); - } - - fn test_valid_table_patterns(table_name: &str) { - assert_eq!( - table_name, - TableGlobalKey::parse(format!("__tg-catalog-schema-{}", table_name)) - .unwrap() - .table_name - ); - - assert_eq!( - table_name, - TableRegionalKey::parse(format!("__tr-catalog-schema-{}-0", table_name)) - .unwrap() - .table_name - ); - } - - #[test] - fn test_table_name_pattern() { - test_valid_table_patterns("cpu:metrics"); - test_valid_table_patterns(":cpu:metrics"); - } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index a1908dd5ef70..f4d4954d0885 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -18,13 +18,11 @@ use store_api::storage::RegionNumber; use table::metadata::TableId; use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX}; -use crate::error::{ - ConcurrentModifyRegionsPlacementSnafu, InvalidTableMetadataSnafu, MoveRegionSnafu, Result, -}; +use crate::error::{InvalidTableMetadataSnafu, MoveRegionSnafu, Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest}; +use crate::rpc::store::{BatchGetRequest, CompareAndPutRequest, MoveValueRequest, RangeRequest}; use crate::DatanodeId; pub struct DatanodeTableKey { @@ -99,7 +97,7 @@ impl DatanodeTableManager { Self { kv_backend } } - async fn get(&self, key: &DatanodeTableKey) -> Result> { + pub async fn get(&self, key: &DatanodeTableKey) -> Result> { self.kv_backend .get(&key.as_raw_key()) .await? @@ -107,31 +105,36 @@ impl DatanodeTableManager { .transpose() } + /// Create DatanodeTable key and value. If the key already exists, check if the value is the same. pub async fn create( &self, datanode_id: DatanodeId, table_id: TableId, regions: Vec, ) -> Result<()> { - let key = DatanodeTableKey::new(datanode_id, table_id).as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; - let req = CompareAndPutRequest::new().with_key(key).with_value(val); + let key = DatanodeTableKey::new(datanode_id, table_id); + let val = DatanodeTableValue::new(table_id, regions.clone()); + let req = CompareAndPutRequest::new() + .with_key(key.as_raw_key()) + .with_value(val.try_as_raw_value()?); let resp = self.kv_backend.compare_and_put(req).await?; if !resp.success { - let curr = resp.prev_kv.map_or_else( - || "empty".to_string(), - |kv| { - DatanodeTableValue::try_from_raw_value(kv.value).map_or_else( - |e| format!("Invalid DatanodeTableValue for Datanode {datanode_id}: {e}"), - |v| format!("{v:?}"), - ) - }, + let Some(curr) = resp + .prev_kv + .map(|kv| DatanodeTableValue::try_from_raw_value(kv.value)) + .transpose()? else { + return UnexpectedSnafu { + err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, val: {val:?}"), + }.fail(); + }; + + ensure!( + curr.table_id == table_id && curr.regions == regions, + UnexpectedSnafu { + err_msg: format!("current value '{curr:?}' already existed for key '{key}', {val:?} is not set"), + } ); - return ConcurrentModifyRegionsPlacementSnafu { - err_msg: format!("Datanode {datanode_id} already existed {curr}"), - } - .fail(); } Ok(()) } @@ -150,13 +153,26 @@ impl DatanodeTableManager { to_datanode: DatanodeId, table_id: TableId, region: RegionNumber, - ) -> Result { + ) -> Result<()> { let from_key = DatanodeTableKey::new(from_datanode, table_id); - let mut from_value = self.get(&from_key).await?.context(MoveRegionSnafu { - table_id, - region, - err_msg: format!("DatanodeTableKey not found for Datanode {from_datanode}"), - })?; + let to_key = DatanodeTableKey::new(to_datanode, table_id); + let mut kvs = self + .kv_backend + .batch_get(BatchGetRequest { + keys: vec![from_key.as_raw_key(), to_key.as_raw_key()], + }) + .await? + .kvs; + + ensure!( + !kvs.is_empty(), + MoveRegionSnafu { + table_id, + region, + err_msg: format!("DatanodeTableKey not found for Datanode {from_datanode}"), + } + ); + let mut from_value = DatanodeTableValue::try_from_raw_value(kvs.remove(0).value)?; ensure!( from_value.regions.contains(®ion), @@ -167,8 +183,11 @@ impl DatanodeTableManager { } ); - let to_key = DatanodeTableKey::new(to_datanode, table_id); - let to_value = self.get(&to_key).await?; + let to_value = if !kvs.is_empty() { + Some(DatanodeTableValue::try_from_raw_value(kvs.remove(0).value)?) + } else { + None + }; if let Some(v) = to_value.as_ref() { ensure!( @@ -221,7 +240,15 @@ impl DatanodeTableManager { let txn = Txn::new().when(compares).and_then(operations); let resp = self.kv_backend.txn(txn).await?; - Ok(resp.succeeded) + ensure!( + resp.succeeded, + MoveRegionSnafu { + table_id, + region, + err_msg: format!("txn failed with responses: {:?}", resp.responses), + } + ); + Ok(()) } pub async fn tables(&self, datanode_id: DatanodeId) -> Result> { @@ -262,7 +289,7 @@ mod tests { // Move region 1 from datanode 1 to datanode 2. // Note that the DatanodeTableValue is not existed for datanode 2 now. - assert!(manager.move_region(1, 2, 1, 1).await.unwrap()); + assert!(manager.move_region(1, 2, 1, 1).await.is_ok()); let value = manager .get(&DatanodeTableKey::new(1, 1)) .await @@ -348,12 +375,15 @@ mod tests { assert!(manager.create(2, 1, vec![4, 5, 6]).await.is_ok()); assert!(manager.create(2, 2, vec![1, 2, 3]).await.is_ok()); + // If the value is the same, "create" can be called again. + assert!(manager.create(2, 2, vec![1, 2, 3]).await.is_ok()); + let err_msg = manager .create(1, 1, vec![4, 5, 6]) .await .unwrap_err() .to_string(); - assert!(err_msg.contains("Concurrent modify regions placement: Datanode 1 already existed DatanodeTableValue { table_id: 1, regions: [1, 2, 3], version: 0 }")); + assert!(err_msg.contains("Unexpected: current value 'DatanodeTableValue { table_id: 1, regions: [1, 2, 3], version: 0 }' already existed for key '__dn_table/1/1', DatanodeTableValue { table_id: 1, regions: [4, 5, 6], version: 0 } is not set")); let to_be_removed_key = DatanodeTableKey::new(2, 1); let expected_value = DatanodeTableValue { diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 9510027356ec..e1ceb4698b6a 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -13,16 +13,14 @@ // limitations under the License. use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::ensure; use table::metadata::{RawTableInfo, TableId}; use super::TABLE_INFO_KEY_PREFIX; -use crate::error::{InvalidCatalogValueSnafu, Result}; -use crate::helper::{TableGlobalKey, TableGlobalValue}; +use crate::error::{Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest}; -use crate::table_name::TableName; +use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; pub struct TableInfoKey { table_id: TableId, @@ -64,60 +62,6 @@ impl TableInfoManager { Self { kv_backend } } - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn get_old(&self, table_name: &TableName) -> Result> { - let table_global_key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - }; - self.kv_backend - .get(table_global_key.to_string().as_bytes()) - .await? - .map(|kv| TableGlobalValue::from_bytes(kv.value)) - .transpose() - .map(|v| { - v.map(|v| TableInfoValue { - table_info: v.table_info, - version: 0, - }) - }) - .context(InvalidCatalogValueSnafu) - } - - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn put_old(&self, table_info: RawTableInfo) -> Result<()> { - let key = TableGlobalKey { - catalog_name: table_info.catalog_name.clone(), - schema_name: table_info.schema_name.clone(), - table_name: table_info.name.clone(), - } - .to_string(); - let raw_key = key.as_bytes(); - - let regions_id_map = self - .kv_backend - .get(raw_key) - .await? - .map(|kv| TableGlobalValue::from_bytes(kv.value())) - .transpose() - .context(InvalidCatalogValueSnafu)? - .map(|v| v.regions_id_map) - .unwrap_or_default(); - - let raw_value = TableGlobalValue { - node_id: 0, - regions_id_map, - table_info, - } - .as_bytes() - .context(InvalidCatalogValueSnafu)?; - - let req = PutRequest::new().with_key(raw_key).with_value(raw_value); - self.kv_backend.put(req).await?; - Ok(()) - } - pub async fn get(&self, table_id: TableId) -> Result> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); @@ -128,6 +72,29 @@ impl TableInfoManager { .transpose() } + /// Create TableInfo key and value. If the key already exists, check if the value is the same. + pub async fn create(&self, table_id: TableId, table_info: &RawTableInfo) -> Result<()> { + let result = self + .compare_and_put(table_id, None, table_info.clone()) + .await?; + if let Err(curr) = result { + let Some(curr) = curr else { + return UnexpectedSnafu { + err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, table_info: {table_info:?}"), + }.fail() + }; + ensure!( + &curr.table_info == table_info, + UnexpectedSnafu { + err_msg: format!( + "TableInfoValue for table {table_id} is updated before it is created!" + ) + } + ) + } + Ok(()) + } + /// Compare and put value of key. `expect` is the expected value, if backend's current value associated /// with key is the same as `expect`, the value will be updated to `val`. /// @@ -211,6 +178,13 @@ mod tests { } let manager = TableInfoManager::new(backend.clone()); + assert!(manager.create(99, &new_table_info(99)).await.is_ok()); + assert!(manager.create(99, &new_table_info(99)).await.is_ok()); + + let result = manager.create(99, &new_table_info(88)).await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg + .contains("Unexpected: TableInfoValue for table 99 is updated before it is created!")); let val = manager.get(1).await.unwrap().unwrap(); assert_eq!( diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index d45a88a6fd36..168bd3a59416 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -15,14 +15,17 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt}; use table::metadata::TableId; use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; -use crate::error::{Error, InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result}; -use crate::helper::{build_table_global_prefix, TableGlobalKey, TableGlobalValue}; +use crate::error::{ + Error, InvalidTableMetadataSnafu, RenameTableSnafu, Result, TableAlreadyExistsSnafu, + TableNotExistSnafu, UnexpectedSnafu, +}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::memory::MemoryKvBackend; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest}; use crate::table_name::TableName; @@ -147,12 +150,8 @@ impl TableNameManager { Self { kv_backend } } - /// Creates a new table name entry. Returns the current [TableNameValue] if the entry already existed. - pub async fn create( - &self, - key: &TableNameKey<'_>, - table_id: TableId, - ) -> Result> { + /// Create TableName key and value. If the key already exists, check if the value is the same. + pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result<()> { let raw_key = key.as_raw_key(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -160,49 +159,87 @@ impl TableNameManager { .with_key(raw_key) .with_value(raw_value); let result = self.kv_backend.compare_and_put(req).await?; - Ok(if result.success { - None - } else { - result + if !result.success { + let Some(curr) = result .prev_kv .map(|x| TableNameValue::try_from_raw_value(x.value)) - .transpose()? - }) - } - - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn get_old(&self, key: &TableNameKey<'_>) -> Result> { - let table_global_key = TableGlobalKey { - catalog_name: key.catalog.to_string(), - schema_name: key.schema.to_string(), - table_name: key.table.to_string(), - }; - self.kv_backend - .get(table_global_key.to_string().as_bytes()) - .await? - .map(|kv| TableGlobalValue::from_bytes(kv.value())) - .transpose() - .map(|v| v.map(|v| TableNameValue::new(v.table_id()))) - .context(InvalidCatalogValueSnafu) + .transpose()? else { + return UnexpectedSnafu { + err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, value: {value:?}"), + }.fail() + }; + ensure!( + curr.table_id == table_id, + TableAlreadyExistsSnafu { + table_id: curr.table_id + } + ); + } + Ok(()) } - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn tables_old(&self, catalog: &str, schema: &str) -> Result> { - let key = build_table_global_prefix(catalog, schema); - let req = RangeRequest::new().with_prefix(key.as_bytes()); - - let resp = self.kv_backend.range(req).await?; + /// Rename a TableNameKey to a new table name. Will check whether the TableNameValue matches the + /// `expected_table_id` first. Can be executed again if the first invocation is successful. + pub async fn rename( + &self, + key: TableNameKey<'_>, + expected_table_id: TableId, + new_table_name: &str, + ) -> Result<()> { + let new_key = TableNameKey::new(key.catalog, key.schema, new_table_name); + + if let Some(value) = self.get(key).await? { + ensure!( + value.table_id == expected_table_id, + RenameTableSnafu { + reason: format!( + "the input table name '{}' and id '{expected_table_id}' not match", + Into::::into(key) + ), + } + ); - let mut table_names = Vec::with_capacity(resp.kvs.len()); - for kv in resp.kvs { - let key = TableGlobalKey::parse(String::from_utf8_lossy(kv.key())) - .context(InvalidCatalogValueSnafu)?; - table_names.push(key.table_name); + let txn = Txn::new() + .when(vec![ + Compare::with_value( + key.as_raw_key(), + CompareOp::Equal, + value.try_as_raw_value()?, + ), + Compare::with_not_exist_value(new_key.as_raw_key(), CompareOp::Equal), + ]) + .and_then(vec![ + TxnOp::Delete(key.as_raw_key()), + TxnOp::Put(new_key.as_raw_key(), value.try_as_raw_value()?), + ]); + + let resp = self.kv_backend.txn(txn).await?; + ensure!( + resp.succeeded, + RenameTableSnafu { + reason: format!("txn failed with response: {:?}", resp.responses) + } + ); + } else { + let Some(value) = self.get(new_key).await? else { + // If we can't get the table by its original name, nor can we get by its altered + // name, then the table must not exist at the first place. + return TableNotExistSnafu { + table_name: TableName::from(key).to_string(), + }.fail(); + }; + + ensure!( + value.table_id == expected_table_id, + TableAlreadyExistsSnafu { + table_id: value.table_id + } + ); } - Ok(table_names) + Ok(()) } - pub async fn get(&self, key: &TableNameKey<'_>) -> Result> { + pub async fn get(&self, key: TableNameKey<'_>) -> Result> { let raw_key = key.as_raw_key(); self.kv_backend .get(&raw_key) @@ -223,7 +260,7 @@ impl TableNameManager { Ok(table_names) } - pub async fn remove(&self, key: &TableNameKey<'_>) -> Result<()> { + pub async fn remove(&self, key: TableNameKey<'_>) -> Result<()> { let raw_key = key.as_raw_key(); let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let req = MoveValueRequest::new(raw_key, removed_key.as_bytes()); @@ -248,21 +285,23 @@ mod tests { for i in 1..=3 { let table_name = format!("table_{}", i); let key = TableNameKey::new("my_catalog", "my_schema", &table_name); - assert!(manager.create(&key, i).await.unwrap().is_none()); + assert!(manager.create(&key, i).await.is_ok()); } let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); - assert!(manager.create(&key, 99).await.unwrap().is_none()); + assert!(manager.create(&key, 99).await.is_ok()); + assert!(manager.create(&key, 99).await.is_ok()); - let curr = manager.create(&key, 9).await.unwrap(); - assert_eq!(Some(TableNameValue::new(99)), curr); + let result = manager.create(&key, 9).await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("Table already exists, table_id: 99")); - let value = manager.get(&key).await.unwrap().unwrap(); + let value = manager.get(key).await.unwrap().unwrap(); assert_eq!(value.table_id(), 99); let not_existed = TableNameKey::new("x", "y", "z"); - assert!(manager.get(¬_existed).await.unwrap().is_none()); + assert!(manager.get(not_existed).await.unwrap().is_none()); - assert!(manager.remove(&key).await.is_ok()); + assert!(manager.remove(key).await.is_ok()); let kv = backend .get(b"__removed-__table_name/my_catalog/my_schema/my_table") .await @@ -271,9 +310,31 @@ mod tests { let value = TableNameValue::try_from_raw_value(kv.value).unwrap(); assert_eq!(value.table_id(), 99); + let key = TableNameKey::new("my_catalog", "my_schema", "table_1"); + assert!(manager.rename(key, 1, "table_1_new").await.is_ok()); + assert!(manager.rename(key, 1, "table_1_new").await.is_ok()); + + let result = manager.rename(key, 2, "table_1_new").await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("Table already exists, table_id: 1")); + + let result = manager + .rename( + TableNameKey::new("my_catalog", "my_schema", "table_2"), + 22, + "table_2_new", + ) + .await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("Failed to rename table, reason: the input table name 'my_catalog.my_schema.table_2' and id '22' not match")); + + let result = manager.rename(not_existed, 1, "zz").await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("Table does not exist, table_name: x.y.z")); + let tables = manager.tables("my_catalog", "my_schema").await.unwrap(); assert_eq!(tables.len(), 3); - assert_eq!(tables, vec!["table_1", "table_2", "table_3"]); + assert_eq!(tables, vec!["table_1_new", "table_2", "table_3"]); } #[test] diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index a1e259ea9496..05f9caaf4bd3 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -15,17 +15,15 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::ensure; use store_api::storage::RegionNumber; use table::metadata::TableId; use super::TABLE_REGION_KEY_PREFIX; -use crate::error::{InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result}; -use crate::helper::{TableGlobalKey, TableGlobalValue}; +use crate::error::{Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest}; -use crate::table_name::TableName; +use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; use crate::DatanodeId; pub type RegionDistribution = BTreeMap>; @@ -80,66 +78,30 @@ impl TableRegionManager { .transpose() } - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn get_old(&self, table_name: &TableName) -> Result> { - let key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - } - .to_string(); - let raw_key = key.as_bytes(); - - self.kv_backend - .get(raw_key) - .await? - .map(|kv| TableGlobalValue::from_bytes(kv.value())) - .transpose() - .map(|v| { - v.map(|v| TableRegionValue { - region_distribution: v.regions_id_map.into_iter().collect(), - version: 0, - }) - }) - .context(InvalidCatalogValueSnafu) - } - - // TODO(LFC): Remove this method when table metadata refactor is done. - pub async fn put_old( + /// Create TableRegion key and value. If the key already exists, check if the value is the same. + pub async fn create( &self, - table_name: &TableName, - region_distribution: RegionDistribution, + table_id: TableId, + region_distribution: &RegionDistribution, ) -> Result<()> { - let key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - } - .to_string(); - let raw_key = key.as_bytes(); - - let table_info = self - .kv_backend - .get(raw_key) - .await? - .map(|kv| TableGlobalValue::from_bytes(kv.value())) - .transpose() - .context(InvalidCatalogValueSnafu)? - .map(|v| v.table_info) - .with_context(|| InvalidTableMetadataSnafu { - err_msg: format!("table global value for {table_name} is empty"), - })?; - - let raw_value = TableGlobalValue { - node_id: 0, - regions_id_map: region_distribution.into_iter().collect(), - table_info, + let result = self + .compare_and_put(table_id, None, region_distribution.clone()) + .await?; + if let Err(curr) = result { + let Some(curr) = curr else { + return UnexpectedSnafu { + err_msg: format!("compare_and_put expect None but failed with current value None, table_id: {table_id}, region_distribution: {region_distribution:?}"), + }.fail() + }; + ensure!( + &curr.region_distribution == region_distribution, + UnexpectedSnafu { + err_msg: format!( + "TableRegionValue for table {table_id} is updated before it is created!" + ) + } + ) } - .as_bytes() - .context(InvalidCatalogValueSnafu)?; - - let req = PutRequest::new().with_key(raw_key).with_value(raw_value); - self.kv_backend.put(req).await?; Ok(()) } @@ -155,7 +117,7 @@ impl TableRegionManager { table_id: TableId, expect: Option, region_distribution: RegionDistribution, - ) -> Result>>> { + ) -> Result>> { let key = TableRegionKey::new(table_id); let raw_key = key.as_raw_key(); @@ -179,16 +141,22 @@ impl TableRegionManager { Ok(if resp.success { Ok(()) } else { - Err(resp.prev_kv.map(|x| x.value)) + Err(resp + .prev_kv + .map(|x| TableRegionValue::try_from_raw_value(x.value)) + .transpose()?) }) } - pub async fn remove(&self, table_id: TableId) -> Result<()> { + pub async fn remove(&self, table_id: TableId) -> Result> { let key = TableRegionKey::new(table_id).as_raw_key(); let remove_key = to_removed_key(&String::from_utf8_lossy(&key)); let req = MoveValueRequest::new(key, remove_key.as_bytes()); - self.kv_backend.move_value(req).await?; - Ok(()) + + let resp = self.kv_backend.move_value(req).await?; + resp.0 + .map(|x| TableRegionValue::try_from_raw_value(x.value)) + .transpose() } } @@ -207,25 +175,25 @@ mod tests { let region_distribution = RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]); + let new_region_distribution = + RegionDistribution::from([(1, vec![4, 5, 6]), (2, vec![1, 2, 3])]); + let result = manager .compare_and_put(1, None, region_distribution.clone()) .await .unwrap(); assert!(result.is_ok()); - let new_region_distribution = - RegionDistribution::from([(1, vec![4, 5, 6]), (2, vec![1, 2, 3])]); let curr = manager .compare_and_put(1, None, new_region_distribution.clone()) .await .unwrap() .unwrap_err() .unwrap(); - let curr = TableRegionValue::try_from_raw_value(curr).unwrap(); assert_eq!( curr, TableRegionValue { - region_distribution, + region_distribution: region_distribution.clone(), version: 0 } ); @@ -236,6 +204,13 @@ mod tests { .unwrap() .is_ok()); + assert!(manager.create(99, ®ion_distribution).await.is_ok()); + assert!(manager.create(99, ®ion_distribution).await.is_ok()); + + let result = manager.create(99, &new_region_distribution).await; + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("TableRegionValue for table 99 is updated before it is created!")); + let value = manager.get(1).await.unwrap().unwrap(); assert_eq!( value, @@ -244,9 +219,25 @@ mod tests { version: 1 } ); + let value = manager.get(99).await.unwrap().unwrap(); + assert_eq!( + value, + TableRegionValue { + region_distribution, + version: 0 + } + ); assert!(manager.get(2).await.unwrap().is_none()); - assert!(manager.remove(1).await.is_ok()); + let value = manager.remove(1).await.unwrap().unwrap(); + assert_eq!( + value, + TableRegionValue { + region_distribution: new_region_distribution.clone(), + version: 1 + } + ); + assert!(manager.remove(123).await.unwrap().is_none()); let kv = backend .get(b"__removed-__table_region/1") diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index cdc1cd7d1005..6bd189d4c1c2 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -21,6 +21,7 @@ use crate::key::to_removed_key; pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; +#[derive(Copy, Clone)] pub struct TableRouteKey<'a> { pub table_id: TableId, pub catalog_name: &'a str, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index b850a312473c..a4da3f3e1414 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -31,7 +31,6 @@ use crate::table_name::TableName; #[derive(Debug, Clone, Default)] pub struct RouteRequest { - pub table_names: Vec, pub table_ids: Vec, } @@ -39,7 +38,6 @@ impl From for PbRouteRequest { fn from(mut req: RouteRequest) -> Self { Self { header: None, - table_names: req.table_names.drain(..).map(Into::into).collect(), table_ids: req.table_ids.drain(..).map(|id| PbTableId { id }).collect(), } } @@ -49,16 +47,9 @@ impl RouteRequest { #[inline] pub fn new(table_id: TableId) -> Self { Self { - table_names: vec![], table_ids: vec![table_id], } } - - #[inline] - pub fn add_table_name(mut self, table_name: TableName) -> Self { - self.table_names.push(table_name); - self - } } #[derive(Debug, Clone)] @@ -377,26 +368,14 @@ mod tests { #[test] fn test_route_request_trans() { let req = RouteRequest { - table_names: vec![ - TableName::new("c1", "s1", "t1"), - TableName::new("c2", "s2", "t2"), - ], - table_ids: vec![1, 2, 3], + table_ids: vec![1, 2], }; let into_req: PbRouteRequest = req.into(); assert!(into_req.header.is_none()); - assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name); - assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name); - assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name); - assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name); - assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name); - assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name); - assert_eq!( - (1..=3).map(|id| PbTableId { id }).collect::>(), - into_req.table_ids - ); + assert_eq!(1, into_req.table_ids.get(0).unwrap().id); + assert_eq!(2, into_req.table_ids.get(1).unwrap().id); } #[test] diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 17c0198c7526..10bc3c068a7e 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -46,7 +46,6 @@ object-store = { path = "../object-store" } pin-project = "1.0" prost.workspace = true query = { path = "../query" } -regex = "1.6" secrecy = { version = "0.8", features = ["serde", "alloc"] } serde = "1.0" serde_json = "1.0" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 21891336b1e0..4c59bb5acd70 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -30,9 +30,7 @@ use catalog::{ use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_error::ext::BoxedError; -use common_meta::helper::{ - build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey, TableGlobalKey, -}; +use common_meta::helper::{build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_region::TableRegionKey; @@ -120,17 +118,6 @@ impl FrontendCatalogManager { table: &str, table_id: TableId, ) { - let tg_key = TableGlobalKey { - catalog_name: catalog.into(), - schema_name: schema.into(), - table_name: table.into(), - } - .to_string(); - - let tg_key = tg_key.as_bytes(); - - self.backend_cache_invalidator.invalidate_key(tg_key).await; - let key = TableNameKey::new(catalog, schema, table); self.backend_cache_invalidator .invalidate_key(&key.as_raw_key()) @@ -160,7 +147,7 @@ impl FrontendCatalogManager { self.partition_manager .table_routes() - .invalidate_table_route(&TableName::new(catalog, schema, table)) + .invalidate_table_route(table_id) .await; } } @@ -343,10 +330,9 @@ impl CatalogManager for FrontendCatalogManager { let mut tables = self .table_metadata_manager .table_name_manager() - .tables_old(catalog, schema) + .tables(catalog, schema) .await .context(TableMetadataManagerSnafu)?; - if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME { tables.push("numbers".to_string()); } @@ -359,13 +345,11 @@ impl CatalogManager for FrontendCatalogManager { catalog_name: catalog.to_string(), } .to_string(); - - Ok(self - .backend + self.backend .get(key.as_bytes()) .await - .context(TableMetadataManagerSnafu)? - .is_some()) + .context(TableMetadataManagerSnafu) + .map(|x| x.is_some()) } async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { @@ -374,7 +358,6 @@ impl CatalogManager for FrontendCatalogManager { schema_name: schema.to_string(), } .to_string(); - Ok(self .backend() .get(schema_key.as_bytes()) @@ -387,7 +370,7 @@ impl CatalogManager for FrontendCatalogManager { let key = TableNameKey::new(catalog, schema, table); self.table_metadata_manager .table_name_manager() - .get_old(&key) + .get(key) .await .context(TableMetadataManagerSnafu) .map(|x| x.is_some()) @@ -423,19 +406,19 @@ impl CatalogManager for FrontendCatalogManager { let key = TableNameKey::new(catalog, schema, table_name); let Some(table_name_value) = self.table_metadata_manager .table_name_manager() - .get_old(&key) + .get(key) .await .context(TableMetadataManagerSnafu)? else { return Ok(None) }; - let _table_id = table_name_value.table_id(); + let table_id = table_name_value.table_id(); - let Some(v) = self.table_metadata_manager + let Some(table_info_value) = self.table_metadata_manager .table_info_manager() - .get_old(&key.into()) + .get(table_id) .await .context(TableMetadataManagerSnafu)? else { return Ok(None) }; - let table_info = Arc::new( - v.table_info + table_info_value + .table_info .try_into() .context(catalog_err::InvalidTableInfoInCatalogSnafu)?, ); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 19ed3c265fc5..ca2284dedb16 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -603,7 +603,8 @@ impl ErrorExt for Error { Error::NotSupported { .. } => StatusCode::Unsupported, - Error::HandleHeartbeatResponse { source, .. } => source.status_code(), + Error::HandleHeartbeatResponse { source, .. } + | Error::TableMetadataManager { source, .. } => source.status_code(), Error::RuntimeResource { source, .. } => source.status_code(), Error::PromStoreRemoteQueryPlan { source, .. } @@ -702,7 +703,6 @@ impl ErrorExt for Error { Error::WriteParquet { source, .. } => source.status_code(), Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, - Error::TableMetadataManager { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index d85fe4f1f7ec..cd937bfc73d1 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,13 +18,13 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::key::table_info::TableInfoKey; +use common_meta::key::table_name::TableNameKey; use common_meta::key::table_region::TableRegionKey; use common_meta::key::TableMetaKey; -use common_meta::table_name::TableName; -use common_telemetry::{error, info}; +use common_telemetry::error; use partition::manager::TableRouteCacheInvalidatorRef; #[derive(Clone)] @@ -83,26 +83,28 @@ impl InvalidateTableCacheHandler { } async fn invalidate_table_cache(&self, table_ident: TableIdent) { - let tg_key = TableGlobalKey { - catalog_name: table_ident.catalog.clone(), - schema_name: table_ident.schema.clone(), - table_name: table_ident.table.clone(), - } - .to_string(); - info!("invalidate table cache: {}", tg_key); - let tg_key = tg_key.as_bytes(); + let table_id = table_ident.table_id; + self.backend_cache_invalidator + .invalidate_key(&TableInfoKey::new(table_id).as_raw_key()) + .await; - self.backend_cache_invalidator.invalidate_key(tg_key).await; + self.backend_cache_invalidator + .invalidate_key(&TableRegionKey::new(table_id).as_raw_key()) + .await; - let key = &TableRegionKey::new(table_ident.table_id).as_raw_key(); - self.backend_cache_invalidator.invalidate_key(key).await; + self.backend_cache_invalidator + .invalidate_key( + &TableNameKey::new( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + ) + .as_raw_key(), + ) + .await; self.table_route_cache_invalidator - .invalidate_table_route(&TableName::new( - table_ident.catalog, - table_ident.schema, - table_ident.table, - )) + .invalidate_table_route(table_id) .await; } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 2d712f8a8e5a..a667c05b182f 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -22,11 +22,12 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::table_name::TableName; +use common_meta::key::table_region::TableRegionKey; +use common_meta::key::TableMetaKey; use partition::manager::TableRouteCacheInvalidator; +use table::metadata::TableId; use tokio::sync::mpsc; use super::invalidate_table_cache::InvalidateTableCacheHandler; @@ -44,30 +45,26 @@ impl KvCacheInvalidator for MockKvCacheInvalidator { } pub struct MockTableRouteCacheInvalidator { - inner: Mutex>, + inner: Mutex>, } #[async_trait::async_trait] impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator { - async fn invalidate_table_route(&self, table: &TableName) { - let _ = self.inner.lock().unwrap().remove(&table.to_string()); + async fn invalidate_table_route(&self, table_id: TableId) { + let _ = self.inner.lock().unwrap().remove(&table_id); } } #[tokio::test] async fn test_invalidate_table_cache_handler() { - let table_key = TableGlobalKey { - catalog_name: "test".to_string(), - schema_name: "greptime".to_string(), - table_name: "foo_table".to_string(), - }; - - let inner = HashMap::from([(table_key.to_string().as_bytes().to_vec(), 1)]); + let table_id = 1; + let table_region_key = TableRegionKey::new(table_id); + let inner = HashMap::from([(table_region_key.as_raw_key(), 1)]); let backend = Arc::new(MockKvCacheInvalidator { inner: Mutex::new(inner), }); - let inner = HashMap::from([(table_key.to_string(), 1)]); + let inner = HashMap::from([(table_id, 1)]); let table_route = Arc::new(MockTableRouteCacheInvalidator { inner: Mutex::new(inner), }); @@ -87,7 +84,7 @@ async fn test_invalidate_table_cache_handler() { catalog: "test".to_string(), schema: "greptime".to_string(), table: "foo_table".to_string(), - table_id: 0, + table_id, engine: "mito".to_string(), }), ) @@ -102,19 +99,9 @@ async fn test_invalidate_table_cache_handler() { .inner .lock() .unwrap() - .contains_key(table_key.to_string().as_bytes())); - - let table_name = TableName { - catalog_name: "test".to_string(), - schema_name: "greptime".to_string(), - table_name: "foo_table".to_string(), - }; + .contains_key(&table_region_key.as_raw_key())); - assert!(!table_route - .inner - .lock() - .unwrap() - .contains_key(&table_name.to_string())); + assert!(!table_route.inner.lock().unwrap().contains_key(&table_id)); // removes a invalid key handle_instruction( diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 63c2fa33f38a..1e19e2a2fb2c 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -25,7 +25,7 @@ use api::v1::{ FlushTableExpr, InsertRequests, }; use async_trait::async_trait; -use catalog::{CatalogManager, RegisterTableRequest}; +use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; use chrono::DateTime; use client::client_manager::DatanodeClients; use client::Database; @@ -176,11 +176,20 @@ impl DistInstance { .with_context(|| TableNotFoundSnafu { table_name: table_name.to_string(), })?; - - let table_id = table.table_info().ident.table_id; + let table_id = table.table_info().table_id(); self.drop_table_procedure(&table_name, table_id).await?; + let request = DeregisterTableRequest { + catalog: table_name.catalog_name.clone(), + schema: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + }; + self.catalog_manager + .deregister_table(request) + .await + .context(CatalogSnafu)?; + // Since the table information dropped on meta does not go through KvBackend, so we // manually invalidate the cache here. // @@ -279,7 +288,6 @@ impl DistInstance { let route_response = self .meta_client .route(RouteRequest { - table_names: vec![table_name.clone()], table_ids: vec![table_id], }) .await @@ -383,7 +391,7 @@ impl DistInstance { let partitions = self .catalog_manager .partition_manager() - .find_table_partitions(&table_name) + .find_table_partitions(table.table_info().table_id()) .await .context(error::FindTablePartitionRuleSnafu { table_name: &table_name.table_name, diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index cc2beb977c40..f9d3b3fb34bc 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -103,13 +103,14 @@ impl DistInserter { let table_info = self.find_table_info(&request.table_name).await?; let table_meta = &table_info.meta; + let table_id = table_info.table_id(); let split = partition_manager - .split_insert_request(&table_name, request, table_meta.schema.as_ref()) + .split_insert_request(table_id, request, table_meta.schema.as_ref()) .await .context(SplitInsertSnafu)?; let table_route = partition_manager - .find_table_route(&table_name) + .find_table_route(table_id) .await .with_context(|_| FindTableRouteSnafu { table_name: table_name.to_string(), @@ -252,23 +253,23 @@ mod tests { .table_name_manager() .create(&key, table_id) .await - .unwrap() - .is_none()); + .is_ok()); assert!(table_metadata_manager .table_info_manager() - .put_old(table_info) + .compare_and_put(table_id, None, table_info) .await .is_ok()); - assert!(table_metadata_manager + let _ = table_metadata_manager .table_region_manager() - .put_old( - &key.into(), + .compare_and_put( + 1, + None, RegionDistribution::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]), ) .await - .is_ok()); + .unwrap(); } #[tokio::test] diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 67a66714df6a..cf40a8c204f5 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -101,8 +101,9 @@ impl Table for DistTable { let partition_manager = self.catalog_manager.partition_manager(); let datanode_clients = self.catalog_manager.datanode_clients(); + let table_id = self.table_info.table_id(); let partition_rule = partition_manager - .find_table_partition_rule(&self.table_name) + .find_table_partition_rule(table_id) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -112,7 +113,7 @@ impl Table for DistTable { .map_err(BoxedError::new) .context(TableOperationSnafu)?; let datanodes = partition_manager - .find_region_datanodes(&self.table_name, regions) + .find_region_datanodes(table_id, regions) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -171,8 +172,9 @@ impl Table for DistTable { async fn delete(&self, request: DeleteRequest) -> table::Result { let partition_manager = self.catalog_manager.partition_manager(); + let table_id = self.table_info.table_id(); let partition_rule = partition_manager - .find_table_partition_rule(&self.table_name) + .find_table_partition_rule(table_id) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -242,7 +244,7 @@ impl DistTable { let route = self .catalog_manager .partition_manager() - .find_table_route(table_name) + .find_table_route(self.table_info.table_id()) .await .with_context(|_| FindTableRouteSnafu { table_name: table_name.to_string(), @@ -479,7 +481,7 @@ pub(crate) mod test { ], ); table_routes - .insert_table_route(table_name.clone(), Arc::new(table_route)) + .insert_table_route(1, Arc::new(table_route)) .await; let table_name = TableName::new( @@ -554,7 +556,7 @@ pub(crate) mod test { ], ); table_routes - .insert_table_route(table_name.clone(), Arc::new(table_route)) + .insert_table_route(2, Arc::new(table_route)) .await; partition_manager @@ -564,12 +566,9 @@ pub(crate) mod test { async fn test_find_partition_rule() { let partition_manager = create_partition_rule_manager().await; + // "one_column_partitioning_table" has id 1 let partition_rule = partition_manager - .find_table_partition_rule(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "one_column_partitioning_table", - )) + .find_table_partition_rule(1) .await .unwrap(); let range_rule = partition_rule @@ -580,12 +579,9 @@ pub(crate) mod test { assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); + // "two_column_partitioning_table" has table 2 let partition_rule = partition_manager - .find_table_partition_rule(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "two_column_partitioning_table", - )) + .find_table_partition_rule(2) .await .unwrap(); let range_columns_rule = partition_rule diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index a71495bd2735..cdb2a19531b3 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -38,7 +38,7 @@ pub struct DdlManager { datanode_clients: Arc, pub(crate) mailbox: MailboxRef, pub(crate) server_addr: String, - table_metadata_manager: TableMetadataManagerRef, + pub(crate) table_metadata_manager: TableMetadataManagerRef, } #[derive(Clone)] diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index da9d51ae7682..9996b5f572f9 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -424,8 +424,8 @@ pub enum Error { #[snafu(display("Expected to retry later, reason: {}", reason))] RetryLater { reason: String, location: Location }, - #[snafu(display("Combine error: {}", err_msg))] - Combine { err_msg: String, location: Location }, + #[snafu(display("Failed to update table metadata, err_msg: {}", err_msg))] + UpdateTableMetadata { err_msg: String, location: Location }, #[snafu(display("Failed to convert table route, source: {}", source))] TableRouteConversion { @@ -504,7 +504,7 @@ impl ErrorExt for Error { | Error::MailboxReceiver { .. } | Error::RetryLater { .. } | Error::StartGrpc { .. } - | Error::Combine { .. } + | Error::UpdateTableMetadata { .. } | Error::NoEnoughAvailableDatanode { .. } | Error::ConvertGrpcExpr { .. } | Error::Join { .. } => StatusCode::Internal, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index a6229afc6e61..96df5fea9338 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -19,7 +19,6 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManagerRef; -use common_meta::table_name::TableName; use common_meta::ClusterId; use common_telemetry::warn; use snafu::ResultExt; @@ -52,15 +51,11 @@ impl RegionLeaseHandler { } } - async fn find_table_ident( - &self, - table_id: TableId, - table_name: &TableName, - ) -> Result> { + async fn find_table_ident(&self, table_id: TableId) -> Result> { let value = self .table_metadata_manager .table_info_manager() - .get_old(table_name) + .get(table_id) .await .context(TableMetadataManagerSnafu)?; Ok(value.map(|x| { @@ -122,20 +117,15 @@ impl HeartbeatHandler for RegionLeaseHandler { stat.region_stats.iter().for_each(|x| { let region_id: RegionId = x.id.into(); let table_id = region_id.table_id(); - let table_name = TableName::new( - x.catalog.to_string(), - x.schema.to_string(), - x.table.to_string(), - ); datanode_regions - .entry((table_id, table_name)) + .entry(table_id) .or_insert_with(Vec::new) .push(RegionId::from(x.id).region_number()); }); let mut region_leases = Vec::with_capacity(datanode_regions.len()); - for ((table_id, table_name), local_regions) in datanode_regions { - let Some(table_ident) = self.find_table_ident(table_id, &table_name).await? else { + for (table_id, local_regions) in datanode_regions { + let Some(table_ident) = self.find_table_ident(table_id).await? else { warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ Reason: table not found."); continue; @@ -144,20 +134,20 @@ impl HeartbeatHandler for RegionLeaseHandler { let Some(table_region_value) = self .table_metadata_manager .table_region_manager() - .get_old(&table_name) + .get(table_id) .await .context(TableMetadataManagerSnafu)? else { - warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ - Reason: table region value not found."); - continue; - }; + warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ + Reason: table region value not found."); + continue; + }; let Some(global_regions) = table_region_value .region_distribution .get(&datanode_id) else { - warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ - Reason: not expected to place the region on it."); - continue; - }; + warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ + Reason: not expected to place the region on it."); + continue; + }; // Filter out the designated regions from table info value for the given table on the given Datanode. let designated_regions = local_regions diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/meta-srv/src/procedure/alter_table.rs index c253db1846e2..644649d85f41 100644 --- a/src/meta-srv/src/procedure/alter_table.rs +++ b/src/meta-srv/src/procedure/alter_table.rs @@ -19,6 +19,9 @@ use async_trait::async_trait; use client::Database; use common_meta::ident::TableIdent; use common_meta::instruction::Instruction; +use common_meta::key::table_info::TableInfoValue; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::TableRouteKey; use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use common_meta::rpc::ddl::AlterTableTask; use common_meta::rpc::router::TableRoute; @@ -27,17 +30,16 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, }; -use common_telemetry::debug; +use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::TableReference; -use table::metadata::{RawTableInfo, TableInfo}; +use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::requests::{AlterKind, AlterTableRequest}; -use super::utils::build_table_metadata_key; use crate::ddl::DdlContext; -use crate::error::{self, Result}; +use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu}; use crate::procedure::utils::handle_request_datanode_error; use crate::service::mailbox::BroadcastChannel; use crate::table_routes::fetch_table; @@ -111,62 +113,77 @@ impl AlterTableProcedure { Ok(Status::Done) } - /// Update table metadata for rename table operation. - async fn on_update_metadata_for_rename( + async fn update_table_info_value( &self, - new_table_name: &str, - new_table_info: TableInfo, - ) -> Result { + table_id: TableId, + table_info_value: &TableInfoValue, + new_table_info: RawTableInfo, + ) -> Result<()> { + self.context.table_metadata_manager + .table_info_manager() + .compare_and_put(table_id, Some(table_info_value.clone()), new_table_info) + .await + .context(TableMetadataManagerSnafu)? + .map_err(|curr| { + // The table info metadata should be guarded by procedure locks. + UnexpectedSnafu { + violated: format!( + "TableInfoValue for table {table_id} is changed during table alternation, expected: '{table_info_value:?}', actual: '{curr:?}'", + ) + }.build() + }) + } + + /// Update table metadata for rename table operation. + async fn on_update_metadata_for_rename(&self, new_table_info: TableInfo) -> Result { + let table_metadata_manager = &self.context.table_metadata_manager; + let table_ref = self.data.table_ref(); - let new_table_ref = TableReference { - catalog: table_ref.catalog, - schema: table_ref.schema, - table: new_table_name, - }; + let new_table_name = new_table_info.name.clone(); let table_id = self.data.table_info.ident.table_id; - // Check whether the table has already been renamed. - if let Some((mut table_global_value, table_route_value)) = - fetch_table(&self.context.kv_store, table_ref).await? + if let Some((table_info_value, table_route_value)) = + fetch_table(&self.context.kv_store, table_metadata_manager, table_id).await? { + self.update_table_info_value(table_id, &table_info_value, new_table_info.into()) + .await?; + info!("Updated TableInfoValue for table {table_id} with new table name '{new_table_name}'"); + + table_metadata_manager + .table_name_manager() + .rename( + TableNameKey::new(table_ref.catalog, table_ref.schema, table_ref.table), + table_id, + &new_table_name, + ) + .await + .context(TableMetadataManagerSnafu)?; + info!("Renamed TableNameKey to new table name '{new_table_name}' for table {table_id}"); + let table_route = table_route_value .clone() .try_into() .context(error::TableRouteConversionSnafu)?; - let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); - - let (new_table_global_key, new_table_route_key) = - build_table_metadata_key(new_table_ref, table_id); - - table_global_value.table_info = new_table_info.into(); + let table_route_key = TableRouteKey { + table_id, + catalog_name: table_ref.catalog, + schema_name: table_ref.schema, + table_name: table_ref.table, + }; + let new_table_route_key = TableRouteKey { + table_name: &new_table_name, + ..table_route_key + }; let txn = Txn::new() - .when(vec![ - Compare::with_value( - table_route_key.to_string().into_bytes(), - CompareOp::Equal, - table_route_value.clone().into(), - ), - // Compare::with_value( - // table_global_key.to_string().into_bytes(), - // CompareOp::Equal, - // table_global_value - // .clone() - // .as_bytes() - // .context(error::InvalidCatalogValueSnafu)?, - // ), - ]) + .when(vec![Compare::with_value( + table_route_key.to_string().into_bytes(), + CompareOp::Equal, + table_route_value.clone().into(), + )]) .and_then(vec![ - TxnOp::Delete(table_global_key.to_string().into_bytes()), TxnOp::Delete(table_route_key.to_string().into_bytes()), - TxnOp::Put( - new_table_global_key.to_string().into_bytes(), - table_global_value - .clone() - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?, - ), TxnOp::Put( new_table_route_key.to_string().into_bytes(), table_route_value.into(), @@ -181,20 +198,7 @@ impl AlterTableProcedure { msg: "table metadata changed" } ); - - return Ok(table_route); - } else if let Some((table, route)) = - fetch_table(&self.context.kv_store, new_table_ref).await? - { - let table_route = route.try_into().context(error::TableRouteConversionSnafu)?; - - ensure!( - table.table_info == new_table_info.into(), - error::UnexpectedSnafu { - violated: "table metadata changed" - } - ); - + info!("Updated TableRouteValue for table {table_id} with new table name '{new_table_name}'"); return Ok(table_route); } @@ -226,6 +230,10 @@ impl AlterTableProcedure { new_info.ident.version = table_info.ident.version + 1; new_info.meta = new_meta; + if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + new_info.name = new_table_name.to_string(); + } + Ok(new_info) } @@ -241,18 +249,20 @@ impl AlterTableProcedure { new_info ); - if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { - let table_route = self - .on_update_metadata_for_rename(new_table_name, new_info) - .await?; + if matches!(request.alter_kind, AlterKind::RenameTable { .. }) { + let table_route = self.on_update_metadata_for_rename(new_info).await?; self.data.state = AlterTableState::InvalidateTableCache; self.data.table_route = Some(table_route); return Ok(Status::executing(true)); } - if let Some((mut table_global_value, table_route_value)) = - fetch_table(&self.context.kv_store, table_ref).await? + if let Some((table_info_value, table_route_value)) = fetch_table( + &self.context.kv_store, + &self.context.table_metadata_manager, + table_id, + ) + .await? { let table_route = table_route_value .clone() @@ -260,54 +270,9 @@ impl AlterTableProcedure { .context(error::TableRouteConversionSnafu)?; let new_raw_info: RawTableInfo = new_info.into(); - // If the metadata already updated. - if table_global_value.table_info == new_raw_info { - debug!("table: {} metadata already updated", table_ref.to_string()); - - self.data.state = AlterTableState::InvalidateTableCache; - self.data.table_route = Some(table_route); - return Ok(Status::executing(true)); - } - - let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); - - let txn = Txn::new().when(vec![ - Compare::with_value( - table_route_key.to_string().into_bytes(), - CompareOp::Equal, - table_route_value.clone().into(), - ), - // TODO(weny): due to unordered map, we cannot compare values directly. - // Compare::with_value( - // table_global_key.to_string().into_bytes(), - // CompareOp::Equal, - // table_global_value - // .clone() - // .as_bytes() - // .context(error::InvalidCatalogValueSnafu)?, - // ), - ]); - - table_global_value.table_info = new_raw_info; - - let txn = txn.and_then(vec![TxnOp::Put( - table_global_key.to_string().into_bytes(), - table_global_value - .clone() - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?, - )]); - - let resp = self.context.kv_store.txn(txn).await?; - - ensure!( - resp.succeeded, - error::TxnSnafu { - msg: "table metadata changed" - } - ); - - debug!("table: {} metadata updated", table_ref.to_string()); + self.update_table_info_value(table_id, &table_info_value, new_raw_info) + .await?; + info!("Updated TableInfoValue for table {table_id} when altering table"); self.data.state = AlterTableState::InvalidateTableCache; self.data.table_route = Some(table_route); diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index 8e641e494dbc..cdd793ee8029 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use client::Database; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_meta::helper::TableGlobalKey; +use common_meta::key::table_name::TableNameKey; use common_meta::key::TableRouteKey; use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use common_meta::rpc::ddl::CreateTableTask; @@ -25,6 +25,7 @@ use common_meta::rpc::router::TableRoute; use common_meta::table_name::TableName; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::info; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -33,9 +34,8 @@ use table::metadata::TableId; use super::utils::{handle_request_datanode_error, handle_retry_error}; use crate::ddl::DdlContext; -use crate::error::{self, Result}; -use crate::service::router::create_table_global_value; -use crate::table_routes::get_table_global_value; +use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::service::router::create_region_distribution; pub struct CreateTableProcedure { context: DdlContext, @@ -65,25 +65,25 @@ impl CreateTableProcedure { }) } - fn global_table_key(&self) -> TableGlobalKey { - let table_ref = self.creator.data.table_ref(); - - TableGlobalKey { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - } - } - fn table_name(&self) -> TableName { self.creator.data.task.table_name() } /// Checks whether the table exists. async fn on_prepare(&mut self) -> Result { - if (get_table_global_value(&self.context.kv_store, &self.global_table_key()).await?) - .is_some() - { + let expr = &self.creator.data.task.create_table; + let value = self + .context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + )) + .await + .context(TableMetadataManagerSnafu)?; + if value.is_some() { ensure!( self.creator.data.task.create_table.create_if_not_exists, error::TableAlreadyExistsSnafu { @@ -99,8 +99,7 @@ impl CreateTableProcedure { Ok(Status::executing(true)) } - /// registers the `TableRouteValue`,`TableGlobalValue` - async fn register_metadata(&self) -> Result<()> { + async fn on_create_metadata(&self) -> Result { let _timer = common_telemetry::timer!( crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META ); @@ -112,14 +111,6 @@ impl CreateTableProcedure { .to_string() .into_bytes(); - let table_global_key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - } - .to_string() - .into_bytes(); - let (peers, table_route) = self .creator .data @@ -133,22 +124,38 @@ impl CreateTableProcedure { table_route: Some(table_route), }; - let table_global_value = create_table_global_value( - &table_route_value, - self.creator.data.task.table_info.clone(), - )? - .as_bytes() - .context(error::InvalidCatalogValueSnafu)?; + let manager = &self.context.table_metadata_manager; + + let region_distribution = create_region_distribution(&table_route_value)?; + manager + .table_region_manager() + .create(table_id, ®ion_distribution) + .await + .context(TableMetadataManagerSnafu)?; + info!("Created TableRegionValue for table {table_id}"); + + manager + .table_info_manager() + .create(table_id, &self.creator.data.task.table_info) + .await + .context(TableMetadataManagerSnafu)?; + info!("Created TableInfoValue for table {table_id}"); + + for (datanode_id, regions) in region_distribution { + manager + .datanode_table_manager() + .create(datanode_id, table_id, regions) + .await + .context(TableMetadataManagerSnafu)?; + info!("Create DatanodeTableValue for table {table_id}"); + } let txn = Txn::new() - .when(vec![ - Compare::with_not_exist_value(table_route_key.clone(), CompareOp::Equal), - Compare::with_not_exist_value(table_global_key.clone(), CompareOp::Equal), - ]) - .and_then(vec![ - TxnOp::Put(table_route_key, table_route_value.into()), - TxnOp::Put(table_global_key, table_global_value), - ]); + .when(vec![Compare::with_not_exist_value( + table_route_key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put(table_route_key, table_route_value.into())]); let resp = self.context.kv_store.txn(txn).await?; @@ -158,38 +165,23 @@ impl CreateTableProcedure { msg: "table_route_key or table_global_key exists" } ); - - Ok(()) - } - - async fn on_create_metadata(&mut self) -> Result { - let kv_store = &self.context.kv_store; - let key = &self.global_table_key(); - - match get_table_global_value(kv_store, key).await? { - Some(table_global_value) => { - // The metasrv crashed after metadata was created immediately. - // Recovers table_route from kv. - let table_id = table_global_value.table_id() as u64; - - let expected = self.creator.data.table_route.table.id; - // If there is something like: - // Create table A, Create table A(from another Fe, Somehow, Failed), Renames table A to B, Create table A(Recovered). - // We must ensure the table_id isn't changed. - ensure!( - table_id == expected, - error::TableIdChangedSnafu { - expected, - found: table_id - } - ); - } - None => { - // registers metadata - self.register_metadata().await?; - } - } - + info!("Created TableRouteValue for table {table_id}"); + + // Create TableNameValue at last, because we use it to check whether the table exists at + // the beginning of the procedure. + manager + .table_name_manager() + .create( + &TableNameKey::new( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ), + table_id, + ) + .await + .context(TableMetadataManagerSnafu)?; + info!("Created TableNameValue for table {table_id}"); Ok(Status::Done) } diff --git a/src/meta-srv/src/procedure/drop_table.rs b/src/meta-srv/src/procedure/drop_table.rs index 31ea217fa74b..b2bc7c5d57af 100644 --- a/src/meta-srv/src/procedure/drop_table.rs +++ b/src/meta-srv/src/procedure/drop_table.rs @@ -21,6 +21,8 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_meta::ident::TableIdent; use common_meta::instruction::Instruction; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::TableRouteKey; use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use common_meta::rpc::ddl::DropTableTask; use common_meta::rpc::router::TableRoute; @@ -29,19 +31,18 @@ use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; -use common_telemetry::debug; +use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use table::engine::TableReference; -use super::utils::{build_table_metadata_key, handle_retry_error}; +use super::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error; -use crate::error::Result; +use crate::error::{Result, TableMetadataManagerSnafu}; use crate::procedure::utils::{build_table_route_value, handle_request_datanode_error}; use crate::service::mailbox::BroadcastChannel; -use crate::table_routes::fetch_table; pub struct DropTableProcedure { context: DdlContext, data: DropTableData, @@ -70,21 +71,51 @@ impl DropTableProcedure { /// Removes the table metadata. async fn on_remove_metadata(&mut self) -> Result { let table_ref = self.data.table_ref(); + let table_id = self.data.task.table_id; - // If metadata not exists (might have already been removed). - if fetch_table(&self.context.kv_store, table_ref) - .await? - .is_none() - { - self.data.state = DropTableState::InvalidateTableCache; + let manager = &self.context.table_metadata_manager; + manager + .table_info_manager() + .remove(table_id) + .await + .context(TableMetadataManagerSnafu)?; + info!("Removed TableInfoValue for table: {table_id}"); - return Ok(Status::executing(true)); + let table_region_value = manager + .table_region_manager() + .remove(table_id) + .await + .context(TableMetadataManagerSnafu)?; + info!("Removed TableRegionValue for table: {table_id}"); + + if let Some(table_region_value) = table_region_value { + for datanode_id in table_region_value.region_distribution.keys() { + manager + .datanode_table_manager() + .remove(*datanode_id, table_id) + .await + .context(TableMetadataManagerSnafu)?; + info!("Removed DatanodeTableValue for table: {table_id} on Datanode {datanode_id}"); + } } - let table_ref = self.data.table_ref(); - let table_id = self.data.task.table_id; - - let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); + manager + .table_name_manager() + .remove(TableNameKey::new( + table_ref.catalog, + table_ref.schema, + table_ref.table, + )) + .await + .context(TableMetadataManagerSnafu)?; + info!("Removed TableNameValue for table: {table_id}"); + + let table_route_key = TableRouteKey { + table_id, + catalog_name: table_ref.catalog, + schema_name: table_ref.schema, + table_name: table_ref.table, + }; let table_route_value = build_table_route_value(self.data.table_route.clone())?; // To protect the potential resource leak issues. @@ -95,10 +126,9 @@ impl DropTableProcedure { CompareOp::Equal, table_route_value.into(), )]) - .and_then(vec![ - TxnOp::Delete(table_route_key.to_string().into_bytes()), - TxnOp::Delete(table_global_key.to_string().into_bytes()), - ]); + .and_then(vec![TxnOp::Delete( + table_route_key.to_string().into_bytes(), + )]); let resp = self.context.kv_store.txn(txn).await?; ensure!( @@ -107,6 +137,7 @@ impl DropTableProcedure { msg: "table_route_value changed" } ); + info!("Removed TableRouteValue for table: {table_id}"); self.data.state = DropTableState::InvalidateTableCache; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 3c1b84969225..73268a4afe12 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,7 +27,6 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManagerRef; -use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -207,11 +206,7 @@ impl RegionFailoverManager { Ok(self .table_metadata_manager .table_region_manager() - .get_old(&TableName::new( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - )) + .get(table_ident.table_id) .await .context(TableMetadataManagerSnafu)? .is_some()) @@ -422,15 +417,10 @@ mod tests { .context .table_metadata_manager .table_region_manager() - .get_old(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "my_table", - )) + .get(1) .await .unwrap() .unwrap(); - let failed_datanode = value .region_distribution .iter() @@ -493,11 +483,7 @@ mod tests { .await; let table_region_value = table_metadata_manager .table_region_manager() - .get_old(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - table, - )) + .get(1) .await .unwrap() .unwrap(); @@ -532,7 +518,7 @@ mod tests { let selector_ctx = SelectorContext { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), - kv_store, + kv_store: kv_store.clone(), meta_peer_client, catalog: Some(DEFAULT_CATALOG_NAME.to_string()), schema: Some(DEFAULT_SCHEMA_NAME.to_string()), @@ -664,11 +650,7 @@ mod tests { .context .table_metadata_manager .table_region_manager() - .get_old(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "my_table", - )) + .get(1) .await .unwrap() .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index aafefec9d2ed..4c71a3baef09 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -17,7 +17,6 @@ use async_trait::async_trait; use common_meta::key::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; -use common_meta::table_name::TableName; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; @@ -27,7 +26,7 @@ use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; use crate::error::{ CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableMetadataManagerSnafu, - TableNotFoundSnafu, TableRouteConversionSnafu, + TableNotFoundSnafu, TableRouteConversionSnafu, UpdateTableMetadataSnafu, }; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; @@ -54,6 +53,8 @@ impl UpdateRegionMetadata { self.update_table_region_value(ctx, failed_region).await?; + self.update_region_placement(ctx, failed_region).await?; + self.update_table_route(ctx, failed_region).await?; ctx.dist_lock.unlock(key).await?; @@ -65,19 +66,14 @@ impl UpdateRegionMetadata { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result<()> { + let table_region_manager = ctx.table_metadata_manager.table_region_manager(); + let table_ident = &failed_region.table_ident; let table_id = table_ident.table_id; - let table_name = TableName::new( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - ); - let value = ctx - .table_metadata_manager - .table_region_manager() - .get_old(&table_name) + let value = table_region_manager + .get(table_id) .await - .context(TableMetadataManagerSnafu)? + .context(TableRouteConversionSnafu)? .with_context(|| TableNotFoundSnafu { name: table_ident.to_string(), })?; @@ -96,11 +92,13 @@ impl UpdateRegionMetadata { .or_insert_with(Vec::new); region_numbers.push(failed_region.region_number); - ctx.table_metadata_manager - .table_region_manager() - .put_old(&table_name, region_distribution.clone()) + table_region_manager + .compare_and_put(table_id, Some(value.clone()), region_distribution.clone()) .await - .context(TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)? + .map_err(|curr| UpdateTableMetadataSnafu { + err_msg: format!("region distribution is concurrently updating, expected '{value:?}' but actual: '{curr:?}'") + }.build())?; info!( "Region distribution of table (id = {table_id}) is updated to {:?}. \ @@ -110,6 +108,24 @@ impl UpdateRegionMetadata { Ok(()) } + async fn update_region_placement( + &self, + ctx: &RegionFailoverContext, + failed_region: &RegionIdent, + ) -> Result<()> { + ctx.table_metadata_manager + .datanode_table_manager() + .move_region( + failed_region.datanode_id, + self.candidate.id, + failed_region.table_ident.table_id, + failed_region.region_number, + ) + .await + .context(TableMetadataManagerSnafu)?; + Ok(()) + } + async fn update_table_route( &self, ctx: &RegionFailoverContext, @@ -240,7 +256,7 @@ mod tests { } #[tokio::test] - async fn test_update_table_info_value() { + async fn test_update_table_region_value() { common_telemetry::init_default_ut_logging(); async fn test( @@ -256,15 +272,10 @@ mod tests { .await .unwrap(); - let table_ident = failed_region.table_ident; env.context .table_metadata_manager .table_region_manager() - .get_old(&TableName::new( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - )) + .get(failed_region.table_ident.table_id) .await .unwrap() .unwrap() @@ -514,11 +525,10 @@ mod tests { assert_eq!(peers.len(), 2); assert_eq!(actual, expected); - let map = env - .context - .table_metadata_manager + let manager = &env.context.table_metadata_manager; + let map = manager .table_region_manager() - .get_old(&TableName::new(&catalog_name, &schema_name, &table_name)) + .get(table_id) .await .unwrap() .unwrap() @@ -526,6 +536,21 @@ mod tests { assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![3, 1])); assert_eq!(map.get(&3), Some(&vec![4, 2])); + + // test DatanodeTableValues matches the table region distribution + let datanode_table_manager = manager.datanode_table_manager(); + let tables = datanode_table_manager.tables(1).await.unwrap(); + assert!(tables.is_empty()); + + let tables = datanode_table_manager.tables(2).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].table_id, 1); + assert_eq!(tables[0].regions, vec![3, 1]); + + let tables = datanode_table_manager.tables(3).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].table_id, 1); + assert_eq!(tables[0].regions, vec![4, 2]); } } } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 16cfa7f17a9b..302b2ac7b6d5 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -13,14 +13,10 @@ // limitations under the License. use api::v1::meta::TableRouteValue; -use common_meta::helper::TableGlobalKey; -use common_meta::key::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; use common_procedure::error::Error as ProcedureError; use snafu::{location, Location, ResultExt}; -use table::engine::TableReference; -use table::metadata::TableId; use crate::error::{self, Error, Result}; @@ -35,26 +31,6 @@ pub fn build_table_route_value(table_route: TableRoute) -> Result, - table_id: TableId, -) -> (TableGlobalKey, TableRouteKey) { - let table_route_key = TableRouteKey { - table_id, - catalog_name: table_ref.catalog, - schema_name: table_ref.schema, - table_name: table_ref.table, - }; - - let table_global_key = TableGlobalKey { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - }; - - (table_global_key, table_route_key) -} - pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::error::Error) -> Error { move |err| { if matches!(err, client::error::Error::FlightGet { .. }) { diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 8530b589f616..c42edf1cd365 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -88,7 +88,7 @@ impl HttpHandler for TablesHandler { let tables = self .table_metadata_manager .table_name_manager() - .tables_old(catalog, schema) + .tables(catalog, schema) .await .context(TableMetadataManagerSnafu)?; @@ -120,16 +120,16 @@ impl HttpHandler for TableHandler { let table_id = self .table_metadata_manager .table_name_manager() - .get_old(&key) + .get(key) .await .context(TableMetadataManagerSnafu)? .map(|x| x.table_id()); - if let Some(_table_id) = table_id { + if let Some(table_id) = table_id { let table_info_value = self .table_metadata_manager .table_info_manager() - .get_old(&key.into()) + .get(table_id) .await .context(TableMetadataManagerSnafu)? .map(|x| format!("{x:?}")) @@ -137,11 +137,11 @@ impl HttpHandler for TableHandler { result.insert("table_info_value", table_info_value); } - if let Some(_table_id) = table_id { + if let Some(table_id) = table_id { let table_region_value = self .table_metadata_manager .table_region_manager() - .get_old(&key.into()) + .get(table_id) .await .context(TableMetadataManagerSnafu)? .map(|x| format!("{x:?}")) @@ -206,10 +206,9 @@ async fn get_keys_by_prefix(key_prefix: String, kv_store: &KvStoreRef) -> Result mod tests { use std::sync::Arc; - use common_meta::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, - SchemaKey, TableGlobalKey, - }; + use common_meta::helper::{build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey}; + use common_meta::key::table_name::TableNameKey; + use common_meta::key::TableMetaKey; use common_meta::rpc::store::PutRequest; use crate::service::admin::meta::get_keys_by_prefix; @@ -247,19 +246,11 @@ mod tests { .await .is_ok()); - let table1 = TableGlobalKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - }; - let table2 = TableGlobalKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: "test_table1".to_string(), - }; + let table1 = TableNameKey::new(catalog_name, schema_name, table_name); + let table2 = TableNameKey::new(catalog_name, schema_name, "test_table1"); assert!(in_mem .put(PutRequest { - key: table1.to_string().as_bytes().to_vec(), + key: table1.as_raw_key(), value: "".as_bytes().to_vec(), prev_kv: false, }) @@ -267,7 +258,7 @@ mod tests { .is_ok()); assert!(in_mem .put(PutRequest { - key: table2.to_string().as_bytes().to_vec(), + key: table2.as_raw_key(), value: "".as_bytes().to_vec(), prev_kv: false, }) @@ -281,7 +272,10 @@ mod tests { .await .unwrap(); let table_key = get_keys_by_prefix( - build_table_global_prefix(table1.catalog_name, table1.schema_name), + format!( + "{}/", + TableNameKey::prefix_to_table(table1.catalog, table1.schema) + ), &in_mem, ) .await diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 5d0ca6b914a9..f3874b6626ec 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -17,7 +17,6 @@ use api::v1::meta::{ Table, TableId, TableRoute, }; use common_grpc_expr::alter_expr_to_request; -use common_meta::helper::TableGlobalKey; use common_meta::key::TableRouteKey; use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DdlTask, DropTableTask}; use common_meta::rpc::router; @@ -30,10 +29,10 @@ use tonic::{Request, Response}; use super::store::kv::KvStoreRef; use super::GrpcResult; use crate::ddl::DdlManagerRef; -use crate::error::{self, Result}; +use crate::error::{self, Result, TableMetadataManagerSnafu}; use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef}; use crate::sequence::SequenceRef; -use crate::table_routes::{get_table_global_value, get_table_route_value}; +use crate::table_routes::get_table_route_value; #[async_trait::async_trait] impl ddl_task_server::DdlTask for MetaSrv { @@ -84,7 +83,6 @@ impl ddl_task_server::DdlTask for MetaSrv { handle_alter_table_task( header.cluster_id, alter_table_task, - self.kv_store().clone(), self.ddl_manager().clone(), ) .await? @@ -248,7 +246,6 @@ async fn handle_drop_table_task( async fn handle_alter_table_task( cluster_id: u64, mut alter_table_task: AlterTableTask, - kv_store: KvStoreRef, ddl_manager: DdlManagerRef, ) -> Result { let table_id = alter_table_task @@ -266,19 +263,16 @@ async fn handle_alter_table_task( let table_ref = alter_table_task.table_ref(); - let table_global_key = TableGlobalKey { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - }; - - let table_global_value = get_table_global_value(&kv_store, &table_global_key) - .await? + let table_info_value = ddl_manager + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(TableMetadataManagerSnafu)? .with_context(|| error::TableNotFoundSnafu { name: table_ref.to_string(), })?; - - let table_info = table_global_value.table_info; + let table_info = table_info_value.table_info; // Sets alter_table's table_version alter_table_task.alter_table.table_version = table_info.ident.version; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 0f57a762e38f..6799bae848cc 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -12,18 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::meta::{ router_server, Peer, PeerDict, ResponseHeader, RouteRequest, RouteResponse, TableRoute, TableRouteValue, }; -use common_meta::helper::TableGlobalValue; use common_meta::key::table_info::TableInfoValue; -use common_meta::table_name::TableName; +use common_meta::key::table_region::RegionDistribution; use common_telemetry::timer; use snafu::{OptionExt, ResultExt}; -use table::metadata::RawTableInfo; use tonic::{Request, Response}; use crate::error; @@ -54,10 +50,9 @@ impl router_server::Router for MetaSrv { } } -pub(crate) fn create_table_global_value( +pub(crate) fn create_region_distribution( table_route_value: &TableRouteValue, - table_info: RawTableInfo, -) -> Result { +) -> Result { let peers = &table_route_value.peers; let region_routes = &table_route_value .table_route @@ -67,9 +62,7 @@ pub(crate) fn create_table_global_value( })? .region_routes; - let node_id = peers[region_routes[0].leader_peer_index as usize].id; - - let mut regions_id_map = HashMap::with_capacity(region_routes.len()); + let mut regions_id_map = RegionDistribution::new(); for route in region_routes.iter() { let node_id = peers[route.leader_peer_index as usize].id; let region_id = route @@ -84,27 +77,15 @@ pub(crate) fn create_table_global_value( .or_insert_with(Vec::new) .push(region_id); } - - Ok(TableGlobalValue { - node_id, - regions_id_map, - table_info, - }) + Ok(regions_id_map) } async fn handle_route(req: RouteRequest, ctx: Context) -> Result { - let RouteRequest { - header, - table_names, - table_ids: _, - } = req; + let RouteRequest { header, table_ids } = req; let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - let table_names = table_names - .into_iter() - .map(Into::into) - .collect::>(); - let tables = fetch_tables(&ctx, table_names).await?; + let table_ids = table_ids.iter().map(|x| x.id).collect::>(); + let tables = fetch_tables(&ctx, table_ids).await?; let (peers, table_routes) = fill_table_routes(tables)?; diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 8730d67af45f..123b112ceff9 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -13,30 +13,18 @@ // limitations under the License. use api::v1::meta::TableRouteValue; -use common_meta::helper::{TableGlobalKey, TableGlobalValue}; use common_meta::key::table_info::TableInfoValue; -use common_meta::key::TableRouteKey; +use common_meta::key::{TableMetadataManagerRef, TableRouteKey}; use common_meta::rpc::store::PutRequest; -use common_meta::table_name::TableName; use snafu::{OptionExt, ResultExt}; -use table::engine::TableReference; +use table::metadata::TableId; use crate::error::{ - DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableMetadataManagerSnafu, - TableRouteNotFoundSnafu, + DecodeTableRouteSnafu, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu, }; use crate::metasrv::Context; use crate::service::store::kv::KvStoreRef; -pub async fn get_table_global_value( - kv_store: &KvStoreRef, - key: &TableGlobalKey, -) -> Result> { - let kv = kv_store.get(&key.to_raw_key()).await?; - kv.map(|kv| TableGlobalValue::from_bytes(kv.value).context(InvalidCatalogValueSnafu)) - .transpose() -} - pub(crate) async fn get_table_route_value( kv_store: &KvStoreRef, key: &TableRouteKey<'_>, @@ -64,65 +52,45 @@ pub(crate) async fn put_table_route_value( Ok(()) } -pub(crate) fn table_route_key(table_id: u32, t: &TableGlobalKey) -> TableRouteKey<'_> { - TableRouteKey { - table_id, - catalog_name: &t.catalog_name, - schema_name: &t.schema_name, - table_name: &t.table_name, - } -} - pub(crate) async fn fetch_table( kv_store: &KvStoreRef, - table_ref: TableReference<'_>, -) -> Result> { - let tgk = TableGlobalKey { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), + table_metadata_manager: &TableMetadataManagerRef, + table_id: TableId, +) -> Result> { + let Some(table_info_value) = table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(TableMetadataManagerSnafu)? else { + return Ok(None); }; - let tgv = get_table_global_value(kv_store, &tgk).await?; - - if let Some(tgv) = tgv { - let trk = table_route_key(tgv.table_id(), &tgk); - let trv = get_table_route_value(kv_store, &trk).await?; - - return Ok(Some((tgv, trv))); - } + let table_info = &table_info_value.table_info; + let trk = TableRouteKey { + table_id, + catalog_name: &table_info.catalog_name, + schema_name: &table_info.schema_name, + table_name: &table_info.name, + }; + let table_route_value = get_table_route_value(kv_store, &trk).await?; - Ok(None) + Ok(Some((table_info_value, table_route_value))) } pub(crate) async fn fetch_tables( ctx: &Context, - table_names: Vec, + table_ids: Vec, ) -> Result> { let kv_store = &ctx.kv_store; + let table_metadata_manager = &ctx.table_metadata_manager; let mut tables = vec![]; // Maybe we can optimize the for loop in the future, but in general, // there won't be many keys, in fact, there is usually just one. - for table_name in table_names { - let Some(tgv) = ctx.table_metadata_manager - .table_info_manager() - .get_old(&table_name) - .await - .context(TableMetadataManagerSnafu)? else { - continue; - }; - let table_info = &tgv.table_info; - - let trk = TableRouteKey { - table_id: table_info.ident.table_id, - catalog_name: &table_info.catalog_name, - schema_name: &table_info.schema_name, - table_name: &table_info.name, - }; - let trv = get_table_route_value(kv_store, &trk).await?; - - tables.push((tgv, trv)); + for table_id in table_ids { + if let Some(x) = fetch_table(kv_store, table_metadata_manager, table_id).await? { + tables.push(x); + } } Ok(tables) @@ -138,6 +106,7 @@ pub(crate) mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::key::table_region::RegionDistribution; use common_meta::key::TableMetadataManagerRef; + use common_meta::table_name::TableName; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -176,7 +145,7 @@ pub(crate) mod tests { }; table_metadata_manager .table_info_manager() - .put_old(table_info) + .create(1, &table_info) .await .unwrap(); @@ -185,14 +154,21 @@ pub(crate) mod tests { // 1 => 1, 2 // 2 => 3 // 3 => 4 + let region_distribution = + RegionDistribution::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]); table_metadata_manager .table_region_manager() - .put_old( - &TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), - RegionDistribution::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]), - ) + .create(1, ®ion_distribution) .await .unwrap(); + + for (datanode_id, regions) in region_distribution { + table_metadata_manager + .datanode_table_manager() + .create(datanode_id, 1, regions) + .await + .unwrap(); + } } pub(crate) async fn prepare_table_route_value<'a>( diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index d43def5e7ed5..fd3710f178d0 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -20,6 +20,7 @@ use common_query::prelude::Expr; use datafusion_common::ScalarValue; use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::TableId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -33,26 +34,26 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Failed to find Datanode, table: {} region: {:?}", table, region))] + #[snafu(display("Failed to find Datanode, table id: {}, region: {}", table_id, region))] FindDatanode { - table: String, + table_id: TableId, region: RegionNumber, location: Location, }, - #[snafu(display("Failed to find table routes for table {}", table_name))] + #[snafu(display("Failed to find table routes for table id {}", table_id))] FindTableRoutes { - table_name: String, + table_id: TableId, location: Location, }, #[snafu(display( "Failed to find region routes for table {}, region id: {}", - table_name, + table_id, region_id ))] FindRegionRoutes { - table_name: String, + table_id: TableId, region_id: u64, location: Location, }, @@ -111,13 +112,9 @@ pub enum Error { #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] InvalidDeleteRequest { reason: String, location: Location }, - #[snafu(display( - "Invalid table route data in meta, table name: {}, msg: {}", - table_name, - err_msg - ))] + #[snafu(display("Invalid table route data, table id: {}, msg: {}", table_id, err_msg))] InvalidTableRouteData { - table_name: String, + table_id: TableId, err_msg: String, location: Location, }, @@ -133,9 +130,9 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("Failed to find leader of table {} region {}", table_name, region_id))] + #[snafu(display("Failed to find leader of table id {} region {}", table_id, region_id))] FindLeader { - table_name: String, + table_id: TableId, region_id: RegionId, location: Location, }, diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index ae898c4e6fbf..699d643527b0 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -17,13 +17,13 @@ use std::sync::Arc; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; -use common_meta::table_name::TableName; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; use datatypes::schema::Schema; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::TableId; use table::requests::InsertRequest; use crate::columns::RangeColumnsPartitionRule; @@ -36,7 +36,7 @@ use crate::{error, PartitionRuleRef}; #[async_trait::async_trait] pub trait TableRouteCacheInvalidator: Send + Sync { - async fn invalidate_table_route(&self, table: &TableName); + async fn invalidate_table_route(&self, table: TableId); } pub type TableRouteCacheInvalidatorRef = Arc; @@ -59,7 +59,7 @@ pub struct PartitionInfo { #[async_trait::async_trait] impl TableRouteCacheInvalidator for PartitionRuleManager { - async fn invalidate_table_route(&self, table: &TableName) { + async fn invalidate_table_route(&self, table: TableId) { self.table_routes.invalidate_table_route(table).await } } @@ -74,23 +74,23 @@ impl PartitionRuleManager { } /// Find table route of given table name. - pub async fn find_table_route(&self, table: &TableName) -> Result> { + pub async fn find_table_route(&self, table: TableId) -> Result> { self.table_routes.get_route(table).await } /// Find datanodes of corresponding regions of given table. pub async fn find_region_datanodes( &self, - table: &TableName, + table_id: TableId, regions: Vec, ) -> Result>> { - let route = self.table_routes.get_route(table).await?; + let route = self.table_routes.get_route(table_id).await?; let mut datanodes = HashMap::with_capacity(regions.len()); for region in regions.iter() { let datanode = route .find_region_leader(*region) .context(error::FindDatanodeSnafu { - table: table.to_string(), + table_id, region: *region, })?; datanodes @@ -102,38 +102,36 @@ impl PartitionRuleManager { } /// Find all leader peers of given table. - pub async fn find_table_region_leaders(&self, table: &TableName) -> Result> { - let route = self.table_routes.get_route(table).await?; + pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result> { + let route = self.table_routes.get_route(table_id).await?; let mut peers = Vec::with_capacity(route.region_routes.len()); for peer in &route.region_routes { peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu { region_id: peer.region.id, - table_name: table.to_string(), + table_id, })?); } Ok(peers) } - pub async fn find_table_partitions(&self, table: &TableName) -> Result> { - let route = self.table_routes.get_route(table).await?; + pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { + let route = self.table_routes.get_route(table_id).await?; ensure!( !route.region_routes.is_empty(), - error::FindTableRoutesSnafu { - table_name: table.to_string() - } + error::FindTableRoutesSnafu { table_id } ); let mut partitions = Vec::with_capacity(route.region_routes.len()); for r in route.region_routes.iter() { - let partition = - r.region - .partition - .clone() - .with_context(|| error::FindRegionRoutesSnafu { - region_id: r.region.id, - table_name: table.to_string(), - })?; + let partition = r + .region + .partition + .clone() + .context(error::FindRegionRoutesSnafu { + region_id: r.region.id, + table_id, + })?; let partition_def = PartitionDef::try_from(partition)?; partitions.push(PartitionInfo { @@ -152,7 +150,7 @@ impl PartitionRuleManager { .windows(2) .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()), error::InvalidTableRouteDataSnafu { - table_name: table.to_string(), + table_id, err_msg: "partition columns of all regions are not the same" } ); @@ -161,14 +159,14 @@ impl PartitionRuleManager { } /// Get partition rule of given table. - pub async fn find_table_partition_rule(&self, table: &TableName) -> Result { - let partitions = self.find_table_partitions(table).await?; + pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result { + let partitions = self.find_table_partitions(table_id).await?; let partition_columns = partitions[0].partition.partition_columns(); ensure!( !partition_columns.is_empty(), error::InvalidTableRouteDataSnafu { - table_name: table.to_string(), + table_id, err_msg: "no partition columns found" } ); @@ -247,7 +245,7 @@ impl PartitionRuleManager { /// of given table. pub async fn split_insert_request( &self, - table: &TableName, + table: TableId, req: InsertRequest, schema: &Schema, ) -> Result { diff --git a/src/partition/src/route.rs b/src/partition/src/route.rs index 13343069b15b..151b15ef10c0 100644 --- a/src/partition/src/route.rs +++ b/src/partition/src/route.rs @@ -16,20 +16,19 @@ use std::sync::Arc; use std::time::Duration; use common_meta::rpc::router::{RouteRequest, TableRoute}; -use common_meta::table_name::TableName; use common_telemetry::timer; use meta_client::client::MetaClient; use moka::future::{Cache, CacheBuilder}; use snafu::{ensure, ResultExt}; +use table::metadata::TableId; use crate::error::{self, Result}; use crate::metrics; -type TableRouteCache = Cache>; +type TableRouteCache = Cache>; pub struct TableRoutes { meta_client: Arc, - // TODO(LFC): Use table id as cache key, then remove all the manually invoked cache invalidations. cache: TableRouteCache, } @@ -45,11 +44,11 @@ impl TableRoutes { } } - pub async fn get_route(&self, table_name: &TableName) -> Result> { + pub async fn get_route(&self, table_id: TableId) -> Result> { let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET); self.cache - .try_get_with_by_ref(table_name, self.get_from_meta(table_name)) + .try_get_with_by_ref(&table_id, self.get_from_meta(table_id)) .await .map_err(|e| { error::GetCacheSnafu { @@ -59,33 +58,30 @@ impl TableRoutes { }) } - async fn get_from_meta(&self, table_name: &TableName) -> Result> { + async fn get_from_meta(&self, table_id: TableId) -> Result> { let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET_REMOTE); let mut resp = self .meta_client .route(RouteRequest { - table_names: vec![table_name.clone()], - table_ids: vec![], + table_ids: vec![table_id], }) .await .context(error::RequestMetaSnafu)?; ensure!( !resp.table_routes.is_empty(), - error::FindTableRoutesSnafu { - table_name: table_name.to_string() - } + error::FindTableRoutesSnafu { table_id } ); let route = resp.table_routes.swap_remove(0); Ok(Arc::new(route)) } - pub async fn insert_table_route(&self, table_name: TableName, table_route: Arc) { - self.cache.insert(table_name, table_route).await + pub async fn insert_table_route(&self, table_id: TableId, table_route: Arc) { + self.cache.insert(table_id, table_route).await } - pub async fn invalidate_table_route(&self, table_name: &TableName) { - self.cache.invalidate(table_name).await + pub async fn invalidate_table_route(&self, table_id: TableId) { + self.cache.invalidate(&table_id).await } pub fn cache(&self) -> &TableRouteCache { diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 3003d6f4360e..020f8a326f62 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -17,9 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; +use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use common_base::bytes::Bytes; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_meta::peer::Peer; use common_meta::table_name::TableName; use datafusion::common::Result; @@ -31,27 +32,31 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, Visit use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use partition::manager::PartitionRuleManager; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::error; +use crate::error::{CatalogSnafu, TableNotFoundSnafu}; pub struct DistExtensionPlanner { partition_manager: Arc, clients: Arc, + catalog_manager: CatalogManagerRef, } impl DistExtensionPlanner { pub fn new( partition_manager: Arc, clients: Arc, + catalog_manager: CatalogManagerRef, ) -> Self { Self { partition_manager, clients, + catalog_manager, } } } @@ -84,6 +89,14 @@ impl ExtensionPlanner for DistExtensionPlanner { .await .map(Some); }; + + if table_name.schema_name == INFORMATION_SCHEMA_NAME { + return planner + .create_physical_plan(input_plan, session_state) + .await + .map(Some); + } + let input_schema = input_plan.schema().clone(); let input_plan = self.set_table_name(&table_name, input_plan.clone())?; let substrait_plan: Bytes = DFLogicalSubstraitConvertor @@ -130,8 +143,22 @@ impl DistExtensionPlanner { } async fn get_peers(&self, table_name: &TableName) -> Result> { + let table = self + .catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table: table_name.to_string(), + })?; + let table_id = table.table_info().table_id(); + self.partition_manager - .find_table_region_leaders(table_name) + .find_table_region_leaders(table_id) .await .with_context(|_| error::RoutePartitionSnafu { table: table_name.clone(), diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index ca123fe1ccb0..deff2533fc41 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -113,6 +113,7 @@ impl QueryEngineState { .with_query_planner(Arc::new(DfQueryPlanner::new( partition_manager, datanode_clients, + catalog_list.clone(), ))) .with_optimizer_rules(optimizer.rules) .with_physical_optimizer_rules(physical_optimizers); @@ -221,12 +222,13 @@ impl DfQueryPlanner { fn new( partition_manager: Option>, datanode_clients: Option>, + catalog_manager: CatalogManagerRef, ) -> Self { let mut planners: Vec> = vec![Arc::new(PromExtensionPlanner)]; if let Some(partition_manager) = partition_manager && let Some(datanode_clients) = datanode_clients { - planners.push(Arc::new(DistExtensionPlanner::new(partition_manager, datanode_clients))); + planners.push(Arc::new(DistExtensionPlanner::new(partition_manager, datanode_clients, catalog_manager))); } Self { physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners), diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index d001b5e30841..5b46532bd97e 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -25,8 +25,7 @@ mod test { CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, }; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use common_meta::table_name::TableName; + use common_catalog::consts::MITO_ENGINE; use common_query::Output; use common_recordbatch::RecordBatches; use frontend::instance::Instance; @@ -338,11 +337,7 @@ CREATE TABLE {table_name} ( let table_region_value = instance .table_metadata_manager() .table_region_manager() - .get_old(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - table_name, - )) + .get(table_id) .await .unwrap() .unwrap(); @@ -606,17 +601,23 @@ CREATE TABLE {table_name} ( table_name: &str, expected_distribution: HashMap, ) { + let table = instance + .frontend() + .catalog_manager() + .table("greptime", "public", table_name) + .await + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + let table_id = table.table_info().ident.table_id; let table_region_value = instance .table_metadata_manager() .table_region_manager() - .get_old(&TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - table_name, - )) + .get(table_id) .await .unwrap() .unwrap(); + let region_to_dn_map = table_region_value .region_distribution .iter() diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index de9ffd8815fb..c73dfdc85ab9 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -21,7 +21,7 @@ mod tests { use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::table_name::TableName; + use common_meta::key::table_name::TableNameKey; use common_query::Output; use common_recordbatch::RecordBatches; use frontend::error::{self, Error, Result}; @@ -177,16 +177,24 @@ mod tests { instance: &MockDistributedInstance, expected_distribution: HashMap, ) { - let table_region_value = instance - .table_metadata_manager() - .table_region_manager() - .get_old(&TableName::new( + let manager = instance.table_metadata_manager(); + let table_id = manager + .table_name_manager() + .get(TableNameKey::new( DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo", )) .await .unwrap() + .unwrap() + .table_id(); + + let table_region_value = manager + .table_region_manager() + .get(table_id) + .await + .unwrap() .unwrap(); let region_to_dn_map = table_region_value .region_distribution diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 917f81ec2b8e..f461c20d5a9c 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; use catalog::remote::CachedMetaKvBackend; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use common_meta::helper::TableGlobalKey; use common_meta::ident::TableIdent; +use common_meta::key::table_name::{TableNameKey, TableNameValue}; +use common_meta::key::table_region::RegionDistribution; +use common_meta::key::TableMetaKey; use common_meta::rpc::router::TableRoute; use common_meta::rpc::KeyValue; -use common_meta::table_name::TableName; use common_meta::RegionIdent; use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; @@ -35,9 +35,9 @@ use meta_srv::error::Result as MetaResult; use meta_srv::metasrv::{SelectorContext, SelectorRef}; use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure}; use meta_srv::selector::{Namespace, Selector}; -use meta_srv::table_routes; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; +use table::metadata::TableId; use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; use tests_integration::test_util::{check_output_stream, get_test_store_config, StorageType}; use tokio::time; @@ -104,25 +104,14 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(matches!(result.unwrap(), Output::AffectedRows(1))); } - let cache_key = TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "my_table".to_string(), - } - .to_string(); - - let table_name = TableName { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "my_table".to_string(), - }; + let cache_key = TableNameKey::new("greptime", "public", "my_table").as_raw_key(); let cache = get_table_cache(&frontend, &cache_key).unwrap(); - let _ = cache.unwrap(); - let route_cache = get_route_cache(&frontend, &table_name); - let _ = route_cache.unwrap(); + let table_name_value = TableNameValue::try_from_raw_value(cache.unwrap().value).unwrap(); + let table_id = table_name_value.table_id(); + assert!(get_route_cache(&frontend, table_id).is_some()); - let distribution = find_region_distribution(&cluster).await; + let distribution = find_region_distribution(&cluster, table_id).await; info!("Find region distribution: {distribution:?}"); let mut foreign = 0; @@ -145,15 +134,13 @@ pub async fn test_region_failover(store_type: StorageType) { run_region_failover_procedure(&cluster, failed_region.clone(), selector).await; - let distribution = find_region_distribution(&cluster).await; + let distribution = find_region_distribution(&cluster, table_id).await; info!("Find region distribution again: {distribution:?}"); // Waits for invalidating table cache time::sleep(Duration::from_millis(100)).await; - let cache = get_table_cache(&frontend, &cache_key); - assert!(cache.unwrap().is_none()); - let route_cache = get_route_cache(&frontend, &table_name); + let route_cache = get_route_cache(&frontend, table_id); assert!(route_cache.is_none()); // Inserts data to each datanode after failover @@ -175,7 +162,7 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(success) } -fn get_table_cache(instance: &Arc, key: &str) -> Option> { +fn get_table_cache(instance: &Arc, key: &[u8]) -> Option> { let catalog_manager = instance .catalog_manager() .as_any() @@ -190,10 +177,10 @@ fn get_table_cache(instance: &Arc, key: &str) -> Option, table_name: &TableName) -> Option> { +fn get_route_cache(instance: &Arc, table_id: TableId) -> Option> { let catalog_manager = instance .catalog_manager() .as_any() @@ -201,7 +188,7 @@ fn get_route_cache(instance: &Arc, table_name: &TableName) -> Option, ts: u64) -> Vec> { @@ -268,20 +255,50 @@ CREATE TABLE my_table ( result.get(0).unwrap().as_ref().unwrap(); } -async fn find_region_distribution(cluster: &GreptimeDbCluster) -> HashMap> { - let key = TableGlobalKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "my_table".to_string(), - }; - let value = table_routes::get_table_global_value(&cluster.kv_store, &key) +async fn find_region_distribution( + cluster: &GreptimeDbCluster, + table_id: TableId, +) -> RegionDistribution { + let manager = cluster.meta_srv.table_metadata_manager(); + let region_distribution = manager + .table_region_manager() + .get(table_id) .await .unwrap() - .unwrap(); - value.regions_id_map + .unwrap() + .region_distribution; + + // test DatanodeTableValues match the table region distribution + for datanode_id in cluster.datanode_instances.keys() { + let mut actual = manager + .datanode_table_manager() + .tables(*datanode_id) + .await + .unwrap() + .into_iter() + .filter_map(|x| { + if x.table_id == table_id { + Some(x.regions) + } else { + None + } + }) + .flatten() + .collect::>(); + actual.sort(); + + if let Some(mut expected) = region_distribution.get(datanode_id).cloned() { + expected.sort(); + assert_eq!(expected, actual); + } else { + assert!(actual.is_empty()); + } + } + + region_distribution } -fn choose_failed_region(distribution: HashMap>) -> RegionIdent { +fn choose_failed_region(distribution: RegionDistribution) -> RegionIdent { let (failed_datanode, failed_region) = distribution .iter() .filter_map(|(datanode_id, regions)| {