Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Apr 10, 2024
1 parent e1ba819 commit 53e141b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ private String generateUniqueCommitInstantTime(String initializationTime) {
}

private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) {
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, partitionInfoList, getRecordsGenerationParams());
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient);
final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> mergeRang
final T minValue;
final T maxValue;
if (one.getMinValue() != null && another.getMinValue() != null) {
minValue = one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? one.getMinValue() : another.getMinValue();
minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? one.getMinValue() : another.getMinValue();
} else if (one.getMinValue() == null) {
minValue = another.getMinValue();
} else {
minValue = one.getMinValue();
}

if (one.getMaxValue() != null && another.getMaxValue() != null) {
maxValue = one.getMaxValue().toString().compareTo(another.getMaxValue().toString()) < 0 ? another.getMaxValue() : one.getMaxValue();
maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? another.getMaxValue() : one.getMaxValue();
} else if (one.getMaxValue() == null) {
maxValue = another.getMaxValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ public static Stream<HoodieRecord> createPartitionStatsRecords(String partitionP
});
}

public static String getPartitionStatsIndexKey(String partitionName, HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
final PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
public static String getPartitionStatsIndexKey(String partitionPath, HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
final PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath));
final ColumnIndexID columnIndexID = new ColumnIndexID(columnRangeMetadata.getColumnName());
return columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1942,23 +1942,20 @@ private static Path filePath(String basePath, String partition, String filename)

public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
List<DirectoryInfo> partitionInfoList,
MetadataRecordsGenerationParams recordsGenerationParams) {
// Find the columns to index
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
final List<String> columnsToIndex = getColumnsToIndex(
recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient) {
final List<String> columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex();
if (columnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData();
}
LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size()));
LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndex);
// Create records for MDT
int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
int parallelism = Math.max(Math.min(partitionInfoList.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionInfo -> {
final String partitionName = partitionInfo.getRelativePath();
final String partitionPath = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File (Your existing code does this)
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = partitionInfo.getFileNameToSizeMap().keySet().stream()
.map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
.map(fileName -> getFileStatsRangeMetadata(partitionPath, partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
.collect(toList());
// Step 2: Flatten and Group by Column Name
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap = fileColumnMetadata.stream()
Expand All @@ -1967,7 +1964,7 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
// Step 3: Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
.map(entry -> BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
return HoodieMetadataPayload.createPartitionStatsRecords(partitionName, partitionStatsRangeMetadata.collect(toList()), false).iterator();
return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, partitionStatsRangeMetadata.collect(toList()), false).iterator();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.metadata;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
Expand Down Expand Up @@ -121,17 +122,13 @@ public void testConvertFilesToPartitionStatsRecords() throws Exception {
HoodieData<HoodieRecord> result = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
engineContext,
partitionInfoList,
new MetadataRecordsGenerationParams(
metaClient,
Arrays.asList(MetadataPartitionType.PARTITION_STATS, MetadataPartitionType.COLUMN_STATS),
"",
1,
true,
1,
columnsToIndex,
Collections.emptyList(),
true,
1));
HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexColumnStats(true)
.withMetadataIndexPartitionStats(true)
.withColumnStatsIndexForColumns("rider,driver")
.withPartitionStatsIndexParallelism(1)
.build(),
metaClient);
// Validate the result.
List<HoodieRecord> records = result.collectAsList();
// 3 partitions * 2 columns = 6 partition stats records
Expand Down

0 comments on commit 53e141b

Please sign in to comment.