Skip to content

Commit

Permalink
rebase and partitionBy test
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jan 18, 2024
1 parent 2e77353 commit f8c89bc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,10 @@ case class HoodieFileIndex(spark: SparkSession,
} else if (recordKeys.nonEmpty) {
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys))
} else if (recordKeys.nonEmpty && partitionStatsIndex.isIndexAvailable && !queryFilters.isEmpty) {
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
val shouldReadInMemory = partitionStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
partitionStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
Some(getCandidateFiles(transposedColStatsDF, queryFilters))
Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames))
}
} else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
Expand All @@ -33,8 +34,10 @@ import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

import scala.collection.JavaConversions._

/**
* Test cases on partition stats index with Spark datasource and Spark sql.
* Test cases on partition stats index with Spark datasource.
*/
@Tag("functional")
class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
Expand Down Expand Up @@ -119,6 +122,24 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
verifyQueryPredicate(hudiOpts)
}

@Test
def testPartitionStatsWithPartitionBy(): Unit = {
val hudiOpts = commonOpts.-(PARTITIONPATH_FIELD.key)
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))

inputDF.write.partitionBy("partition").format("hudi")
.options(hudiOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val snapshot0 = spark.read.format("org.apache.hudi").options(hudiOpts).load(basePath)
snapshot0.cache()
assertEquals(100, snapshot0.count())
}

def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = {
val reckey = mergedDfList.last.limit(1).collect().map(row => row.getAs("_row_key").toString)
val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
Expand Down

0 comments on commit f8c89bc

Please sign in to comment.