Skip to content

Commit

Permalink
[to rc/1.3.3] Properly handle the case where a single IndexedRequest …
Browse files Browse the repository at this point in the history
…contains multiple “delete PlanNode” operations on the receiving end of IoTConsensus. (#13427)
  • Loading branch information
liyuheng55555 authored Sep 6, 2024
1 parent 1df0000 commit 6e319b2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
Expand All @@ -33,6 +34,7 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
Expand All @@ -56,6 +58,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class DataRegionStateMachine extends BaseStateMachine {

Expand Down Expand Up @@ -151,17 +154,19 @@ public void loadSnapshot(File latestSnapshotRootDir) {
}
}

protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
List<DeleteDataNode> deleteDataNodes = new ArrayList<>();
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
if (planNode instanceof SearchNode) {
((SearchNode) planNode).setSearchIndex(indexedRequest.getSearchIndex());
}
if (planNode instanceof InsertNode) {
InsertNode innerNode = (InsertNode) planNode;
insertNodes.add(innerNode);
insertNodes.add((InsertNode) planNode);
} else if (planNode instanceof DeleteDataNode) {
deleteDataNodes.add((DeleteDataNode) planNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
// contains one request
Expand All @@ -172,7 +177,45 @@ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ "the size of requests are larger than 1");
}
}
return mergeInsertNodes(insertNodes);
if (!insertNodes.isEmpty()) {
if (!deleteDataNodes.isEmpty()) {
throw new IllegalArgumentException(
"One indexedRequest cannot contain InsertNode and DeleteDataNode at the same time");
}
return mergeInsertNodes(insertNodes);
}
return mergeDeleteDataNode(deleteDataNodes);
}

private DeleteDataNode mergeDeleteDataNode(List<DeleteDataNode> deleteDataNodes) {
int size = deleteDataNodes.size();
if (size == 0) {
throw new IllegalArgumentException("deleteDataNodes is empty");
}
DeleteDataNode firstOne = deleteDataNodes.get(0);
if (size == 1) {
return firstOne;
}
if (!deleteDataNodes.stream()
.allMatch(
deleteDataNode ->
firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime()
&& firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) {
throw new IllegalArgumentException(
"DeleteDataNodes which start time or end time are not same cannot be merged");
}
List<PartialPath> pathList =
deleteDataNodes.stream()
.flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
// Some time the deleteDataNode list contains a path for multiple times, so use
// distinct() to clear them
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
firstOne.getPlanNodeId(),
pathList,
firstOne.getDeleteStartTime(),
firstOne.getDeleteEndTime());
}

/**
Expand All @@ -181,12 +224,12 @@ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
* Notice: the continuity of insert nodes sharing same search index should be protected by the
* upper layer.
*
* @exception RuntimeException when insertNodes is empty
* @exception IllegalArgumentException when insertNodes is empty
*/
protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
throw new RuntimeException();
throw new IllegalArgumentException("insertNodes should never be empty");
}
if (size == 1) {
return insertNodes.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
IConsensusRequest result;
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
result = grabInsertNode(indexedRequest);
result = grabPlanNode(indexedRequest);
} else if (request instanceof BatchIndexedConsensusRequest) {
BatchIndexedConsensusRequest batchRequest = (BatchIndexedConsensusRequest) request;
DeserializedBatchIndexedConsensusRequest deserializedRequest =
Expand All @@ -88,7 +88,7 @@ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
batchRequest.getEndSyncIndex(),
batchRequest.getRequests().size());
for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
final PlanNode planNode = grabInsertNode(indexedRequest);
final PlanNode planNode = grabPlanNode(indexedRequest);
if (planNode instanceof ComparableConsensusRequest) {
final IoTProgressIndex ioTProgressIndex =
new IoTProgressIndex(batchRequest.getSourcePeerId(), indexedRequest.getSyncIndex());
Expand Down

0 comments on commit 6e319b2

Please sign in to comment.