Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lancelly committed Jun 21, 2024
1 parent f6511b1 commit 61a5b0f
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.NotSynchronizedMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;

import org.apache.tsfile.read.filter.basic.Filter;
Expand Down Expand Up @@ -85,7 +85,7 @@ public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
this.memoryReservationManager =
new NotSynchronizedMemoryReservationManager(queryId, this.getClass().getName());
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
}

// TODO too many callers just pass a null SessionInfo which should be forbidden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
Expand Down Expand Up @@ -198,7 +198,7 @@ private FragmentInstanceContext(
this.dataNodeQueryContextMap = dataNodeQueryContextMap;
this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
this.memoryReservationManager =
new SynchronizedMemoryReservationManager(id.getQueryId(), this.getClass().getName());
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

private FragmentInstanceContext(
Expand All @@ -210,7 +210,7 @@ private FragmentInstanceContext(
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
this.memoryReservationManager =
new SynchronizedMemoryReservationManager(id.getQueryId(), this.getClass().getName());
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

private FragmentInstanceContext(
Expand All @@ -227,7 +227,7 @@ private FragmentInstanceContext(
this.globalTimeFilter = globalTimeFilter;
this.dataNodeQueryContextMap = null;
this.memoryReservationManager =
new SynchronizedMemoryReservationManager(id.getQueryId(), this.getClass().getName());
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

@TestOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected AbstractSeriesAggregationScanOperator(
this.timeRangeIterator = timeRangeIterator;

this.cachedRawDataSize =
(1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
(1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxReturnSize = maxReturnSize;
this.outputEndTime = outputEndTime;
this.canUseStatistics = canUseStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ public AlignedSeriesScanOperator(
public long calculateMaxPeekMemory() {
return Math.max(
maxReturnSize,
(1L + valueColumnCount)
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
* 3L);
(1L + valueColumnCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public SeriesScanOperator(

@Override
public long calculateMaxPeekMemory() {
return Math.max(
maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
return Math.max(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationManagerWrapper;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
Expand Down Expand Up @@ -144,8 +143,7 @@ public SeriesScanUtil(
this.orderUtils = new DescTimeOrderUtils();
this.mergeReader = getDescPriorityMergeReader();
}
this.mergeReader.setMemoryReservationManagerWrapper(
new SynchronizedMemoryReservationManagerWrapper(context.getMemoryReservationContext()));
this.mergeReader.setMemoryReservationManager(context.getMemoryReservationContext());

// init TimeSeriesMetadata materializer
this.seqTimeSeriesMetadata = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;

public class NotSynchronizedMemoryReservationManager implements MemoryReservationManager {
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class NotThreadSafeMemoryReservationManager implements MemoryReservationManager {
// To avoid reserving memory too frequently, we choose to do it in batches. This is the lower
// bound for each batch.
private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
Expand All @@ -39,8 +42,7 @@ public class NotSynchronizedMemoryReservationManager implements MemoryReservatio

private long bytesToBeReleased = 0;

public NotSynchronizedMemoryReservationManager(
final QueryId queryId, final String contextHolder) {
public NotThreadSafeMemoryReservationManager(final QueryId queryId, final String contextHolder) {
this.queryId = queryId;
this.contextHolder = contextHolder;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.apache.iotdb.db.queryengine.common.QueryId;

public class SynchronizedMemoryReservationManager extends NotSynchronizedMemoryReservationManager {
public SynchronizedMemoryReservationManager(QueryId queryId, String contextHolder) {
public class ThreadSafeMemoryReservationManager extends NotThreadSafeMemoryReservationManager {
public ThreadSafeMemoryReservationManager(QueryId queryId, String contextHolder) {
super(queryId, contextHolder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationManagerWrapper;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;

import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.reader.IPointReader;
Expand All @@ -41,7 +41,7 @@ public class PriorityMergeReader implements IPointReader {

protected long usedMemorySize = 0;

protected SynchronizedMemoryReservationManagerWrapper memoryReservationManagerWrapper;
protected MemoryReservationManager memoryReservationManager;

public PriorityMergeReader() {
heap =
Expand All @@ -54,9 +54,8 @@ public PriorityMergeReader() {
});
}

public void setMemoryReservationManagerWrapper(
SynchronizedMemoryReservationManagerWrapper memoryReservationManagerWrapper) {
this.memoryReservationManagerWrapper = memoryReservationManagerWrapper;
public void setMemoryReservationManager(MemoryReservationManager memoryReservationManager) {
this.memoryReservationManager = memoryReservationManager;
}

@TestOnly
Expand All @@ -77,8 +76,8 @@ public void addReader(IPointReader reader, MergeReaderPriority priority, long en
currentReadStopTime = Math.max(currentReadStopTime, endTime);
long size = element.getReader().getUsedMemorySize();
usedMemorySize += size;
if (memoryReservationManagerWrapper != null) {
memoryReservationManagerWrapper.reserveMemoryCumulatively(size);
if (memoryReservationManager != null) {
memoryReservationManager.reserveMemoryCumulatively(size);
}
} else {
reader.close();
Expand Down Expand Up @@ -110,8 +109,8 @@ public TimeValuePair nextTimeValuePair() throws IOException {
} else {
long size = top.getReader().getUsedMemorySize();
usedMemorySize -= size;
if (memoryReservationManagerWrapper != null) {
memoryReservationManagerWrapper.releaseMemoryCumulatively(size);
if (memoryReservationManager != null) {
memoryReservationManager.releaseMemoryCumulatively(size);
}
}
return ret;
Expand Down Expand Up @@ -139,8 +138,8 @@ protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOExc
if (!e.hasNext()) {
long size = e.getReader().getUsedMemorySize();
usedMemorySize -= size;
if (memoryReservationManagerWrapper != null) {
memoryReservationManagerWrapper.releaseMemoryCumulatively(size);
if (memoryReservationManager != null) {
memoryReservationManager.releaseMemoryCumulatively(size);
}
e.getReader().close();
continue;
Expand All @@ -155,8 +154,8 @@ protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOExc
} else {
long size = e.getReader().getUsedMemorySize();
usedMemorySize -= size;
if (memoryReservationManagerWrapper != null) {
memoryReservationManagerWrapper.releaseMemoryCumulatively(size);
if (memoryReservationManager != null) {
memoryReservationManager.releaseMemoryCumulatively(size);
}
// the chunk is end
e.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,12 @@ public void seriesScanOperatorTest() {
scanOptionsBuilder.build());

assertEquals(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxReturnSize());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 2L,
seriesScanOperator.calculateRetainedSizeAfterCallingNext());
assertEquals(0L, seriesScanOperator.calculateRetainedSizeAfterCallingNext());

} catch (IllegalPathException e) {
e.printStackTrace();
Expand Down Expand Up @@ -202,7 +200,7 @@ public void alignedSeriesScanOperatorTest() {
long maxPeekMemory =
Math.max(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
long maxReturnMemory =
Math.min(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
Expand Down Expand Up @@ -1006,8 +1004,7 @@ public void seriesAggregationScanOperatorTest() {
TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION;
long cachedRawDataSize =
2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
long cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
Expand Down Expand Up @@ -1039,7 +1036,7 @@ public void seriesAggregationScanOperatorTest() {
TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ 2 * LongColumn.SIZE_IN_BYTES_PER_POSITION;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
Expand Down Expand Up @@ -1082,7 +1079,7 @@ public void seriesAggregationScanOperatorTest() {
* (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION);
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
Expand Down Expand Up @@ -1122,7 +1119,7 @@ public void seriesAggregationScanOperatorTest() {
* (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION));
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
Expand Down Expand Up @@ -1160,7 +1157,7 @@ public void seriesAggregationScanOperatorTest() {
typeProvider);

expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
cachedRawDataSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
Expand Down

0 comments on commit 61a5b0f

Please sign in to comment.