Skip to content

Commit

Permalink
implement add column
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosZyk committed Jun 12, 2024
1 parent 50d8708 commit 356a488
Show file tree
Hide file tree
Showing 26 changed files with 946 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
Expand Down Expand Up @@ -408,6 +409,9 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case CommitCreateTable:
plan = new CommitCreateTablePlan();
break;
case AddTableColumn:
plan = new AddTableColumnPlan();
break;
case GetNodePathsPartition:
plan = new GetNodePathsPartitionPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public enum ConfigPhysicalPlanType {
PreCreateTable((short) 850),
RollbackCreateTable((short) 851),
CommitCreateTable((short) 852),
AddTableColumn((short) 853),

/** Deprecated types for sync, restored them for upgrade. */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.table;

import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

public class AddTableColumnPlan extends ConfigPhysicalPlan {

private String database;

private String tableName;

private List<TsTableColumnSchema> columnSchemaList;

private boolean isRollback;

public AddTableColumnPlan() {
super(ConfigPhysicalPlanType.AddTableColumn);
}

public AddTableColumnPlan(
String database,
String tableName,
List<TsTableColumnSchema> columnSchemaList,
boolean isRollback) {
super(ConfigPhysicalPlanType.AddTableColumn);
this.database = database;
this.tableName = tableName;
this.columnSchemaList = columnSchemaList;
this.isRollback = isRollback;
}

public String getDatabase() {
return database;
}

public String getTableName() {
return tableName;
}

public List<TsTableColumnSchema> getColumnSchemaList() {
return columnSchemaList;
}

public boolean isRollback() {
return isRollback;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

ReadWriteIOUtils.write(database, stream);
ReadWriteIOUtils.write(tableName, stream);
TsTableColumnSchemaUtil.serialize(columnSchemaList, stream);
ReadWriteIOUtils.write(isRollback, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
this.database = ReadWriteIOUtils.readString(buffer);
this.tableName = ReadWriteIOUtils.readString(buffer);
this.columnSchemaList = TsTableColumnSchemaUtil.deserializeColumnSchemaList(buffer);
this.isRollback = ReadWriteIOUtils.readBool(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.table.AlterTableOperationType;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
Expand Down Expand Up @@ -123,6 +124,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
Expand Down Expand Up @@ -2385,4 +2387,18 @@ public TSStatus createTable(ByteBuffer tableInfo) {
return status;
}
}

public TSStatus alterTable(TAlterTableReq req) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
switch (AlterTableOperationType.getType(req.operationType)) {
case ADD_COLUMN:
return procedureManager.alterTableAddColumn(req);
default:
throw new IllegalArgumentException();
}
} else {
return status;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.trigger.TriggerInformation;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.iotdb.confignode.procedure.impl.schema.SetTTLProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.DropConsumerProcedure;
Expand All @@ -100,6 +103,7 @@
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
Expand Down Expand Up @@ -1290,6 +1294,56 @@ public TSStatus createTable(String database, TsTable table) {
}
}

public TSStatus alterTableAddColumn(TAlterTableReq req) {
String database = req.database;
String tableName = req.tableName;
String queryId = req.queryId;
List<TsTableColumnSchema> columnSchemaList =
TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.updateInfo);

long procedureId = -1;
synchronized (this) {
boolean hasOverlappedTask = false;
ProcedureType type;
AddTableColumnProcedure addTableColumnProcedure;
for (Procedure<?> procedure : executor.getProcedures().values()) {
type = ProcedureFactory.getProcedureType(procedure);
if (type == null || !type.equals(ProcedureType.ADD_TABLE_COLUMN_PROCEDURE)) {
continue;
}
addTableColumnProcedure = (AddTableColumnProcedure) procedure;
if (queryId.equals(addTableColumnProcedure.getQueryId())) {
procedureId = addTableColumnProcedure.getProcId();
break;
}
if (database.equals(addTableColumnProcedure.getDatabase())
&& tableName.equals(addTableColumnProcedure.getTableName())) {
hasOverlappedTask = true;
break;
}
}

if (procedureId == -1) {
if (hasOverlappedTask) {
return RpcUtils.getStatus(
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
"Some other task dropping table with same name.");
}
procedureId =
this.executor.submitProcedure(
new AddTableColumnProcedure(database, tableName, queryId, columnSchemaList));
}
}
List<TSStatus> procedureStatus = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
if (isSucceed) {
return StatusUtils.OK;
} else {
return procedureStatus.get(0);
}
}

// ======================================================
/*
GET-SET Region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
Expand All @@ -52,6 +54,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
Expand Down Expand Up @@ -1110,6 +1113,59 @@ public void updateSchemaQuotaConfiguration(long seriesThreshold, long deviceThre
schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}

public TsTable getTable(String database, String tableName) {
return clusterSchemaInfo.getTsTable(database, tableName);
}

public synchronized Pair<TSStatus, List<TsTableColumnSchema>> addTableColumn(
String database, String tableName, List<TsTableColumnSchema> columnSchemaList) {
Map<String, List<TsTable>> currentUsingTable = clusterSchemaInfo.getAllUsingTables();
TsTable targetTable = null;
for (TsTable table : currentUsingTable.get(database)) {
if (table.getTableName().equals(tableName)) {
targetTable = table;
break;
}
}

if (targetTable == null) {
return new Pair<>(
RpcUtils.getStatus(
TSStatusCode.TABLE_NOT_EXISTS,
String.format("Table %s.%s not exist", database, tableName)),
null);
}

List<TsTableColumnSchema> copiedList = new ArrayList<>();
for (TsTableColumnSchema columnSchema : columnSchemaList) {
if (targetTable.getColumnSchema(columnSchema.getColumnName()) == null) {
copiedList.add(columnSchema);
}
}

AddTableColumnPlan addTableColumnPlan =
new AddTableColumnPlan(database, tableName, copiedList, false);
try {
return new Pair<>(getConsensusManager().write(addTableColumnPlan), copiedList);
} catch (ConsensusException e) {
LOGGER.warn(e.getMessage(), e);
return new Pair<>(
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()), null);
}
}

public synchronized TSStatus rollbackAddTableColumn(
String database, String tableName, List<TsTableColumnSchema> columnSchemaList) {
AddTableColumnPlan addTableColumnPlan =
new AddTableColumnPlan(database, tableName, columnSchemaList, true);
try {
return getConsensusManager().write(addTableColumnPlan);
} catch (ConsensusException e) {
LOGGER.warn(e.getMessage(), e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}

public void clearSchemaQuotaCache() {
schemaQuotaStatistics.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
Expand Down Expand Up @@ -467,6 +468,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
return clusterSchemaInfo.rollbackCreateTable((RollbackCreateTablePlan) physicalPlan);
case CommitCreateTable:
return clusterSchemaInfo.commitCreateTable((CommitCreateTablePlan) physicalPlan);
case AddTableColumn:
return clusterSchemaInfo.addTableColumn((AddTableColumnPlan) physicalPlan);
case CreatePipeV2:
return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
case SetPipeStatusV2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
Expand Down Expand Up @@ -1091,6 +1092,41 @@ public Map<String, List<TsTable>> getAllPreCreateTables() {
}
}

public TsTable getTsTable(String database, String tableName) {
databaseReadWriteLock.readLock().lock();
try {
return mTree.getTable(new PartialPath(new String[] {ROOT, database}), tableName);
} catch (MetadataException e) {
LOGGER.warn(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
databaseReadWriteLock.readLock().unlock();
}
}

public TSStatus addTableColumn(AddTableColumnPlan plan) {
databaseReadWriteLock.writeLock().lock();
try {
if (plan.isRollback()) {
mTree.rollbackAddTableColumn(
new PartialPath(new String[] {ROOT, plan.getDatabase()}),
plan.getTableName(),
plan.getColumnSchemaList());
} else {
mTree.addTableColumn(
new PartialPath(new String[] {ROOT, plan.getDatabase()}),
plan.getTableName(),
plan.getColumnSchemaList());
}
return RpcUtils.SUCCESS_STATUS;
} catch (MetadataException e) {
LOGGER.warn(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} finally {
databaseReadWriteLock.writeLock().unlock();
}
}

// endregion

@TestOnly
Expand Down
Loading

0 comments on commit 356a488

Please sign in to comment.