From b0ca507e37bc4a938baca8bad90174caf26f44b6 Mon Sep 17 00:00:00 2001 From: zenghua Date: Tue, 9 Jan 2024 15:39:43 +0800 Subject: [PATCH] Fails when create table with nullable hash colmun Signed-off-by: zenghua --- .../lakesoul/meta/SparkMetaVersion.scala | 3 +-- .../commands/CreateTableCommand.scala | 20 +++++++++++++++--- .../lakesoul/exception/LakeSoulErrors.scala | 6 ++++++ .../schema/ImplicitMetadataOperation.scala | 6 ++++-- .../sql/lakesoul/TableCreationTests.scala | 21 ++++++++++++++++++- 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala index 22e7f27e7..13cab9395 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala @@ -91,8 +91,7 @@ object SparkMetaVersion { } val short_table_name = info.getTableName val partitions = info.getPartitions - val properties = info.getProperties.toString() - + val properties = info.getProperties import scala.util.parsing.json.JSON val configuration = JSON.parseFull(properties) val configurationMap = configuration match { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala index 7b41c6f83..ec06d5887 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul.commands -import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER +import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER} import com.dmetasoul.lakesoul.meta.{DataFileInfo, SparkMetaVersion} import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -166,6 +166,7 @@ case class CreateTableCommand(var table: CatalogTable, if (noExistingMetadata) { assertTableSchemaDefined(tableLocation, tableWithLocation) assertPathEmpty(sparkSession, tableWithLocation) + assertHashPartitionNonNullable(table) // This is a user provided schema. // Doesn't come from a query, Follow nullability invariants. val newTableInfo = getProvidedTableInfo(tc, table, ArrowUtils.toArrowSchema(table.schema).toJson) @@ -210,14 +211,14 @@ case class CreateTableCommand(var table: CatalogTable, private def getProvidedTableInfo(tc: TransactionCommit, table: CatalogTable, schemaString: String): TableInfo = { - val hashParitions = table.properties.getOrElse(LakeSoulOptions.HASH_PARTITIONS, "") + val hashPartitions = table.properties.getOrElse(LakeSoulOptions.HASH_PARTITIONS, "") val hashBucketNum = table.properties.getOrElse(LakeSoulOptions.HASH_BUCKET_NUM, "-1").toInt TableInfo(tc.tableInfo.namespace, table_path_s = tc.tableInfo.table_path_s, table_id = tc.tableInfo.table_id, table_schema = schemaString, range_column = table.partitionColumnNames.mkString(LAKESOUL_RANGE_PARTITION_SPLITTER), - hash_column = hashParitions, + hash_column = hashPartitions, bucket_num = hashBucketNum, configuration = table.properties ) @@ -237,6 +238,19 @@ case class CreateTableCommand(var table: CatalogTable, } } + private def assertHashPartitionNonNullable(table: CatalogTable): Unit = { + table.properties.get(LakeSoulOptions.HASH_PARTITIONS).foreach(hashPartitions => { + val hashPartitionsSet = hashPartitions.split(LAKESOUL_HASH_PARTITION_SPLITTER).toSet + if (hashPartitionsSet.nonEmpty) { + if (table.schema(hashPartitionsSet).exists(f => f.nullable)) { + throw LakeSoulErrors.failedCreateTableException( + table.identifier.toString(), + hashPartitionsSet) + } + } + }) + } + private def assertTableSchemaDefined(path: Path, table: CatalogTable): Unit = { // Users did not specify the schema. We expect the schema exists in CatalogTable. diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/LakeSoulErrors.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/LakeSoulErrors.scala index ecb0ceeef..f304c9f40 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/LakeSoulErrors.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/LakeSoulErrors.scala @@ -37,6 +37,12 @@ object LakeSoulErrors { def formatSchema(schema: StructType): String = schema.treeString + def failedCreateTableException(table_name: String, hash_partitions: Set[String]): Throwable = { + new AnalysisException( + s""" + |Error: Failed to create table: $table_name. The hash partitions ('$hash_partitions') contains nullable column. + """.stripMargin) + } def failedCreateTableException(table_name: String): Throwable = { new AnalysisException( diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala index 03af8e178..34aa396d9 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala @@ -13,6 +13,7 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.hadoop.fs.Path import org.apache.spark.sql.arrow.ArrowUtils +import org.apache.spark.sql.lakesoul.LakeSoulOptions.HASH_PARTITIONS /** @@ -137,6 +138,7 @@ trait ImplicitMetadataOperation extends Logging { throw LakeSoulErrors.hashBucketNumNotSetException() } } + val hash_column = normalizedHashPartitionCols.mkString(LAKESOUL_HASH_PARTITION_SPLITTER) // If this is the first write, configure the metadata of the table. //todo: setting @@ -147,9 +149,9 @@ trait ImplicitMetadataOperation extends Logging { table_id = table_info.table_id, table_schema = ArrowUtils.toArrowSchema(dataSchema).toJson, range_column = normalizedRangePartitionCols.mkString(LAKESOUL_RANGE_PARTITION_SPLITTER), - hash_column = normalizedHashPartitionCols.mkString(LAKESOUL_HASH_PARTITION_SPLITTER), + hash_column = hash_column, bucket_num = realHashBucketNum, - configuration = configuration, + configuration = if (hash_column.nonEmpty) configuration.updated(HASH_PARTITIONS, hash_column) else configuration, short_table_name = table_info.short_table_name)) } else if (isOverwriteMode && canOverwriteSchema && isNewSchema) { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala index 1cf726cd8..263801099 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala @@ -1185,7 +1185,7 @@ trait TableCreationTests withTempPath { dir => val tableName = "test_table" withTable(s"$tableName") { - spark.sql(s"CREATE TABLE $tableName(id string, date string, data string) USING lakesoul" + + spark.sql(s"CREATE TABLE $tableName(id string not null, date string, data string) USING lakesoul" + s" PARTITIONED BY (date)" + s" LOCATION '${dir.toURI}'" + s" TBLPROPERTIES('lakesoul_cdc_change_column'='change_kind'," + @@ -1205,6 +1205,25 @@ trait TableCreationTests } } + test("create table sql with range and hash partition - fails when hash partition is nullable") { + withTempPath { dir => + val tableName = "test_table" + withTable(s"$tableName") { + val e = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE $tableName(id string, date string, data string) USING lakesoul" + + s" PARTITIONED BY (date)" + + s" LOCATION '${dir.toURI}'" + + s" TBLPROPERTIES('lakesoul_cdc_change_column'='change_kind'," + + s" 'hashPartitions'='id'," + + s" 'hashBucketNum'='2')") + } + assert(e.getMessage.contains(tableName)) + assert(e.getMessage.contains("The hash partitions")) + assert(e.getMessage.contains("contains nullable column.")) + } + } + } + test("create table with TableCreator - with tbl properties") { withTable("tt") { withTempDir(dir => {