Skip to content

Commit

Permalink
[Rust] fix catalog unittest
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Jan 29, 2024
1 parent 2c2c9ae commit 06c2d28
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 60 deletions.
7 changes: 1 addition & 6 deletions .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
Expand Down Expand Up @@ -165,7 +164,6 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
Expand Down Expand Up @@ -234,7 +232,6 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 1
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Init PG RBAC
run: |
./script/meta_rbac_init_for_local_test.sh -j 1
Expand Down Expand Up @@ -315,7 +312,6 @@ jobs:
- name: Init PG
run: |
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql lakesoul_test
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
Expand Down Expand Up @@ -384,7 +380,6 @@ jobs:
- name: Init PG
run: |
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql lakesoul_test
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Init PG RBAC ROW POLICY
run: |
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_rbac_init.sql lakesoul_test
Expand Down Expand Up @@ -432,4 +427,4 @@ jobs:
name: maven-test-report-artifact-flink-1
path: lakesoul-flink/target/site
retention-days: 5
if-no-files-found: error
if-no-files-found: error
1 change: 0 additions & 1 deletion .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -c "ALTER DATABASE lakesoul_test SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';"
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
Expand Down
16 changes: 4 additions & 12 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::catalog::LakeSoulNamespace;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::error::DataFusionError;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use lakesoul_metadata::MetaDataClientRef;
use proto::proto::entity::Namespace;
Expand Down Expand Up @@ -83,11 +83,7 @@ impl CatalogProvider for LakeSoulCatalog {
///
/// If a schema of the same name existed before, it is replaced in
/// the catalog and returned.
fn register_schema(
&self,
name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
fn register_schema(&self, name: &str, _schema: Arc<dyn SchemaProvider>) -> Result<Option<Arc<dyn SchemaProvider>>> {
let _guard = self.catalog_lock.write();
let client = self.metadata_client.clone();
let schema: Option<Arc<dyn SchemaProvider>> = {
Expand Down Expand Up @@ -124,11 +120,7 @@ impl CatalogProvider for LakeSoulCatalog {
///
/// Implementations of this method should return None if schema with `name`
/// does not exist.
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn SchemaProvider>>> {
fn deregister_schema(&self, _name: &str, _cascade: bool) -> Result<Option<Arc<dyn SchemaProvider>>> {
// Not supported
// let _guard = self.catalog_lock.write();
// let client = self.metadata_client.clone();
Expand All @@ -153,4 +145,4 @@ impl CatalogProvider for LakeSoulCatalog {
// return Ok(None);
Err(DataFusionError::NotImplemented("Not supported".into()))
}
}
}
39 changes: 28 additions & 11 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::catalog::create_io_config_builder;
use crate::error::Result;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use lakesoul_io::datasource::file_format::LakeSoulParquetFormat;
use lakesoul_io::datasource::listing::LakeSoulListingTable;
Expand Down Expand Up @@ -83,10 +83,13 @@ impl SchemaProvider for LakeSoulNamespace {
Handle::current()
.spawn(async move {
let _guard = lock.read().await;
client.get_all_table_name_id_by_namespace(&np).await.unwrap()
client
.get_all_table_name_id_by_namespace(&np)
.await
.expect("get all table name failed")
})
.await
.unwrap()
.expect("spawn failed")
})
.into_iter()
.map(|v| v.table_name)
Expand All @@ -102,6 +105,7 @@ impl SchemaProvider for LakeSoulNamespace {
.get_table_info_by_table_name(name, &self.namespace)
.await
{
debug!("call table() on table: {}.{}", &self.namespace, name);
let config;
if let Ok(config_builder) =
create_io_config_builder(self.metadata_client.clone(), Some(name), true, self.namespace()).await
Expand Down Expand Up @@ -138,18 +142,14 @@ impl SchemaProvider for LakeSoulNamespace {
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
#[allow(unused_variables)]
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn TableProvider>>> {
fn register_table(&self, name: String, table: Arc<dyn TableProvider>) -> Result<Option<Arc<dyn TableProvider>>> {
// the type info of dyn TableProvider is not enough or use AST??????
unimplemented!("schema provider does not support registering tables")
}
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> lakesoul_io::lakesoul_io_config::Result<Option<Arc<dyn TableProvider>>> {
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let client = self.metadata_client.clone();
let table_name = name.to_string();
let np = self.namespace.clone();
Expand Down Expand Up @@ -201,12 +201,29 @@ impl SchemaProvider for LakeSoulNamespace {
}
})
.await
.unwrap()
.map_err(|e| DataFusionError::External(Box::new(e)))?
})
}

fn table_exist(&self, name: &str) -> bool {
// table name is primary key for `table_name_id`
self.table_names().into_iter().any(|s| s == name)
let client = self.metadata_client.clone();
let np = self.namespace.clone();
let lock = self.namespace_lock.clone();
futures::executor::block_on(async move {
Handle::current()
.spawn(async move {
let _guard = lock.read().await;
client
.get_all_table_name_id_by_namespace(&np)
.await
.expect("get table name failed")
})
.await
.expect("spawn failed")
})
.into_iter()
.map(|v| v.table_name)
.any(|s| s == name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rand::distributions::DistString;

use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;

use crate::catalog::commit_data;
use crate::lakesoul_table::helpers::{create_io_config_builder_from_table_info, get_columnar_value};
Expand Down Expand Up @@ -148,7 +149,7 @@ pub struct LakeSoulHashSinkExec {
metadata_client: MetaDataClientRef,
}

impl fmt::Debug for LakeSoulHashSinkExec {
impl Debug for LakeSoulHashSinkExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "LakeSoulHashSinkExec schema: {:?}", self.count_schema)
}
Expand Down Expand Up @@ -267,13 +268,14 @@ impl LakeSoulHashSinkExec {
commit_data(client.clone(), &table_name, partition_desc, files)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
debug!("table: {} insert success at {:?}",&table_name,std::time::SystemTime::now())
}
Ok(count)
}
}

impl DisplayAs for LakeSoulHashSinkExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "LakeSoulHashSinkExec")
}
}
Expand Down Expand Up @@ -308,9 +310,9 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// The input order is either exlicitly set (such as by a ListingTable),
// The input order is either explicitly set (such as by a ListingTable),
// or require that the [FileSinkExec] gets the data in the order the
// input produced it (otherwise the optimizer may chose to reorder
// input produced it (otherwise the optimizer may choose to reorder
// the input which could result in unintended / poor UX)
//
// More rationale:
Expand Down Expand Up @@ -417,7 +419,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
}
}

/// Create a output record batch with a count
/// Create an output record batch with a count
///
/// ```text
/// +-------+,
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/lakesoul_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ pub mod helpers;
use std::{ops::Deref, sync::Arc};

use arrow::datatypes::SchemaRef;
use datafusion::sql::TableReference;
use datafusion::{
dataframe::DataFrame,
datasource::TableProvider,
execution::context::{SessionContext, SessionState},
logical_expr::LogicalPlanBuilder,
};
use datafusion::sql::TableReference;
use lakesoul_io::{lakesoul_io_config::create_session_context_with_planner, lakesoul_reader::RecordBatch};
use lakesoul_metadata::{MetaDataClient, MetaDataClientRef};
use proto::proto::entity::TableInfo;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl LakeSoulTable {
let schema = record_batch.schema();
let logical_plan = LogicalPlanBuilder::insert_into(
sess_ctx.read_batch(record_batch)?.into_unoptimized_plan(),
TableReference::partial(self.table_namespace().to_string(),self.table_name().to_string()),
TableReference::partial(self.table_namespace().to_string(), self.table_name().to_string()),
schema.deref(),
false,
)?
Expand Down
Loading

0 comments on commit 06c2d28

Please sign in to comment.