Skip to content

Commit

Permalink
[AMORO-1935] 0.4.x introduce a table property for task spliting size …
Browse files Browse the repository at this point in the history
…of self-optimizing (apache#1936)

* add table property self-optimizing.max-task-file-size-bytes

* change property name to self-optimizing.max-task-size-bytes

* fix docs
  • Loading branch information
wangtaohz authored Sep 18, 2023
1 parent 2d3b0bd commit 7879d44
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@
import com.netease.arctic.data.file.DataFileWithSequence;
import com.netease.arctic.data.file.DeleteFileWithSequence;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.SequenceNumberFetcher;
import com.netease.arctic.utils.SerializationUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.PropertyUtil;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,13 +76,8 @@ protected List<FileScanTask> filterRepeatFileScanTask(Collection<FileScanTask> f
}

protected List<List<FileScanTask>> binPackFileScanTask(List<FileScanTask> fileScanTasks) {
long targetFileSize = getTargetSize();

Long sum = fileScanTasks.stream()
.map(fileScanTask -> fileScanTask.file().fileSizeInBytes()).reduce(0L, Long::sum);
int taskCnt = (int) (sum / targetFileSize) + 1;

return new BinPacking.ListPacker<FileScanTask>(targetFileSize, taskCnt, true)
long taskSize = getTaskSize();
return new BinPacking.ListPacker<FileScanTask>(taskSize, Integer.MAX_VALUE, true)
.pack(fileScanTasks, fileScanTask -> fileScanTask.file().fileSizeInBytes());
}

Expand Down Expand Up @@ -179,12 +172,6 @@ protected SequenceNumberFetcher seqNumberFetcher() {
return sequenceNumberFetcher;
}

private long getTargetSize() {
return PropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
}

protected void getOptimizeFile(List<FileScanTask> fileScanTasks,
List<DataFile> dataFiles,
List<DeleteFile> eqDeleteFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ protected long getSmallFileSize(Map<String, String> properties) {
}
}

protected long getTaskSize() {
return PropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE,
TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT);
}

protected interface PartitionWeight extends Comparable<PartitionWeight> {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,10 @@ protected OptimizeType getOptimizeType() {
protected List<BasicOptimizeTask> collectTask(String partition) {
List<BasicOptimizeTask> result;
if (arcticTable.isUnkeyedTable()) {
result = collectTasksWithBinPack(partition, getOptimizingTargetSize());
result = collectTasksWithBinPack(partition, getTaskSize());
} else {
if (allBaseFilesInRootNode(partition)) {
// TO avoid too big task size leading to optimizer OOM, we limit the max task size to 4 * optimizing target size
result = collectTasksWithBinPack(partition, getOptimizingTargetSize() * Math.min(4, getBaseBucketSize()));
result = collectTasksWithBinPack(partition, getTaskSize());
} else {
result = collectTasksWithNodes(partition);
}
Expand Down Expand Up @@ -204,9 +203,7 @@ private List<BasicOptimizeTask> collectTasksWithBinPack(String partition, long t
false, constructCustomHiveSubdirectory(baseFiles)
);

Long sum = baseFiles.stream().map(DataFile::fileSizeInBytes).reduce(0L, Long::sum);
int taskCnt = (int) (sum / taskSize) + 1;
List<List<DataFile>> packed = new BinPacking.ListPacker<DataFile>(taskSize, taskCnt, true)
List<List<DataFile>> packed = new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(baseFiles, DataFile::fileSizeInBytes);
for (List<DataFile> files : packed) {
if (CollectionUtils.isNotEmpty(files)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,8 @@ private List<BasicOptimizeTask> collectUnKeyedTableTasks(String partition) {
List<DeleteFile> posDeleteFiles = getPosDeleteFilesFromFileTree(partition);
if (nodeTaskNeedBuild(partition, posDeleteFiles, baseFiles)) {
// for unkeyed table, tasks can be bin-packed
long taskSize = CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
Long sum = baseFiles.stream().map(DataFile::fileSizeInBytes).reduce(0L, Long::sum);
int taskCnt = (int) (sum / taskSize) + 1;
List<List<DataFile>> packed = new BinPacking.ListPacker<DataFile>(taskSize, taskCnt, true)
long taskSize = getTaskSize();
List<List<DataFile>> packed = new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(baseFiles, DataFile::fileSizeInBytes);
for (List<DataFile> files : packed) {
if (CollectionUtils.isNotEmpty(files)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testBinPackPlan() throws Exception {
long targetSize = dataFiles.get(0).fileSizeInBytes() * packFileCnt + 100;
table.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0")
.set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetSize + "")
.set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, targetSize + "")
.commit();
insertEqDeleteFiles(table, 1);
insertPosDeleteFiles(table, dataFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,10 @@ public void testNoPartitionKeyedTableSplitRootNode() throws IOException {
Pair<Snapshot, List<DataFile>> insertBaseResult = insertTableBaseDataFiles(testNoPartitionTable);
List<DataFile> baseDataFiles = insertBaseResult.second();
long fileSize = baseDataFiles.get(0).fileSizeInBytes();
// set task_size to be 2.5 * file_size, task_size = base_bucket * target_size = 2.5 * file_size
// base_bucket = 4, so target_size = file_size * 2.5 / 4 = file_size * 5 / 8
// set task_size to be 2.5 * file_size
testNoPartitionTable.updateProperties()
.set(TableProperties.BASE_FILE_INDEX_HASH_BUCKET, "4")
.set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, fileSize * 5 / 8 + "")
.set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, fileSize * 5 / 2 + "")
.commit();

List<FileScanTask> baseFiles = planBaseFiles(testNoPartitionTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private TableProperties() {
public static final String SELF_OPTIMIZING_MAX_FILE_SIZE_BYTES = "self-optimizing.max-file-size-bytes";
public static final long SELF_OPTIMIZING_MAX_FILE_SIZE_BYTES_DEFAULT = 8589934592L; // 8 GB

public static final String SELF_OPTIMIZING_MAX_TASK_SIZE = "self-optimizing.max-task-size-bytes";
public static final long SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT = 1073741824L; // 1 GB

public static final String SELF_OPTIMIZING_FRAGMENT_RATIO = "self-optimizing.fragment-ratio";
public static final int SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT = 8;

Expand Down
1 change: 1 addition & 0 deletions site/docs/ch/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Self-optimizing 配置对 Iceberg format, Mixed streaming format 都会生效。
| self-optimizing.target-size | 134217728(128MB)| self-optimizing 的目标文件大小 |
| self-optimizing.max-file-count | 10000 | 一次 self-optimizing 最多处理的文件个数 | |
| self-optimizing.max-file-size-bytes | 8589934592(8GB) | 一次 self-optimizing 最多处理的文件大小 | |
| self-optimizing.max-task-size-bytes | 1073741824(1GB) | self-optimizing task 大小 | |
| self-optimizing.fragment-ratio | 8 | fragment 文件大小阈值,实际计算时取倒数与 self-optimizing.target-size 的值相乘 |
| self-optimizing.minor.trigger.file-count | 12 | 触发 minor optimizing 的 fragment 最少文件数量 |
| self-optimizing.minor.trigger.interval | 3600000(1小时) | 触发 minor optimizing 的最长时间间隔 |
Expand Down

0 comments on commit 7879d44

Please sign in to comment.