Skip to content

Commit

Permalink
[To rc/1.3.3] Make each execution acquire a schema read lock only once
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Sep 26, 2024
1 parent 7c59ea1 commit 547f3eb
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import org.apache.tsfile.read.filter.basic.Filter;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* This class is used to record the context of a query including QueryId, query statement, session
Expand Down Expand Up @@ -71,7 +71,7 @@ public class MPPQueryContext {

private Filter globalTimeFilter;

private Map<SchemaLockType, Integer> acquiredLockNumMap = new HashMap<>();
private final Set<SchemaLockType> acquiredLocks = new HashSet<>();

private boolean isExplainAnalyze = false;

Expand Down Expand Up @@ -200,16 +200,12 @@ public String getSql() {
return sql;
}

public Map<SchemaLockType, Integer> getAcquiredLockNumMap() {
return acquiredLockNumMap;
public Set<SchemaLockType> getAcquiredLocks() {
return acquiredLocks;
}

public void addAcquiredLockNum(SchemaLockType lockType) {
if (acquiredLockNumMap.containsKey(lockType)) {
acquiredLockNumMap.put(lockType, acquiredLockNumMap.get(lockType) + 1);
} else {
acquiredLockNumMap.put(lockType, 1);
}
public boolean addAcquiredLock(final SchemaLockType lockType) {
return acquiredLocks.add(lockType);
}

public void generateGlobalTimeFilter(Analysis analysis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
Expand All @@ -53,7 +52,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -144,14 +142,7 @@ private ExecutionResult execution(
if (queryContext != null) {
queryContext.releaseAllMemoryReservedForFrontEnd();
}
if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap();
for (Map.Entry<SchemaLockType, Integer> entry : lockMap.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
DataNodeSchemaLockManager.getInstance().releaseReadLock(entry.getKey());
}
}
}
DataNodeSchemaLockManager.getInstance().releaseReadLock(queryContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2570,32 +2570,35 @@ public Analysis visitCreateTimeseries(
}

private void checkIsTemplateCompatible(
PartialPath timeseriesPath, String alias, MPPQueryContext context, boolean takeLock) {
final PartialPath timeSeriesPath,
final String alias,
final MPPQueryContext context,
final boolean takeLock) {
if (takeLock) {
DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE);
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
}
Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(timeseriesPath, alias);
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias);
if (templateInfo != null) {
throw new SemanticException(
new TemplateIncompatibleException(
timeseriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right));
timeSeriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right));
}
}

private void checkIsTemplateCompatible(
PartialPath devicePath,
List<String> measurements,
List<String> aliasList,
MPPQueryContext context,
boolean takeLock) {
final PartialPath devicePath,
final List<String> measurements,
final List<String> aliasList,
final MPPQueryContext context,
final boolean takeLock) {
if (takeLock) {
DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE);
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
}
for (int i = 0; i < measurements.size(); i++) {
Pair<Template, PartialPath> templateInfo =
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(
devicePath.concatNode(measurements.get(i)),
aliasList == null ? null : aliasList.get(i));
Expand All @@ -2613,12 +2616,12 @@ private void analyzeSchemaProps(Map<String, String> props) {
if (props == null || props.isEmpty()) {
return;
}
Map<String, String> caseChangeMap = new HashMap<>();
for (String key : props.keySet()) {
final Map<String, String> caseChangeMap = new HashMap<>();
for (final String key : props.keySet()) {
caseChangeMap.put(key.toLowerCase(Locale.ROOT), key);
}
for (Map.Entry<String, String> caseChangeEntry : caseChangeMap.entrySet()) {
String lowerCaseKey = caseChangeEntry.getKey();
for (final Map.Entry<String, String> caseChangeEntry : caseChangeMap.entrySet()) {
final String lowerCaseKey = caseChangeEntry.getKey();
if (!ALLOWED_SCHEMA_PROPS.contains(lowerCaseKey)) {
throw new SemanticException(
new MetadataException(
Expand All @@ -2631,11 +2634,11 @@ private void analyzeSchemaProps(Map<String, String> props) {
}
}

private void analyzeSchemaProps(List<Map<String, String>> propsList) {
private void analyzeSchemaProps(final List<Map<String, String>> propsList) {
if (propsList == null) {
return;
}
for (Map<String, String> props : propsList) {
for (final Map<String, String> props : propsList) {
analyzeSchemaProps(props);
}
}
Expand Down Expand Up @@ -2709,24 +2712,24 @@ public Analysis visitInternalCreateTimeseries(

@Override
public Analysis visitInternalCreateMultiTimeSeries(
InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
MPPQueryContext context) {
final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
final MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);

Analysis analysis = new Analysis();
final Analysis analysis = new Analysis();
analysis.setStatement(internalCreateMultiTimeSeriesStatement);

PathPatternTree pathPatternTree = new PathPatternTree();
DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE);
for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry :
final PathPatternTree pathPatternTree = new PathPatternTree();
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry :
internalCreateMultiTimeSeriesStatement.getDeviceMap().entrySet()) {
checkIsTemplateCompatible(
entry.getKey(), entry.getValue().right.getMeasurements(), null, context, false);
pathPatternTree.appendFullPath(entry.getKey().concatNode(ONE_LEVEL_PATH_WILDCARD));
}

SchemaPartition schemaPartitionInfo;
final SchemaPartition schemaPartitionInfo;
schemaPartitionInfo =
partitionFetcher.getOrCreateSchemaPartition(
pathPatternTree, context.getSession().getUserName());
Expand All @@ -2739,26 +2742,26 @@ public Analysis visitCreateMultiTimeSeries(
final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement,
final MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
final Analysis analysis = new Analysis();
analysis.setStatement(createMultiTimeSeriesStatement);

analyzeSchemaProps(createMultiTimeSeriesStatement.getPropsList());

List<PartialPath> timeseriesPathList = createMultiTimeSeriesStatement.getPaths();
List<String> aliasList = createMultiTimeSeriesStatement.getAliasList();
final List<PartialPath> timeseriesPathList = createMultiTimeSeriesStatement.getPaths();
final List<String> aliasList = createMultiTimeSeriesStatement.getAliasList();

DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE);
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
for (int i = 0; i < timeseriesPathList.size(); i++) {
checkIsTemplateCompatible(
timeseriesPathList.get(i), aliasList == null ? null : aliasList.get(i), context, false);
}

PathPatternTree patternTree = new PathPatternTree();
for (PartialPath path : createMultiTimeSeriesStatement.getPaths()) {
final PathPatternTree patternTree = new PathPatternTree();
for (final PartialPath path : createMultiTimeSeriesStatement.getPaths()) {
patternTree.appendFullPath(path);
}
SchemaPartition schemaPartitionInfo =
final SchemaPartition schemaPartitionInfo =
partitionFetcher.getOrCreateSchemaPartition(
patternTree, context.getSession().getUserName());
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
Expand Down Expand Up @@ -4053,10 +4056,10 @@ private Pair<List<PartialPath>, PartialPath> findAllViewsInPaths(
}

private void checkTargetPathsInCreateLogicalView(
Analysis analysis,
CreateLogicalViewStatement createLogicalViewStatement,
MPPQueryContext context) {
Pair<Boolean, String> checkResult = createLogicalViewStatement.checkTargetPaths();
final Analysis analysis,
final CreateLogicalViewStatement createLogicalViewStatement,
final MPPQueryContext context) {
final Pair<Boolean, String> checkResult = createLogicalViewStatement.checkTargetPaths();
if (Boolean.FALSE.equals(checkResult.left)) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
Expand All @@ -4067,10 +4070,10 @@ private void checkTargetPathsInCreateLogicalView(
}
// Make sure there are no redundant paths in targets. Note that redundant paths in source
// are legal.
List<PartialPath> targetPathList = createLogicalViewStatement.getTargetPathList();
Set<String> targetStringSet = new HashSet<>();
for (PartialPath path : targetPathList) {
boolean repeatPathNotExist = targetStringSet.add(path.toString());
final List<PartialPath> targetPathList = createLogicalViewStatement.getTargetPathList();
final Set<String> targetStringSet = new HashSet<>();
for (final PartialPath path : targetPathList) {
final boolean repeatPathNotExist = targetStringSet.add(path.toString());
if (!repeatPathNotExist) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
Expand All @@ -4082,12 +4085,12 @@ private void checkTargetPathsInCreateLogicalView(
}
// Make sure all paths are not under any templates
try {
DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE);
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
for (PartialPath path : createLogicalViewStatement.getTargetPathList()) {
checkIsTemplateCompatible(path, null, context, false);
}
} catch (Exception e) {
} catch (final Exception e) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(
Expand All @@ -4098,14 +4101,14 @@ private void checkTargetPathsInCreateLogicalView(

@Override
public Analysis visitShowLogicalView(
ShowLogicalViewStatement showLogicalViewStatement, MPPQueryContext context) {
final ShowLogicalViewStatement showLogicalViewStatement, final MPPQueryContext context) {
context.setQueryType(QueryType.READ);
Analysis analysis = new Analysis();
final Analysis analysis = new Analysis();
analysis.setStatement(showLogicalViewStatement);

PathPatternTree patternTree = new PathPatternTree();
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(showLogicalViewStatement.getPathPattern());
SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
final SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);

analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowLogicalViewHeader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.analyze.lock;

import org.apache.iotdb.db.queryengine.common.MPPQueryContext;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DataNodeSchemaLockManager {
Expand All @@ -34,26 +36,32 @@ public static DataNodeSchemaLockManager getInstance() {
}

private DataNodeSchemaLockManager() {
int lockNum = SchemaLockType.values().length;
final int lockNum = SchemaLockType.values().length;
this.locks = new ReentrantReadWriteLock[lockNum];
for (int i = 0; i < lockNum; i++) {
locks[i] = new ReentrantReadWriteLock(false);
}
}

public void takeReadLock(SchemaLockType lockType) {
locks[lockType.ordinal()].readLock().lock();
public void takeReadLock(final MPPQueryContext context, final SchemaLockType lockType) {
if (context.addAcquiredLock(lockType)) {
locks[lockType.ordinal()].readLock().lock();
}
}

public void releaseReadLock(SchemaLockType lockType) {
locks[lockType.ordinal()].readLock().unlock();
public void releaseReadLock(final MPPQueryContext queryContext) {
if (queryContext != null && !queryContext.getAcquiredLocks().isEmpty()) {
queryContext
.getAcquiredLocks()
.forEach(lockType -> locks[lockType.ordinal()].readLock().unlock());
}
}

public void takeWriteLock(SchemaLockType lockType) {
public void takeWriteLock(final SchemaLockType lockType) {
locks[lockType.ordinal()].writeLock().lock();
}

public void releaseWriteLock(SchemaLockType lockType) {
public void releaseWriteLock(final SchemaLockType lockType) {
locks[lockType.ordinal()].writeLock().unlock();
}
}
Loading

0 comments on commit 547f3eb

Please sign in to comment.