Skip to content

Commit

Permalink
adjusting
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi committed Jun 14, 2024
1 parent 610e8ec commit 8f25a1a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,18 @@ void markConfigRegionCommit() {
//////////////////////////// Switch ////////////////////////////

@Override
public void thawRate(final boolean isStartPipe) {
public synchronized void thawRate(final boolean isStartPipe) {
super.thawRate(isStartPipe);
// The stopped pipe's rate should only be thawed by "startPipe" command
if (isStopped) {
return;
}
configRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
}

@Override
public void freezeRate(final boolean isStopPipe) {
public synchronized void freezeRate(final boolean isStopPipe) {
super.freezeRate(isStopPipe);
configRegionCommitMeter.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ void markSchemaRegionCommit() {

// Thread-safe & Idempotent
@Override
public void thawRate(final boolean isStartPipe) {
public synchronized void thawRate(final boolean isStartPipe) {
super.thawRate(isStartPipe);
// The stopped pipe's rate should only be thawed by "startPipe" command
if (isStopped) {
return;
}
dataRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
schemaRegionCommitMeter.compareAndSet(
Expand All @@ -203,7 +207,7 @@ public void thawRate(final boolean isStartPipe) {

// Thread-safe & Idempotent
@Override
public void freezeRate(final boolean isStopPipe) {
public synchronized void freezeRate(final boolean isStopPipe) {
super.freezeRate(isStopPipe);
dataRegionCommitMeter.set(null);
schemaRegionCommitMeter.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public abstract class PipeRemainingOperator {

private long lastEmptyTimeStamp = System.currentTimeMillis();
private long lastNonEmptyTimeStamp = System.currentTimeMillis();
protected volatile boolean isStopped = true;
protected boolean isStopped = true;

//////////////////////////// Tags ////////////////////////////

Expand All @@ -51,37 +51,35 @@ protected void setNameAndCreationTime(final String pipeName, final long creation

//////////////////////////// Switch ////////////////////////////

protected void notifyEmpty() {
protected void notifyNonEmpty() {
final long pipeRemainingTimeCommitAutoSwitchSeconds =
PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();

lastEmptyTimeStamp = System.currentTimeMillis();
if (lastEmptyTimeStamp - lastNonEmptyTimeStamp
lastNonEmptyTimeStamp = System.currentTimeMillis();
if (lastNonEmptyTimeStamp - lastEmptyTimeStamp
>= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
freezeRate(false);
thawRate(false);
}
}

protected void notifyNonEmpty() {
protected void notifyEmpty() {
final long pipeRemainingTimeCommitAutoSwitchSeconds =
PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();

lastNonEmptyTimeStamp = System.currentTimeMillis();
if (lastNonEmptyTimeStamp - lastEmptyTimeStamp
>= pipeRemainingTimeCommitAutoSwitchSeconds * 1000
&& !isStopped) {
// The stopped pipe's rate should only be thawed by "startPipe" command
thawRate(false);
lastEmptyTimeStamp = System.currentTimeMillis();
if (lastEmptyTimeStamp - lastNonEmptyTimeStamp
>= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
freezeRate(false);
}
}

public void thawRate(final boolean isStartPipe) {
public synchronized void thawRate(final boolean isStartPipe) {
if (isStartPipe) {
isStopped = false;
}
}

public void freezeRate(final boolean isStopPipe) {
public synchronized void freezeRate(final boolean isStopPipe) {
if (isStopPipe) {
isStopped = true;
}
Expand Down

0 comments on commit 8f25a1a

Please sign in to comment.