diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index 0a377551a9cf..a8b9d895d006 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; @@ -33,6 +34,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -56,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public class DataRegionStateMachine extends BaseStateMachine { @@ -151,8 +154,9 @@ public void loadSnapshot(File latestSnapshotRootDir) { } } - protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) { + protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) { List insertNodes = new ArrayList<>(indexedRequest.getRequests().size()); + List deleteDataNodes = new ArrayList<>(); for (IConsensusRequest req : indexedRequest.getRequests()) { // PlanNode in IndexedConsensusRequest should always be InsertNode PlanNode planNode = getPlanNode(req); @@ -160,8 +164,9 @@ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) { ((SearchNode) planNode).setSearchIndex(indexedRequest.getSearchIndex()); } if (planNode instanceof InsertNode) { - InsertNode innerNode = (InsertNode) planNode; - insertNodes.add(innerNode); + insertNodes.add((InsertNode) planNode); + } else if (planNode instanceof DeleteDataNode) { + deleteDataNodes.add((DeleteDataNode) planNode); } else if (indexedRequest.getRequests().size() == 1) { // If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only // contains one request @@ -172,7 +177,45 @@ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) { + "the size of requests are larger than 1"); } } - return mergeInsertNodes(insertNodes); + if (!insertNodes.isEmpty()) { + if (!deleteDataNodes.isEmpty()) { + throw new IllegalArgumentException( + "One indexedRequest cannot contain InsertNode and DeleteDataNode at the same time"); + } + return mergeInsertNodes(insertNodes); + } + return mergeDeleteDataNode(deleteDataNodes); + } + + private DeleteDataNode mergeDeleteDataNode(List deleteDataNodes) { + int size = deleteDataNodes.size(); + if (size == 0) { + throw new IllegalArgumentException("deleteDataNodes is empty"); + } + DeleteDataNode firstOne = deleteDataNodes.get(0); + if (size == 1) { + return firstOne; + } + if (!deleteDataNodes.stream() + .allMatch( + deleteDataNode -> + firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime() + && firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) { + throw new IllegalArgumentException( + "DeleteDataNodes which start time or end time are not same cannot be merged"); + } + List pathList = + deleteDataNodes.stream() + .flatMap(deleteDataNode -> deleteDataNode.getPathList().stream()) + // Some time the deleteDataNode list contains a path for multiple times, so use + // distinct() to clear them + .distinct() + .collect(Collectors.toList()); + return new DeleteDataNode( + firstOne.getPlanNodeId(), + pathList, + firstOne.getDeleteStartTime(), + firstOne.getDeleteEndTime()); } /** @@ -181,12 +224,12 @@ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) { * Notice: the continuity of insert nodes sharing same search index should be protected by the * upper layer. * - * @exception RuntimeException when insertNodes is empty + * @exception IllegalArgumentException when insertNodes is empty */ protected InsertNode mergeInsertNodes(List insertNodes) { int size = insertNodes.size(); if (size == 0) { - throw new RuntimeException(); + throw new IllegalArgumentException("insertNodes should never be empty"); } if (size == 1) { return insertNodes.get(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java index 775f1bf6f49b..240c1b1caa0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java @@ -79,7 +79,7 @@ public IConsensusRequest deserializeRequest(IConsensusRequest request) { IConsensusRequest result; if (request instanceof IndexedConsensusRequest) { IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request; - result = grabInsertNode(indexedRequest); + result = grabPlanNode(indexedRequest); } else if (request instanceof BatchIndexedConsensusRequest) { BatchIndexedConsensusRequest batchRequest = (BatchIndexedConsensusRequest) request; DeserializedBatchIndexedConsensusRequest deserializedRequest = @@ -88,7 +88,7 @@ public IConsensusRequest deserializeRequest(IConsensusRequest request) { batchRequest.getEndSyncIndex(), batchRequest.getRequests().size()); for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) { - final PlanNode planNode = grabInsertNode(indexedRequest); + final PlanNode planNode = grabPlanNode(indexedRequest); if (planNode instanceof ComparableConsensusRequest) { final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex(batchRequest.getSourcePeerId(), indexedRequest.getSyncIndex());