From 394467bd701331c58742eb84e684e80ea85688c8 Mon Sep 17 00:00:00 2001 From: Alan Choo <43991780+HeimingZ@users.noreply.github.com> Date: Tue, 10 Aug 2021 16:08:49 +0800 Subject: [PATCH] [IOTDB-1541] Change sequence of wal and memtable in insert (#3660) --- .../db/engine/memtable/AbstractMemTable.java | 14 +++++- .../iotdb/db/engine/memtable/IMemTable.java | 6 +++ .../engine/storagegroup/TsFileProcessor.java | 49 ++++++++++++++++--- 3 files changed, 59 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 51af2315b95b..e2b7638820c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -392,14 +392,24 @@ public void addTVListRamCost(long cost) { this.tvListRamCost += cost; } + @Override + public void releaseTVListRamCost(long cost) { + this.tvListRamCost -= cost; + } + @Override public long getTVListsRamCost() { return tvListRamCost; } @Override - public void addTextDataSize(long testDataSize) { - this.memSize += testDataSize; + public void addTextDataSize(long textDataSize) { + this.memSize += textDataSize; + } + + @Override + public void releaseTextDataSize(long textDataSize) { + this.memSize -= textDataSize; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index ff865eacd1e3..8cf61dc8753e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -57,6 +57,9 @@ public interface IMemTable { /** only used when mem control enabled */ void addTVListRamCost(long cost); + /** only used when mem control enabled */ + void releaseTVListRamCost(long cost); + /** only used when mem control enabled */ long getTVListsRamCost(); @@ -137,6 +140,9 @@ ReadOnlyMemChunk query( /** only used when mem control enabled */ void addTextDataSize(long textDataIncrement); + /** only used when mem control enabled */ + void releaseTextDataSize(long textDataDecrement); + long getMaxPlanIndex(); long getMinPlanIndex(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index b59c45c134c3..9ef19e46fb1f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -209,16 +209,18 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { } } + long[] memIncrements = null; if (enableMemControl) { - checkMemCostAndAddToTspInfo(insertRowPlan); + memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan); } - workMemTable.insert(insertRowPlan); - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { try { getLogNode().write(insertRowPlan); } catch (Exception e) { + if (enableMemControl && memIncrements != null) { + rollbackMemoryInfo(memIncrements); + } throw new WriteProcessException( String.format( "%s: %s write WAL failed", @@ -227,6 +229,8 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { } } + workMemTable.insert(insertRowPlan); + // update start time of this memtable tsFileResource.updateStartTime( insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime()); @@ -261,9 +265,10 @@ public void insertTablet( } } + long[] memIncrements = null; try { if (enableMemControl) { - checkMemCostAndAddToTspInfo(insertTabletPlan, start, end); + memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start, end); } } catch (WriteProcessException e) { for (int i = start; i < end; i++) { @@ -271,19 +276,32 @@ public void insertTablet( } throw new WriteProcessException(e); } + try { - workMemTable.insertTablet(insertTabletPlan, start, end); if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { insertTabletPlan.setStart(start); insertTabletPlan.setEnd(end); getLogNode().write(insertTabletPlan); } } catch (Exception e) { + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + if (enableMemControl && memIncrements != null) { + rollbackMemoryInfo(memIncrements); + } + throw new WriteProcessException(e); + } + + try { + workMemTable.insertTablet(insertTabletPlan, start, end); + } catch (WriteProcessException e) { for (int i = start; i < end; i++) { results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } throw new WriteProcessException(e); } + for (int i = start; i < end; i++) { results[i] = RpcUtils.SUCCESS_STATUS; } @@ -300,7 +318,7 @@ public void insertTablet( } @SuppressWarnings("squid:S3776") // high Cognitive Complexity - private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) + private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) throws WriteProcessException { // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 long memTableIncrement = 0L; @@ -347,12 +365,13 @@ private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan) } } updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); + return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; } - private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end) + private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end) throws WriteProcessException { if (start >= end) { - return; + return new long[] {0, 0, 0}; } long[] memIncrements = new long[3]; // memTable, text, chunk metadata @@ -388,6 +407,7 @@ private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int long textDataIncrement = memIncrements[1]; long chunkMetadataIncrement = memIncrements[2]; updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); + return memIncrements; } private void updateMemCost( @@ -492,6 +512,19 @@ private void updateMemoryInfo( workMemTable.addTextDataSize(textDataIncrement); } + private void rollbackMemoryInfo(long[] memIncrements) { + long memTableIncrement = memIncrements[0]; + long textDataIncrement = memIncrements[1]; + long chunkMetadataIncrement = memIncrements[2]; + + memTableIncrement += textDataIncrement; + storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); + tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + workMemTable.releaseTVListRamCost(memTableIncrement); + workMemTable.releaseTextDataSize(textDataIncrement); + } + /** * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which * <= 'timestamp' in the deletion.