Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory calculate error when insertRecords with both aligned and non-aligned devices #12720

Merged
merged 7 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,6 @@ private void insertToTsFileProcessors(
if (v == null) {
v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
v.setSearchIndex(insertRowNode.getSearchIndex());
v.setAligned(insertRowNode.isAligned());
}
v.addOneInsertRowNode(insertRowNode, finalI);
return v;
Expand Down Expand Up @@ -3278,7 +3277,6 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
if (v == null) {
v = new InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
v.setSearchIndex(insertRowNode.getSearchIndex());
v.setAligned(insertRowNode.isAligned());
}
v.addOneInsertRowNode(insertRowNode, finalI);
return v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,7 @@ public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics)
long[] memIncrements;

long memControlStartTime = System.nanoTime();
if (insertRowsNode.isAligned()) {
memIncrements = checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
} else {
memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
}
memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
// recordScheduleMemoryBlockCost
costsForMetrics[1] += System.nanoTime() - memControlStartTime;

Expand Down Expand Up @@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
}

@SuppressWarnings("squid:S3776") // High Cognitive Complexity
private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;

long[] memIncrements = new long[3];
// device -> measurement -> adding TVList size
Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new HashMap<>();
Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = new HashMap<>();
// device -> (measurements -> datatype, adding aligned TVList size)
Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfoForAligned =
new HashMap<>();
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();
if (insertRowNode.isAligned()) {
handleAlignedData(insertRowNode, memIncrements, increasingMemTableInfoForAligned);
} else {
handleUnalignedData(insertRowNode, memIncrements, increasingMemTableInfoForNonAligned);
}
}
updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
HTHou marked this conversation as resolved.
Show resolved Hide resolved
return memIncrements;
}

@SuppressWarnings("squid:S3776") // High Cognitive Complexity
private void handleAlignedData(
HTHou marked this conversation as resolved.
Show resolved Hide resolved
InsertRowNode insertRowNode,
long[] memIncrements,
Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfoForAligned) {
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];

IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();

if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
&& !increasingMemTableInfoForAligned.containsKey(deviceId)) {
// For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
* dataTypes.length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
&& (!increasingMemTableInfo.containsKey(deviceId)
|| !increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
// ChunkMetadataIncrement
chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new HashMap<>())
.putIfAbsent(measurements[i], 1);
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
int addingPointNum =
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new HashMap<>())
.computeIfAbsent(measurements[i], k -> 0);
increasingMemTableInfoForAligned
.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
.left
.put(measurements[i], dataTypes[i]);
// TEXT data mem size
if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}

} else {
// For existed device of this mem table
AlignedWritableMemChunkGroup memChunkGroup =
(AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId);
AlignedWritableMemChunk alignedMemChunk =
memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
increasingMemTableInfoForAligned.computeIfAbsent(
deviceId, k -> new Pair<>(new HashMap<>(), 0));
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}

int addingPointNum = addingPointNumInfo.getRight();
// Extending the column of aligned mem chunk
if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i]))
&& !increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i])) {
memTableIncrement +=
((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0
? TVList.tvListArrayMemCost(dataTypes[i])
: 0;
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) -> v + 1);
((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* AlignedTVList.valueListArrayMemCost(dataTypes[i]);
HTHou marked this conversation as resolved.
Show resolved Hide resolved
increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], dataTypes[i]);
}
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
HTHou marked this conversation as resolved.
Show resolved Hide resolved
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
int addingPointNum = increasingMemTableInfoForAligned.get(deviceId).getRight();
// Here currentChunkPointNum + addingPointNum >= 1
if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
HTHou marked this conversation as resolved.
Show resolved Hide resolved
if (alignedMemChunk != null) {
dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes());
}
dataTypesInTVList.addAll(increasingMemTableInfoForAligned.get(deviceId).getLeft().values());
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
HTHou marked this conversation as resolved.
Show resolved Hide resolved
}
increasingMemTableInfoForAligned.get(deviceId).setRight(addingPointNum + 1);
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};

memIncrements[0] = memTableIncrement;
memIncrements[1] = textDataIncrement;
memIncrements[2] = chunkMetadataIncrement;
}

@SuppressWarnings("squid:S3776") // High Cognitive Complexity
private void handleUnalignedData(
InsertRowNode insertRowNode,
long[] memIncrements,
Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned) {

long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];

IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();

for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
&& (!increasingMemTableInfoForNonAligned.containsKey(deviceId)
|| !increasingMemTableInfoForNonAligned.get(deviceId).containsKey(measurements[i]))) {
// ChunkMetadataIncrement
chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
increasingMemTableInfoForNonAligned
.computeIfAbsent(deviceId, k -> new HashMap<>())
.putIfAbsent(measurements[i], 1);
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
int addingPointNum =
increasingMemTableInfoForNonAligned
.computeIfAbsent(deviceId, k -> new HashMap<>())
.computeIfAbsent(measurements[i], k -> 0);
memTableIncrement +=
((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0
? TVList.tvListArrayMemCost(dataTypes[i])
: 0;
increasingMemTableInfoForNonAligned
.get(deviceId)
.computeIfPresent(measurements[i], (k, v) -> v + 1);
}
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
memIncrements[0] = memTableIncrement;
memIncrements[1] = textDataIncrement;
memIncrements[2] = chunkMetadataIncrement;
}

@SuppressWarnings("squid:S3776") // high Cognitive Complexity
Expand Down Expand Up @@ -672,90 +776,6 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow(
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
}

@SuppressWarnings("squid:S3776") // high Cognitive Complexity
private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
// device -> (measurements -> datatype, adding aligned TVList size)
Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfo = new HashMap<>();
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();
if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
&& !increasingMemTableInfo.containsKey(deviceId)) {
// For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
* dataTypes.length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
.left
.put(measurements[i], dataTypes[i]);
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}

} else {
// For existed device of this mem table
AlignedWritableMemChunkGroup memChunkGroup =
(AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId);
AlignedWritableMemChunk alignedMemChunk =
memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0));
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}

int addingPointNum = addingPointNumInfo.getRight();
// Extending the column of aligned mem chunk
if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i]))
&& !increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
memTableIncrement +=
((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* AlignedTVList.valueListArrayMemCost(dataTypes[i]);
increasingMemTableInfo.get(deviceId).left.put(measurements[i], dataTypes[i]);
}
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
// Here currentChunkPointNum + addingPointNum >= 1
if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
if (alignedMemChunk != null) {
dataTypesInTVList.addAll(
((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes());
}
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
}
increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
}

private long[] checkMemCostAndAddToTspInfoForTablet(
IDeviceID deviceId,
String[] measurements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,65 @@ record = new TSRecord(102, "root.vehicle.d2");
Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
}

@Test
public void testRamCostInsertSameDataBy2Ways()
HTHou marked this conversation as resolved.
Show resolved Hide resolved
throws MetadataException, WriteProcessException, IOException {
TsFileProcessor processor1 =
new TsFileProcessor(
storageGroup,
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
(tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
this.sgInfo.initTsFileProcessorInfo(processor1);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
// insert 100 rows (50 aligned, 50 non-aligned) by insertRow
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2");
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
InsertRowNode node = buildInsertRowNodeByTSRecord(record);
if (i <= 50) {
node.setAligned(true);
}
processor1.insert(node, new long[4]);
}
IMemTable memTable1 = processor1.getWorkMemTable();

TsFileProcessor processor2 =
new TsFileProcessor(
storageGroup,
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
(tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
this.sgInfo.initTsFileProcessorInfo(processor2);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
insertRowsNode.setAligned(true);
// insert 100 rows (50 aligned, 50 non-aligned) by insertRows
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2");
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
InsertRowNode node = buildInsertRowNodeByTSRecord(record);
if (i <= 50) {
node.setAligned(true);
}
insertRowsNode.addOneInsertRowNode(node, i - 1);
}
processor2.insert(insertRowsNode, new long[4]);
IMemTable memTable2 = processor2.getWorkMemTable();

Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost());
Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum());
Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
}

@Test
public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
Expand Down
Loading