Skip to content

Commit

Permalink
[IOTDB-1541] Change sequence of wal and memtable in insert (#3660)
Browse files Browse the repository at this point in the history
  • Loading branch information
HeimingZ authored Aug 10, 2021
1 parent 1462869 commit 394467b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,24 @@ public void addTVListRamCost(long cost) {
this.tvListRamCost += cost;
}

@Override
public void releaseTVListRamCost(long cost) {
this.tvListRamCost -= cost;
}

@Override
public long getTVListsRamCost() {
return tvListRamCost;
}

@Override
public void addTextDataSize(long testDataSize) {
this.memSize += testDataSize;
public void addTextDataSize(long textDataSize) {
this.memSize += textDataSize;
}

@Override
public void releaseTextDataSize(long textDataSize) {
this.memSize -= textDataSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public interface IMemTable {
/** only used when mem control enabled */
void addTVListRamCost(long cost);

/** only used when mem control enabled */
void releaseTVListRamCost(long cost);

/** only used when mem control enabled */
long getTVListsRamCost();

Expand Down Expand Up @@ -137,6 +140,9 @@ ReadOnlyMemChunk query(
/** only used when mem control enabled */
void addTextDataSize(long textDataIncrement);

/** only used when mem control enabled */
void releaseTextDataSize(long textDataDecrement);

long getMaxPlanIndex();

long getMinPlanIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,18 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
}
}

long[] memIncrements = null;
if (enableMemControl) {
checkMemCostAndAddToTspInfo(insertRowPlan);
memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan);
}

workMemTable.insert(insertRowPlan);

if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
getLogNode().write(insertRowPlan);
} catch (Exception e) {
if (enableMemControl && memIncrements != null) {
rollbackMemoryInfo(memIncrements);
}
throw new WriteProcessException(
String.format(
"%s: %s write WAL failed",
Expand All @@ -227,6 +229,8 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
}
}

workMemTable.insert(insertRowPlan);

// update start time of this memtable
tsFileResource.updateStartTime(
insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime());
Expand Down Expand Up @@ -261,29 +265,43 @@ public void insertTablet(
}
}

long[] memIncrements = null;
try {
if (enableMemControl) {
checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
}
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage());
}
throw new WriteProcessException(e);
}

try {
workMemTable.insertTablet(insertTabletPlan, start, end);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
insertTabletPlan.setStart(start);
insertTabletPlan.setEnd(end);
getLogNode().write(insertTabletPlan);
}
} catch (Exception e) {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
if (enableMemControl && memIncrements != null) {
rollbackMemoryInfo(memIncrements);
}
throw new WriteProcessException(e);
}

try {
workMemTable.insertTablet(insertTabletPlan, start, end);
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
throw new WriteProcessException(e);
}

for (int i = start; i < end; i++) {
results[i] = RpcUtils.SUCCESS_STATUS;
}
Expand All @@ -300,7 +318,7 @@ public void insertTablet(
}

@SuppressWarnings("squid:S3776") // high Cognitive Complexity
private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
Expand Down Expand Up @@ -347,12 +365,13 @@ private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
}

private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
if (start >= end) {
return;
return new long[] {0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata

Expand Down Expand Up @@ -388,6 +407,7 @@ private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
return memIncrements;
}

private void updateMemCost(
Expand Down Expand Up @@ -492,6 +512,19 @@ private void updateMemoryInfo(
workMemTable.addTextDataSize(textDataIncrement);
}

private void rollbackMemoryInfo(long[] memIncrements) {
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];

memTableIncrement += textDataIncrement;
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
workMemTable.releaseTVListRamCost(memTableIncrement);
workMemTable.releaseTextDataSize(textDataIncrement);
}

/**
* Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
* <= 'timestamp' in the deletion. <br>
Expand Down

0 comments on commit 394467b

Please sign in to comment.