From 217bf7f212a62357d62ab6b9a24ba6eacad7197e Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Wed, 12 Jun 2024 15:48:25 +0800 Subject: [PATCH] save --- .../db/queryengine/common/DeviceContext.java | 81 ++++++++++++ ...SchemaInfo.java => TimeseriesContext.java} | 12 +- .../common/schematree/ClusterSchemaTree.java | 35 +----- .../execution/driver/DataDriverContext.java | 19 +-- .../fragment/FragmentInstanceContext.java | 17 +-- .../schema/source/DeviceSchemaSource.java | 4 +- .../ActiveDeviceRegionScanOperator.java | 48 ++++++-- .../ActiveTimeSeriesRegionScanOperator.java | 12 +- .../source/RegionScanForActiveDeviceUtil.java | 3 +- .../RegionScanForActiveTimeSeriesUtil.java | 6 +- .../db/queryengine/plan/analyze/Analysis.java | 19 +-- .../plan/analyze/AnalyzeVisitor.java | 29 +++-- .../plan/planner/LocalExecutionPlanner.java | 12 +- .../plan/planner/LogicalPlanBuilder.java | 9 +- .../plan/planner/LogicalPlanVisitor.java | 4 +- .../plan/planner/OperatorTreeGenerator.java | 29 ++--- .../planner/distribution/SourceRewriter.java | 4 +- .../read/DeviceSchemaFetchScanNode.java | 7 -- .../metedata/read/SchemaFetchScanNode.java | 116 +++++++++--------- .../read/SeriesSchemaFetchScanNode.java | 6 - .../node/source/DeviceRegionScanNode.java | 46 +++---- .../node/source/TimeseriesRegionScanNode.java | 37 +++--- .../impl/mem/MTreeBelowSGMemoryImpl.java | 3 +- .../impl/pbtree/MTreeBelowSGCachedImpl.java | 3 +- .../storageengine/dataregion/DataRegion.java | 7 +- .../dataregion/IDataRegionForQuery.java | 3 +- .../dataregion/VirtualDataRegion.java | 3 +- .../dataregion/memtable/TsFileProcessor.java | 7 +- .../logical/RegionScanLogicalPlannerTest.java | 88 +++++++------ 29 files changed, 379 insertions(+), 290 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/DeviceContext.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/{TimeseriesSchemaInfo.java => TimeseriesContext.java} (94%) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/DeviceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/DeviceContext.java new file mode 100644 index 000000000000..3c6663c680b1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/DeviceContext.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.common; + +import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class DeviceContext { + private boolean isAligned; + private final int templateId; + + public DeviceContext(DeviceSchemaInfo deviceSchemaInfo) { + this.isAligned = deviceSchemaInfo.isAligned(); + this.templateId = deviceSchemaInfo.getTemplateId(); + } + + public DeviceContext(boolean isAligned, int templateId) { + this.isAligned = isAligned; + this.templateId = templateId; + } + + public boolean isAligned() { + return isAligned; + } + + public int getTemplateId() { + return templateId; + } + + public void serializeAttributes(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(isAligned, byteBuffer); + ReadWriteIOUtils.write(templateId, byteBuffer); + } + + public void serializeAttributes(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(isAligned, stream); + ReadWriteIOUtils.write(templateId, stream); + } + + public static DeviceContext deserialize(ByteBuffer buffer) { + boolean isAligned = ReadWriteIOUtils.readBool(buffer); + int templateId = ReadWriteIOUtils.readInt(buffer); + return new DeviceContext(isAligned, templateId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeviceContext that = (DeviceContext) o; + return isAligned == that.isAligned && templateId == that.templateId; + } + + @Override + public int hashCode() { + return Objects.hash(isAligned, templateId); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesContext.java similarity index 94% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesContext.java index 56dd0fa6a807..6450c036128c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesContext.java @@ -32,7 +32,7 @@ import static org.apache.iotdb.db.queryengine.execution.operator.schema.source.TimeSeriesSchemaSource.mapToString; -public class TimeseriesSchemaInfo { +public class TimeseriesContext { private final String dataType; private final String encoding; private final String compression; @@ -45,7 +45,7 @@ public class TimeseriesSchemaInfo { private final String deadband; private final String deadbandParameters; - public TimeseriesSchemaInfo(IMeasurementSchemaInfo schemaInfo) { + public TimeseriesContext(IMeasurementSchemaInfo schemaInfo) { this.dataType = schemaInfo.getSchema().getType().toString(); this.encoding = schemaInfo.getSchema().getEncodingType().toString(); this.compression = schemaInfo.getSchema().getCompressor().toString(); @@ -85,7 +85,7 @@ public String getDeadband() { return deadband; } - public TimeseriesSchemaInfo( + public TimeseriesContext( String dataType, String alias, String encoding, @@ -122,7 +122,7 @@ public void serializeAttributes(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(deadbandParameters, stream); } - public static TimeseriesSchemaInfo deserialize(ByteBuffer buffer) { + public static TimeseriesContext deserialize(ByteBuffer buffer) { String dataType = ReadWriteIOUtils.readString(buffer); String alias = ReadWriteIOUtils.readString(buffer); String encoding = ReadWriteIOUtils.readString(buffer); @@ -130,7 +130,7 @@ public static TimeseriesSchemaInfo deserialize(ByteBuffer buffer) { String tags = ReadWriteIOUtils.readString(buffer); String deadband = ReadWriteIOUtils.readString(buffer); String deadbandParameters = ReadWriteIOUtils.readString(buffer); - return new TimeseriesSchemaInfo( + return new TimeseriesContext( dataType, alias, encoding, compression, tags, deadband, deadbandParameters); } @@ -142,7 +142,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) { return false; } - TimeseriesSchemaInfo that = (TimeseriesSchemaInfo) obj; + TimeseriesContext that = (TimeseriesContext) obj; return Objects.equals(dataType, that.dataType) && Objects.equals(alias, that.alias) && encoding.equals(that.encoding) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java index 336afb027456..f1d90875ac53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java @@ -311,39 +311,8 @@ public void appendTemplateDevice( entityNode.setTemplateId(templateId); cur.replaceChild(deviceName, entityNode); } - templateMap.putIfAbsent(templateId, template); - } - - /** - * Append a template device to the schema tree. - * - * @param devicePath device path - * @param isAligned whether the device is aligned - */ - public void appendDevice(PartialPath devicePath, Boolean isAligned) { - String[] nodes = devicePath.getNodes(); - SchemaNode cur = root; - SchemaNode child; - for (int i = 1; i < nodes.length - 1; i++) { - child = cur.getChild(nodes[i]); - if (child == null) { - child = new SchemaInternalNode(nodes[i]); - cur.addChild(nodes[i], child); - } - cur = child; - } - String deviceName = nodes[nodes.length - 1]; - child = cur.getChild(deviceName); - if (child == null) { - SchemaEntityNode entityNode = new SchemaEntityNode(deviceName); - entityNode.setAligned(isAligned); - cur.addChild(deviceName, entityNode); - } else if (child.isEntity()) { - child.getAsEntityNode().setAligned(isAligned); - } else { - SchemaEntityNode entityNode = new SchemaEntityNode(deviceName); - entityNode.setAligned(isAligned); - cur.replaceChild(deviceName, entityNode); + if (template != null) { + templateMap.putIfAbsent(templateId, template); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java index c30c11549102..3fc9598ef8af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; @@ -39,7 +40,7 @@ public class DataDriverContext extends DriverContext { // it will be set to null, after being merged into Parent FIContext private List paths; private QueryDataSourceType queryDataSourceType = null; - private Map deviceIDToAligned; + private Map deviceIDToContext; // it will be set to null, after QueryDataSource being inited private List sourceOperators; @@ -47,27 +48,27 @@ public DataDriverContext(FragmentInstanceContext fragmentInstanceContext, int pi super(fragmentInstanceContext, pipelineId); this.paths = new ArrayList<>(); this.sourceOperators = new ArrayList<>(); - this.deviceIDToAligned = null; + this.deviceIDToContext = null; } public DataDriverContext(DataDriverContext parentContext, int pipelineId) { super(parentContext.getFragmentInstanceContext(), pipelineId); this.paths = new ArrayList<>(); this.sourceOperators = new ArrayList<>(); - this.deviceIDToAligned = null; + this.deviceIDToContext = null; } public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { this.queryDataSourceType = queryDataSourceType; } - public void setDeviceIDToAligned(Map deviceIDToAligned) { - this.deviceIDToAligned = deviceIDToAligned; + public void setDeviceIDToContext(Map deviceIDToContext) { + this.deviceIDToContext = deviceIDToContext; } - public void clearDeviceIDToAligned() { + public void clearDeviceIDToContext() { // friendly for gc - deviceIDToAligned = null; + deviceIDToContext = null; } public void addPath(PartialPath path) { @@ -82,8 +83,8 @@ public List getPaths() { return paths; } - public Map getDeviceIDToAligned() { - return deviceIDToAligned; + public Map getDeviceIDToContext() { + return deviceIDToContext; } public Optional getQueryDataSourceType() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 72425e4df363..3748372c4f61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; @@ -68,7 +69,7 @@ public class FragmentInstanceContext extends QueryContext { // it will only be used once, after sharedQueryDataSource being inited, it will be set to null private List sourcePaths; // Used for region scan. - private Map devicePathsToAligned; + private Map devicePathsToContext; // Shared by all scan operators in this fragment instance to avoid memory problem private IQueryDataSource sharedQueryDataSource; @@ -356,8 +357,8 @@ public void setSourcePaths(List sourcePaths) { this.sourcePaths = sourcePaths; } - public void setDevicePathsToAligned(Map devicePathsToAligned) { - this.devicePathsToAligned = devicePathsToAligned; + public void setDevicePathsToContext(Map devicePathsToContext) { + this.devicePathsToContext = devicePathsToContext; } public void initQueryDataSource(List sourcePaths) throws QueryProcessException { @@ -399,17 +400,17 @@ public void initQueryDataSource(List sourcePaths) throws QueryProce } } - public void initRegionScanQueryDataSource(Map devicePathToAligned) + public void initRegionScanQueryDataSource(Map devicePathsToContext) throws QueryProcessException { long startTime = System.nanoTime(); - if (devicePathsToAligned == null) { + if (devicePathsToContext == null) { return; } dataRegion.readLock(); try { this.sharedQueryDataSource = dataRegion.queryForDeviceRegionScan( - devicePathToAligned, + devicePathsToContext, this, globalTimeFilter != null ? globalTimeFilter.copy() : null, timePartitions); @@ -460,8 +461,8 @@ public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProc sourcePaths = null; break; case DEVICE_REGION_SCAN: - initRegionScanQueryDataSource(devicePathsToAligned); - devicePathsToAligned = null; + initRegionScanQueryDataSource(devicePathsToContext); + devicePathsToContext = null; break; case TIME_SERIES_REGION_SCAN: initRegionScanQueryDataSource(sourcePaths); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java index 37a9d63865c7..d1f6029f04c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java @@ -109,7 +109,7 @@ public void transformToTsBlockColumns( builder .getColumnBuilder(2) .writeBinary(new Binary(String.valueOf(device.isAligned()), TSFileConfig.STRING_CHARSET)); - if (templateId != -1) { + if (templateId != SchemaConstant.NON_TEMPLATE) { builder .getColumnBuilder(3) .writeBinary( @@ -125,7 +125,7 @@ public void transformToTsBlockColumns( builder .getColumnBuilder(1) .writeBinary(new Binary(String.valueOf(device.isAligned()), TSFileConfig.STRING_CHARSET)); - if (templateId != -1) { + if (templateId != SchemaConstant.NON_TEMPLATE) { builder .getColumnBuilder(2) .writeBinary( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java index 873352b8b741..1932129c2f57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java @@ -19,16 +19,22 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; @@ -41,7 +47,7 @@ public class ActiveDeviceRegionScanOperator extends AbstractRegionScanDataSourceOperator { // The devices which need to be checked. - private final Map deviceToAlignedMap; + private final Map deviceContextMap; private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(ActiveDeviceRegionScanOperator.class) @@ -50,24 +56,24 @@ public class ActiveDeviceRegionScanOperator extends AbstractRegionScanDataSource public ActiveDeviceRegionScanOperator( OperatorContext operatorContext, PlanNodeId sourceId, - Map deviceToAlignedMap, + Map deviceContextMap, Filter timeFilter, boolean outputCount) { this.outputCount = outputCount; this.sourceId = sourceId; this.operatorContext = operatorContext; - this.deviceToAlignedMap = deviceToAlignedMap; + this.deviceContextMap = deviceContextMap; this.regionScanUtil = new RegionScanForActiveDeviceUtil(timeFilter); } @Override protected boolean getNextTsFileHandle() throws IOException, IllegalPathException { - return ((RegionScanForActiveDeviceUtil) regionScanUtil).nextTsFileHandle(deviceToAlignedMap); + return ((RegionScanForActiveDeviceUtil) regionScanUtil).nextTsFileHandle(deviceContextMap); } @Override protected boolean isAllDataChecked() { - return deviceToAlignedMap.isEmpty(); + return deviceContextMap.isEmpty(); } @Override @@ -77,20 +83,42 @@ protected void updateActiveData() { if (this.outputCount) { count += activeDevices.size(); - activeDevices.forEach(deviceToAlignedMap.keySet()::remove); + activeDevices.forEach(deviceContextMap.keySet()::remove); } else { TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); for (IDeviceID deviceID : activeDevices) { + DeviceContext deviceContext = deviceContextMap.get(deviceID); + int templateId = deviceContext.getTemplateId(); + // TODO: use IDeviceID interface to get ttl + long ttl; + try { + ttl = DataNodeTTLCache.getInstance().getTTL(((PlainDeviceID) deviceID).toStringID()); + } catch (IllegalPathException e) { + ttl = Long.MAX_VALUE; + } + // TODO: make it more readable, like "30 days" or "10 hours" + String ttlStr = ttl == Long.MAX_VALUE ? IoTDBConstant.TTL_INFINITE : String.valueOf(ttl); + timeColumnBuilder.writeLong(-1); columnBuilders[0].writeBinary(new Binary(deviceID.getBytes())); columnBuilders[1].writeBinary( new Binary( - String.valueOf(deviceToAlignedMap.get(deviceID)), TSFileConfig.STRING_CHARSET)); - columnBuilders[2].appendNull(); - columnBuilders[3].appendNull(); + String.valueOf(deviceContextMap.get(deviceID).isAligned()), + TSFileConfig.STRING_CHARSET)); + + if (templateId != SchemaConstant.NON_TEMPLATE) { + columnBuilders[2].writeBinary( + new Binary( + String.valueOf( + ClusterTemplateManager.getInstance().getTemplate(templateId).getName()), + TSFileConfig.STRING_CHARSET)); + } else { + columnBuilders[2].appendNull(); + } + columnBuilders[3].writeBinary(new Binary(ttlStr, TSFileConfig.STRING_CHARSET)); resultTsBlockBuilder.declarePosition(); - deviceToAlignedMap.remove(deviceID); + deviceContextMap.remove(deviceID); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java index e3385f92ecd0..6be648fb6bc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; @@ -42,7 +42,7 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSourceOperator { // Timeseries which need to be checked. - private final Map> timeSeriesToSchemasInfo; + private final Map> timeSeriesToSchemasInfo; private static final Binary VIEW_TYPE = new Binary("BASE".getBytes()); private final Binary dataBaseName; private static final long INSTANCE_SIZE = @@ -53,7 +53,7 @@ public class ActiveTimeSeriesRegionScanOperator extends AbstractRegionScanDataSo public ActiveTimeSeriesRegionScanOperator( OperatorContext operatorContext, PlanNodeId sourceId, - Map> timeSeriesToSchemasInfo, + Map> timeSeriesToSchemasInfo, Filter timeFilter, boolean isOutputCount) { this.outputCount = isOutputCount; @@ -111,9 +111,9 @@ protected void updateActiveData() { IDeviceID deviceID = entry.getKey(); String deviceStr = ((PlainDeviceID) deviceID).toStringID(); List timeSeriesList = entry.getValue(); - Map timeSeriesInfo = timeSeriesToSchemasInfo.get(deviceID); + Map timeSeriesInfo = timeSeriesToSchemasInfo.get(deviceID); for (String timeSeries : timeSeriesList) { - TimeseriesSchemaInfo schemaInfo = timeSeriesInfo.get(timeSeries); + TimeseriesContext schemaInfo = timeSeriesInfo.get(timeSeries); timeColumnBuilder.writeLong(-1); columnBuilders[0].writeBinary( new Binary(contactDeviceAndMeasurement(deviceStr, timeSeries))); @@ -135,7 +135,7 @@ protected void updateActiveData() { } private void removeTimeseriesListFromDevice(IDeviceID deviceID, List timeSeriesList) { - Map timeSeriesInfo = timeSeriesToSchemasInfo.get(deviceID); + Map timeSeriesInfo = timeSeriesToSchemasInfo.get(deviceID); for (String timeSeries : timeSeriesList) { timeSeriesInfo.remove(timeSeries); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveDeviceUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveDeviceUtil.java index 8ca5e165363f..11faa55f3941 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveDeviceUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveDeviceUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceStartEndTime; @@ -60,7 +61,7 @@ public boolean isCurrentTsFileFinished() { return deviceSetForCurrentTsFile.isEmpty(); } - public boolean nextTsFileHandle(Map targetDevices) + public boolean nextTsFileHandle(Map targetDevices) throws IOException, IllegalPathException { if (!queryDataSource.hasNext()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveTimeSeriesUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveTimeSeriesUtil.java index ff6711232bba..61c7028ab211 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveTimeSeriesUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/RegionScanForActiveTimeSeriesUtil.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceStartEndTime; @@ -55,8 +55,8 @@ public RegionScanForActiveTimeSeriesUtil(Filter timeFilter) { this.activeTimeSeries = new HashMap<>(); } - public boolean nextTsFileHandle( - Map> targetTimeseries) throws IOException { + public boolean nextTsFileHandle(Map> targetTimeseries) + throws IOException { if (!queryDataSource.hasNext()) { // There is no more TsFileHandles to be scanned. return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 5897f78b0ddd..f2f70adebf45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -28,9 +28,10 @@ import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.NodeRef; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource; @@ -301,23 +302,23 @@ aggregation results last_value(temperature) and last_value(status), whereas buck private List measurementSchemaList; // Used for regionScan - private Map devicePathToAlignedStatus; - private Map>> deviceToTimeseriesSchemas; + private Map devicePathToContextMap; + private Map>> deviceToTimeseriesSchemas; - public void setDevicePathToAlignedStatus(Map devicePathToAlignedStatus) { - this.devicePathToAlignedStatus = devicePathToAlignedStatus; + public void setDevicePathToContextMap(Map devicePathToContextMap) { + this.devicePathToContextMap = devicePathToContextMap; } - public Map getDevicePathToAlignedStatus() { - return devicePathToAlignedStatus; + public Map getDevicePathToContextMap() { + return devicePathToContextMap; } public void setDeviceToTimeseriesSchemas( - Map>> deviceToTimeseriesSchemas) { + Map>> deviceToTimeseriesSchemas) { this.deviceToTimeseriesSchemas = deviceToTimeseriesSchemas; } - public Map>> + public Map>> getDeviceToTimeseriesSchemas() { return deviceToTimeseriesSchemas; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 894881dc6cd8..d02cb3f1e001 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -49,8 +49,9 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -2879,7 +2880,7 @@ private boolean analyzeTimeseriesRegionScan( Map> deviceToMeasurementMap = new HashMap<>(); Map deviceToAlignedMap = new HashMap<>(); Map> deviceToMeasurementSchemaMap = new HashMap<>(); - Map> deviceToTimeseriesSchemaMap = new HashMap<>(); + Map> deviceToTimeseriesSchemaMap = new HashMap<>(); for (PartialPath pattern : patternTree.getAllPathPatterns()) { List measurementPathList = schemaTree.searchMeasurementPaths(pattern).left; for (MeasurementPath measurementPath : measurementPathList) { @@ -2895,7 +2896,7 @@ private boolean analyzeTimeseriesRegionScan( deviceToTimeseriesSchemaMap .computeIfAbsent(devicePath, k -> new ArrayList<>()) .add( - new TimeseriesSchemaInfo( + new TimeseriesContext( new MeasurementSchemaInfo( measurementPath.getMeasurement(), measurementPath.getMeasurementSchema(), @@ -2904,19 +2905,18 @@ private boolean analyzeTimeseriesRegionScan( } } - Map>> deviceToTimeseriesSchemaInfo = + Map>> deviceToTimeseriesSchemaInfo = new HashMap<>(); for (PartialPath devicePath : deviceToAlignedMap.keySet()) { if (deviceToAlignedMap.get(devicePath)) { List measurementList = deviceToMeasurementMap.get(devicePath); List schemaList = deviceToMeasurementSchemaMap.get(devicePath); - List timeseriesSchemaInfoList = - deviceToTimeseriesSchemaMap.get(devicePath); + List timeseriesContextList = deviceToTimeseriesSchemaMap.get(devicePath); AlignedPath alignedPath = new AlignedPath(devicePath.getNodes(), measurementList, schemaList); deviceToTimeseriesSchemaInfo .computeIfAbsent(devicePath, k -> new HashMap<>()) - .put(alignedPath, timeseriesSchemaInfoList); + .put(alignedPath, timeseriesContextList); } else { for (String measurement : deviceToMeasurementMap.get(devicePath)) { MeasurementPath measurementPath = @@ -2926,7 +2926,7 @@ private boolean analyzeTimeseriesRegionScan( .put( measurementPath, Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( new MeasurementSchemaInfo( measurement, deviceToMeasurementSchemaMap.get(devicePath).get(0), @@ -3068,15 +3068,14 @@ private void analyzeDeviceRegionScan( } // fetch Data partition - List deviceSchemaInfoList = schemaTree.getMatchedDevices(ALL_MATCH_PATTERN); - Map devicePathsToAlignedStatus = new HashMap<>(); - for (DeviceSchemaInfo deviceSchema : deviceSchemaInfoList) { - devicePathsToAlignedStatus.put(deviceSchema.getDevicePath(), deviceSchema.isAligned()); - } - analysis.setDevicePathToAlignedStatus(devicePathsToAlignedStatus); + + Map devicePathsToInfoMap = + schemaTree.getMatchedDevices(pattern).stream() + .collect(Collectors.toMap(DeviceSchemaInfo::getDevicePath, DeviceContext::new)); + analysis.setDevicePathToContextMap(devicePathsToInfoMap); DataPartition dataPartition = fetchDataPartitionByDevices( - devicePathsToAlignedStatus.keySet().stream() + devicePathsToInfoMap.keySet().stream() .map(PartialPath::getFullPath) .collect(Collectors.toSet()), schemaTree, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 4789834f35eb..4ad583bec141 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; @@ -97,7 +98,7 @@ public List plan( context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); instanceContext.setSourcePaths(collectSourcePaths(context)); - instanceContext.setDevicePathsToAligned(collectDevicePathsToAligned(context)); + instanceContext.setDevicePathsToContext(collectDevicePathsToContext(context)); instanceContext.setQueryDataSourceType( getQueryDataSourceType((DataDriverContext) context.getDriverContext())); @@ -190,11 +191,12 @@ private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverC return dataDriverContext.getQueryDataSourceType().orElse(QueryDataSourceType.SERIES_SCAN); } - private Map collectDevicePathsToAligned(LocalExecutionPlanContext context) { + private Map collectDevicePathsToContext( + LocalExecutionPlanContext context) { DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); - Map deviceToAlignedMap = dataDriverContext.getDeviceIDToAligned(); - dataDriverContext.clearDeviceIDToAligned(); - return deviceToAlignedMap; + Map deviceContextMap = dataDriverContext.getDeviceIDToContext(); + dataDriverContext.clearDeviceIDToContext(); + return deviceContextMap; } private List collectSourcePaths(LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 347826b8fd68..3ff06fb338a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -28,8 +28,9 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; @@ -1376,15 +1377,15 @@ public LogicalPlanBuilder planEndTimeColumnInject( } public LogicalPlanBuilder planDeviceRegionScan( - Map devicePathToAlignedStatus, boolean outputCount) { + Map devicePathToContextMap, boolean outputCount) { this.root = new DeviceRegionScanNode( - context.getQueryId().genPlanNodeId(), devicePathToAlignedStatus, outputCount, null); + context.getQueryId().genPlanNodeId(), devicePathToContextMap, outputCount, null); return this; } public LogicalPlanBuilder planTimeseriesRegionScan( - Map>> deviceToTimeseriesSchemaInfo, + Map>> deviceToTimeseriesSchemaInfo, boolean outputCount) { TimeseriesRegionScanNode timeseriesRegionScanNode = new TimeseriesRegionScanNode(context.getQueryId().genPlanNodeId(), outputCount, null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 1a6bddb422a3..96307945e462 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -579,7 +579,7 @@ public PlanNode visitShowDevices( if (showDevicesStatement.hasTimeCondition()) { planBuilder = planBuilder - .planDeviceRegionScan(analysis.getDevicePathToAlignedStatus(), false) + .planDeviceRegionScan(analysis.getDevicePathToContextMap(), false) .planLimit(showDevicesStatement.getLimit()) .planOffset(showDevicesStatement.getOffset()); return planBuilder.getRoot(); @@ -625,7 +625,7 @@ public PlanNode visitCountDevices( LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); if (countDevicesStatement.hasTimeCondition()) { - planBuilder = planBuilder.planDeviceRegionScan(analysis.getDevicePathToAlignedStatus(), true); + planBuilder = planBuilder.planDeviceRegionScan(analysis.getDevicePathToContextMap(), true); return planBuilder.getRoot(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index abafd45ae949..1127cbd1683b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -26,9 +26,10 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.NodeRef; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator; import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator; @@ -3553,17 +3554,18 @@ public Operator visitDeviceRegionScan( node.getPlanNodeId(), ActiveDeviceRegionScanOperator.class.getSimpleName()); Filter filter = context.getGlobalTimeFilter(); - Map deviceIDToAligned = new HashMap<>(); - for (Map.Entry entry : node.getDevicePathsToAligned().entrySet()) { - deviceIDToAligned.put(new PlainDeviceID(entry.getKey().getFullPath()), entry.getValue()); + Map deviceIDToContext = new HashMap<>(); + for (Map.Entry entry : + node.getDevicePathToContextMap().entrySet()) { + deviceIDToContext.put(new PlainDeviceID(entry.getKey().getFullPath()), entry.getValue()); } ActiveDeviceRegionScanOperator regionScanOperator = new ActiveDeviceRegionScanOperator( - operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter, node.isOutputCount()); + operatorContext, node.getPlanNodeId(), deviceIDToContext, filter, node.isOutputCount()); DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); dataDriverContext.addSourceOperator(regionScanOperator); - dataDriverContext.setDeviceIDToAligned(deviceIDToAligned); + dataDriverContext.setDeviceIDToContext(deviceIDToContext); dataDriverContext.setQueryDataSourceType(QueryDataSourceType.DEVICE_REGION_SCAN); return regionScanOperator; @@ -3582,10 +3584,10 @@ public Operator visitTimeSeriesRegionScan( Filter filter = context.getGlobalTimeFilter(); DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); - Map> timeseriesToSchemaInfo = new HashMap<>(); - for (Map.Entry>> entryMap : + Map> timeseriesToSchemaInfo = new HashMap<>(); + for (Map.Entry>> entryMap : node.getDeviceToTimeseriesSchemaInfo().entrySet()) { - Map timeseriesSchemaInfoMap = + Map timeseriesSchemaInfoMap = getTimeseriesSchemaInfoMap(entryMap, dataDriverContext); timeseriesToSchemaInfo.put( new PlainDeviceID(entryMap.getKey().getFullPath()), timeseriesSchemaInfoMap); @@ -3604,12 +3606,11 @@ public Operator visitTimeSeriesRegionScan( return regionScanOperator; } - private static Map getTimeseriesSchemaInfoMap( - Map.Entry>> entryMap, + private static Map getTimeseriesSchemaInfoMap( + Map.Entry>> entryMap, DataDriverContext context) { - Map timeseriesSchemaInfoMap = new HashMap<>(); - for (Map.Entry> entry : - entryMap.getValue().entrySet()) { + Map timeseriesSchemaInfoMap = new HashMap<>(); + for (Map.Entry> entry : entryMap.getValue().entrySet()) { PartialPath path = entry.getKey(); context.addPath(path); if (path instanceof MeasurementPath) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 14d810514c89..63d59ccaec7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -39,7 +39,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SeriesSchemaFetchScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ActiveRegionScanMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; @@ -895,8 +894,7 @@ public List visitSchemaFetchMerge( for (TRegionReplicaSet schemaRegion : storageGroupSchemaRegionMap.get( ((SchemaFetchScanNode) child).getStorageGroup().getFullPath())) { - SchemaFetchScanNode schemaFetchScanNode = - (SchemaFetchScanNode) child.clone(); + SchemaFetchScanNode schemaFetchScanNode = (SchemaFetchScanNode) child.clone(); schemaFetchScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); schemaFetchScanNode.setRegionReplicaSet(schemaRegion); root.addChild(schemaFetchScanNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/DeviceSchemaFetchScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/DeviceSchemaFetchScanNode.java index 2e24512c8c2d..5b1935e1c4c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/DeviceSchemaFetchScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/DeviceSchemaFetchScanNode.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.path.PathPatternTree; @@ -28,15 +27,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; - -import com.google.common.collect.ImmutableList; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; public class DeviceSchemaFetchScanNode extends SchemaFetchScanNode { @@ -52,7 +46,6 @@ public DeviceSchemaFetchScanNode( this.authorityScope.constructTree(); } - public PathPatternTree getAuthorityScope() { return authorityScope; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java index d0598b735bc4..56d2d5cf6574 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read; -import com.google.common.collect.ImmutableList; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; @@ -27,66 +26,63 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; +import com.google.common.collect.ImmutableList; + import java.util.Collections; import java.util.List; -public abstract class SchemaFetchScanNode extends SourceNode { - protected final PartialPath storageGroup; - protected final PathPatternTree patternTree; - protected TRegionReplicaSet schemaRegionReplicaSet; - - protected SchemaFetchScanNode( - PlanNodeId id, - PartialPath storageGroup, - PathPatternTree patternTree){ - super(id); - this.storageGroup = storageGroup; - this.patternTree = patternTree; - this.patternTree.constructTree(); - } - - - public PartialPath getStorageGroup() { - return storageGroup; - } - - public PathPatternTree getPatternTree() { - return patternTree; - } - - @Override - public List getChildren() { - return Collections.emptyList(); - } - - @Override - public void addChild(PlanNode child) {} - - @Override - public int allowedChildCount() { - return 0; - } - - @Override - public List getOutputColumnNames() { - return ImmutableList.of(); - } - - - @Override - public void open() throws Exception {} - - @Override - public TRegionReplicaSet getRegionReplicaSet() { - return schemaRegionReplicaSet; - } - - @Override - public void setRegionReplicaSet(TRegionReplicaSet schemaRegionReplicaSet) { - this.schemaRegionReplicaSet = schemaRegionReplicaSet; - } - - @Override - public void close() throws Exception {} - +public abstract class SchemaFetchScanNode extends SourceNode { + protected final PartialPath storageGroup; + protected final PathPatternTree patternTree; + protected TRegionReplicaSet schemaRegionReplicaSet; + + protected SchemaFetchScanNode( + PlanNodeId id, PartialPath storageGroup, PathPatternTree patternTree) { + super(id); + this.storageGroup = storageGroup; + this.patternTree = patternTree; + this.patternTree.constructTree(); + } + + public PartialPath getStorageGroup() { + return storageGroup; + } + + public PathPatternTree getPatternTree() { + return patternTree; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public void addChild(PlanNode child) {} + + @Override + public int allowedChildCount() { + return 0; + } + + @Override + public List getOutputColumnNames() { + return ImmutableList.of(); + } + + @Override + public void open() throws Exception {} + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return schemaRegionReplicaSet; + } + + @Override + public void setRegionReplicaSet(TRegionReplicaSet schemaRegionReplicaSet) { + this.schemaRegionReplicaSet = schemaRegionReplicaSet; + } + + @Override + public void close() throws Exception {} } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SeriesSchemaFetchScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SeriesSchemaFetchScanNode.java index 5884252cd60f..6f6d5ee491a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SeriesSchemaFetchScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/SeriesSchemaFetchScanNode.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.path.PathPatternTree; @@ -28,18 +27,14 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.schemaengine.template.Template; -import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** This class defines the scan task of schema fetcher. */ @@ -62,7 +57,6 @@ public SeriesSchemaFetchScanNode( this.withTemplate = withTemplate; } - public Map getTemplateMap() { return templateMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java index 8671c52ddc0c..a4db348342bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -45,21 +46,21 @@ import java.util.stream.Collectors; public class DeviceRegionScanNode extends RegionScanNode { - private Map devicePathsToAligned; + private Map devicePathToContextMap; public DeviceRegionScanNode( PlanNodeId planNodeId, - Map devicePathsToAligned, + Map devicePathToContextMap, boolean outputCount, TRegionReplicaSet regionReplicaSet) { super(planNodeId); - this.devicePathsToAligned = devicePathsToAligned; + this.devicePathToContextMap = devicePathToContextMap; this.regionReplicaSet = regionReplicaSet; this.outputCount = outputCount; } - public Map getDevicePathsToAligned() { - return devicePathsToAligned; + public Map getDevicePathToContextMap() { + return devicePathToContextMap; } @Override @@ -75,7 +76,7 @@ public void addChild(PlanNode child) { @Override public PlanNode clone() { return new DeviceRegionScanNode( - getPlanNodeId(), getDevicePathsToAligned(), isOutputCount(), getRegionReplicaSet()); + getPlanNodeId(), getDevicePathToContextMap(), isOutputCount(), getRegionReplicaSet()); } @Override @@ -101,24 +102,23 @@ public int allowedChildCount() { public static PlanNode deserialize(ByteBuffer buffer) { int size = ReadWriteIOUtils.readInt(buffer); - Map devicePathsToAligned = new HashMap<>(); + Map devicePathToContextMap = new HashMap<>(); for (int i = 0; i < size; i++) { PartialPath path = (PartialPath) PathDeserializeUtil.deserialize(buffer); - boolean aligned = ReadWriteIOUtils.readBool(buffer); - devicePathsToAligned.put(path, aligned); + devicePathToContextMap.put(path, DeviceContext.deserialize(buffer)); } boolean outputCount = ReadWriteIOUtils.readBool(buffer); PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); - return new DeviceRegionScanNode(planNodeId, devicePathsToAligned, outputCount, null); + return new DeviceRegionScanNode(planNodeId, devicePathToContextMap, outputCount, null); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.DEVICE_REGION_SCAN.serialize(byteBuffer); - ReadWriteIOUtils.write(devicePathsToAligned.size(), byteBuffer); - for (Map.Entry entry : devicePathsToAligned.entrySet()) { + ReadWriteIOUtils.write(devicePathToContextMap.size(), byteBuffer); + for (Map.Entry entry : devicePathToContextMap.entrySet()) { entry.getKey().serialize(byteBuffer); - ReadWriteIOUtils.write(entry.getValue(), byteBuffer); + entry.getValue().serializeAttributes(byteBuffer); } ReadWriteIOUtils.write(outputCount, byteBuffer); } @@ -126,10 +126,10 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.DEVICE_REGION_SCAN.serialize(stream); - ReadWriteIOUtils.write(devicePathsToAligned.size(), stream); - for (Map.Entry entry : devicePathsToAligned.entrySet()) { + ReadWriteIOUtils.write(devicePathToContextMap.size(), stream); + for (Map.Entry entry : devicePathToContextMap.entrySet()) { entry.getKey().serialize(stream); - ReadWriteIOUtils.write(entry.getValue(), stream); + entry.getValue().serializeAttributes(stream); } ReadWriteIOUtils.write(outputCount, stream); } @@ -145,23 +145,23 @@ public String toString() { @Override public Set getDevicePaths() { - return new HashSet<>(devicePathsToAligned.keySet()); + return new HashSet<>(devicePathToContextMap.keySet()); } @Override public void addDevicePath(PartialPath devicePath, RegionScanNode node) { - this.devicePathsToAligned.put( - devicePath, ((DeviceRegionScanNode) node).devicePathsToAligned.get(devicePath)); + this.devicePathToContextMap.put( + devicePath, ((DeviceRegionScanNode) node).devicePathToContextMap.get(devicePath)); } @Override public void clearPath() { - this.devicePathsToAligned = new HashMap<>(); + this.devicePathToContextMap = new HashMap<>(); } @Override public long getSize() { - return devicePathsToAligned.size(); + return devicePathToContextMap.size(); } @Override @@ -170,12 +170,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; DeviceRegionScanNode that = (DeviceRegionScanNode) o; - return devicePathsToAligned.equals(that.devicePathsToAligned) + return devicePathToContextMap.equals(that.devicePathToContextMap) && outputCount == that.isOutputCount(); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), devicePathsToAligned, outputCount); + return Objects.hash(super.hashCode(), devicePathToContextMap, outputCount); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java index 2bbe27d2230d..5a5abed1aa02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathType; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -55,8 +55,7 @@ public class TimeseriesRegionScanNode extends RegionScanNode { // IDeviceID -> (MeasurementPath -> TimeseriesSchemaInfo) - private Map>> - deviceToTimeseriesSchemaInfo; + private Map>> deviceToTimeseriesSchemaInfo; public TimeseriesRegionScanNode( PlanNodeId planNodeId, boolean outputCount, TRegionReplicaSet regionReplicaSet) { @@ -67,7 +66,7 @@ public TimeseriesRegionScanNode( public TimeseriesRegionScanNode( PlanNodeId planNodeId, - Map>> deviceToTimeseriesSchemaInfo, + Map>> deviceToTimeseriesSchemaInfo, boolean outputCount, TRegionReplicaSet regionReplicaSet) { super(planNodeId); @@ -77,11 +76,11 @@ public TimeseriesRegionScanNode( } public void setDeviceToTimeseriesSchemaInfo( - Map>> deviceToTimeseriesSchemaInfo) { + Map>> deviceToTimeseriesSchemaInfo) { this.deviceToTimeseriesSchemaInfo = deviceToTimeseriesSchemaInfo; } - public Map>> + public Map>> getDeviceToTimeseriesSchemaInfo() { return deviceToTimeseriesSchemaInfo; } @@ -125,7 +124,7 @@ public int allowedChildCount() { public static PlanNode deserialize(ByteBuffer buffer) { int size = ReadWriteIOUtils.readInt(buffer); - Map>> deviceToTimeseriesSchemaInfo = + Map>> deviceToTimeseriesSchemaInfo = new HashMap<>(); for (int i = 0; i < size; i++) { @@ -137,13 +136,13 @@ public static PlanNode deserialize(ByteBuffer buffer) { PartialPath devicePath = new PartialPath(nodes); int pathSize = ReadWriteIOUtils.readInt(buffer); - Map> measurementToSchemaInfo = new HashMap<>(); + Map> measurementToSchemaInfo = new HashMap<>(); for (int j = 0; j < pathSize; j++) { PartialPath path = deserializePartialPath(nodes, buffer); int schemaSize = ReadWriteIOUtils.readInt(buffer); - List schemaInfos = new ArrayList<>(); + List schemaInfos = new ArrayList<>(); for (int k = 0; k < schemaSize; k++) { - schemaInfos.add(TimeseriesSchemaInfo.deserialize(buffer)); + schemaInfos.add(TimeseriesContext.deserialize(buffer)); } measurementToSchemaInfo.put(path, schemaInfos); } @@ -197,7 +196,7 @@ public String toString() { "%s -> %s", entry1.getKey().getFullPath(), entry1.getValue().stream() - .map(TimeseriesSchemaInfo::toString) + .map(TimeseriesContext::toString) .collect(Collectors.joining(", ")))) .collect(Collectors.joining(", ")))) .collect(Collectors.joining(", "))); @@ -247,7 +246,7 @@ public int hashCode() { protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.TIMESERIES_REGION_SCAN.serialize(byteBuffer); ReadWriteIOUtils.write(deviceToTimeseriesSchemaInfo.size(), byteBuffer); - for (Map.Entry>> entry : + for (Map.Entry>> entry : deviceToTimeseriesSchemaInfo.entrySet()) { int size = entry.getKey().getNodeLength(); @@ -258,12 +257,12 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { } ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer); - for (Map.Entry> timseriesEntry : + for (Map.Entry> timseriesEntry : entry.getValue().entrySet()) { serializeMeasurements(timseriesEntry.getKey(), byteBuffer); ReadWriteIOUtils.write(timseriesEntry.getValue().size(), byteBuffer); - for (TimeseriesSchemaInfo timeseriesSchemaInfo : timseriesEntry.getValue()) { - timeseriesSchemaInfo.serializeAttributes(byteBuffer); + for (TimeseriesContext timeseriesContext : timseriesEntry.getValue()) { + timeseriesContext.serializeAttributes(byteBuffer); } } } @@ -274,7 +273,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.TIMESERIES_REGION_SCAN.serialize(stream); ReadWriteIOUtils.write(deviceToTimeseriesSchemaInfo.size(), stream); - for (Map.Entry>> entry : + for (Map.Entry>> entry : deviceToTimeseriesSchemaInfo.entrySet()) { int size = entry.getKey().getNodeLength(); @@ -285,12 +284,12 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { } ReadWriteIOUtils.write(entry.getValue().size(), stream); - for (Map.Entry> timseriesEntry : + for (Map.Entry> timseriesEntry : entry.getValue().entrySet()) { serializeMeasurements(timseriesEntry.getKey(), stream); ReadWriteIOUtils.write(timseriesEntry.getValue().size(), stream); - for (TimeseriesSchemaInfo timeseriesSchemaInfo : timseriesEntry.getValue()) { - timeseriesSchemaInfo.serializeAttributes(stream); + for (TimeseriesContext timeseriesContext : timseriesEntry.getValue()) { + timeseriesContext.serializeAttributes(stream); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index be7adafe82e8..97cc5f8828ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -773,7 +773,8 @@ public ClusterSchemaTree fetchDeviceSchema( @Override protected Void collectEntity(IDeviceMNode node) { if (node.isAlignedNullable() != null) { - schemaTree.appendDevice(node.getPartialPath(), node.isAligned()); + schemaTree.appendTemplateDevice( + node.getPartialPath(), node.isAligned(), node.getSchemaTemplateId(), null); } return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java index ab36b2abd2f2..ceeebf1247ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java @@ -898,7 +898,8 @@ public ClusterSchemaTree fetchDeviceSchema( @Override protected Void collectEntity(IDeviceMNode node) { if (node.isAlignedNullable() != null) { - schemaTree.appendDevice(node.getPartialPath(), node.isAligned()); + schemaTree.appendTemplateDevice( + node.getPartialPath(), node.isAligned(), node.getSchemaTemplateId(), null); } return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 5a58b3e1dbfc..8c473652c939 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.quota.ExceedQuotaException; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; @@ -1871,7 +1872,7 @@ private List getFileHandleListForQuery( @Override public IQueryDataSource queryForDeviceRegionScan( - Map devicePathToAligned, + Map devicePathToAligned, QueryContext queryContext, Filter globalTimeFilter, List timePartitions) @@ -1906,7 +1907,7 @@ public IQueryDataSource queryForDeviceRegionScan( private List getFileHandleListForQuery( Collection tsFileResources, - Map devicePathToAligned, + Map devicePathsToContext, QueryContext context, Filter globalTimeFilter, boolean isSeq) @@ -1924,7 +1925,7 @@ private List getFileHandleListForQuery( } else { tsFileResource .getProcessor() - .queryForDeviceRegionScan(devicePathToAligned, context, fileScanHandles); + .queryForDeviceRegionScan(devicePathsToContext, context, fileScanHandles); } } finally { closeQueryLock.readLock().unlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java index 1ad4de9abf85..c18262145e53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java @@ -20,6 +20,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -49,7 +50,7 @@ QueryDataSource query( /** Get satisfied QueryDataSource from DataRegion for regionScan */ IQueryDataSource queryForDeviceRegionScan( - Map devicePathToAligned, + Map devicePathsToContext, QueryContext queryContext, Filter globalTimeFilter, List timePartitions) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java index b18dbb01eb7e..42c5a766886c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java @@ -20,6 +20,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -73,7 +74,7 @@ public QueryDataSource query( @Override public IQueryDataSource queryForDeviceRegionScan( - Map devicePathToAligned, + Map devicePathsToContext, QueryContext queryContext, Filter globalTimeFilter, List timePartitions) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 74aed59ab887..5a964c33600d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; @@ -1883,7 +1884,7 @@ public void queryForSeriesRegionScan( * get the related ChunkMetadata of data on disk. */ public void queryForDeviceRegionScan( - Map devicePathToAligned, + Map devicePathsToContext, QueryContext queryContext, List fileScanHandlesForQuery) { long startTime = System.nanoTime(); @@ -1893,9 +1894,9 @@ public void queryForDeviceRegionScan( new HashMap<>(); flushQueryLock.readLock().lock(); try { - for (Map.Entry entry : devicePathToAligned.entrySet()) { + for (Map.Entry entry : devicePathsToContext.entrySet()) { IDeviceID deviceID = entry.getKey(); - boolean isAligned = entry.getValue(); + boolean isAligned = entry.getValue().isAligned(); long timeLowerBound = getQueryTimeLowerBound( PathUtils.splitPathToDetachedNodes(((PlainDeviceID) deviceID).toStringID())); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java index 452e6dce7983..ba24a39d64dc 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java @@ -23,8 +23,10 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; +import org.apache.iotdb.db.queryengine.common.TimeseriesContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -49,10 +51,10 @@ public class RegionScanLogicalPlannerTest { - private static Map>> + private static Map>> deviceToTimeseriesSchemaInfoMap; - private static Map>> + private static Map>> getDeviceToTimeseriesSchemaInfoMap() throws IllegalPathException { if (deviceToTimeseriesSchemaInfoMap != null) { @@ -60,40 +62,40 @@ public class RegionScanLogicalPlannerTest { } deviceToTimeseriesSchemaInfoMap = new HashMap<>(); - Map> timeseriesSchemaInfoMap = new HashMap<>(); + Map> timeseriesSchemaInfoMap = new HashMap<>(); timeseriesSchemaInfoMap.put( new MeasurementPath("root.sg.d1.s1", TSDataType.INT32), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "INT32", null, "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null))); timeseriesSchemaInfoMap.put( new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "DOUBLE", "status", "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null))); timeseriesSchemaInfoMap.put( new MeasurementPath("root.sg.d1.s3", TSDataType.BOOLEAN), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "BOOLEAN", null, "PLAIN", "LZ4", "{\"key1\":\"value2\"}", null, null))); deviceToTimeseriesSchemaInfoMap.put( new PartialPath(new PlainDeviceID("root.sg.d1")), timeseriesSchemaInfoMap); - Map> timeseriesSchemaInfoMap2 = new HashMap<>(); + Map> timeseriesSchemaInfoMap2 = new HashMap<>(); timeseriesSchemaInfoMap2.put( new MeasurementPath("root.sg.d2.s1", TSDataType.INT32), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "INT32", null, "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null))); timeseriesSchemaInfoMap2.put( new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "DOUBLE", "status", "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null))); timeseriesSchemaInfoMap2.put( new MeasurementPath("root.sg.d2.s4", TSDataType.TEXT), Collections.singletonList( - new TimeseriesSchemaInfo( + new TimeseriesContext( "TEXT", null, "PLAIN", "LZ4", "{\"key2\":\"value1\"}", null, null))); deviceToTimeseriesSchemaInfoMap.put( new PartialPath(new PlainDeviceID("root.sg.d2")), timeseriesSchemaInfoMap2); @@ -101,17 +103,15 @@ public class RegionScanLogicalPlannerTest { List schemas = new ArrayList<>(); schemas.add("s1"); schemas.add("s2"); - List timeseriesSchemaInfoList = new ArrayList<>(); - Map> timeseriesSchemaInfoMap3 = new HashMap<>(); - timeseriesSchemaInfoList.add( - new TimeseriesSchemaInfo( - "INT32", null, "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null)); - timeseriesSchemaInfoList.add( - new TimeseriesSchemaInfo( + List timeseriesContextList = new ArrayList<>(); + Map> timeseriesSchemaInfoMap3 = new HashMap<>(); + timeseriesContextList.add( + new TimeseriesContext("INT32", null, "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null)); + timeseriesContextList.add( + new TimeseriesContext( "DOUBLE", "status", "PLAIN", "LZ4", "{\"key1\":\"value1\"}", null, null)); timeseriesSchemaInfoMap3.put( - new AlignedPath("root.sg.d2.a", schemas, Collections.emptyList()), - timeseriesSchemaInfoList); + new AlignedPath("root.sg.d2.a", schemas, Collections.emptyList()), timeseriesContextList); deviceToTimeseriesSchemaInfoMap.put( new PartialPath(new PlainDeviceID("root.sg.d2.a")), timeseriesSchemaInfoMap3); @@ -126,13 +126,19 @@ public void testShowDevicesWithTimeCondition() throws IllegalPathException { // fake initResultNodeContext() queryId.genPlanNodeId(); - Map devicePathsToAligned = new HashMap<>(); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2.a")), true); + Map deviceContextMap = new HashMap<>(); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d1")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2.a")), + new DeviceContext(true, SchemaConstant.NON_TEMPLATE)); DeviceRegionScanNode regionScanNode = - new DeviceRegionScanNode(queryId.genPlanNodeId(), devicePathsToAligned, false, null); + new DeviceRegionScanNode(queryId.genPlanNodeId(), deviceContextMap, false, null); PlanNode actualPlan = parseSQLToPlanNode(sql); Assert.assertEquals(actualPlan, regionScanNode); @@ -146,13 +152,19 @@ public void testShowDevicesWithTimeConditionWithLimitOffset() throws IllegalPath // fake initResultNodeContext() queryId.genPlanNodeId(); - Map devicePathsToAligned = new HashMap<>(); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2.a")), true); + Map deviceContextMap = new HashMap<>(); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d1")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2.a")), + new DeviceContext(true, SchemaConstant.NON_TEMPLATE)); DeviceRegionScanNode regionScanNode = - new DeviceRegionScanNode(queryId.genPlanNodeId(), devicePathsToAligned, false, null); + new DeviceRegionScanNode(queryId.genPlanNodeId(), deviceContextMap, false, null); LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), 20); limitNode.addChild(regionScanNode); @@ -171,13 +183,19 @@ public void testCountDevicesWithTimeConditionWithLimitOffset() throws IllegalPat // fake initResultNodeContext() queryId.genPlanNodeId(); - Map devicePathsToAligned = new HashMap<>(); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")), false); - devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2.a")), true); + Map deviceContextMap = new HashMap<>(); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d1")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2")), + new DeviceContext(false, SchemaConstant.NON_TEMPLATE)); + deviceContextMap.put( + new PartialPath(new PlainDeviceID("root.sg.d2.a")), + new DeviceContext(true, SchemaConstant.NON_TEMPLATE)); DeviceRegionScanNode regionScanNode = - new DeviceRegionScanNode(queryId.genPlanNodeId(), devicePathsToAligned, true, null); + new DeviceRegionScanNode(queryId.genPlanNodeId(), deviceContextMap, true, null); PlanNode actualPlan = parseSQLToPlanNode(sql); Assert.assertEquals(actualPlan, regionScanNode);