From a8a1dbebc407fede312d894aed2847c83023a118 Mon Sep 17 00:00:00 2001 From: zhangyuan Date: Tue, 19 Nov 2024 19:03:37 +0800 Subject: [PATCH] [bugfix](hudi) catch exception when getting hudi partition (#35027) (#41342) bp https://github.com/apache/doris/pull/35027 Hudi use a thread pool to get files for each partition. And use a countdown latch to wait all threads finish. But if the thread throw exception, the countdown latch will not be counted down, and thread will be blocked. --- .../planner/external/hudi/HudiScanNode.java | 122 ++++++++++-------- 1 file changed, 67 insertions(+), 55 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 803dadae03d5d4..ab417a19cf725d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -72,6 +72,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class HudiScanNode extends HiveScanNode { @@ -272,73 +273,84 @@ public List getSplits() throws UserException { .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor(); List splits = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); + AtomicReference throwable = new AtomicReference<>(); partitions.forEach(partition -> executor.execute(() -> { - String globPath; - String partitionName = ""; - if (partition.isDummyPartition()) { - globPath = hudiClient.getBasePathV2().toString() + "/*"; - } else { - partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); - } - List statuses; try { - statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), - new Path(globPath)); - } catch (IOException e) { - throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); - } - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); + String globPath; + String partitionName = ""; + if (partition.isDummyPartition()) { + globPath = hudiClient.getBasePathV2().toString() + "/*"; + } else { + partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); + } + List statuses; + try { + statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), + new Path(globPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); + } + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, + timeline, statuses.toArray(new FileStatus[0])); - if (isCowOrRoTable) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { - noLogsSplitNum.incrementAndGet(); - String filePath = baseFile.getPath(); - long fileSize = baseFile.getFileSize(); - // Need add hdfs host to location - LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - Path splitFilePath = locationPath.toScanRangeLocation(); - splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, - new String[0], partition.getPartitionValues())); - }); - } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { - Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); - String filePath = baseFile.map(BaseFile::getPath).orElse(""); - long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); + if (isCowOrRoTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant) + .forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + Path splitFilePath = locationPath.toScanRangeLocation(); + splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues())); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + .forEach(fileSlice -> { + Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); + String filePath = baseFile.map(BaseFile::getPath).orElse(""); + long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); - List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) - .collect(Collectors.toList()); - if (logs.isEmpty()) { - noLogsSplitNum.incrementAndGet(); - } + List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) + .map(Path::toString) + .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum.incrementAndGet(); + } - // no base file, use log file to parse file type - String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; - HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, - new String[0], partition.getPartitionValues()); - split.setTableFormatType(TableFormatType.HUDI); - split.setDataFilePath(filePath); - split.setHudiDeltaLogs(logs); - split.setInputFormat(inputFormat); - split.setSerde(serdeLib); - split.setBasePath(basePath); - split.setHudiColumnNames(columnNames); - split.setHudiColumnTypes(columnTypes); - split.setInstantTime(queryInstant); - splits.add(split); - }); + // no base file, use log file to parse file type + String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; + HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + split.setTableFormatType(TableFormatType.HUDI); + split.setDataFilePath(filePath); + split.setHudiDeltaLogs(logs); + split.setInputFormat(inputFormat); + split.setSerde(serdeLib); + split.setBasePath(basePath); + split.setHudiColumnNames(columnNames); + split.setHudiColumnTypes(columnTypes); + split.setInstantTime(queryInstant); + splits.add(split); + }); + } + } catch (Throwable t) { + throwable.set(t); + } finally { + countDownLatch.countDown(); } - countDownLatch.countDown(); })); try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e.getMessage(), e); } + if (throwable.get() != null) { + throw new RuntimeException(throwable.get().getMessage(), throwable.get()); + } return splits; }