Skip to content

Commit

Permalink
[Table Model] Create table device (#12746)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosZyk authored Jun 18, 2024
1 parent bbb7846 commit 1264141
Show file tree
Hide file tree
Showing 26 changed files with 761 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
Expand Down Expand Up @@ -525,6 +526,22 @@ public TSStatus visitDeleteLogicalView(DeleteLogicalViewNode node, ISchemaRegion
}
}

@Override
public TSStatus visitCreateTableDevice(CreateTableDeviceNode node, ISchemaRegion schemaRegion) {
try {
// todo implement storage for device of diverse data types
schemaRegion.createTableDevice(
node.getTableName(),
node.getDeviceIdList(),
node.getAttributeNameList(),
node.getAttributeValueList());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}

@Override
public TSStatus visitPipeEnrichedWritePlanNode(
final PipeEnrichedWritePlanNode node, final ISchemaRegion schemaRegion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ public void invalidAllCache() {
partitionCache.invalidAllCache();
}

@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
}

@Override
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
}

@Override
public SchemaPartition getSchemaPartition(String database) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
}

/** split data partition query param by database */
private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
List<DataPartitionQueryParam> dataPartitionQueryParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface IAnalysis {

SchemaPartition getSchemaPartitionInfo();

void setSchemaPartitionInfo(SchemaPartition schemaPartition);

DataPartition getDataPartitionInfo();

void setRedirectNodeList(List<TEndPoint> redirectNodeList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;

import org.apache.tsfile.file.metadata.IDeviceID;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -90,4 +92,34 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(

/** Invalid all partition cache */
void invalidAllCache();

// ======================== Table Model Schema Partition Interface ========================
/**
* Get or create schema partition, used in data insertion with enable_auto_create_schema is true.
* if schemaPartition does not exist, then automatically create.
*
* <p>The database shall start with "root.". Concat this to a user-provided db name if necessary.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName);

/**
* For data query with completed id.
*
* <p>The database shall start with "root.". Concat this to a user-provided db name if necessary.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList);

/**
* For data query with partial device id conditions.
*
* <p>The database shall start with "root.". Concat this to a user-provided db name if necessary.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database);
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;

Expand Down Expand Up @@ -484,6 +485,10 @@ public R visitAlterLogicalView(AlterLogicalViewNode node, C context) {
return visitPlan(node, context);
}

public R visitCreateTableDevice(CreateTableDeviceNode node, C context) {
return visitPlan(node, context);
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// Data Write Node
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public class Analysis implements IAnalysis {

private DataPartition dataPartition;

private SchemaPartition schemaPartition;

private DatasetHeader respDatasetHeader;

private boolean finishQueryAfterAnalyze;
Expand Down Expand Up @@ -653,7 +655,12 @@ public String getStatementType() {

@Override
public SchemaPartition getSchemaPartitionInfo() {
throw new UnsupportedOperationException();
return schemaPartition;
}

@Override
public void setSchemaPartitionInfo(SchemaPartition schemaPartition) {
this.schemaPartition = schemaPartition;
}

@Override
Expand All @@ -673,7 +680,7 @@ public void addEndPointToRedirectNodeList(TEndPoint endPoint) {

@Override
public TimePredicate getCovertedTimePredicate() {
throw new UnsupportedOperationException();
return null;
}

public static final class AccessControlInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllRows;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
Expand Down Expand Up @@ -231,6 +232,9 @@ private Visitor(
@Override
public Scope process(Node node, Optional<Scope> scope) {
Scope returnScope = super.process(node, scope);
if (node instanceof CreateDevice) {
return returnScope;
}
checkState(
returnScope.getOuterQueryParent().equals(outerQueryScope),
"result scope should have outer query scope equal with parameter outer query scope");
Expand Down Expand Up @@ -2420,6 +2424,11 @@ private Scope.Builder scopeBuilder(Optional<Scope> parentScope) {

return scopeBuilder;
}

@Override
protected Scope visitCreateDevice(CreateDevice node, Optional<Scope> context) {
return null;
}
}

private static boolean hasScopeAsLocalParent(Scope root, Scope parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.plan.relational.metadata;

import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
Expand All @@ -27,6 +28,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException;
import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.type.Type;

import java.util.List;
Expand Down Expand Up @@ -97,4 +99,34 @@ TableSchema validateTableHeaderSchema(
* <p>If validation failed, a SemanticException will be thrown.
*/
void validateDeviceSchema(ITableDeviceSchemaValidation schemaValidation, MPPQueryContext context);

// ======================== Table Model Schema Partition Interface ========================
/**
* Get or create schema partition, used in data insertion with enable_auto_create_schema is true.
* if schemaPartition does not exist, then automatically create.
*
* <p>The database shall start with "root.". Concat this to a user-provided db name if necessary.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName);

/**
* For data query with completed id.
*
* <p>The database is a user-provided db name.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList);

/**
* For data query with partial device id conditions.
*
* <p>The database is a user-provided db name.
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.apache.iotdb.db.queryengine.plan.relational.metadata;

import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
Expand All @@ -43,6 +46,8 @@
import java.util.Locale;
import java.util.Optional;

import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.apache.tsfile.read.common.type.BinaryType.TEXT;
import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
Expand All @@ -54,6 +59,8 @@ public class TableMetadataImpl implements Metadata {

private final TypeManager typeManager = new InternalTypeManager();

private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();

@Override
public boolean tableExists(QualifiedObjectName name) {
return false;
Expand Down Expand Up @@ -288,6 +295,23 @@ public void validateDeviceSchema(
throw new UnsupportedOperationException();
}

@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
return partitionFetcher.getOrCreateSchemaPartition(
PATH_ROOT + PATH_SEPARATOR + database, deviceIDList, userName);
}

@Override
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList) {
return partitionFetcher.getSchemaPartition(PATH_ROOT + PATH_SEPARATOR + database, deviceIDList);
}

@Override
public SchemaPartition getSchemaPartition(String database) {
return partitionFetcher.getSchemaPartition(PATH_ROOT + PATH_SEPARATOR + database);
}

public static boolean isTwoNumericType(List<? extends Type> argumentTypes) {
return argumentTypes.size() == 2
&& isNumericType(argumentTypes.get(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@

package org.apache.iotdb.db.queryengine.plan.relational.planner;

import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.FilterScanCombine;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.IndexScan;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RelationalPlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SimplifyExpressions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
Expand Down Expand Up @@ -96,6 +100,9 @@ public LogicalQueryPlan plan(Analysis analysis) {
}

private PlanNode planStatement(Analysis analysis, Statement statement) {
if (statement instanceof CreateDevice) {
return planCreateDevice((CreateDevice) statement, analysis);
}
return createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);
}

Expand Down Expand Up @@ -181,4 +188,27 @@ private enum Stage {
OPTIMIZED,
OPTIMIZED_AND_VALIDATED
}

private PlanNode planCreateDevice(CreateDevice statement, Analysis analysis) {
context.setQueryType(QueryType.WRITE);

CreateTableDeviceNode node =
new CreateTableDeviceNode(
context.getQueryId().genPlanNodeId(),
statement.getDatabase(),
statement.getTable(),
statement.getDeviceIdList(),
statement.getAttributeNameList(),
statement.getAttributeValueList());

analysis.setStatement(statement);
SchemaPartition partition =
metadata.getOrCreateSchemaPartition(
statement.getDatabase(),
node.getPartitionKeyList(),
context.getSession().getUserName());
analysis.setSchemaPartitionInfo(partition);

return node;
}
}
Loading

0 comments on commit 1264141

Please sign in to comment.