Skip to content

Commit

Permalink
support scaling up to maxWorkers for stage on failover event
Browse files Browse the repository at this point in the history
  • Loading branch information
hmitnflx committed Jun 8, 2024
1 parent 3b5cde5 commit 58dc446
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ void start() {
.distinctUntilChanged()
.throttleLast(5, TimeUnit.SECONDS);

//TODO(hmitnflx): Add a failoveraware transformer here that consumes all other events as well
// to make scale up decisions
// Create the scaler.
if (useClutchRps) {
logger.info("Using clutch rps scaler, job: {}, stage: {} ", jobId, stage);
ClutchRpsPIDConfig rpsConfig = Option.of(config).flatMap(ClutchConfiguration::getRpsConfig).getOrNull();
Expand Down Expand Up @@ -332,7 +329,7 @@ private void setOutstandingScalingRequest(final Subscription subscription) {
inProgressScalingSubscription.compareAndSet(null, subscription);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) {
public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, ScalingReason reason) {
final int desiredWorkers;
if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) {
logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment);
Expand All @@ -343,7 +340,12 @@ public int getDesiredWorkersForScaleUp(final int increment, final int numCurrent
return numCurrentWorkers;
} else {
final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax();
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
if (reason == ScalingReason.FailoverAware) {
logger.info("FailoverAware scaling up stage {} of job {} to maxWorkersForStage {}", stage, jobId, maxWorkersForStage);
desiredWorkers = maxWorkersForStage;
} else {
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
}
return desiredWorkers;
}
}
Expand All @@ -361,7 +363,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers,
setOutstandingScalingRequest(subscription);
}

public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) {
public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, ScalingReason reason) {
final int desiredWorkers;
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
Expand Down Expand Up @@ -463,7 +465,7 @@ public void onNext(Event event) {
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()) + " " +
stats.getCurrentHighCount() + " times");
final int numCurrWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, strategy.getReason());
if (desiredWorkers > numCurrWorkers) {
scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand All @@ -479,7 +481,7 @@ public void onNext(Event event) {
" is below scaleDownThreshold of " + strategy.getScaleDownBelowPct() +
" " + stats.getCurrentLowCount() + " times");
final int numCurrentWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, strategy.getReason());
if (desiredWorkers < numCurrentWorkers) {
scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ public void call() {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.FailoverAware, stage,
100.0,
100.0,
getFailoverValue(failoverStatusClient.getFailoverStatus()),
getFailoverValue(failoverStatusClient.getFailoverStatus()),
numWorkers)
);
}
Expand Down Expand Up @@ -427,6 +427,17 @@ public void onNext(MetricData metricData) {
};
}

private double getFailoverValue(FailoverStatusClient.FailoverStatus failoverStatus) {
switch (failoverStatus) {
case REGION_SAVIOR:
return 100.0;
case REGION_EVACUEE:
return -100.0;
default:
return 0.0;
}
}

private void addScalerEventForSourceJobDrops(int numWorkers) {
double sourceJobDrops = 0;
boolean hasSourceJobDropsMetric = false;
Expand Down

0 comments on commit 58dc446

Please sign in to comment.