diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java index e2f03b5062..c18fcb8c8a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java @@ -28,7 +28,6 @@ import com.netease.arctic.data.file.DataFileWithSequence; import com.netease.arctic.data.file.DeleteFileWithSequence; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.TableProperties; import com.netease.arctic.utils.SequenceNumberFetcher; import com.netease.arctic.utils.SerializationUtils; import org.apache.iceberg.DataFile; @@ -36,7 +35,6 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.util.BinPacking; -import org.apache.iceberg.util.PropertyUtil; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -78,13 +76,8 @@ protected List filterRepeatFileScanTask(Collection f } protected List> binPackFileScanTask(List fileScanTasks) { - long targetFileSize = getTargetSize(); - - Long sum = fileScanTasks.stream() - .map(fileScanTask -> fileScanTask.file().fileSizeInBytes()).reduce(0L, Long::sum); - int taskCnt = (int) (sum / targetFileSize) + 1; - - return new BinPacking.ListPacker(targetFileSize, taskCnt, true) + long taskSize = getTaskSize(); + return new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) .pack(fileScanTasks, fileScanTask -> fileScanTask.file().fileSizeInBytes()); } @@ -179,12 +172,6 @@ protected SequenceNumberFetcher seqNumberFetcher() { return sequenceNumberFetcher; } - private long getTargetSize() { - return PropertyUtil.propertyAsLong(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); - } - protected void getOptimizeFile(List fileScanTasks, List dataFiles, List eqDeleteFiles, diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java index 3442fe4fb0..6deb7b6dfb 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java @@ -195,6 +195,12 @@ protected long getSmallFileSize(Map properties) { } } + protected long getTaskSize() { + return PropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, + TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT); + } + protected interface PartitionWeight extends Comparable { } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java index 92f589c127..9713948151 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java @@ -116,11 +116,10 @@ protected OptimizeType getOptimizeType() { protected List collectTask(String partition) { List result; if (arcticTable.isUnkeyedTable()) { - result = collectTasksWithBinPack(partition, getOptimizingTargetSize()); + result = collectTasksWithBinPack(partition, getTaskSize()); } else { if (allBaseFilesInRootNode(partition)) { - // TO avoid too big task size leading to optimizer OOM, we limit the max task size to 4 * optimizing target size - result = collectTasksWithBinPack(partition, getOptimizingTargetSize() * Math.min(4, getBaseBucketSize())); + result = collectTasksWithBinPack(partition, getTaskSize()); } else { result = collectTasksWithNodes(partition); } @@ -204,9 +203,7 @@ private List collectTasksWithBinPack(String partition, long t false, constructCustomHiveSubdirectory(baseFiles) ); - Long sum = baseFiles.stream().map(DataFile::fileSizeInBytes).reduce(0L, Long::sum); - int taskCnt = (int) (sum / taskSize) + 1; - List> packed = new BinPacking.ListPacker(taskSize, taskCnt, true) + List> packed = new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) .pack(baseFiles, DataFile::fileSizeInBytes); for (List files : packed) { if (CollectionUtils.isNotEmpty(files)) { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java index 29e10ddfbd..3e1d727b1b 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java @@ -170,12 +170,8 @@ private List collectUnKeyedTableTasks(String partition) { List posDeleteFiles = getPosDeleteFilesFromFileTree(partition); if (nodeTaskNeedBuild(partition, posDeleteFiles, baseFiles)) { // for unkeyed table, tasks can be bin-packed - long taskSize = CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); - Long sum = baseFiles.stream().map(DataFile::fileSizeInBytes).reduce(0L, Long::sum); - int taskCnt = (int) (sum / taskSize) + 1; - List> packed = new BinPacking.ListPacker(taskSize, taskCnt, true) + long taskSize = getTaskSize(); + List> packed = new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) .pack(baseFiles, DataFile::fileSizeInBytes); for (List files : packed) { if (CollectionUtils.isNotEmpty(files)) { diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java index a20eafde6c..6773775a2a 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java @@ -109,7 +109,7 @@ public void testBinPackPlan() throws Exception { long targetSize = dataFiles.get(0).fileSizeInBytes() * packFileCnt + 100; table.updateProperties() .set(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0") - .set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetSize + "") + .set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, targetSize + "") .commit(); insertEqDeleteFiles(table, 1); insertPosDeleteFiles(table, dataFiles); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java index 5710691ce5..c9d05a7848 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java @@ -287,11 +287,10 @@ public void testNoPartitionKeyedTableSplitRootNode() throws IOException { Pair> insertBaseResult = insertTableBaseDataFiles(testNoPartitionTable); List baseDataFiles = insertBaseResult.second(); long fileSize = baseDataFiles.get(0).fileSizeInBytes(); - // set task_size to be 2.5 * file_size, task_size = base_bucket * target_size = 2.5 * file_size - // base_bucket = 4, so target_size = file_size * 2.5 / 4 = file_size * 5 / 8 + // set task_size to be 2.5 * file_size testNoPartitionTable.updateProperties() .set(TableProperties.BASE_FILE_INDEX_HASH_BUCKET, "4") - .set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, fileSize * 5 / 8 + "") + .set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, fileSize * 5 / 2 + "") .commit(); List baseFiles = planBaseFiles(testNoPartitionTable); diff --git a/core/src/main/java/com/netease/arctic/table/TableProperties.java b/core/src/main/java/com/netease/arctic/table/TableProperties.java index a64cc40b22..4fc5cd71fb 100644 --- a/core/src/main/java/com/netease/arctic/table/TableProperties.java +++ b/core/src/main/java/com/netease/arctic/table/TableProperties.java @@ -92,6 +92,9 @@ private TableProperties() { public static final String SELF_OPTIMIZING_MAX_FILE_SIZE_BYTES = "self-optimizing.max-file-size-bytes"; public static final long SELF_OPTIMIZING_MAX_FILE_SIZE_BYTES_DEFAULT = 8589934592L; // 8 GB + public static final String SELF_OPTIMIZING_MAX_TASK_SIZE = "self-optimizing.max-task-size-bytes"; + public static final long SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT = 1073741824L; // 1 GB + public static final String SELF_OPTIMIZING_FRAGMENT_RATIO = "self-optimizing.fragment-ratio"; public static final int SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT = 8; diff --git a/site/docs/ch/configurations.md b/site/docs/ch/configurations.md index bba8e590af..4baf155954 100644 --- a/site/docs/ch/configurations.md +++ b/site/docs/ch/configurations.md @@ -17,6 +17,7 @@ Self-optimizing 配置对 Iceberg format, Mixed streaming format 都会生效。 | self-optimizing.target-size | 134217728(128MB)| self-optimizing 的目标文件大小 | | self-optimizing.max-file-count | 10000 | 一次 self-optimizing 最多处理的文件个数 | | | self-optimizing.max-file-size-bytes | 8589934592(8GB) | 一次 self-optimizing 最多处理的文件大小 | | +| self-optimizing.max-task-size-bytes | 1073741824(1GB) | self-optimizing task 大小 | | | self-optimizing.fragment-ratio | 8 | fragment 文件大小阈值,实际计算时取倒数与 self-optimizing.target-size 的值相乘 | | self-optimizing.minor.trigger.file-count | 12 | 触发 minor optimizing 的 fragment 最少文件数量 | | self-optimizing.minor.trigger.interval | 3600000(1小时) | 触发 minor optimizing 的最长时间间隔 |