From 547f3eba6bc6e9ab044cd293005a67d30d38992c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 26 Sep 2024 21:00:57 +0800 Subject: [PATCH] [To rc/1.3.3] Make each execution acquire a schema read lock only once --- .../queryengine/common/MPPQueryContext.java | 18 ++- .../db/queryengine/plan/Coordinator.java | 11 +- .../plan/analyze/AnalyzeVisitor.java | 105 +++++++++--------- .../lock/DataNodeSchemaLockManager.java | 22 ++-- .../analyze/schema/ClusterSchemaFetcher.java | 64 +++++------ 5 files changed, 110 insertions(+), 110 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 7eebccdd6751..ccd7536919fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -33,10 +33,10 @@ import org.apache.tsfile.read.filter.basic.Filter; import java.time.ZoneId; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.Set; /** * This class is used to record the context of a query including QueryId, query statement, session @@ -71,7 +71,7 @@ public class MPPQueryContext { private Filter globalTimeFilter; - private Map acquiredLockNumMap = new HashMap<>(); + private final Set acquiredLocks = new HashSet<>(); private boolean isExplainAnalyze = false; @@ -200,16 +200,12 @@ public String getSql() { return sql; } - public Map getAcquiredLockNumMap() { - return acquiredLockNumMap; + public Set getAcquiredLocks() { + return acquiredLocks; } - public void addAcquiredLockNum(SchemaLockType lockType) { - if (acquiredLockNumMap.containsKey(lockType)) { - acquiredLockNumMap.put(lockType, acquiredLockNumMap.get(lockType) + 1); - } else { - acquiredLockNumMap.put(lockType, 1); - } + public boolean addAcquiredLock(final SchemaLockType lockType) { + return acquiredLocks.add(lockType); } public void generateGlobalTimeFilter(Analysis analysis) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 5d8c7830e3f4..db56a6ec6959 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -36,7 +36,6 @@ import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager; -import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; @@ -53,7 +52,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -144,14 +142,7 @@ private ExecutionResult execution( if (queryContext != null) { queryContext.releaseAllMemoryReservedForFrontEnd(); } - if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) { - Map lockMap = queryContext.getAcquiredLockNumMap(); - for (Map.Entry entry : lockMap.entrySet()) { - for (int i = 0; i < entry.getValue(); i++) { - DataNodeSchemaLockManager.getInstance().releaseReadLock(entry.getKey()); - } - } - } + DataNodeSchemaLockManager.getInstance().releaseReadLock(queryContext); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index b01ee8babed1..012f48c271ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2570,32 +2570,35 @@ public Analysis visitCreateTimeseries( } private void checkIsTemplateCompatible( - PartialPath timeseriesPath, String alias, MPPQueryContext context, boolean takeLock) { + final PartialPath timeSeriesPath, + final String alias, + final MPPQueryContext context, + final boolean takeLock) { if (takeLock) { - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE); - context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE); } - Pair templateInfo = - schemaFetcher.checkTemplateSetAndPreSetInfo(timeseriesPath, alias); + final Pair templateInfo = + schemaFetcher.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias); if (templateInfo != null) { throw new SemanticException( new TemplateIncompatibleException( - timeseriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right)); + timeSeriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right)); } } private void checkIsTemplateCompatible( - PartialPath devicePath, - List measurements, - List aliasList, - MPPQueryContext context, - boolean takeLock) { + final PartialPath devicePath, + final List measurements, + final List aliasList, + final MPPQueryContext context, + final boolean takeLock) { if (takeLock) { - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE); - context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE); } for (int i = 0; i < measurements.size(); i++) { - Pair templateInfo = + final Pair templateInfo = schemaFetcher.checkTemplateSetAndPreSetInfo( devicePath.concatNode(measurements.get(i)), aliasList == null ? null : aliasList.get(i)); @@ -2613,12 +2616,12 @@ private void analyzeSchemaProps(Map props) { if (props == null || props.isEmpty()) { return; } - Map caseChangeMap = new HashMap<>(); - for (String key : props.keySet()) { + final Map caseChangeMap = new HashMap<>(); + for (final String key : props.keySet()) { caseChangeMap.put(key.toLowerCase(Locale.ROOT), key); } - for (Map.Entry caseChangeEntry : caseChangeMap.entrySet()) { - String lowerCaseKey = caseChangeEntry.getKey(); + for (final Map.Entry caseChangeEntry : caseChangeMap.entrySet()) { + final String lowerCaseKey = caseChangeEntry.getKey(); if (!ALLOWED_SCHEMA_PROPS.contains(lowerCaseKey)) { throw new SemanticException( new MetadataException( @@ -2631,11 +2634,11 @@ private void analyzeSchemaProps(Map props) { } } - private void analyzeSchemaProps(List> propsList) { + private void analyzeSchemaProps(final List> propsList) { if (propsList == null) { return; } - for (Map props : propsList) { + for (final Map props : propsList) { analyzeSchemaProps(props); } } @@ -2709,24 +2712,24 @@ public Analysis visitInternalCreateTimeseries( @Override public Analysis visitInternalCreateMultiTimeSeries( - InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, - MPPQueryContext context) { + final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, + final MPPQueryContext context) { context.setQueryType(QueryType.WRITE); - Analysis analysis = new Analysis(); + final Analysis analysis = new Analysis(); analysis.setStatement(internalCreateMultiTimeSeriesStatement); - PathPatternTree pathPatternTree = new PathPatternTree(); - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE); - context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE); - for (Map.Entry> entry : + final PathPatternTree pathPatternTree = new PathPatternTree(); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE); + for (final Map.Entry> entry : internalCreateMultiTimeSeriesStatement.getDeviceMap().entrySet()) { checkIsTemplateCompatible( entry.getKey(), entry.getValue().right.getMeasurements(), null, context, false); pathPatternTree.appendFullPath(entry.getKey().concatNode(ONE_LEVEL_PATH_WILDCARD)); } - SchemaPartition schemaPartitionInfo; + final SchemaPartition schemaPartitionInfo; schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition( pathPatternTree, context.getSession().getUserName()); @@ -2739,26 +2742,26 @@ public Analysis visitCreateMultiTimeSeries( final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final MPPQueryContext context) { context.setQueryType(QueryType.WRITE); - Analysis analysis = new Analysis(); + final Analysis analysis = new Analysis(); analysis.setStatement(createMultiTimeSeriesStatement); analyzeSchemaProps(createMultiTimeSeriesStatement.getPropsList()); - List timeseriesPathList = createMultiTimeSeriesStatement.getPaths(); - List aliasList = createMultiTimeSeriesStatement.getAliasList(); + final List timeseriesPathList = createMultiTimeSeriesStatement.getPaths(); + final List aliasList = createMultiTimeSeriesStatement.getAliasList(); - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE); - context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE); for (int i = 0; i < timeseriesPathList.size(); i++) { checkIsTemplateCompatible( timeseriesPathList.get(i), aliasList == null ? null : aliasList.get(i), context, false); } - PathPatternTree patternTree = new PathPatternTree(); - for (PartialPath path : createMultiTimeSeriesStatement.getPaths()) { + final PathPatternTree patternTree = new PathPatternTree(); + for (final PartialPath path : createMultiTimeSeriesStatement.getPaths()) { patternTree.appendFullPath(path); } - SchemaPartition schemaPartitionInfo = + final SchemaPartition schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition( patternTree, context.getSession().getUserName()); analysis.setSchemaPartitionInfo(schemaPartitionInfo); @@ -4053,10 +4056,10 @@ private Pair, PartialPath> findAllViewsInPaths( } private void checkTargetPathsInCreateLogicalView( - Analysis analysis, - CreateLogicalViewStatement createLogicalViewStatement, - MPPQueryContext context) { - Pair checkResult = createLogicalViewStatement.checkTargetPaths(); + final Analysis analysis, + final CreateLogicalViewStatement createLogicalViewStatement, + final MPPQueryContext context) { + final Pair checkResult = createLogicalViewStatement.checkTargetPaths(); if (Boolean.FALSE.equals(checkResult.left)) { analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus( @@ -4067,10 +4070,10 @@ private void checkTargetPathsInCreateLogicalView( } // Make sure there are no redundant paths in targets. Note that redundant paths in source // are legal. - List targetPathList = createLogicalViewStatement.getTargetPathList(); - Set targetStringSet = new HashSet<>(); - for (PartialPath path : targetPathList) { - boolean repeatPathNotExist = targetStringSet.add(path.toString()); + final List targetPathList = createLogicalViewStatement.getTargetPathList(); + final Set targetStringSet = new HashSet<>(); + for (final PartialPath path : targetPathList) { + final boolean repeatPathNotExist = targetStringSet.add(path.toString()); if (!repeatPathNotExist) { analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus( @@ -4082,12 +4085,12 @@ private void checkTargetPathsInCreateLogicalView( } // Make sure all paths are not under any templates try { - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.TIMESERIES_VS_TEMPLATE); - context.addAcquiredLockNum(SchemaLockType.TIMESERIES_VS_TEMPLATE); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE); for (PartialPath path : createLogicalViewStatement.getTargetPathList()) { checkIsTemplateCompatible(path, null, context, false); } - } catch (Exception e) { + } catch (final Exception e) { analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus( RpcUtils.getStatus( @@ -4098,14 +4101,14 @@ private void checkTargetPathsInCreateLogicalView( @Override public Analysis visitShowLogicalView( - ShowLogicalViewStatement showLogicalViewStatement, MPPQueryContext context) { + final ShowLogicalViewStatement showLogicalViewStatement, final MPPQueryContext context) { context.setQueryType(QueryType.READ); - Analysis analysis = new Analysis(); + final Analysis analysis = new Analysis(); analysis.setStatement(showLogicalViewStatement); - PathPatternTree patternTree = new PathPatternTree(); + final PathPatternTree patternTree = new PathPatternTree(); patternTree.appendPathPattern(showLogicalViewStatement.getPathPattern()); - SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); + final SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree); analysis.setSchemaPartitionInfo(schemaPartitionInfo); analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowLogicalViewHeader()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/lock/DataNodeSchemaLockManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/lock/DataNodeSchemaLockManager.java index b57986ccdec2..11478cf1f77f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/lock/DataNodeSchemaLockManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/lock/DataNodeSchemaLockManager.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze.lock; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; + import java.util.concurrent.locks.ReentrantReadWriteLock; public class DataNodeSchemaLockManager { @@ -34,26 +36,32 @@ public static DataNodeSchemaLockManager getInstance() { } private DataNodeSchemaLockManager() { - int lockNum = SchemaLockType.values().length; + final int lockNum = SchemaLockType.values().length; this.locks = new ReentrantReadWriteLock[lockNum]; for (int i = 0; i < lockNum; i++) { locks[i] = new ReentrantReadWriteLock(false); } } - public void takeReadLock(SchemaLockType lockType) { - locks[lockType.ordinal()].readLock().lock(); + public void takeReadLock(final MPPQueryContext context, final SchemaLockType lockType) { + if (context.addAcquiredLock(lockType)) { + locks[lockType.ordinal()].readLock().lock(); + } } - public void releaseReadLock(SchemaLockType lockType) { - locks[lockType.ordinal()].readLock().unlock(); + public void releaseReadLock(final MPPQueryContext queryContext) { + if (queryContext != null && !queryContext.getAcquiredLocks().isEmpty()) { + queryContext + .getAcquiredLocks() + .forEach(lockType -> locks[lockType.ordinal()].readLock().unlock()); + } } - public void takeWriteLock(SchemaLockType lockType) { + public void takeWriteLock(final SchemaLockType lockType) { locks[lockType.ordinal()].writeLock().lock(); } - public void releaseWriteLock(SchemaLockType lockType) { + public void releaseWriteLock(final SchemaLockType lockType) { locks[lockType.ordinal()].writeLock().unlock(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java index 11c0979a2b7d..13a9f5fc97a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java @@ -181,17 +181,17 @@ public ClusterSchemaTree fetchSchemaWithTags( @Override public void fetchAndComputeSchemaWithAutoCreate( - ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation, - MPPQueryContext context) { + final ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation, + final MPPQueryContext context) { // The schema cache R/W and fetch operation must be locked together thus the cache clean // operation executed by delete timeseries will be effective. - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); - context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION); schemaCache.takeReadLock(); try { - Pair templateSetInfo = + final Pair templateSetInfo = templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath()); - List indexOfMissingMeasurements; + final List indexOfMissingMeasurements; if (templateSetInfo == null) { // normal timeseries indexOfMissingMeasurements = @@ -209,7 +209,7 @@ public void fetchAndComputeSchemaWithAutoCreate( } // offer null for the rest missing schema processing - for (int index : indexOfMissingMeasurements) { + for (final int index : indexOfMissingMeasurements) { schemaComputationWithAutoCreation.computeMeasurement(index, null); } } finally { @@ -219,12 +219,13 @@ public void fetchAndComputeSchemaWithAutoCreate( @Override public void fetchAndComputeSchemaWithAutoCreate( - List schemaComputationWithAutoCreationList, - MPPQueryContext context) { + final List + schemaComputationWithAutoCreationList, + final MPPQueryContext context) { // The schema cache R/W and fetch operation must be locked together thus the cache clean - // operation executed by delete timeseries will be effective. - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); - context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); + // operation executed by delete timeSeries will be effective. + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION); schemaCache.takeReadLock(); try { @@ -258,25 +259,26 @@ public void fetchAndComputeSchemaWithAutoCreate( @Override public ISchemaTree fetchSchemaListWithAutoCreate( - List devicePathList, - List measurementsList, - List tsDataTypesList, - List encodingsList, - List compressionTypesList, - List isAlignedList, - MPPQueryContext context) { + final List devicePathList, + final List measurementsList, + final List tsDataTypesList, + final List encodingsList, + final List compressionTypesList, + final List isAlignedList, + final MPPQueryContext context) { // The schema cache R/W and fetch operation must be locked together thus the cache clean - // operation executed by delete timeseries will be effective. - DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); - context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); + // operation executed by delete timeSeries will be effective. + DataNodeSchemaLockManager.getInstance() + .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION); schemaCache.takeReadLock(); try { - ClusterSchemaTree schemaTree = new ClusterSchemaTree(); - List> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size()); - List indexOfDevicesWithMissingMeasurements = new ArrayList<>(); + final ClusterSchemaTree schemaTree = new ClusterSchemaTree(); + final List> indexOfMissingMeasurementsList = + new ArrayList<>(devicePathList.size()); + final List indexOfDevicesWithMissingMeasurements = new ArrayList<>(); for (int i = 0; i < devicePathList.size(); i++) { schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i))); - List indexOfMissingMeasurements = + final List indexOfMissingMeasurements = checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i)); if (!indexOfMissingMeasurements.isEmpty()) { indexOfDevicesWithMissingMeasurements.add(i); @@ -289,8 +291,8 @@ public ISchemaTree fetchSchemaListWithAutoCreate( return schemaTree; } - // try fetch the missing schema from remote and cache fetched schema - ClusterSchemaTree remoteSchemaTree = + // Try fetch the missing schema from remote and cache fetched schema + final ClusterSchemaTree remoteSchemaTree = clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices( devicePathList, measurementsList, @@ -305,9 +307,9 @@ public ISchemaTree fetchSchemaListWithAutoCreate( return schemaTree; } - // auto create the still missing schema and merge them into schemaTree - List indexOfDevicesNeedAutoCreateSchema = new ArrayList<>(); - List> indexOfMeasurementsNeedAutoCreate = new ArrayList<>(); + // Auto create the still missing schema and merge them into schemaTree + final List indexOfDevicesNeedAutoCreateSchema = new ArrayList<>(); + final List> indexOfMeasurementsNeedAutoCreate = new ArrayList<>(); List indexOfMissingMeasurements; int deviceIndex; for (int i = 0, size = indexOfDevicesWithMissingMeasurements.size(); i < size; i++) {