Skip to content

Commit

Permalink
Pipe: fix PipeEnrichedStatement can't contain redirect info (#12579)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielWang2035 authored May 24, 2024
1 parent 7ef1eb7 commit e49a07d
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
Expand Down Expand Up @@ -112,12 +111,7 @@ public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final TabletInsertio
if (event instanceof PipeRawTabletInsertionEvent) {
deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId();
} else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible();
// insertNode.getDevicePath() is null for InsertRowsNode
if (Objects.nonNull(insertNode) && Objects.nonNull(insertNode.getDevicePath())) {
deviceId = insertNode.getDevicePath().getFullPath();
}
deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId();
}

if (Objects.isNull(deviceId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

Expand All @@ -47,12 +46,9 @@ protected void doTransfer(AsyncPipeDataTransferServiceClient client, TPipeTransf

@Override
protected void updateLeaderCache(TSStatus status) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible();
// insertNode.getDevicePath() is null for InsertRowsNode
if (insertNode != null && insertNode.getDevicePath() != null) {
if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) {
connector.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), status.getRedirectNode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,16 @@ private void doTransfer(

try {
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();

// getDeviceId() may return null for InsertRowsNode, will be equal to getClient(null)
clientAndStatus = clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
if (insertNode != null) {
clientAndStatus =
// insertNode.getDevicePath() is null for InsertRowsNode
Objects.nonNull(insertNode.getDevicePath())
? clientManager.getClient(insertNode.getDevicePath().getFullPath())
: clientManager.getClient();
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
compressIfNeeded(
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
} else {
clientAndStatus = clientManager.getClient();
resp =
clientAndStatus
.getLeft()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
Expand Down Expand Up @@ -62,15 +63,19 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent

private List<TabletInsertionDataContainer> dataContainers;

private final PartialPath devicePath;

private ProgressIndex progressIndex;

public PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe) {
this(
walEntryHandler,
devicePath,
progressIndex,
isAligned,
isGeneratedByPipe,
Expand All @@ -83,6 +88,7 @@ public PipeInsertNodeTabletInsertionEvent(

private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe,
Expand All @@ -93,6 +99,8 @@ private PipeInsertNodeTabletInsertionEvent(
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
this.walEntryHandler = walEntryHandler;
// Record device path here so there's no need to get it from InsertNode cache later.
this.devicePath = devicePath;
this.progressIndex = progressIndex;
this.isAligned = isAligned;
this.isGeneratedByPipe = isGeneratedByPipe;
Expand All @@ -114,6 +122,10 @@ public InsertNode getInsertNodeViaCacheIfPossible() {
return walEntryHandler.getInsertNodeViaCacheIfPossible();
}

public String getDeviceId() {
return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null;
}

/////////////////////////// EnrichedEvent ///////////////////////////

@Override
Expand Down Expand Up @@ -170,6 +182,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
devicePath,
progressIndex,
isAligned,
isGeneratedByPipe,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
insertNode.getDevicePath(),
insertNode.getProgressIndex(),
insertNode.isAligned(),
insertNode.isGeneratedByPipe()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class TreeModelPlanner implements IPlanner {
private static final Logger LOGGER = LoggerFactory.getLogger(TreeModelPlanner.class);

private final Statement statement;

Expand Down Expand Up @@ -159,9 +163,16 @@ public ScheduledExecutorService getScheduledExecutorService() {
public void setRedirectInfo(
IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) {
Analysis analysis = (Analysis) iAnalysis;
if (analysis.getStatement() instanceof InsertBaseStatement

// Get the inner statement of PipeEnrichedStatement
Statement statementToRedirect =
analysis.getStatement() instanceof PipeEnrichedStatement
? ((PipeEnrichedStatement) analysis.getStatement()).getInnerStatement()
: analysis.getStatement();

if (statementToRedirect instanceof InsertBaseStatement
&& !analysis.isFinishQueryAfterAnalyze()) {
InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
InsertBaseStatement insertStatement = (InsertBaseStatement) statementToRedirect;
List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement) {
Expand Down

0 comments on commit e49a07d

Please sign in to comment.