Skip to content

Commit

Permalink
[ARCTIC-1730] Add a cache when building IcebergDeleteFile in IcebergT…
Browse files Browse the repository at this point in the history
…ableFileScanHelper.scan to prevent too many generated IcebergDeleteFiles (#1731)

create deleteFile by cache

Co-authored-by: huyuanfeng <[email protected]>
  • Loading branch information
huyuanfeng2018 and huyuanfeng authored Jul 24, 2023
1 parent c9bf12d commit 5581f4a
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class IcebergTableFileScanHelper implements TableFileScanHelper {
Expand All @@ -62,7 +64,8 @@ public List<FileScanResult> scan() {
long startTime = System.currentTimeMillis();
PartitionSpec partitionSpec = table.spec();
try (CloseableIterable<FileScanTask> filesIterable =
table.newScan().useSnapshot(snapshotId).planFiles()) {
table.newScan().useSnapshot(snapshotId).planFiles()) {
IcebergDeleteFileCacheGenerator deleteFilesGenerator = new IcebergDeleteFileCacheGenerator();
for (FileScanTask task : filesIterable) {
if (partitionFilter != null) {
StructLike partition = task.file().partition();
Expand All @@ -73,7 +76,9 @@ public List<FileScanResult> scan() {
}
IcebergDataFile dataFile = createDataFile(task.file());
List<IcebergContentFile<?>> deleteFiles =
task.deletes().stream().map(this::createDeleteFile).collect(Collectors.toList());
task.deletes().stream()
.map(deleteFilesGenerator::generate)
.collect(Collectors.toList());
results.add(new FileScanResult(dataFile, deleteFiles));
}
} catch (IOException e) {
Expand All @@ -94,7 +99,15 @@ private IcebergDataFile createDataFile(DataFile dataFile) {
return new IcebergDataFile(dataFile, sequenceNumberFetcher.sequenceNumberOf(dataFile.path().toString()));
}

private IcebergDeleteFile createDeleteFile(DeleteFile deleteFile) {
return new IcebergDeleteFile(deleteFile, sequenceNumberFetcher.sequenceNumberOf(deleteFile.path().toString()));
private class IcebergDeleteFileCacheGenerator {
private final Map<DeleteFile, IcebergDeleteFile> deleteFilesCache = Maps.newHashMap();

IcebergDeleteFile generate(DeleteFile deleteFile) {
return deleteFilesCache.computeIfAbsent(
deleteFile,
file ->
new IcebergDeleteFile(
file, sequenceNumberFetcher.sequenceNumberOf(file.path().toString())));
}
}
}

0 comments on commit 5581f4a

Please sign in to comment.