Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix]Fails when create table with nullable hash colmun #387

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ object SparkMetaVersion {
}
val short_table_name = info.getTableName
val partitions = info.getPartitions
val properties = info.getProperties.toString()

val properties = info.getProperties
import scala.util.parsing.json.JSON
val configuration = JSON.parseFull(properties)
val configurationMap = configuration match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package org.apache.spark.sql.lakesoul.commands

import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER
import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER}
import com.dmetasoul.lakesoul.meta.{DataFileInfo, SparkMetaVersion}
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -166,6 +166,7 @@ case class CreateTableCommand(var table: CatalogTable,
if (noExistingMetadata) {
assertTableSchemaDefined(tableLocation, tableWithLocation)
assertPathEmpty(sparkSession, tableWithLocation)
assertHashPartitionNonNullable(table)
// This is a user provided schema.
// Doesn't come from a query, Follow nullability invariants.
val newTableInfo = getProvidedTableInfo(tc, table, ArrowUtils.toArrowSchema(table.schema).toJson)
Expand Down Expand Up @@ -210,14 +211,14 @@ case class CreateTableCommand(var table: CatalogTable,
private def getProvidedTableInfo(tc: TransactionCommit,
table: CatalogTable,
schemaString: String): TableInfo = {
val hashParitions = table.properties.getOrElse(LakeSoulOptions.HASH_PARTITIONS, "")
val hashPartitions = table.properties.getOrElse(LakeSoulOptions.HASH_PARTITIONS, "")
val hashBucketNum = table.properties.getOrElse(LakeSoulOptions.HASH_BUCKET_NUM, "-1").toInt
TableInfo(tc.tableInfo.namespace,
table_path_s = tc.tableInfo.table_path_s,
table_id = tc.tableInfo.table_id,
table_schema = schemaString,
range_column = table.partitionColumnNames.mkString(LAKESOUL_RANGE_PARTITION_SPLITTER),
hash_column = hashParitions,
hash_column = hashPartitions,
bucket_num = hashBucketNum,
configuration = table.properties
)
Expand All @@ -237,6 +238,19 @@ case class CreateTableCommand(var table: CatalogTable,
}
}

private def assertHashPartitionNonNullable(table: CatalogTable): Unit = {
table.properties.get(LakeSoulOptions.HASH_PARTITIONS).foreach(hashPartitions => {
val hashPartitionsSet = hashPartitions.split(LAKESOUL_HASH_PARTITION_SPLITTER).toSet
if (hashPartitionsSet.nonEmpty) {
if (table.schema(hashPartitionsSet).exists(f => f.nullable)) {
throw LakeSoulErrors.failedCreateTableException(
table.identifier.toString(),
hashPartitionsSet)
}
}
})
}


private def assertTableSchemaDefined(path: Path, table: CatalogTable): Unit = {
// Users did not specify the schema. We expect the schema exists in CatalogTable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ object LakeSoulErrors {

def formatSchema(schema: StructType): String = schema.treeString

def failedCreateTableException(table_name: String, hash_partitions: Set[String]): Throwable = {
new AnalysisException(
s"""
|Error: Failed to create table: $table_name. The hash partitions ('$hash_partitions') contains nullable column.
""".stripMargin)
}

def failedCreateTableException(table_name: String): Throwable = {
new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.arrow.ArrowUtils
import org.apache.spark.sql.lakesoul.LakeSoulOptions.HASH_PARTITIONS


/**
Expand Down Expand Up @@ -137,6 +138,7 @@ trait ImplicitMetadataOperation extends Logging {
throw LakeSoulErrors.hashBucketNumNotSetException()
}
}
val hash_column = normalizedHashPartitionCols.mkString(LAKESOUL_HASH_PARTITION_SPLITTER)

// If this is the first write, configure the metadata of the table.
//todo: setting
Expand All @@ -147,9 +149,9 @@ trait ImplicitMetadataOperation extends Logging {
table_id = table_info.table_id,
table_schema = ArrowUtils.toArrowSchema(dataSchema).toJson,
range_column = normalizedRangePartitionCols.mkString(LAKESOUL_RANGE_PARTITION_SPLITTER),
hash_column = normalizedHashPartitionCols.mkString(LAKESOUL_HASH_PARTITION_SPLITTER),
hash_column = hash_column,
bucket_num = realHashBucketNum,
configuration = configuration,
configuration = if (hash_column.nonEmpty) configuration.updated(HASH_PARTITIONS, hash_column) else configuration,
short_table_name = table_info.short_table_name))
}
else if (isOverwriteMode && canOverwriteSchema && isNewSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ trait TableCreationTests
withTempPath { dir =>
val tableName = "test_table"
withTable(s"$tableName") {
spark.sql(s"CREATE TABLE $tableName(id string, date string, data string) USING lakesoul" +
spark.sql(s"CREATE TABLE $tableName(id string not null, date string, data string) USING lakesoul" +
s" PARTITIONED BY (date)" +
s" LOCATION '${dir.toURI}'" +
s" TBLPROPERTIES('lakesoul_cdc_change_column'='change_kind'," +
Expand All @@ -1205,6 +1205,25 @@ trait TableCreationTests
}
}

test("create table sql with range and hash partition - fails when hash partition is nullable") {
withTempPath { dir =>
val tableName = "test_table"
withTable(s"$tableName") {
val e = intercept[AnalysisException] {
spark.sql(s"CREATE TABLE $tableName(id string, date string, data string) USING lakesoul" +
s" PARTITIONED BY (date)" +
s" LOCATION '${dir.toURI}'" +
s" TBLPROPERTIES('lakesoul_cdc_change_column'='change_kind'," +
s" 'hashPartitions'='id'," +
s" 'hashBucketNum'='2')")
}
assert(e.getMessage.contains(tableName))
assert(e.getMessage.contains("The hash partitions"))
assert(e.getMessage.contains("contains nullable column."))
}
}
}

test("create table with TableCreator - with tbl properties") {
withTable("tt") {
withTempDir(dir => {
Expand Down
Loading