Skip to content

Commit

Permalink
Merge branch 'master' into swada--runtime-task-expose-version
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahwada-stripe authored Jul 10, 2024
2 parents 9b6d574 + c1c7f34 commit 39b131f
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 56 deletions.
40 changes: 25 additions & 15 deletions docs/docs/operate/autoscalingstrategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ Note:

Rule based strategy can be defined for the following resources:

| Resource | Metric |
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. |
| `FailoverAware` | Metric is defined by an implementation of `FailoverStatusClient`. This is used to control autoscaling during failover events. (See #failover-aware) |
| Resource | Metric |
|----------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`.|
| `AutoscalerManagerEvent` | Custom event defined by an implementation of `JobAutoScalerManager`. This allows event-based runtime control over stage workers |

Each strategy has the following parameters:

Expand All @@ -53,21 +53,31 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for
best to use the data drop strategy in conjunction with another strategy that provides the
scale-down trigger.

### Failover-Aware Strategy
### AutoscalerManagerEvent Strategy
This is a custom strategy to set a target worker size at runtime.
The strategy uses `getCurrentValue` from `JobAutoScalerManager` to determine the target worker size.
For a non-negative value `[0.0, 100.0]`, the autoscaler will scale the stage from min to max.
All other values are ignored. Default implementation returns a -1.0 for `currentValue` meaning a no-op for event based scaling.

Example #1: Use-case during region failover in a multi-region setup
During a failover event in a multi-region setup, all incoming traffic is moved away from the failed over
region—`evacuee`—to other region(s)—`savior(s)` to mitigate the issue faster.

For autoscaling mantis jobs, it's sometimes necessary to be failover aware because:
For some autoscaling mantis jobs, it's necessary to be failover aware to:

1. Scale down in evacuee regions — `allowScaleDownDuringEvacuated`.
1. Prevent scale down in evacuee regions — Use `allowAutoScaleManager`.
After an extended period of evacuated state for the region, the jobs in that region would have been
scaled down to lowest values because of no incoming traffic and low resource utilization.
When the traffic is restored in that region, the job will see a big surge in incoming requests overwhelming
the job in that region.
2. Scale up in savior regions — Use `FailoverAware` in strategies.
Mitigation: Use `allowAutoScaleManager` to enable `JobAutoScalerManager` for stage. Provide a custom implementation of
`JobAutoScalerManager` to return `false` for `isScaleDownEnabled` in evacuee region.
2. Scale up in savior regions — Use `AutoscalerManagerEvent` in strategies.
When the traffic is moved to savior regions, the jobs in those regions should be scaled up to handle the
additional load. Currently, failover aware strategy will scale up the configured stages to max #workers in
configured in `StageScalingPolicy`. It also doesn't work with PID Control Based Strategy(ies).
Usage: Provide a custom JobAutoScalerManager implementation to return `1.0` in the savior region to scale stage
numWorkers to max.

## PID Control Based Strategies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand All @@ -78,7 +76,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -416,7 +413,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException {
logger.info("param {} is null or empty", JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM);
}

JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(config);
JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(serviceLocator);
JobMasterService jobMasterService = new JobMasterService(rw.getJobId(), rw.getSchedulingInfo(),
workerMetricsClient, autoScaleMetricsConfig, mantisMasterApi, rw.getContext(), rw.getOnCompleteCallback(), rw.getOnErrorCallback(), rw.getOnTerminateCallback(), jobAutoscalerManager);
jobMasterService.start();
Expand Down Expand Up @@ -482,16 +479,9 @@ remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.ge
}
}

private JobAutoscalerManager getJobAutoscalerManagerInstance(WorkerConfiguration config) {
try {
Class<?> jobAutoscalerManagerClass = Class.forName(config.getJobAutoscalerManagerClassName());
logger.info("Picking {} jobAutoscalerManager", jobAutoscalerManagerClass.getName());
Method managerClassFactory = jobAutoscalerManagerClass.getMethod("valueOf", Properties.class);
return (JobAutoscalerManager) managerClassFactory.invoke(null, System.getProperties());
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
logger.warn("Couldnt instantiate jobAutoscalerManager from class {} because ", config.getJobAutoscalerManagerClassName(), e);
return JobAutoscalerManager.DEFAULT;
}
private JobAutoscalerManager getJobAutoscalerManagerInstance(ServiceLocator serviceLocator) {
final JobAutoscalerManager autoscalerManager = serviceLocator.service(JobAutoscalerManager.class);
return Optional.ofNullable(autoscalerManager).orElse(JobAutoscalerManager.DEFAULT);
}

private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,24 +326,35 @@ private void setOutstandingScalingRequest(final Subscription subscription) {
inProgressScalingSubscription.compareAndSet(null, subscription);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) {
private int getDesiredWorkers(StageScalingPolicy scalingPolicy, Event event) {
final int maxWorkersForStage = scalingPolicy.getMax();
final int minWorkersForStage = scalingPolicy.getMin();
return minWorkersForStage + (int) Math.round((maxWorkersForStage - minWorkersForStage) * event.getEffectiveValue() / 100.0);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) {
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment);
return numCurrentWorkers;
}

if (numCurrentWorkers < 0 || increment < 1) {
logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment);
return numCurrentWorkers;
} else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax();
final int maxWorkersForStage = scalingPolicy.getMax();
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
return desiredWorkers;
}
return desiredWorkers;

}

public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) {
Expand All @@ -364,7 +375,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers,
setOutstandingScalingRequest(subscription);
}

public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) {
public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
Expand All @@ -377,9 +388,12 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) {
logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
}
return desiredWorkers;
}
Expand Down Expand Up @@ -457,16 +471,15 @@ public void onNext(Event event) {
}
stats.add(effectiveValue);
if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) {
logger.info(jobId + ", stage " + stage + ": eff=" +
String.format(PercentNumberFormat, effectiveValue) + ", thresh=" + strategy.getScaleUpAbovePct());
logger.info("{}, stage {}, eventType {}: eff={}, thresh={}", jobId, stage, event.getType(),
String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct());
if (stats.getHighThreshTriggered()) {
logger.info("Attempting to scale up stage " + stage + " of job " + jobId + " by " +
scalingPolicy.getIncrement() + " workers, because " +
event.type + " exceeded scaleUpThreshold of " +
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()) + " " +
stats.getCurrentHighCount() + " times");
logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
stage, jobId, scalingPolicy.getIncrement(), event.getType(),
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()),
stats.getCurrentHighCount());
final int numCurrWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event);
if (desiredWorkers > numCurrWorkers) {
scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand All @@ -477,12 +490,11 @@ public void onNext(Event event) {
logger.debug("scale up NOOP: desiredWorkers same as current workers");
}
} else if (stats.getLowThreshTriggered()) {
logger.info("Attempting to scale down stage " + stage + " of job " + jobId + " by " +
scalingPolicy.getDecrement() + " workers because " + event.getType() +
" is below scaleDownThreshold of " + strategy.getScaleDownBelowPct() +
" " + stats.getCurrentLowCount() + " times");
logger.info("Attempting to scale down stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
stage, jobId, scalingPolicy.getDecrement(), event.getType(),
strategy.getScaleDownBelowPct(), stats.getCurrentLowCount());
final int numCurrentWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event);
if (desiredWorkers < numCurrentWorkers) {
scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ default boolean isScaleDownEnabled() {
return true;
}

/**
* Get the current fractional value to set size for stage numWorkers.
* Valid values are [0.0, 100.0] which set numWorkers from [min, max].
* All other values are ignored for scaling decisions.
*/
default double getCurrentValue() {
return -1.0;
}

/**
* Noop implementation of {@link JobAutoscalerManager} that always returns true
* for isScaleUpEnabled, isScaleDownEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public void call() {
// get the aggregate metric values by metric group for all workers in stage
Map<String, GaugeData> allWorkerAggregates = getAggregates(listofAggregates);
logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString());
maybeEmitAutoscalerManagerEvent(numWorkers);

for (Map.Entry<String, Set<String>> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
final String metricGrp = userDefinedMetric.getKey();
Expand Down Expand Up @@ -416,6 +417,21 @@ public void onNext(MetricData metricData) {
};
}

private void maybeEmitAutoscalerManagerEvent(int numWorkers) {
final double currentValue = jobAutoscalerManager.getCurrentValue();
// The effective value is a pct value and hence ranges from [0, 100].
// Ignore all other values to disable autoscaling for custom events.
if (currentValue >= 0.0 && currentValue <= 100.0) {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage,
currentValue,
currentValue,
numWorkers)
);
}
}

private void addScalerEventForSourceJobDrops(int numWorkers) {
double sourceJobDrops = 0;
boolean hasSourceJobDropsMetric = false;
Expand Down
Loading

0 comments on commit 39b131f

Please sign in to comment.