From 896b527709ba82ac86e572740ecf4229adfc838d Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 16 Oct 2024 19:20:26 +0800 Subject: [PATCH 1/3] Optimize write metric update for insertTablets and insertRelationalTablet --- .../dataregion/DataExecutionVisitor.java | 3 + .../storageengine/dataregion/DataRegion.java | 142 ++++++++++++------ .../dataregion/memtable/TsFileProcessor.java | 40 +++-- .../memtable/TsFileProcessorTest.java | 52 ++++--- 4 files changed, 152 insertions(+), 85 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index bc9bb2022422..90880cd279c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -169,6 +169,9 @@ public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion dataRegion.insertTablets(node); dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; + } catch (WriteProcessRejectException e) { + LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage()); + return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } catch (BatchProcessException e) { LOGGER.warn("Batch failure in executing a InsertMultiTabletsNode."); TSStatus firstStatus = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index c14a8d511e74..1031770dc277 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1071,7 +1071,11 @@ private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) { } private boolean splitAndInsert( - InsertTabletNode insertTabletNode, int loc, int endOffset, TSStatus[] results) { + InsertTabletNode insertTabletNode, + int loc, + int endOffset, + TSStatus[] results, + long[] costsForMetrics) { boolean noFailure = true; // before is first start point @@ -1103,7 +1107,8 @@ private boolean splitAndInsert( isSequence, results, beforeTimePartition, - noFailure) + noFailure, + costsForMetrics) && noFailure; if (before < loc) { insertCnt += 1; @@ -1130,7 +1135,8 @@ private boolean splitAndInsert( isSequence, results, beforeTimePartition, - noFailure) + noFailure, + costsForMetrics) && noFailure; before = loc; if (before < loc) { @@ -1159,7 +1165,8 @@ private boolean splitAndInsert( isSequence, results, beforeTimePartition, - noFailure) + noFailure, + costsForMetrics) && noFailure; insertCnt += 1; logger.debug( @@ -1193,26 +1200,13 @@ public void insertTablet(InsertTabletNode insertTabletNode) } TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); - boolean noFailure; - int loc = insertTabletNode.checkTTL(results, i -> getTTL(insertTabletNode)); - noFailure = loc == 0; - List> deviceEndOffsetPairs = - insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount()); - int start = loc; - for (Pair deviceEndOffsetPair : deviceEndOffsetPairs) { - int end = deviceEndOffsetPair.getRight(); - noFailure = noFailure && splitAndInsert(insertTabletNode, start, end, results); - start = end; - } + long[] costsForMetrics = new long[4]; + boolean noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() - && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) { - // disable updating last cache on follower - startTime = System.nanoTime(); - tryToUpdateInsertTabletLastCache(insertTabletNode); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost( - System.nanoTime() - startTime); - } + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); if (!noFailure) { throw new BatchProcessException(results); @@ -1222,6 +1216,31 @@ public void insertTablet(InsertTabletNode insertTabletNode) } } + private boolean executeInsertTablet( + InsertTabletNode insertTabletNode, TSStatus[] results, long[] costsForMetrics) + throws OutOfTTLException { + boolean noFailure; + int loc = insertTabletNode.checkTTL(results, i -> getTTL(insertTabletNode)); + noFailure = loc == 0; + List> deviceEndOffsetPairs = + insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount()); + int start = loc; + for (Pair deviceEndOffsetPair : deviceEndOffsetPairs) { + int end = deviceEndOffsetPair.getRight(); + noFailure = + noFailure && splitAndInsert(insertTabletNode, start, end, results, costsForMetrics); + start = end; + } + if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() + && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) { + // disable updating last cache on follower + long startTime = System.nanoTime(); + tryToUpdateInsertTabletLastCache(insertTabletNode); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime); + } + return noFailure; + } + private void initFlushTimeMap(long timePartitionId) { if (config.isEnableSeparateData() && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) { @@ -1301,7 +1320,8 @@ private boolean insertTabletToTsFileProcessor( boolean sequence, TSStatus[] results, long timePartitionId, - boolean noFailure) { + boolean noFailure, + long[] costsForMetrics) { // return when start >= end or all measurement failed if (start >= end || insertTabletNode.allMeasurementFailed()) { if (logger.isDebugEnabled()) { @@ -1328,7 +1348,8 @@ private boolean insertTabletToTsFileProcessor( registerToTsFile(insertTabletNode, tsFileProcessor); try { - tsFileProcessor.insertTablet(insertTabletNode, start, end, results, noFailure); + tsFileProcessor.insertTablet( + insertTabletNode, start, end, results, noFailure, costsForMetrics); } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); return false; @@ -1424,7 +1445,7 @@ private List insertToTsFileProcessors( TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); } catch (WriteProcessException e) { insertRowsNode .getResults() @@ -3492,7 +3513,7 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); } catch (WriteProcessException e) { insertRowsOfOneDeviceNode .getResults() @@ -3604,30 +3625,57 @@ public void insert(InsertRowsNode insertRowsNode) * @param insertMultiTabletsNode batch of tablets belongs to multiple devices */ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) - throws BatchProcessException { - for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { - InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); - try { - insertTablet(insertTabletNode); - } catch (WriteProcessException e) { - insertMultiTabletsNode - .getResults() - .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); - } catch (BatchProcessException e) { - // for each error - TSStatus firstStatus = null; - for (TSStatus status : e.getFailingStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - firstStatus = status; + throws BatchProcessException, WriteProcessRejectException { + + StorageEngine.blockInsertionIfReject(); + long startTime = System.nanoTime(); + writeLock("insertTablets"); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); + try { + if (deleted) { + logger.info( + "Won't insert tablets {}, because region is deleted", + insertMultiTabletsNode.getSearchIndex()); + return; + } + long[] costsForMetrics = new long[4]; + for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { + InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); + TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; + Arrays.fill(results, RpcUtils.SUCCESS_STATUS); + try { + boolean noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); + if (!noFailure) { + throw new BatchProcessException(results); } - // return WRITE_PROCESS_REJECT directly for the consensus retry logic - if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { - insertMultiTabletsNode.getResults().put(i, status); - throw new BatchProcessException("Rejected inserting multi tablets"); + } catch (WriteProcessException e) { + insertMultiTabletsNode + .getResults() + .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + } catch (BatchProcessException e) { + // for each error + TSStatus firstStatus = null; + for (TSStatus status : e.getFailingStatus()) { + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + firstStatus = status; + } + // return WRITE_PROCESS_REJECT directly for the consensus retry logic + if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + insertMultiTabletsNode.getResults().put(i, status); + throw new BatchProcessException("Rejected inserting multi tablets"); + } } + insertMultiTabletsNode.getResults().put(i, firstStatus); } - insertMultiTabletsNode.getResults().put(i, firstStatus); } + + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + + } finally { + writeUnlock(); } if (!insertMultiTabletsNode.getResults().isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index a6b937f31ffb..f165905484b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -351,7 +351,7 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) costsForMetrics[3] += System.nanoTime() - startTime; } - public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics) + public void insertRows(InsertRowsNode insertRowsNode, long[] costsForMetrics) throws WriteProcessException { ensureMemTable(costsForMetrics); @@ -449,14 +449,20 @@ private void createNewWorkingMemTable() { walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath()); } - private long[] checkMemCost( - InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results, boolean noFailure) + private long[] scheduleMemoryBlock( + InsertTabletNode insertTabletNode, + int start, + int end, + TSStatus[] results, + boolean noFailure, + long[] costsForMetrics) throws WriteProcessException { long[] memIncrements; try { - long startTime = System.nanoTime(); + long memControlStartTime = System.nanoTime(); memIncrements = checkMemCost(insertTabletNode, start, end, noFailure, results); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - startTime); + // recordScheduleMemoryBlockCost + costsForMetrics[1] += System.nanoTime() - memControlStartTime; } catch (WriteProcessException e) { for (int i = start; i < end; i++) { results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); @@ -525,18 +531,18 @@ private long[] checkAlignedMemCost( * @param results result array */ public void insertTablet( - InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results, boolean noFailure) + InsertTabletNode insertTabletNode, + int start, + int end, + TSStatus[] results, + boolean noFailure, + long[] costsForMetrics) throws WriteProcessException { - if (workMemTable == null) { - long startTime = System.nanoTime(); - createNewWorkingMemTable(); - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime); - WritingMetrics.getInstance() - .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); - } + ensureMemTable(costsForMetrics); - long[] memIncrements = checkMemCost(insertTabletNode, start, end, results, noFailure); + long[] memIncrements = + scheduleMemoryBlock(insertTabletNode, start, end, results, noFailure, costsForMetrics); long startTime = System.nanoTime(); WALFlushListener walFlushListener; @@ -552,7 +558,8 @@ public void insertTablet( rollbackMemoryInfo(memIncrements); throw new WriteProcessException(e); } finally { - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(System.nanoTime() - startTime); + // recordScheduleWalCost + costsForMetrics[2] += System.nanoTime() - startTime; } startTime = System.nanoTime(); @@ -610,7 +617,8 @@ public void insertTablet( tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex()); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() - startTime); + // recordScheduleMemTableCost + costsForMetrics[3] += System.nanoTime() - startTime; } @SuppressWarnings("squid:S3776") // High Cognitive Complexity diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 59fa1ff764b2..5a7e260115ba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -303,12 +303,14 @@ public void alignedTvListRamCostTest() this.sgInfo.initTsFileProcessorInfo(processor); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test Tablet - processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true); + processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true, new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(100, true), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(100, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(200, true), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(200, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(720360, memTable.memSize()); @@ -339,26 +341,29 @@ public void alignedTvListRamCostTest2() this.sgInfo.initTsFileProcessorInfo(processor); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test Tablet - processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true); + processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true, new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(0, true), 0, 10, new TSStatus[10], true); + genInsertTableNodeFors3000ToS6000(0, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(100, true), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(100, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(100, true), 0, 10, new TSStatus[10], true); + genInsertTableNodeFors3000ToS6000(100, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(200, true), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(200, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(200, true), 0, 10, new TSStatus[10], true); + genInsertTableNodeFors3000ToS6000(200, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(300, true), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(300, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(300, true), 0, 10, new TSStatus[10], true); + genInsertTableNodeFors3000ToS6000(300, true), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); Assert.assertEquals(240000, memTable.getTotalPointsNum()); @@ -397,12 +402,15 @@ public void nonAlignedTvListRamCostTest() this.sgInfo.initTsFileProcessorInfo(processor); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test tablet - processor.insertTablet(genInsertTableNode(0, false), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(0, false), 0, 10, new TSStatus[10], true, new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(100, false), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(100, false), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); - processor.insertTablet(genInsertTableNode(200, false), 0, 10, new TSStatus[10], true); + processor.insertTablet( + genInsertTableNode(200, false), 0, 10, new TSStatus[10], true, new long[4]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(1440000, memTable.memSize()); @@ -459,7 +467,7 @@ public void testRamCostInsertSameNonAlignedDataBy2Ways() record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), i - 1); } - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -490,7 +498,7 @@ record = new TSRecord(102, "root.vehicle.d2"); insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); @@ -502,7 +510,7 @@ record = new TSRecord(102, "root.vehicle.d2"); insertRowNode1.setMeasurements(new String[1]); insertRowNode1.setValues(new String[1]); insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); processor1.insert(insertRowNode1, new long[4]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -557,7 +565,7 @@ public void testRamCostInsertSameAlignedDataBy2Ways() node.setAligned(true); insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -593,7 +601,7 @@ record = new TSRecord(102, "root.vehicle.d2"); insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); @@ -606,7 +614,7 @@ record = new TSRecord(102, "root.vehicle.d2"); insertRowNode1.setValues(new String[1]); insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); insertRowsNode.setAligned(true); - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); processor1.insert(insertRowNode1, new long[4]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -666,7 +674,7 @@ public void testRamCostInsertSameDataBy2Ways() } insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -728,7 +736,7 @@ public void testRamCostInsertSameDataBy2Ways2() } insertRowsNode.addOneInsertRowNode(node, i - 1); } - processor2.insert(insertRowsNode, new long[4]); + processor2.insertRows(insertRowsNode, new long[4]); IMemTable memTable2 = processor2.getWorkMemTable(); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); From b418a9303b7d37a20f3698aa0dd4065e7dc6d1bd Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 16 Oct 2024 19:49:26 +0800 Subject: [PATCH 2/3] optimize code --- .../iotdb/db/storageengine/dataregion/DataRegion.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1031770dc277..376fb0cbdded 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3643,19 +3643,18 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); + boolean noFailure = false; try { - boolean noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); - if (!noFailure) { - throw new BatchProcessException(results); - } + noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics); } catch (WriteProcessException e) { insertMultiTabletsNode .getResults() .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); - } catch (BatchProcessException e) { + } + if (!noFailure) { // for each error TSStatus firstStatus = null; - for (TSStatus status : e.getFailingStatus()) { + for (TSStatus status : results) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { firstStatus = status; } From fc51446949dea088e2b91349668f01f036028559 Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 16 Oct 2024 20:04:34 +0800 Subject: [PATCH 3/3] fix mvn build warning --- .../apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java | 3 ++- pom.xml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java index bccd1b865c48..2f20e4061a5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -91,7 +92,7 @@ private static void buildMqttPluginMap() throws IOException { } URL[] jarURLs = getPluginJarURLs(mqttDir); - logger.debug("MQTT Plugin jarURLs: {}", jarURLs); + logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs)); for (URL jarUrl : jarURLs) { ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl}); diff --git a/pom.xml b/pom.xml index a5e76b9912c3..635c385ffac4 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,7 @@ 2.0.0-SNAPSHOT pom Apache IoTDB Project Parent POM - This is the top level project that builds, packages the tsfile, iotdb engine, jdbc, and integration libs. + This is the top level project that builds, packages the iotdb engine, client, and integration libs. iotdb-api iotdb-client