From 06c2d285f0215832ccca9095b7f9a45a83a724e8 Mon Sep 17 00:00:00 2001 From: mag1c1an1 Date: Mon, 29 Jan 2024 11:35:02 +0800 Subject: [PATCH] [Rust] fix catalog unittest Signed-off-by: mag1c1an1 --- .github/workflows/maven-test.yml | 7 +-- .github/workflows/rust-ci.yml | 1 - .../src/catalog/lakesoul_catalog.rs | 16 ++---- .../src/catalog/lakesoul_namespace.rs | 39 ++++++++++---- .../datasource/file_format/metadata_format.rs | 12 +++-- .../src/lakesoul_table/mod.rs | 4 +- .../src/test/catalog_tests.rs | 51 +++++++++++-------- rust/lakesoul-io/src/datasource/listing.rs | 4 +- rust/lakesoul-metadata/src/metadata_client.rs | 13 ++++- 9 files changed, 87 insertions(+), 60 deletions(-) diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index 269ca83ef..ceb1787d8 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -96,7 +96,6 @@ jobs: - name: Init PG run: | ./script/meta_init_for_local_test.sh -j 2 - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Install Protoc uses: arduino/setup-protoc@v2 with: @@ -165,7 +164,6 @@ jobs: - name: Init PG run: | ./script/meta_init_for_local_test.sh -j 2 - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Install Protoc uses: arduino/setup-protoc@v2 with: @@ -234,7 +232,6 @@ jobs: - name: Init PG run: | ./script/meta_init_for_local_test.sh -j 1 - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Init PG RBAC run: | ./script/meta_rbac_init_for_local_test.sh -j 1 @@ -315,7 +312,6 @@ jobs: - name: Init PG run: | PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql lakesoul_test - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Install Protoc uses: arduino/setup-protoc@v2 with: @@ -384,7 +380,6 @@ jobs: - name: Init PG run: | PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql lakesoul_test - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Init PG RBAC ROW POLICY run: | PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_rbac_init.sql lakesoul_test @@ -432,4 +427,4 @@ jobs: name: maven-test-report-artifact-flink-1 path: lakesoul-flink/target/site retention-days: 5 - if-no-files-found: error \ No newline at end of file + if-no-files-found: error diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index ac9b33e33..9dda4e230 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -54,7 +54,6 @@ jobs: - name: Init PG run: | ./script/meta_init_for_local_test.sh -j 2 - PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';" - name: Install Protoc uses: arduino/setup-protoc@v2 with: diff --git a/rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs b/rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs index 255745787..167b096b0 100644 --- a/rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs +++ b/rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs @@ -5,7 +5,7 @@ use crate::catalog::LakeSoulNamespace; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::CatalogProvider; -use datafusion::error::DataFusionError; +use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; use lakesoul_metadata::MetaDataClientRef; use proto::proto::entity::Namespace; @@ -83,11 +83,7 @@ impl CatalogProvider for LakeSoulCatalog { /// /// If a schema of the same name existed before, it is replaced in /// the catalog and returned. - fn register_schema( - &self, - name: &str, - _schema: Arc, - ) -> lakesoul_io::lakesoul_io_config::Result>> { + fn register_schema(&self, name: &str, _schema: Arc) -> Result>> { let _guard = self.catalog_lock.write(); let client = self.metadata_client.clone(); let schema: Option> = { @@ -124,11 +120,7 @@ impl CatalogProvider for LakeSoulCatalog { /// /// Implementations of this method should return None if schema with `name` /// does not exist. - fn deregister_schema( - &self, - _name: &str, - _cascade: bool, - ) -> lakesoul_io::lakesoul_io_config::Result>> { + fn deregister_schema(&self, _name: &str, _cascade: bool) -> Result>> { // Not supported // let _guard = self.catalog_lock.write(); // let client = self.metadata_client.clone(); @@ -153,4 +145,4 @@ impl CatalogProvider for LakeSoulCatalog { // return Ok(None); Err(DataFusionError::NotImplemented("Not supported".into())) } -} \ No newline at end of file +} diff --git a/rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs b/rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs index cd1659f75..85ad280c6 100644 --- a/rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs +++ b/rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs @@ -3,12 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::catalog::create_io_config_builder; -use crate::error::Result; use async_trait::async_trait; use datafusion::catalog::schema::SchemaProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; +use datafusion::error::Result; use datafusion::prelude::SessionContext; use lakesoul_io::datasource::file_format::LakeSoulParquetFormat; use lakesoul_io::datasource::listing::LakeSoulListingTable; @@ -83,10 +83,13 @@ impl SchemaProvider for LakeSoulNamespace { Handle::current() .spawn(async move { let _guard = lock.read().await; - client.get_all_table_name_id_by_namespace(&np).await.unwrap() + client + .get_all_table_name_id_by_namespace(&np) + .await + .expect("get all table name failed") }) .await - .unwrap() + .expect("spawn failed") }) .into_iter() .map(|v| v.table_name) @@ -102,6 +105,7 @@ impl SchemaProvider for LakeSoulNamespace { .get_table_info_by_table_name(name, &self.namespace) .await { + debug!("call table() on table: {}.{}", &self.namespace, name); let config; if let Ok(config_builder) = create_io_config_builder(self.metadata_client.clone(), Some(name), true, self.namespace()).await @@ -138,18 +142,14 @@ impl SchemaProvider for LakeSoulNamespace { /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. #[allow(unused_variables)] - fn register_table( - &self, - name: String, - table: Arc, - ) -> lakesoul_io::lakesoul_io_config::Result>> { + fn register_table(&self, name: String, table: Arc) -> Result>> { // the type info of dyn TableProvider is not enough or use AST?????? unimplemented!("schema provider does not support registering tables") } /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). #[allow(unused_variables)] - fn deregister_table(&self, name: &str) -> lakesoul_io::lakesoul_io_config::Result>> { + fn deregister_table(&self, name: &str) -> Result>> { let client = self.metadata_client.clone(); let table_name = name.to_string(); let np = self.namespace.clone(); @@ -201,12 +201,29 @@ impl SchemaProvider for LakeSoulNamespace { } }) .await - .unwrap() + .map_err(|e| DataFusionError::External(Box::new(e)))? }) } fn table_exist(&self, name: &str) -> bool { // table name is primary key for `table_name_id` - self.table_names().into_iter().any(|s| s == name) + let client = self.metadata_client.clone(); + let np = self.namespace.clone(); + let lock = self.namespace_lock.clone(); + futures::executor::block_on(async move { + Handle::current() + .spawn(async move { + let _guard = lock.read().await; + client + .get_all_table_name_id_by_namespace(&np) + .await + .expect("get table name failed") + }) + .await + .expect("spawn failed") + }) + .into_iter() + .map(|v| v.table_name) + .any(|s| s == name) } } diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index ba4cf8ec6..9caf65fc9 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -36,6 +36,7 @@ use rand::distributions::DistString; use tokio::sync::Mutex; use tokio::task::JoinHandle; +use tracing::debug; use crate::catalog::commit_data; use crate::lakesoul_table::helpers::{create_io_config_builder_from_table_info, get_columnar_value}; @@ -148,7 +149,7 @@ pub struct LakeSoulHashSinkExec { metadata_client: MetaDataClientRef, } -impl fmt::Debug for LakeSoulHashSinkExec { +impl Debug for LakeSoulHashSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "LakeSoulHashSinkExec schema: {:?}", self.count_schema) } @@ -267,13 +268,14 @@ impl LakeSoulHashSinkExec { commit_data(client.clone(), &table_name, partition_desc, files) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + debug!("table: {} insert success at {:?}",&table_name,std::time::SystemTime::now()) } Ok(count) } } impl DisplayAs for LakeSoulHashSinkExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "LakeSoulHashSinkExec") } } @@ -308,9 +310,9 @@ impl ExecutionPlan for LakeSoulHashSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // The input order is either exlicitly set (such as by a ListingTable), + // The input order is either explicitly set (such as by a ListingTable), // or require that the [FileSinkExec] gets the data in the order the - // input produced it (otherwise the optimizer may chose to reorder + // input produced it (otherwise the optimizer may choose to reorder // the input which could result in unintended / poor UX) // // More rationale: @@ -417,7 +419,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec { } } -/// Create a output record batch with a count +/// Create an output record batch with a count /// /// ```text /// +-------+, diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs index 00477248f..af2a11a5d 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs @@ -7,13 +7,13 @@ pub mod helpers; use std::{ops::Deref, sync::Arc}; use arrow::datatypes::SchemaRef; +use datafusion::sql::TableReference; use datafusion::{ dataframe::DataFrame, datasource::TableProvider, execution::context::{SessionContext, SessionState}, logical_expr::LogicalPlanBuilder, }; -use datafusion::sql::TableReference; use lakesoul_io::{lakesoul_io_config::create_session_context_with_planner, lakesoul_reader::RecordBatch}; use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; use proto::proto::entity::TableInfo; @@ -83,7 +83,7 @@ impl LakeSoulTable { let schema = record_batch.schema(); let logical_plan = LogicalPlanBuilder::insert_into( sess_ctx.read_batch(record_batch)?.into_unoptimized_plan(), - TableReference::partial(self.table_namespace().to_string(),self.table_name().to_string()), + TableReference::partial(self.table_namespace().to_string(), self.table_name().to_string()), schema.deref(), false, )? diff --git a/rust/lakesoul-datafusion/src/test/catalog_tests.rs b/rust/lakesoul-datafusion/src/test/catalog_tests.rs index 5b0c55dd0..3579e1c9a 100644 --- a/rust/lakesoul-datafusion/src/test/catalog_tests.rs +++ b/rust/lakesoul-datafusion/src/test/catalog_tests.rs @@ -42,16 +42,16 @@ mod catalog_tests { Arc::new(MetaDataClient::from_env().await.unwrap()) } - fn random_namespace(hash_bucket_num: usize) -> Vec { + fn random_namespace(prefix: &str, hash_bucket_num: usize) -> Vec { let mut rng = ChaCha8Rng::from_rng(thread_rng()).unwrap(); (0..rng.gen_range(1..10)) .map(|_| Namespace { namespace: { let mut v = String::with_capacity(5); - for _ in 0..5 { + for _ in 0..10 { v.push((&mut rng).gen_range('a'..'z')); } - v + format!("{prefix}_{v}") }, properties: serde_json::to_string(&LakeSoulTableProperty { hash_bucket_num: Some(hash_bucket_num), @@ -73,14 +73,22 @@ mod catalog_tests { for _ in 0..n { let table_name = { let mut v = String::with_capacity(8); - for _ in 0..5 { + for _ in 0..10 { v.push((&mut rng).gen_range('a'..'z')); } v }; let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), &np.namespace, &table_name); + let table_id = format!( + "table_{}", + (&mut rng) + .sample_iter(&Alphanumeric) + .take(22) + .map(char::from) + .collect::() + ); v.push(TableInfo { - table_id: (&mut rng).sample_iter(&Alphanumeric).take(12).map(char::from).collect(), + table_id, table_namespace: np.namespace.clone(), table_name, table_path: format!("file://{}", path.clone()), @@ -110,7 +118,6 @@ mod catalog_tests { } } - #[test] fn test_catalog_api() { let rt = Runtime::new().unwrap(); rt.block_on(async { @@ -134,7 +141,7 @@ mod catalog_tests { .build(); let sc = Arc::new(create_session_context(&mut config).unwrap()); - let data = random_tables(random_namespace(4), schema.clone()); + let data = random_tables(random_namespace("api", 4), schema.clone()); let catalog = Arc::new(LakeSoulCatalog::new(client.clone(), sc.clone())); let dummy_schema_provider = Arc::new(LakeSoulNamespace::new(client.clone(), sc.clone(), "dummy")); @@ -153,7 +160,7 @@ mod catalog_tests { lakesoul_table.execute_upsert(batch.clone()).await.unwrap(); } } - assert!(sc.register_catalog("lakesoul", catalog.clone()).is_none()); + assert!(sc.register_catalog("test_catalog_api", catalog.clone()).is_none()); for (np, tables) in data.iter() { let schema = LakeSoulNamespace::new(client.clone(), sc.clone(), &np.namespace); let names = schema.table_names(); @@ -168,7 +175,6 @@ mod catalog_tests { }); } - #[test] fn test_catalog_sql() { let rt = Runtime::new().unwrap(); rt.block_on(async { @@ -211,7 +217,7 @@ mod catalog_tests { let df = sc.sql(sql).await.unwrap(); df.collect().await.unwrap() }; - sc.register_catalog("lakesoul", catalog.clone()); + sc.register_catalog("test_catalog_sql", catalog.clone()); let after = { let sql = "show tables"; let df = sc.sql(sql).await.unwrap(); @@ -219,11 +225,11 @@ mod catalog_tests { }; assert_ne!(after, before); } - let data = random_tables(random_namespace(4), schema.clone()); + let data = random_tables(random_namespace("sql", 4), schema.clone()); for (np, tables) in data.iter() { { // create schema - let sql = format!("create schema lakesoul.{}", np.namespace); + let sql = format!("create schema test_catalog_sql.{}", np.namespace); let df = sc.sql(&sql).await.unwrap(); df.collect().await.unwrap(); let ret = client.get_namespace_by_namespace(&np.namespace).await.unwrap(); @@ -243,24 +249,23 @@ mod catalog_tests { debug!("{names:?}"); assert_eq!(names.len(), tables.len()); for name in names { - assert!(schema.table(&name).await.is_some()); { - // test select - let q = format!("select * from lakesoul.{}.{}", np.namespace, name); + // test show columns + let q = format!("show columns from test_catalog_sql.{}.{}", np.namespace, name); let df = sc.sql(&q).await.unwrap(); let record = df.collect().await.unwrap(); - assert_batches_eq!(expected, &record); + assert!(record.len() > 0); } { - // test show columns - let q = format!("show columns from lakesoul.{}.{}", np.namespace, name); + // test select + let q = format!("select * from test_catalog_sql.{}.{}", np.namespace, name); let df = sc.sql(&q).await.unwrap(); let record = df.collect().await.unwrap(); - assert!(record.len() > 0); + assert_batches_eq!(expected, &record); } { // drop table - let sql = format!("drop table lakesoul.{}.{}", np.namespace, name); + let sql = format!("drop table test_catalog_sql.{}.{}", np.namespace, name); let df = sc.sql(&sql).await.unwrap(); assert!(df.collect().await.is_ok()) } @@ -268,4 +273,10 @@ mod catalog_tests { } }); } + + #[test] + fn test_all_cases() { + test_catalog_api(); + test_catalog_sql(); + } } diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index 6e4d854d9..8bcad5fab 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -255,8 +255,8 @@ impl TableProvider for LakeSoulListingTable { // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT // queries. Thus, we can check if the plan is streaming to ensure file sink input is // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` - // to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data), - // all of the data at the input is sink after execution finishes. See discussion for rationale: + // to consume data at the input. When `unbounded_input` flag is `false` (e.g. non-streaming data), + // all the data at the input is sink after execution finishes. See discussion for rationale: // https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918 unbounded_input: false, single_file_output: self.options().single_file, diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index 986c2c0be..def891acf 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -342,7 +342,12 @@ impl MetaDataClient { } }) .collect::>(); - self.transaction_insert_partition_info(new_partition_list).await?; + let val = self.transaction_insert_partition_info(new_partition_list).await?; + let vec = self + .get_all_partition_info(&table_info.table_id.as_str()) + .await + .unwrap(); + debug!("val = {val} ,get partition list after finished: {:?}", vec); Ok(()) } _ => { @@ -495,7 +500,13 @@ impl MetaDataClient { .map(|(k, v)| format!("{}={}", k, v)) .collect::>(); let table_info = self.get_table_info_by_table_name(table_name, namespace).await?; + debug!("table_info: {:?}", table_info); let partition_list = self.get_all_partition_info(table_info.table_id.as_str()).await?; + debug!( + "{} 's partition_list: {:?}", + table_info.table_id.as_str(), + partition_list + ); let mut data_commit_info_list = Vec::::new(); for idx in 0..partition_list.len() { let partition_info = partition_list.get(idx).unwrap();