From 5581f4aeab0860c486d9aa0e85be43f6dbba8fef Mon Sep 17 00:00:00 2001 From: big face cat <731030576@qq.com> Date: Mon, 24 Jul 2023 10:31:16 +0800 Subject: [PATCH] [ARCTIC-1730] Add a cache when building IcebergDeleteFile in IcebergTableFileScanHelper.scan to prevent too many generated IcebergDeleteFiles (#1731) create deleteFile by cache Co-authored-by: huyuanfeng --- .../scan/IcebergTableFileScanHelper.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java index 1783049158..9898ed1305 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java @@ -31,12 +31,14 @@ import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class IcebergTableFileScanHelper implements TableFileScanHelper { @@ -62,7 +64,8 @@ public List scan() { long startTime = System.currentTimeMillis(); PartitionSpec partitionSpec = table.spec(); try (CloseableIterable filesIterable = - table.newScan().useSnapshot(snapshotId).planFiles()) { + table.newScan().useSnapshot(snapshotId).planFiles()) { + IcebergDeleteFileCacheGenerator deleteFilesGenerator = new IcebergDeleteFileCacheGenerator(); for (FileScanTask task : filesIterable) { if (partitionFilter != null) { StructLike partition = task.file().partition(); @@ -73,7 +76,9 @@ public List scan() { } IcebergDataFile dataFile = createDataFile(task.file()); List> deleteFiles = - task.deletes().stream().map(this::createDeleteFile).collect(Collectors.toList()); + task.deletes().stream() + .map(deleteFilesGenerator::generate) + .collect(Collectors.toList()); results.add(new FileScanResult(dataFile, deleteFiles)); } } catch (IOException e) { @@ -94,7 +99,15 @@ private IcebergDataFile createDataFile(DataFile dataFile) { return new IcebergDataFile(dataFile, sequenceNumberFetcher.sequenceNumberOf(dataFile.path().toString())); } - private IcebergDeleteFile createDeleteFile(DeleteFile deleteFile) { - return new IcebergDeleteFile(deleteFile, sequenceNumberFetcher.sequenceNumberOf(deleteFile.path().toString())); + private class IcebergDeleteFileCacheGenerator { + private final Map deleteFilesCache = Maps.newHashMap(); + + IcebergDeleteFile generate(DeleteFile deleteFile) { + return deleteFilesCache.computeIfAbsent( + deleteFile, + file -> + new IcebergDeleteFile( + file, sequenceNumberFetcher.sequenceNumberOf(file.path().toString()))); + } } }