-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7144] Build storage partition stats index and use it for data skipping #10352
[HUDI-7144] Build storage partition stats index and use it for data skipping #10352
Conversation
8cb19dc
to
b5d7c62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets get this turned on by default?
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments on cases that I think we should cover out of the gate and have it all passing tests.
Summarizing by understanding here, to get on the same page. When the partition stats index is turned on, we essentially track merged statistics across all files within a storage partition, for each of the partition columns. on the query side, we try to see if any of the storage partitions can be simply pruned out based on columns in the query and the new stats.
So, there should be no semantic differences between this and what hive-style partitioning will achieve. i.e queries written on one of the partition columns and a combination of them etc. Love to see complex test cases like those and confirm this is indeed good
I can do line-by-line detailed review once these are ressolved.
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
Outdated
Show resolved
Hide resolved
...datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
Show resolved
Hide resolved
...ource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
Show resolved
Hide resolved
...ource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
Outdated
Show resolved
Hide resolved
...atasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
Outdated
Show resolved
Hide resolved
...ource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
Outdated
Show resolved
Hide resolved
...ource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
Show resolved
Hide resolved
7d6558b
to
6e55ec7
Compare
I made one skim of the changes. Can you reply on my previous review comments? esp on tests. |
6e55ec7
to
6a4ed24
Compare
@vinothchandar Thanks for the review. I have addressed all your comments. The test was passing two commits ago. I am looking into the failures. But, PR is ready to review again. |
6a4ed24
to
f8c89bc
Compare
f8c89bc
to
e72c465
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. Will need to step through to understand how partition columns are mapped automatically on the writer side into target column stats index cols.
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) | |||
return new Path(basePath, partition + Path.SEPARATOR + filename); | |||
} | |||
} | |||
|
|||
public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you are using recordsGenParams.getTargetColumnsForColumnStatsIndex();
ultimately to generate the stats. do you just add the partition fields into the target columns? to make partition stats aggregation happen automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we need to specify the partition field in target columns config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid this extra step though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this automatically? As a user, if I am already doing partitionBy
already, then its fair expectation that the index is built automatically and partition pruning works without any extra configs
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please help me understand if I am missing sth around tests
...atasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
Outdated
Show resolved
Hide resolved
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch more comments.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
Outdated
Show resolved
Hide resolved
...datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
Outdated
Show resolved
Hide resolved
...datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
Outdated
Show resolved
Hide resolved
...atasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
Show resolved
Hide resolved
@@ -346,6 +352,12 @@ case class HoodieFileIndex(spark: SparkSession, | |||
Option.empty | |||
} else if (recordKeys.nonEmpty) { | |||
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) | |||
} else if (recordKeys.nonEmpty && partitionStatsIndex.isIndexAvailable && !queryFilters.isEmpty) { | |||
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) | |||
val shouldReadInMemory = partitionStatsIndex.shouldReadInMemory(this, queryReferencedColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please point me to the piece of code/line numbers where we identify a referenced column as a partitioned column? (on read side)
and how .partitionBy("a,b,c")
turns into partition stats on the metadata write side?
That part is a still unclear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First thing I want to clarify is that partition stats are collected only when column stats is enabled, and for only those columns for which column stats is enabled.
Write path: When user will do .partitionBy("a,b,c")
, then the logic is similar to column stats. We use the commit metadata and convert that to partition stats. This happens in HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords
. The difference from column stats is that the stats are aggregated by partition value in BaseFileUtils.getColumnRangeInPartition
.
Read path: queryReferencedColumns
here contain data filters. Partition pruning based on partition filters has already happened one level above.
e72c465
to
d0faf33
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please clarify the latest comments.
return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> { | ||
final String partitionName = partitionFiles.getRelativePath(); | ||
Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = partitionFiles.getFileNameToSizeMap().keySet().stream() | ||
.map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we reading all the column stats again to generate partition stats? I think we should avoid doing this extra work and piggy back off the column stats index values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called one time at the time of intialization of partiiton stats. Nevertheless, we can avoid extra work with minimal branching.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
a592ad1
to
933b215
Compare
3ce4c6f
to
8510ace
Compare
8510ace
to
1430466
Compare
53e141b
to
77a9f8e
Compare
77a9f8e
to
ab97c0d
Compare
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
@@ -557,7 +558,8 @@ public void testDowngradeSixToFiveShouldDeleteRecordIndexPartition() throws Exce | |||
PARTITION_NAME_COLUMN_STATS, | |||
PARTITION_NAME_BLOOM_FILTERS, | |||
PARTITION_NAME_RECORD_INDEX, | |||
PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX | |||
PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this list be generated from the production code, i.e., list of supported MDT partitions? Also, do we need to upgrade the table version? I think master branch is still on table version 6, the same as 0.14.0 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will eventually need to upgrade the table version for 1.0. But, this PR does not do any incompatible schema changes, so we should be good for now. HUDI-7665 to track.
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
private static final int METADATA_TYPE_RECORD_INDEX = 5; | ||
private static final int METADATA_TYPE_PARTITION_STATS = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add enum and guarantee the ordering, and automatically assign the type ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. yes we could. Just to be clear, you're suggesting no schema change right? As in we will still keep the int value in the records, and not enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, no schema change; just code restructuring maintaining the same ID, i.e., add getId
in MetadataPartitionType
. You can refactor this part in a separate PR.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
Outdated
Show resolved
Hide resolved
ab97c0d
to
4a813e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting a pending review, that I. had hanging around.
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) | |||
return new Path(basePath, partition + Path.SEPARATOR + filename); | |||
} | |||
} | |||
|
|||
public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this automatically? As a user, if I am already doing partitionBy
already, then its fair expectation that the index is built automatically and partition pruning works without any extra configs
...rce/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
Outdated
Show resolved
Hide resolved
9c26ef9
to
54d5734
Compare
case PARTITION_STATS: | ||
if (dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) { | ||
LOG.warn("Skipping partition stats index initialization as target columns are not set"); | ||
continue; | ||
} | ||
fileGroupCountAndRecordsPair = initializePartitionStatsIndex(partitionInfoList); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required for this PR. IMO all such switch-case logic should be included in the MetadataPartitionType
enum for easier extensibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, HoodieBackedTableMetadataWriter
can also be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Will do it in HUDI-7691
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
Outdated
Show resolved
Hide resolved
private static final int METADATA_TYPE_RECORD_INDEX = 5; | ||
private static final int METADATA_TYPE_PARTITION_STATS = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, no schema change; just code restructuring maintaining the same ID, i.e., add getId
in MetadataPartitionType
. You can refactor this part in a separate PR.
if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS)) { | ||
final HoodieData<HoodieRecord> partitionStatsRDD = convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, metadataConfig); | ||
partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS, partitionStatsRDD); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A side topic, does this compose a sub-graph in DAG without triggering the execution? Ideally, all types of metadata should be computed in parallel leveraging the parallelism in Spark, instead of being computed type by type sequentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. That's a very good point! Typically, we are enabling indexes one at a time. But, I agree with you. I will work on it in HUDI-7690. Btw, this limitation is only for initialization. In case of update, we don't compute sequuentially.
@@ -1872,4 +1883,175 @@ public HoodieRecord next() { | |||
} | |||
}; | |||
} | |||
|
|||
public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, | |||
List<DirectoryInfo> partitionInfoList, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this contain log files in MOR tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the assumptions of colstats apply for parition stats too. Log files are considered in subsequent updates with the write status. So, just like colstats, we will have to compact the table before enabling partition stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Is this for initializing the partition stats only? Do we have a guard to throw an exception if the colstats or partition stats partition is initialized on the latest file slices with log files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is just for initialization. We just log a warning and continue for other filegroups. This is what colstats is also doing. However, given that partition stats will be enabled by default, I think it makes sense to throw exception. Will do it.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
.reduce(BaseFileUtils::mergeRanges).get(); | ||
} | ||
|
||
private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> mergeRanges(HoodieColumnRangeMetadata<T> one, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we merge column ranges between the base and log files (e.g., log file contains updates and deletes)? Is that covered in this PR or by existing column stats logic (how does that work)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's covered by existing colstats logic. For log files, we get the column ranges based on write stats and then merge as usual.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I'm trying to understand how col stats are merged between the base and log files, e.g., for custom payload, because we may not be able to simply take the minimum of all minimum values or the maximum of all maximum values from the base and log files, as there can be deletes and there can be event-time based merging. We can discuss this in a separate thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We follow the same approach as for colstats -
hudi/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Line 372 in f99b181
public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { |
If the new record is for delete, then we ignore the previous one. If previous record is for delete, then we take the newer one, otherwise just merge the stats. I don't think we do event-time based merging for MDT.
...rce/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
Outdated
Show resolved
Hide resolved
...atasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
Show resolved
Hide resolved
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
Show resolved
Hide resolved
…kipping Merge datasource and sql tests Turn on partition stats by default Address some comments and tests fix mit rebase and partitionBy test Use null for file name when building partition stats Fix the merging of file stats and add test Exit early if no columns to index Add test using hudi_metadata tvf check partition pruning and address other comments fix after rebase Encode partitioning for hive style partitioned table Address comments
54d5734
to
2ba44a1
Compare
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 you can land the PR once all clarifying questions are answered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codope hello, I found partition stats will not effort, the current implementation does not seem to achieve the effect of partition filtering.
- first
in this picture, I change the ut filter to trigger partition stats index.
partition_stats will not save fileName, so if reuse CSI
logical, it will throw null point in group by key
- second
and have a question, I am not sure this pr is use to partition
purge like physical partition col, which mean use other field min/max to get which physical partitions to list fileSlice. or filter fileName like CSI
, RLI
.
thanks
creaete a issue to track it https://issues.apache.org/jira/browse/HUDI-7829 |
Change Logs
Build storage partition stats index and use it for data skipping. Main changes are as follows:
HoodieMetadataConfig
and the writer changes are inHoodieBackedTableMetadataWriter
with some util methods inHoodieTableMetadataUtil
HoodieFileIndex
. First, the partition pruning happens as usual, then depending on data filters, data can be skipped further if partition stats index is available.Impact
Stats aggregated by storage partition. Efficient data skipping. Meta sync need not sync the partition metadata. Queries will use the index while planning in the driver.
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist