Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hmitnflx committed Jul 9, 2024
1 parent da7b93e commit f099c47
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/docs/operate/autoscalingstrategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for
### AutoscalerManagerEvent Strategy
This is a custom strategy to set a target worker size at runtime.
The strategy uses `getCurrentValue` from `JobAutoScalerManager` to determine the target worker size.
For a non-negative value `[0.0, 1.0]`, the autoscaler will scale the stage from min to max.
For a non-negative value `[0.0, 100.0]`, the autoscaler will scale the stage from min to max.
All other values are ignored. Default implementation returns a -1.0 for `currentValue` meaning a no-op for event based scaling.

Example #1: Use-case during region failover in a multi-region setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,29 +326,35 @@ private void setOutstandingScalingRequest(final Subscription subscription) {
inProgressScalingSubscription.compareAndSet(null, subscription);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, ScalingReason reason) {
private int getDesiredWorkers(StageScalingPolicy scalingPolicy, Event event) {
final int maxWorkersForStage = scalingPolicy.getMax();
final int minWorkersForStage = scalingPolicy.getMin();
return minWorkersForStage + (int) Math.round((maxWorkersForStage - minWorkersForStage) * event.getEffectiveValue() / 100.0);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) {
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment);
return numCurrentWorkers;
}

if (numCurrentWorkers < 0 || increment < 1) {
logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment);
return numCurrentWorkers;
} else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax();
if (reason == ScalingReason.AutoscalerManagerEvent) {
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to maxWorkersForStage {}", stage, jobId, maxWorkersForStage);
desiredWorkers = maxWorkersForStage;
} else {
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
}
return desiredWorkers;
final int maxWorkersForStage = scalingPolicy.getMax();
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
}
return desiredWorkers;

}

public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) {
Expand All @@ -369,7 +375,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers,
setOutstandingScalingRequest(subscription);
}

public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, ScalingReason reason) {
public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
Expand All @@ -382,9 +388,12 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) {
logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
}
return desiredWorkers;
}
Expand Down Expand Up @@ -470,7 +479,7 @@ public void onNext(Event event) {
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()),
stats.getCurrentHighCount());
final int numCurrWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, strategy.getReason());
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event);
if (desiredWorkers > numCurrWorkers) {
scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand All @@ -485,7 +494,7 @@ public void onNext(Event event) {
stage, jobId, scalingPolicy.getDecrement(), event.getType(),
strategy.getScaleDownBelowPct(), stats.getCurrentLowCount());
final int numCurrentWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, strategy.getReason());
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event);
if (desiredWorkers < numCurrentWorkers) {
scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ default boolean isScaleDownEnabled() {

/**
* Get the current fractional value to set size for stage numWorkers.
* Valid values are [0.0, 1.0] which set numWorkers from [min, max].
* Valid values are [0.0, 100.0] which set numWorkers from [min, max].
* All other values are ignored for scaling decisions.
*/
default double getCurrentValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,7 @@ public void call() {
// get the aggregate metric values by metric group for all workers in stage
Map<String, GaugeData> allWorkerAggregates = getAggregates(listofAggregates);
logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString());
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage,
jobAutoscalerManager.getCurrentValue(),
jobAutoscalerManager.getCurrentValue(),
numWorkers)
);
maybeEmitAutoscalerManagerEvent(numWorkers);

for (Map.Entry<String, Set<String>> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
final String metricGrp = userDefinedMetric.getKey();
Expand Down Expand Up @@ -423,6 +417,19 @@ public void onNext(MetricData metricData) {
};
}

private void maybeEmitAutoscalerManagerEvent(int numWorkers) {
final double currentValue = jobAutoscalerManager.getCurrentValue();
if (currentValue >= 0.0 && currentValue <= 100.0) {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage,
currentValue,
currentValue,
numWorkers)
);
}
}

private void addScalerEventForSourceJobDrops(int numWorkers) {
double sourceJobDrops = 0;
boolean hasSourceJobDropsMetric = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.mantisrx.server.core.stats.MetricStringConstants;
import io.mantisrx.server.master.client.MantisMasterClientApi;
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down

0 comments on commit f099c47

Please sign in to comment.