diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 9f2887cd453c7..ef9df288aa8ff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -510,7 +510,7 @@ private String generateUniqueCommitInstantTime(String initializationTime) { } private Pair> initializePartitionStatsIndex(List partitionInfoList) { - HoodieData records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, partitionInfoList, getRecordsGenerationParams()); + HoodieData records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient); final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount(); return Pair.of(fileGroupCount, records); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index cb9e2ecc584db..a76a547b3e611 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -91,7 +91,7 @@ private static > HoodieColumnRangeMetadata mergeRang final T minValue; final T maxValue; if (one.getMinValue() != null && another.getMinValue() != null) { - minValue = one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? one.getMinValue() : another.getMinValue(); + minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? one.getMinValue() : another.getMinValue(); } else if (one.getMinValue() == null) { minValue = another.getMinValue(); } else { @@ -99,7 +99,7 @@ private static > HoodieColumnRangeMetadata mergeRang } if (one.getMaxValue() != null && another.getMaxValue() != null) { - maxValue = one.getMaxValue().toString().compareTo(another.getMaxValue().toString()) < 0 ? another.getMaxValue() : one.getMaxValue(); + maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? another.getMaxValue() : one.getMaxValue(); } else if (one.getMaxValue() == null) { maxValue = another.getMaxValue(); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 696cab7fbddfe..c890a5f33fb36 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -676,8 +676,8 @@ public static Stream createPartitionStatsRecords(String partitionP }); } - public static String getPartitionStatsIndexKey(String partitionName, HoodieColumnRangeMetadata columnRangeMetadata) { - final PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName)); + public static String getPartitionStatsIndexKey(String partitionPath, HoodieColumnRangeMetadata columnRangeMetadata) { + final PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath)); final ColumnIndexID columnIndexID = new ColumnIndexID(columnRangeMetadata.getColumnName()); return columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 0123424149c43..242d23a04065c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1942,23 +1942,20 @@ private static Path filePath(String basePath, String partition, String filename) public static HoodieData convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, List partitionInfoList, - MetadataRecordsGenerationParams recordsGenerationParams) { - // Find the columns to index - HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - final List columnsToIndex = getColumnsToIndex( - recordsGenerationParams, - Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient) { + final List columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex(); if (columnsToIndex.isEmpty()) { return engineContext.emptyHoodieData(); } - LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); + LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndex); // Create records for MDT - int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1); + int parallelism = Math.max(Math.min(partitionInfoList.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1); return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionInfo -> { - final String partitionName = partitionInfo.getRelativePath(); + final String partitionPath = partitionInfo.getRelativePath(); // Step 1: Collect Column Metadata for Each File (Your existing code does this) List>> fileColumnMetadata = partitionInfo.getFileNameToSizeMap().keySet().stream() - .map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) + .map(fileName -> getFileStatsRangeMetadata(partitionPath, partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) .collect(toList()); // Step 2: Flatten and Group by Column Name Map>> columnMetadataMap = fileColumnMetadata.stream() @@ -1967,7 +1964,7 @@ public static HoodieData convertFilesToPartitionStatsRecords(Hoodi // Step 3: Aggregate Column Ranges Stream> partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream() .map(entry -> BaseFileUtils.getColumnRangeInPartition(entry.getValue())); - return HoodieMetadataPayload.createPartitionStatsRecords(partitionName, partitionStatsRangeMetadata.collect(toList()), false).iterator(); + return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, partitionStatsRangeMetadata.collect(toList()), false).iterator(); }); } diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index aeb0fd1205f70..cad622c49e76d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -121,17 +122,13 @@ public void testConvertFilesToPartitionStatsRecords() throws Exception { HoodieData result = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords( engineContext, partitionInfoList, - new MetadataRecordsGenerationParams( - metaClient, - Arrays.asList(MetadataPartitionType.PARTITION_STATS, MetadataPartitionType.COLUMN_STATS), - "", - 1, - true, - 1, - columnsToIndex, - Collections.emptyList(), - true, - 1)); + HoodieMetadataConfig.newBuilder().enable(true) + .withMetadataIndexColumnStats(true) + .withMetadataIndexPartitionStats(true) + .withColumnStatsIndexForColumns("rider,driver") + .withPartitionStatsIndexParallelism(1) + .build(), + metaClient); // Validate the result. List records = result.collectAsList(); // 3 partitions * 2 columns = 6 partition stats records