From 8ec2e13516466898991eeb54860d7c33f2d0aa8f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 4 Sep 2024 18:31:11 +0800 Subject: [PATCH 1/2] calculate metadata memory --- .../compaction/io/CompactionTsFileReader.java | 31 +++++++++++++ .../estimator/CompactionEstimateUtils.java | 45 +++++++++++++++++++ ...astCompactionInnerCompactionEstimator.java | 13 +++++- .../FastCrossSpaceCompactionEstimator.java | 12 ++++- .../ReadChunkInnerCompactionEstimator.java | 15 ++++++- 5 files changed, 112 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java index 745dde2f696c..c5dddebaf69b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java @@ -165,6 +165,37 @@ public ByteBuffer readPageWithoutUnCompressing(long startOffset, int pageSize) return timeseriesMetadataOffsetMap; } + public Map> getTimeseriesMetadataOffsetByDevice( + MetadataIndexNode measurementNode) throws IOException { + Map> timeseriesMetadataOffsetMap = new LinkedHashMap<>(); + List childrenEntryList = measurementNode.getChildren(); + for (int i = 0; i < childrenEntryList.size(); i++) { + long startOffset = childrenEntryList.get(i).getOffset(); + long endOffset = + i == childrenEntryList.size() - 1 + ? measurementNode.getEndOffset() + : childrenEntryList.get(i + 1).getOffset(); + ByteBuffer nextBuffer = readData(startOffset, endOffset); + if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + // leaf measurement node + while (nextBuffer.hasRemaining()) { + int metadataStartOffset = nextBuffer.position(); + timeseriesMetadataOffsetMap.put( + childrenEntryList.get(i).getCompareKey().toString(), + new Pair<>(startOffset + metadataStartOffset, startOffset + nextBuffer.position())); + } + + } else { + // internal measurement node + MetadataIndexNode nextLayerMeasurementNode = + getDeserializeContext().deserializeMetadataIndexNode(nextBuffer, false); + timeseriesMetadataOffsetMap.putAll( + getTimeseriesMetadataOffsetByDevice(nextLayerMeasurementNode)); + } + } + return timeseriesMetadataOffsetMap; + } + private void acquireReadDataSizeWithCompactionReadRateLimiter(int readDataSize) { CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1); CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java index aab182a21af0..c919ed96b08a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java @@ -20,11 +20,14 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; @@ -97,6 +100,48 @@ public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws IOE averageChunkMetadataSize); } + public static long roughEstimateMetadataCostInCompaction( + List resources, CompactionType taskType) throws IOException { + if (!CompactionEstimateUtils.addReadLock(resources)) { + return -1L; + } + long cost = -1L; + long modsFileSize = 0L; + try { + for (TsFileResource resource : resources) { + if (resource.modFileExists()) { + modsFileSize += resource.getModFile().getSize(); + } + try (CompactionTsFileReader reader = + new CompactionTsFileReader(resource.getTsFilePath(), taskType)) { + cost = Math.max(cost, getMaxTimeseriesMetadataOfOneDeviceSize(reader)); + } + } + return cost + modsFileSize; + } finally { + CompactionEstimateUtils.releaseReadLock(resources); + } + } + + public static long getMaxTimeseriesMetadataOfOneDeviceSize(CompactionTsFileReader reader) + throws IOException { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + long maxSize = 0; + while (deviceIterator.hasNext()) { + deviceIterator.next(); + MetadataIndexNode firstMeasurementNodeOfCurrentDevice = + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); + long totalTimeseriesMetadataSizeOfCurrentDevice = 0; + Map> timeseriesMetadataOffsetByDevice = + reader.getTimeseriesMetadataOffsetByDevice(firstMeasurementNodeOfCurrentDevice); + for (Pair offsetPair : timeseriesMetadataOffsetByDevice.values()) { + totalTimeseriesMetadataSizeOfCurrentDevice += (offsetPair.right - offsetPair.left); + } + maxSize = Math.max(maxSize, totalTimeseriesMetadataSizeOfCurrentDevice); + } + return maxSize; + } + public static boolean shouldAccurateEstimate(long roughEstimatedMemCost) { return roughEstimatedMemCost > 0 && IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java index 32140f5a4e55..9c1fe4e05155 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.io.IOException; @@ -80,6 +81,15 @@ public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOExce @Override public long roughEstimateInnerCompactionMemory(List resources) throws IOException { + long metadataCost = + CompactionEstimateUtils.roughEstimateMetadataCostInCompaction( + resources, + resources.get(0).isSeq() + ? CompactionType.INNER_SEQ_COMPACTION + : CompactionType.INNER_UNSEQ_COMPACTION); + if (metadataCost < 0) { + return metadataCost; + } int maxConcurrentSeriesNum = Math.max( config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum()); @@ -89,6 +99,7 @@ public long roughEstimateInnerCompactionMemory(List resources) // source files (chunk + uncompressed page) * overlap file num // target file (chunk + unsealed page writer) return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) - + memoryBudgetForFileWriter; + + memoryBudgetForFileWriter + + metadataCost; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java index ffd4ceba1bed..a4d077646bad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.io.IOException; @@ -85,6 +86,14 @@ public long roughEstimateCrossCompactionMemory( List sourceFiles = new ArrayList<>(seqResources.size() + unseqResources.size()); sourceFiles.addAll(seqResources); sourceFiles.addAll(unseqResources); + + long metadataCost = + CompactionEstimateUtils.roughEstimateMetadataCostInCompaction( + sourceFiles, CompactionType.CROSS_COMPACTION); + if (metadataCost < 0) { + return metadataCost; + } + int maxConcurrentSeriesNum = Math.max( config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum()); @@ -94,6 +103,7 @@ public long roughEstimateCrossCompactionMemory( // source files (chunk + uncompressed page) * overlap file num // target files (chunk + unsealed page writer) return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) - + memoryBudgetForFileWriter; + + memoryBudgetForFileWriter + + metadataCost; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java index cd90dd334f3a..9d126b868101 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import java.io.IOException; import java.util.List; public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimator { @@ -71,7 +73,14 @@ public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) { } @Override - public long roughEstimateInnerCompactionMemory(List resources) { + public long roughEstimateInnerCompactionMemory(List resources) + throws IOException { + long metadataCost = + CompactionEstimateUtils.roughEstimateMetadataCostInCompaction( + resources, CompactionType.INNER_SEQ_COMPACTION); + if (metadataCost < 0) { + return metadataCost; + } int maxConcurrentSeriesNum = Math.max( config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum()); @@ -79,6 +88,8 @@ public long roughEstimateInnerCompactionMemory(List resources) { long maxPageSize = tsFileConfig.getPageSizeInByte(); // source files (chunk + uncompressed page) // target file (chunk + unsealed page writer) - return 2 * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) + memoryBudgetForFileWriter; + return 2 * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) + + memoryBudgetForFileWriter + + metadataCost; } } From fca77d3a706bcf91398cec41de64fe793e730b4d Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 5 Sep 2024 18:45:00 +0800 Subject: [PATCH 2/2] fix bug --- .../compaction/io/CompactionTsFileReader.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java index c5dddebaf69b..c8d9cae045f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java @@ -175,18 +175,14 @@ public Map> getTimeseriesMetadataOffsetByDevice( i == childrenEntryList.size() - 1 ? measurementNode.getEndOffset() : childrenEntryList.get(i + 1).getOffset(); - ByteBuffer nextBuffer = readData(startOffset, endOffset); if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { // leaf measurement node - while (nextBuffer.hasRemaining()) { - int metadataStartOffset = nextBuffer.position(); - timeseriesMetadataOffsetMap.put( - childrenEntryList.get(i).getCompareKey().toString(), - new Pair<>(startOffset + metadataStartOffset, startOffset + nextBuffer.position())); - } - + timeseriesMetadataOffsetMap.put( + childrenEntryList.get(i).getCompareKey().toString(), + new Pair<>(startOffset, endOffset)); } else { // internal measurement node + ByteBuffer nextBuffer = readData(startOffset, endOffset); MetadataIndexNode nextLayerMeasurementNode = getDeserializeContext().deserializeMetadataIndexNode(nextBuffer, false); timeseriesMetadataOffsetMap.putAll(