diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index b967f9479859..4ccd55602d91 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -129,7 +129,6 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri PipeTransferConfigPlanReq.toTPipeTransferReq( pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); @@ -202,7 +201,6 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent) snapshotEvent.getFileType(), snapshotEvent.toSealTypeString())); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 923ffccb8a08..11cf71480677 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -118,9 +118,7 @@ protected void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionW compressIfNeeded( PipeTransferPlanNodeReq.toTPipeTransferReq( pipeSchemaRegionWritePlanEvent.getPlanNode())); - rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 7e4319f25174..5b5ea6965555 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -179,7 +179,6 @@ private void doTransfer(Pair endPointAndBatch) { try { final TPipeTransferReq req = compressIfNeeded(batchToTransfer.toTPipeTransferReq()); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); @@ -246,9 +245,7 @@ private void doTransfer( : compressIfNeeded( PipeTransferTabletBinaryReq.toTPipeTransferReq( pipeInsertNodeTabletInsertionEvent.getByteBuffer())); - rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { if (clientAndStatus != null) { @@ -308,7 +305,6 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned())); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); @@ -369,7 +365,6 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) PipeTransferTsFileSealWithModReq.toTPipeTransferReq( modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); @@ -386,7 +381,6 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) compressIfNeeded( PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length())); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java index 669a0c1bc5a6..2fbccc627082 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java @@ -111,9 +111,7 @@ private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent) Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() : 0, snapshotEvent.getDatabaseName(), snapshotEvent.toSealTypeString())); - rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index e55a37bb85ad..cbd059301fd4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -168,7 +168,6 @@ protected void transferFilePieces( ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) : getTransferSingleFilePieceReq(file.getName(), position, payLoad)); rateLimitIfNeeded(clientAndStatus.getLeft().getEndPoint(), req.getBody().length); - resp = PipeTransferFilePieceResp.fromTPipeTransferResp( clientAndStatus.getLeft().pipeTransfer(req));