Skip to content

Commit

Permalink
check partition pruning and address other comments
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Mar 4, 2024
1 parent 58c72d0 commit 3ce4c6f
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1953,10 +1953,10 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size()));
// Create records for MDT
int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> {
final String partitionName = partitionFiles.getRelativePath();
return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionInfo -> {
final String partitionName = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File (Your existing code does this)
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = partitionFiles.getFileNameToSizeMap().keySet().stream()
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = partitionInfo.getFileNameToSizeMap().keySet().stream()
.map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
.collect(toList());
// Step 2: Flatten and Group by Column Name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
allowCaching: Boolean = false) {

@transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
@transient private lazy val metadataTable: HoodieTableMetadata =
@transient lazy val metadataTable: HoodieTableMetadata =
HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString)

@transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,13 @@ case class HoodieFileIndex(spark: SparkSession,
Option.empty
} else if (recordKeys.nonEmpty) {
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys))
} else if (recordKeys.nonEmpty && partitionStatsIndex.isIndexAvailable && !queryFilters.isEmpty) {
} else if (partitionStatsIndex.isIndexAvailable && queryFilters.nonEmpty) {
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
val shouldReadInMemory = partitionStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
partitionStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames))
}
} else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
} else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty) {
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns)
val indexDf = functionalIndex.loadFunctionalIndexDataFrame("", shouldReadInMemory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ package org.apache.hudi

import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, HoodieMetadataRecord}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

Expand All @@ -43,12 +41,8 @@ class PartitionStatsIndexSupport(spark: SparkSession,
allowCaching: Boolean = false)
extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, metaClient, allowCaching) {

@transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
@transient private lazy val metadataTable: HoodieTableMetadata =
HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString)

override def isIndexAvailable: Boolean = {
checkState(metadataConfig.enabled, "Metadata Table support has to be enabled")
checkState(metadataConfig.enabled && metadataConfig.isPartitionStatsIndexEnabled, "Metadata Table support has to be enabled")
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{ActionType, HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.metadata.HoodieBackedTableMetadata
Expand All @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors
import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
import scala.util.matching.Regex

/**
* Common test setup and validation methods for partition stats index testing.
Expand All @@ -64,8 +65,7 @@ class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase {
RECORDKEY_FIELD.key -> "_row_key",
PARTITIONPATH_FIELD.key -> "partition,trip_type",
HIVE_STYLE_PARTITIONING.key -> "true",
PRECOMBINE_FIELD.key -> "timestamp",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
PRECOMBINE_FIELD.key -> "timestamp"
) ++ metadataOpts
var mergedDfList: List[DataFrame] = List.empty

Expand Down Expand Up @@ -258,4 +258,19 @@ class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase {
.join(prevDf, prevDf.columns, "leftanti")
assertEquals(0, nonMatchingRecords.count())
}

def checkPartitionFilters(sparkPlan: String, partitionFilter: String): Boolean = {
val partitionFilterPattern: Regex = """PartitionFilters: \[(.*?)\]""".r
val tsPattern: Regex = (partitionFilter).r

val partitionFilterMatch = partitionFilterPattern.findFirstMatchIn(sparkPlan)

partitionFilterMatch match {
case Some(m) =>
val filters = m.group(1)
tsPattern.findFirstIn(filters).isDefined
case None =>
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)

val snapshot0 = spark.read.format("org.apache.hudi").options(hudiOpts).load(basePath)
val snapshot0 = spark.read.format("org.apache.hudi").options(hudiOpts).load(basePath).where("partition > '2015/03/16'")
snapshot0.cache()
assertEquals(100, snapshot0.count())
assertTrue(checkPartitionFilters(snapshot0.queryExecution.executedPlan.toString, "partition.* > 2015/03/16"))
assertEquals(67, snapshot0.count())
}

def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,15 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
s"select * from hudi_metadata('$identifier') where type=3"
)
assert(result4DF.count() == 3)
checkAnswer(spark.sql(s"select ColumnStatsMetadata.minValue from hudi_metadata('$identifier') where type=3").collect())(
Seq(Seq(null, Seq(1000), null, null, null, null, null, null, null, null, null)),
Seq(Seq(null, Seq(2000), null, null, null, null, null, null, null, null, null)),
Seq(Seq(null, Seq(3000), null, null, null, null, null, null, null, null, null))
)
checkAnswer(spark.sql(s"select ColumnStatsMetadata.maxValue from hudi_metadata('$identifier') where type=3").collect())(
Seq(Seq(null, Seq(2000), null, null, null, null, null, null, null, null, null)),
Seq(Seq(null, Seq(3000), null, null, null, null, null, null, null, null, null)),
Seq(Seq(null, Seq(4000), null, null, null, null, null, null, null, null, null))
checkAnswer(spark.sql(s"select ColumnStatsMetadata.minValue.member1.value from hudi_metadata('$identifier') where type=3").collect())(
Seq(1000),
Seq(2000),
Seq(3000)
)
checkAnswer(spark.sql(s"select ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$identifier') where type=3").collect())(
Seq(2000),
Seq(3000),
Seq(4000)
)
}
}
Expand Down

0 comments on commit 3ce4c6f

Please sign in to comment.