Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix](hudi) catch exception when getting hudi partition (#35027) #41342

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class HudiScanNode extends HiveScanNode {
Expand Down Expand Up @@ -272,73 +273,84 @@ public List<Split> getSplits() throws UserException {
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor();
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));

if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toScanRangeLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> {
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
String filePath = baseFile.map(BaseFile::getPath).orElse("");
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant)
.forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toScanRangeLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> {
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
String filePath = baseFile.map(BaseFile::getPath).orElse("");
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);

List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
.map(Path::toString)
.collect(Collectors.toList());
if (logs.isEmpty()) {
noLogsSplitNum.incrementAndGet();
}
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
.map(Path::toString)
.collect(Collectors.toList());
if (logs.isEmpty()) {
noLogsSplitNum.incrementAndGet();
}

// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partition.getPartitionValues());
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);
split.setInputFormat(inputFormat);
split.setSerde(serdeLib);
split.setBasePath(basePath);
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
splits.add(split);
});
// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partition.getPartitionValues());
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);
split.setInputFormat(inputFormat);
split.setSerde(serdeLib);
split.setBasePath(basePath);
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
splits.add(split);
});
}
} catch (Throwable t) {
throwable.set(t);
} finally {
countDownLatch.countDown();
}
countDownLatch.countDown();
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
if (throwable.get() != null) {
throw new RuntimeException(throwable.get().getMessage(), throwable.get());
}
return splits;
}

Expand Down
Loading