diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java index fc9f50474e1d..7a5044b0c616 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java @@ -110,14 +110,18 @@ void markConfigRegionCommit() { //////////////////////////// Switch //////////////////////////// @Override - public void thawRate(final boolean isStartPipe) { + public synchronized void thawRate(final boolean isStartPipe) { super.thawRate(isStartPipe); + // The stopped pipe's rate should only be thawed by "startPipe" command + if (isStopped) { + return; + } configRegionCommitMeter.compareAndSet( null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock())); } @Override - public void freezeRate(final boolean isStopPipe) { + public synchronized void freezeRate(final boolean isStopPipe) { super.freezeRate(isStopPipe); configRegionCommitMeter.set(null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index 129d9866a4d1..dcab93d535c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -193,8 +193,12 @@ void markSchemaRegionCommit() { // Thread-safe & Idempotent @Override - public void thawRate(final boolean isStartPipe) { + public synchronized void thawRate(final boolean isStartPipe) { super.thawRate(isStartPipe); + // The stopped pipe's rate should only be thawed by "startPipe" command + if (isStopped) { + return; + } dataRegionCommitMeter.compareAndSet( null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock())); schemaRegionCommitMeter.compareAndSet( @@ -203,7 +207,7 @@ public void thawRate(final boolean isStartPipe) { // Thread-safe & Idempotent @Override - public void freezeRate(final boolean isStopPipe) { + public synchronized void freezeRate(final boolean isStopPipe) { super.freezeRate(isStopPipe); dataRegionCommitMeter.set(null); schemaRegionCommitMeter.set(null); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java index 1d00abe09635..97f80e5d9853 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java @@ -30,7 +30,7 @@ public abstract class PipeRemainingOperator { private long lastEmptyTimeStamp = System.currentTimeMillis(); private long lastNonEmptyTimeStamp = System.currentTimeMillis(); - protected volatile boolean isStopped = true; + protected boolean isStopped = true; //////////////////////////// Tags //////////////////////////// @@ -51,37 +51,35 @@ protected void setNameAndCreationTime(final String pipeName, final long creation //////////////////////////// Switch //////////////////////////// - protected void notifyEmpty() { + protected void notifyNonEmpty() { final long pipeRemainingTimeCommitAutoSwitchSeconds = PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds(); - lastEmptyTimeStamp = System.currentTimeMillis(); - if (lastEmptyTimeStamp - lastNonEmptyTimeStamp + lastNonEmptyTimeStamp = System.currentTimeMillis(); + if (lastNonEmptyTimeStamp - lastEmptyTimeStamp >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) { - freezeRate(false); + thawRate(false); } } - protected void notifyNonEmpty() { + protected void notifyEmpty() { final long pipeRemainingTimeCommitAutoSwitchSeconds = PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds(); - lastNonEmptyTimeStamp = System.currentTimeMillis(); - if (lastNonEmptyTimeStamp - lastEmptyTimeStamp - >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000 - && !isStopped) { - // The stopped pipe's rate should only be thawed by "startPipe" command - thawRate(false); + lastEmptyTimeStamp = System.currentTimeMillis(); + if (lastEmptyTimeStamp - lastNonEmptyTimeStamp + >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) { + freezeRate(false); } } - public void thawRate(final boolean isStartPipe) { + public synchronized void thawRate(final boolean isStartPipe) { if (isStartPipe) { isStopped = false; } } - public void freezeRate(final boolean isStopPipe) { + public synchronized void freezeRate(final boolean isStopPipe) { if (isStopPipe) { isStopped = true; }