Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
Cpaulyz committed Jun 12, 2024
1 parent adfe103 commit 217bf7f
Show file tree
Hide file tree
Showing 29 changed files with 379 additions and 290 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.common;

import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class DeviceContext {
private boolean isAligned;
private final int templateId;

public DeviceContext(DeviceSchemaInfo deviceSchemaInfo) {
this.isAligned = deviceSchemaInfo.isAligned();
this.templateId = deviceSchemaInfo.getTemplateId();
}

public DeviceContext(boolean isAligned, int templateId) {
this.isAligned = isAligned;
this.templateId = templateId;
}

public boolean isAligned() {
return isAligned;
}

public int getTemplateId() {
return templateId;
}

public void serializeAttributes(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(isAligned, byteBuffer);
ReadWriteIOUtils.write(templateId, byteBuffer);
}

public void serializeAttributes(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isAligned, stream);
ReadWriteIOUtils.write(templateId, stream);
}

public static DeviceContext deserialize(ByteBuffer buffer) {
boolean isAligned = ReadWriteIOUtils.readBool(buffer);
int templateId = ReadWriteIOUtils.readInt(buffer);
return new DeviceContext(isAligned, templateId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeviceContext that = (DeviceContext) o;
return isAligned == that.isAligned && templateId == that.templateId;
}

@Override
public int hashCode() {
return Objects.hash(isAligned, templateId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import static org.apache.iotdb.db.queryengine.execution.operator.schema.source.TimeSeriesSchemaSource.mapToString;

public class TimeseriesSchemaInfo {
public class TimeseriesContext {
private final String dataType;
private final String encoding;
private final String compression;
Expand All @@ -45,7 +45,7 @@ public class TimeseriesSchemaInfo {
private final String deadband;
private final String deadbandParameters;

public TimeseriesSchemaInfo(IMeasurementSchemaInfo schemaInfo) {
public TimeseriesContext(IMeasurementSchemaInfo schemaInfo) {
this.dataType = schemaInfo.getSchema().getType().toString();
this.encoding = schemaInfo.getSchema().getEncodingType().toString();
this.compression = schemaInfo.getSchema().getCompressor().toString();
Expand Down Expand Up @@ -85,7 +85,7 @@ public String getDeadband() {
return deadband;
}

public TimeseriesSchemaInfo(
public TimeseriesContext(
String dataType,
String alias,
String encoding,
Expand Down Expand Up @@ -122,15 +122,15 @@ public void serializeAttributes(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(deadbandParameters, stream);
}

public static TimeseriesSchemaInfo deserialize(ByteBuffer buffer) {
public static TimeseriesContext deserialize(ByteBuffer buffer) {
String dataType = ReadWriteIOUtils.readString(buffer);
String alias = ReadWriteIOUtils.readString(buffer);
String encoding = ReadWriteIOUtils.readString(buffer);
String compression = ReadWriteIOUtils.readString(buffer);
String tags = ReadWriteIOUtils.readString(buffer);
String deadband = ReadWriteIOUtils.readString(buffer);
String deadbandParameters = ReadWriteIOUtils.readString(buffer);
return new TimeseriesSchemaInfo(
return new TimeseriesContext(
dataType, alias, encoding, compression, tags, deadband, deadbandParameters);
}

Expand All @@ -142,7 +142,7 @@ public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
TimeseriesSchemaInfo that = (TimeseriesSchemaInfo) obj;
TimeseriesContext that = (TimeseriesContext) obj;
return Objects.equals(dataType, that.dataType)
&& Objects.equals(alias, that.alias)
&& encoding.equals(that.encoding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,39 +311,8 @@ public void appendTemplateDevice(
entityNode.setTemplateId(templateId);
cur.replaceChild(deviceName, entityNode);
}
templateMap.putIfAbsent(templateId, template);
}

/**
* Append a template device to the schema tree.
*
* @param devicePath device path
* @param isAligned whether the device is aligned
*/
public void appendDevice(PartialPath devicePath, Boolean isAligned) {
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
SchemaNode child;
for (int i = 1; i < nodes.length - 1; i++) {
child = cur.getChild(nodes[i]);
if (child == null) {
child = new SchemaInternalNode(nodes[i]);
cur.addChild(nodes[i], child);
}
cur = child;
}
String deviceName = nodes[nodes.length - 1];
child = cur.getChild(deviceName);
if (child == null) {
SchemaEntityNode entityNode = new SchemaEntityNode(deviceName);
entityNode.setAligned(isAligned);
cur.addChild(deviceName, entityNode);
} else if (child.isEntity()) {
child.getAsEntityNode().setAligned(isAligned);
} else {
SchemaEntityNode entityNode = new SchemaEntityNode(deviceName);
entityNode.setAligned(isAligned);
cur.replaceChild(deviceName, entityNode);
if (template != null) {
templateMap.putIfAbsent(templateId, template);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
Expand All @@ -39,35 +40,35 @@ public class DataDriverContext extends DriverContext {
// it will be set to null, after being merged into Parent FIContext
private List<PartialPath> paths;
private QueryDataSourceType queryDataSourceType = null;
private Map<IDeviceID, Boolean> deviceIDToAligned;
private Map<IDeviceID, DeviceContext> deviceIDToContext;
// it will be set to null, after QueryDataSource being inited
private List<DataSourceOperator> sourceOperators;

public DataDriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
super(fragmentInstanceContext, pipelineId);
this.paths = new ArrayList<>();
this.sourceOperators = new ArrayList<>();
this.deviceIDToAligned = null;
this.deviceIDToContext = null;
}

public DataDriverContext(DataDriverContext parentContext, int pipelineId) {
super(parentContext.getFragmentInstanceContext(), pipelineId);
this.paths = new ArrayList<>();
this.sourceOperators = new ArrayList<>();
this.deviceIDToAligned = null;
this.deviceIDToContext = null;
}

public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
this.queryDataSourceType = queryDataSourceType;
}

public void setDeviceIDToAligned(Map<IDeviceID, Boolean> deviceIDToAligned) {
this.deviceIDToAligned = deviceIDToAligned;
public void setDeviceIDToContext(Map<IDeviceID, DeviceContext> deviceIDToContext) {
this.deviceIDToContext = deviceIDToContext;
}

public void clearDeviceIDToAligned() {
public void clearDeviceIDToContext() {
// friendly for gc
deviceIDToAligned = null;
deviceIDToContext = null;
}

public void addPath(PartialPath path) {
Expand All @@ -82,8 +83,8 @@ public List<PartialPath> getPaths() {
return paths;
}

public Map<IDeviceID, Boolean> getDeviceIDToAligned() {
return deviceIDToAligned;
public Map<IDeviceID, DeviceContext> getDeviceIDToContext() {
return deviceIDToContext;
}

public Optional<QueryDataSourceType> getQueryDataSourceType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class FragmentInstanceContext extends QueryContext {
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
private List<PartialPath> sourcePaths;
// Used for region scan.
private Map<IDeviceID, Boolean> devicePathsToAligned;
private Map<IDeviceID, DeviceContext> devicePathsToContext;

// Shared by all scan operators in this fragment instance to avoid memory problem
private IQueryDataSource sharedQueryDataSource;
Expand Down Expand Up @@ -356,8 +357,8 @@ public void setSourcePaths(List<PartialPath> sourcePaths) {
this.sourcePaths = sourcePaths;
}

public void setDevicePathsToAligned(Map<IDeviceID, Boolean> devicePathsToAligned) {
this.devicePathsToAligned = devicePathsToAligned;
public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) {
this.devicePathsToContext = devicePathsToContext;
}

public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProcessException {
Expand Down Expand Up @@ -399,17 +400,17 @@ public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProce
}
}

public void initRegionScanQueryDataSource(Map<IDeviceID, Boolean> devicePathToAligned)
public void initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext> devicePathsToContext)
throws QueryProcessException {
long startTime = System.nanoTime();
if (devicePathsToAligned == null) {
if (devicePathsToContext == null) {
return;
}
dataRegion.readLock();
try {
this.sharedQueryDataSource =
dataRegion.queryForDeviceRegionScan(
devicePathToAligned,
devicePathsToContext,
this,
globalTimeFilter != null ? globalTimeFilter.copy() : null,
timePartitions);
Expand Down Expand Up @@ -460,8 +461,8 @@ public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProc
sourcePaths = null;
break;
case DEVICE_REGION_SCAN:
initRegionScanQueryDataSource(devicePathsToAligned);
devicePathsToAligned = null;
initRegionScanQueryDataSource(devicePathsToContext);
devicePathsToContext = null;
break;
case TIME_SERIES_REGION_SCAN:
initRegionScanQueryDataSource(sourcePaths);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void transformToTsBlockColumns(
builder
.getColumnBuilder(2)
.writeBinary(new Binary(String.valueOf(device.isAligned()), TSFileConfig.STRING_CHARSET));
if (templateId != -1) {
if (templateId != SchemaConstant.NON_TEMPLATE) {
builder
.getColumnBuilder(3)
.writeBinary(
Expand All @@ -125,7 +125,7 @@ public void transformToTsBlockColumns(
builder
.getColumnBuilder(1)
.writeBinary(new Binary(String.valueOf(device.isAligned()), TSFileConfig.STRING_CHARSET));
if (templateId != -1) {
if (templateId != SchemaConstant.NON_TEMPLATE) {
builder
.getColumnBuilder(2)
.writeBinary(
Expand Down
Loading

0 comments on commit 217bf7f

Please sign in to comment.