Skip to content

Commit

Permalink
Add memory control for MergeReader
Browse files Browse the repository at this point in the history
  • Loading branch information
lancelly authored Jun 21, 2024
1 parent 3dd6c8c commit bce9631
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;

import org.apache.tsfile.read.filter.basic.Filter;
Expand Down Expand Up @@ -78,20 +79,13 @@ public class MPPQueryContext {

// To avoid query front-end from consuming too much memory, it needs to reserve memory when
// constructing some Expression and PlanNode.
private long reservedBytesInTotalForFrontEnd = 0;

private long bytesToBeReservedForFrontEnd = 0;

// To avoid reserving memory too frequently, we choose to do it in batches. This is the lower
// bound
// for each batch.
private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;

private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance();
private final MemoryReservationManager memoryReservationManager;

public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
this.memoryReservationManager =
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
}

// TODO too many callers just pass a null SessionInfo which should be forbidden
Expand Down Expand Up @@ -127,7 +121,7 @@ public MPPQueryContext(

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseMemoryForFrontEnd();
this.releaseAllMemoryReservedForFrontEnd();
}

private void initResultNodeContext() {
Expand Down Expand Up @@ -313,40 +307,19 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) {
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
this.bytesToBeReservedForFrontEnd += bytes;
if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
reserveMemoryForFrontEndImmediately();
}
this.memoryReservationManager.reserveMemoryCumulatively(bytes);
}

public void reserveMemoryForFrontEndImmediately() {
if (bytesToBeReservedForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId());
this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
this.bytesToBeReservedForFrontEnd = 0;
}
this.memoryReservationManager.reserveMemoryImmediately();
}

public void releaseMemoryForFrontEnd() {
if (reservedBytesInTotalForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
reservedBytesInTotalForFrontEnd = 0;
}
public void releaseAllMemoryReservedForFrontEnd() {
this.memoryReservationManager.releaseAllReservedMemory();
}

public void releaseMemoryForFrontEnd(final long bytes) {
if (bytes != 0) {
long bytesToRelease;
if (bytes <= bytesToBeReservedForFrontEnd) {
bytesToBeReservedForFrontEnd -= bytes;
} else {
bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
bytesToBeReservedForFrontEnd = 0;
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
reservedBytesInTotalForFrontEnd -= bytesToRelease;
}
}
public void releaseMemoryReservedForFrontEnd(final long bytes) {
this.memoryReservationManager.releaseMemoryCumulatively(bytes);
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
Expand Down Expand Up @@ -63,6 +65,8 @@ public class FragmentInstanceContext extends QueryContext {

private final FragmentInstanceStateMachine stateMachine;

private final MemoryReservationManager memoryReservationManager;

private IDataRegionForQuery dataRegion;
private Filter globalTimeFilter;

Expand Down Expand Up @@ -193,6 +197,8 @@ private FragmentInstanceContext(
globalTimePredicate == null ? null : globalTimePredicate.convertPredicateToTimeFilter();
this.dataNodeQueryContextMap = dataNodeQueryContextMap;
this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
this.memoryReservationManager =
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

private FragmentInstanceContext(
Expand All @@ -203,6 +209,8 @@ private FragmentInstanceContext(
this.sessionInfo = sessionInfo;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
this.memoryReservationManager =
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

private FragmentInstanceContext(
Expand All @@ -218,6 +226,8 @@ private FragmentInstanceContext(
this.dataRegion = dataRegion;
this.globalTimeFilter = globalTimeFilter;
this.dataNodeQueryContextMap = null;
this.memoryReservationManager =
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

@TestOnly
Expand All @@ -232,6 +242,7 @@ private FragmentInstanceContext(long queryId) {
this.stateMachine = null;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
this.memoryReservationManager = null;
}

public void start() {
Expand Down Expand Up @@ -369,6 +380,14 @@ public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToC
this.devicePathsToContext = devicePathsToContext;
}

public MemoryReservationManager getMemoryReservationContext() {
return memoryReservationManager;
}

public void releaseMemoryReservationManager() {
memoryReservationManager.releaseAllReservedMemory();
}

public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProcessException {
long startTime = System.nanoTime();
if (sourcePaths == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) {
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true);

context.releaseMemoryReservationManager();

if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected AbstractSeriesAggregationScanOperator(
this.timeRangeIterator = timeRangeIterator;

this.cachedRawDataSize =
(1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
(1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxReturnSize = maxReturnSize;
this.outputEndTime = outputEndTime;
this.canUseStatistics = canUseStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ public AlignedSeriesScanOperator(
public long calculateMaxPeekMemory() {
return Math.max(
maxReturnSize,
(1L + valueColumnCount)
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
* 3L);
(1L + valueColumnCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public SeriesScanOperator(

@Override
public long calculateMaxPeekMemory() {
return Math.max(
maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
return Math.max(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

public class SeriesScanUtil implements Accountable {

protected final QueryContext context;
protected final FragmentInstanceContext context;

// The path of the target series which will be scanned.
protected final PartialPath seriesPath;
Expand Down Expand Up @@ -143,6 +143,7 @@ public SeriesScanUtil(
this.orderUtils = new DescTimeOrderUtils();
this.mergeReader = getDescPriorityMergeReader();
}
this.mergeReader.setMemoryReservationManager(context.getMemoryReservationContext());

// init TimeSeriesMetadata materializer
this.seqTimeSeriesMetadata = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private ExecutionResult execution(
return result;
} finally {
if (queryContext != null) {
queryContext.releaseMemoryForFrontEnd();
queryContext.releaseAllMemoryReservedForFrontEnd();
}
if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext conte
RewriterContext rewriterContext =
new RewriterContext(analysis, context, queryStatement.isAlignByDevice());
PlanNode node;
try {
node = plan.accept(new Rewriter(), rewriterContext);
} finally {
// release the last batch of memory
rewriterContext.releaseMemoryForFrontEndImmediately();
}

node = plan.accept(new Rewriter(), rewriterContext);

return node;
}

Expand Down Expand Up @@ -633,17 +630,13 @@ private PlanNode planProject(PlanNode resultNode, PlanNode rawNode, RewriterCont

private static class RewriterContext {

private static final long RELEASE_BATCH_SIZE = 1024L * 1024L;

private final Analysis analysis;
private final MPPQueryContext context;
private final boolean isAlignByDevice;

private String curDevice;
private PartialPath curDevicePath;

private long bytesToBeReleased = 0;

public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) {
this.analysis = analysis;
Validate.notNull(context, "Query context cannot be null.");
Expand Down Expand Up @@ -683,17 +676,7 @@ public Set<Expression> getAggregationExpressions() {
}

public void releaseMemoryForFrontEnd(final long bytes) {
bytesToBeReleased += bytes;
if (bytesToBeReleased >= RELEASE_BATCH_SIZE) {
releaseMemoryForFrontEndImmediately();
}
}

public void releaseMemoryForFrontEndImmediately() {
if (bytesToBeReleased > 0) {
context.releaseMemoryForFrontEnd(bytesToBeReleased);
bytesToBeReleased = 0;
}
this.context.releaseMemoryReservedForFrontEnd(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,18 @@ public synchronized long tryAllocateFreeMemoryForOperators(long memoryInBytes) {
}
}

public synchronized void reserveMemoryForQueryFrontEnd(
final long memoryInBytes, final long reservedBytes, final String queryId) {
public synchronized void reserveFromFreeMemoryForOperators(
final long memoryInBytes,
final long reservedBytes,
final String queryId,
final String contextHolder) {
if (memoryInBytes > freeMemoryForOperators) {
throw new MemoryNotEnoughException(
String.format(
"There is not enough memory for planning-stage of Query %s, "
"There is not enough memory for Query %s, the contextHolder is %s,"
+ "current remaining free memory is %dB, "
+ "estimated memory usage is %dB, reserved memory for FE of this query in total is %dB",
queryId, freeMemoryForOperators, memoryInBytes, reservedBytes));
+ "reserved memory for this context in total is %dB.",
queryId, contextHolder, freeMemoryForOperators, reservedBytes));
} else {
freeMemoryForOperators -= memoryInBytes;
if (LOGGER.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.plan.planner.memory;

public interface MemoryReservationManager {
/**
* Reserve memory for the given size. The memory reservation request will be accumulated and the
* actual memory will be reserved when the accumulated memory exceeds the threshold.
*
* @param size the size of memory to reserve
*/
void reserveMemoryCumulatively(final long size);

/** Reserve memory for the accumulated memory size immediately. */
void reserveMemoryImmediately();

/**
* Release memory for the given size.
*
* @param size the size of memory to release
*/
void releaseMemoryCumulatively(final long size);

/**
* Release all reserved memory immediately. Make sure this method is called when the lifecycle of
* this manager ends, Or the memory to be released in the batch may not be released correctly.
*/
void releaseAllReservedMemory();
}
Loading

0 comments on commit bce9631

Please sign in to comment.