diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index 087c969b89a6..33b524b1a9a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -24,7 +24,6 @@ import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -112,12 +111,7 @@ public synchronized Pair onEvent(final TabletInsertio if (event instanceof PipeRawTabletInsertionEvent) { deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId(); } else if (event instanceof PipeInsertNodeTabletInsertionEvent) { - final InsertNode insertNode = - ((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible(); - // insertNode.getDevicePath() is null for InsertRowsNode - if (Objects.nonNull(insertNode) && Objects.nonNull(insertNode.getDevicePath())) { - deviceId = insertNode.getDevicePath().getFullPath(); - } + deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(); } if (Objects.isNull(deviceId)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java index 21cc976be77f..660b2677ad59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -47,12 +46,9 @@ protected void doTransfer(AsyncPipeDataTransferServiceClient client, TPipeTransf @Override protected void updateLeaderCache(TSStatus status) { - final InsertNode insertNode = - ((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible(); - // insertNode.getDevicePath() is null for InsertRowsNode - if (insertNode != null && insertNode.getDevicePath() != null) { + if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) { connector.updateLeaderCache( - insertNode.getDevicePath().getFullPath(), status.getRedirectNode()); + ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), status.getRedirectNode()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 12e9438f245e..8ee65ef94edc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -236,13 +236,9 @@ private void doTransfer( try { insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible(); - + // getDeviceId() may return null for InsertRowsNode, will be equal to getClient(null) + clientAndStatus = clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId()); if (insertNode != null) { - clientAndStatus = - // insertNode.getDevicePath() is null for InsertRowsNode - Objects.nonNull(insertNode.getDevicePath()) - ? clientManager.getClient(insertNode.getDevicePath().getFullPath()) - : clientManager.getClient(); resp = clientAndStatus .getLeft() @@ -250,7 +246,6 @@ private void doTransfer( compressIfNeeded( PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode))); } else { - clientAndStatus = clientManager.getClient(); resp = clientAndStatus .getLeft() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 437d8b97d509..11711fa1d034 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; @@ -62,15 +63,19 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent private List dataContainers; + private final PartialPath devicePath; + private ProgressIndex progressIndex; public PipeInsertNodeTabletInsertionEvent( WALEntryHandler walEntryHandler, + PartialPath devicePath, ProgressIndex progressIndex, boolean isAligned, boolean isGeneratedByPipe) { this( walEntryHandler, + devicePath, progressIndex, isAligned, isGeneratedByPipe, @@ -83,6 +88,7 @@ public PipeInsertNodeTabletInsertionEvent( private PipeInsertNodeTabletInsertionEvent( WALEntryHandler walEntryHandler, + PartialPath devicePath, ProgressIndex progressIndex, boolean isAligned, boolean isGeneratedByPipe, @@ -93,6 +99,8 @@ private PipeInsertNodeTabletInsertionEvent( long endTime) { super(pipeName, pipeTaskMeta, pattern, startTime, endTime); this.walEntryHandler = walEntryHandler; + // Record device path here so there's no need to get it from InsertNode cache later. + this.devicePath = devicePath; this.progressIndex = progressIndex; this.isAligned = isAligned; this.isGeneratedByPipe = isGeneratedByPipe; @@ -114,6 +122,10 @@ public InsertNode getInsertNodeViaCacheIfPossible() { return walEntryHandler.getInsertNodeViaCacheIfPossible(); } + public String getDeviceId() { + return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null; + } + /////////////////////////// EnrichedEvent /////////////////////////// @Override @@ -170,6 +182,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP long endTime) { return new PipeInsertNodeTabletInsertionEvent( walEntryHandler, + devicePath, progressIndex, isAligned, isGeneratedByPipe, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 32be413e6609..94793b74bc15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -44,6 +44,7 @@ public static PipeRealtimeEvent createRealtimeEvent( return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( new PipeInsertNodeTabletInsertionEvent( walEntryHandler, + insertNode.getDevicePath(), insertNode.getProgressIndex(), insertNode.isAligned(), insertNode.isGeneratedByPipe()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index a42ff843f30d..4ea3048f2c38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -46,12 +46,16 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; public class TreeModelPlanner implements IPlanner { + private static final Logger LOGGER = LoggerFactory.getLogger(TreeModelPlanner.class); private final Statement statement; @@ -159,9 +163,16 @@ public ScheduledExecutorService getScheduledExecutorService() { public void setRedirectInfo( IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { Analysis analysis = (Analysis) iAnalysis; - if (analysis.getStatement() instanceof InsertBaseStatement + + // Get the inner statement of PipeEnrichedStatement + Statement statementToRedirect = + analysis.getStatement() instanceof PipeEnrichedStatement + ? ((PipeEnrichedStatement) analysis.getStatement()).getInnerStatement() + : analysis.getStatement(); + + if (statementToRedirect instanceof InsertBaseStatement && !analysis.isFinishQueryAfterAnalyze()) { - InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement(); + InsertBaseStatement insertStatement = (InsertBaseStatement) statementToRedirect; List redirectNodeList = analysis.getRedirectNodeList(); if (insertStatement instanceof InsertRowsStatement || insertStatement instanceof InsertMultiTabletsStatement) {