Skip to content

Commit

Permalink
[bugfix](hudi) catch exception when getting hudi partition (#35027) (#…
Browse files Browse the repository at this point in the history
…41342)

bp #35027

Hudi use a thread pool to get files for each partition. And use a
countdown latch to wait all threads finish. But if the thread throw
exception, the countdown latch will not be counted down, and thread will
be blocked.
  • Loading branch information
justfortaste authored Nov 19, 2024
1 parent f5d2993 commit a8a1dbe
Showing 1 changed file with 67 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class HudiScanNode extends HiveScanNode {
Expand Down Expand Up @@ -272,73 +273,84 @@ public List<Split> getSplits() throws UserException {
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor();
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));

if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toScanRangeLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> {
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
String filePath = baseFile.map(BaseFile::getPath).orElse("");
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant)
.forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toScanRangeLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> {
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
String filePath = baseFile.map(BaseFile::getPath).orElse("");
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);

List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
.map(Path::toString)
.collect(Collectors.toList());
if (logs.isEmpty()) {
noLogsSplitNum.incrementAndGet();
}
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
.map(Path::toString)
.collect(Collectors.toList());
if (logs.isEmpty()) {
noLogsSplitNum.incrementAndGet();
}

// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partition.getPartitionValues());
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);
split.setInputFormat(inputFormat);
split.setSerde(serdeLib);
split.setBasePath(basePath);
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
splits.add(split);
});
// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partition.getPartitionValues());
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);
split.setInputFormat(inputFormat);
split.setSerde(serdeLib);
split.setBasePath(basePath);
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
splits.add(split);
});
}
} catch (Throwable t) {
throwable.set(t);
} finally {
countDownLatch.countDown();
}
countDownLatch.countDown();
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
if (throwable.get() != null) {
throw new RuntimeException(throwable.get().getMessage(), throwable.get());
}
return splits;
}

Expand Down

0 comments on commit a8a1dbe

Please sign in to comment.