Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit failover events into the job autoscaler events to allow autoscaling decisions #676

Merged
merged 10 commits into from
Jul 10, 2024
40 changes: 25 additions & 15 deletions docs/docs/operate/autoscalingstrategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ Note:

Rule based strategy can be defined for the following resources:

| Resource | Metric |
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. |
| `FailoverAware` | Metric is defined by an implementation of `FailoverStatusClient`. This is used to control autoscaling during failover events. (See #failover-aware) |
| Resource | Metric |
|----------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`.|
| `AutoscalerManagerEvent` | Custom event defined by an implementation of `JobAutoScalerManager`. This allows event-based runtime control over stage workers |

Each strategy has the following parameters:

Expand All @@ -53,21 +53,31 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for
best to use the data drop strategy in conjunction with another strategy that provides the
scale-down trigger.

### Failover-Aware Strategy
### AutoscalerManagerEvent Strategy
This is a custom strategy to set a target worker size at runtime.
The strategy uses `getCurrentValue` from `JobAutoScalerManager` to determine the target worker size.
For a non-negative value `[0.0, 100.0]`, the autoscaler will scale the stage from min to max.
All other values are ignored. Default implementation returns a -1.0 for `currentValue` meaning a no-op for event based scaling.

Example #1: Use-case during region failover in a multi-region setup
During a failover event in a multi-region setup, all incoming traffic is moved away from the failed over
region—`evacuee`—to other region(s)—`savior(s)` to mitigate the issue faster.

For autoscaling mantis jobs, it's sometimes necessary to be failover aware because:
For some autoscaling mantis jobs, it's necessary to be failover aware to:

1. Scale down in evacuee regions — `allowScaleDownDuringEvacuated`.
1. Prevent scale down in evacuee regions — Use `allowAutoScaleManager`.
After an extended period of evacuated state for the region, the jobs in that region would have been
scaled down to lowest values because of no incoming traffic and low resource utilization.
When the traffic is restored in that region, the job will see a big surge in incoming requests overwhelming
the job in that region.
2. Scale up in savior regions — Use `FailoverAware` in strategies.
Mitigation: Use `allowAutoScaleManager` to enable `JobAutoScalerManager` for stage. Provide a custom implementation of
`JobAutoScalerManager` to return `false` for `isScaleDownEnabled` in evacuee region.
2. Scale up in savior regions — Use `AutoscalerManagerEvent` in strategies.
When the traffic is moved to savior regions, the jobs in those regions should be scaled up to handle the
additional load. Currently, failover aware strategy will scale up the configured stages to max #workers in
configured in `StageScalingPolicy`. It also doesn't work with PID Control Based Strategy(ies).
Usage: Provide a custom JobAutoScalerManager implementation to return `1.0` in the savior region to scale stage
numWorkers to max.

## PID Control Based Strategies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,24 +326,35 @@ private void setOutstandingScalingRequest(final Subscription subscription) {
inProgressScalingSubscription.compareAndSet(null, subscription);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) {
private int getDesiredWorkers(StageScalingPolicy scalingPolicy, Event event) {
final int maxWorkersForStage = scalingPolicy.getMax();
final int minWorkersForStage = scalingPolicy.getMin();
return minWorkersForStage + (int) Math.round((maxWorkersForStage - minWorkersForStage) * event.getEffectiveValue() / 100.0);
}

public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) {
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment);
return numCurrentWorkers;
}

if (numCurrentWorkers < 0 || increment < 1) {
logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment);
return numCurrentWorkers;
} else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) {
logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax();
final int maxWorkersForStage = scalingPolicy.getMax();
desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage);
return desiredWorkers;
}
return desiredWorkers;

}

public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) {
Expand All @@ -364,7 +375,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers,
setOutstandingScalingRequest(subscription);
}

public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) {
public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, Event event) {
final int desiredWorkers;
final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy();
if (!scalingPolicy.isEnabled()) {
Expand All @@ -377,9 +388,12 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre
} else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) {
logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId);
return numCurrentWorkers;
} else if (event.getType() == ScalingReason.AutoscalerManagerEvent) {
desiredWorkers = getDesiredWorkers(scalingPolicy, event);
logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers);
} else {
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
int min = scalingPolicy.getMin();
desiredWorkers = Math.max(numCurrentWorkers - decrement, min);
}
return desiredWorkers;
}
Expand Down Expand Up @@ -457,16 +471,15 @@ public void onNext(Event event) {
}
stats.add(effectiveValue);
if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) {
logger.info(jobId + ", stage " + stage + ": eff=" +
String.format(PercentNumberFormat, effectiveValue) + ", thresh=" + strategy.getScaleUpAbovePct());
logger.info("{}, stage {}, eventType {}: eff={}, thresh={}", jobId, stage, event.getType(),
String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct());
if (stats.getHighThreshTriggered()) {
logger.info("Attempting to scale up stage " + stage + " of job " + jobId + " by " +
scalingPolicy.getIncrement() + " workers, because " +
event.type + " exceeded scaleUpThreshold of " +
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()) + " " +
stats.getCurrentHighCount() + " times");
logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
stage, jobId, scalingPolicy.getIncrement(), event.getType(),
String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()),
stats.getCurrentHighCount());
final int numCurrWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event);
if (desiredWorkers > numCurrWorkers) {
scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand All @@ -477,12 +490,11 @@ public void onNext(Event event) {
logger.debug("scale up NOOP: desiredWorkers same as current workers");
}
} else if (stats.getLowThreshTriggered()) {
logger.info("Attempting to scale down stage " + stage + " of job " + jobId + " by " +
scalingPolicy.getDecrement() + " workers because " + event.getType() +
" is below scaleDownThreshold of " + strategy.getScaleDownBelowPct() +
" " + stats.getCurrentLowCount() + " times");
logger.info("Attempting to scale down stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times",
stage, jobId, scalingPolicy.getDecrement(), event.getType(),
strategy.getScaleDownBelowPct(), stats.getCurrentLowCount());
final int numCurrentWorkers = event.getNumWorkers();
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers);
final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event);
if (desiredWorkers < numCurrentWorkers) {
scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " +
String.format(PercentNumberFormat, effectiveValue) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ default boolean isScaleDownEnabled() {
return true;
}

/**
* Get the current fractional value to set size for stage numWorkers.
* Valid values are [0.0, 100.0] which set numWorkers from [min, max].
* All other values are ignored for scaling decisions.
*/
default double getCurrentValue() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a document here about why it's -1?

return -1.0;
}

/**
* Noop implementation of {@link JobAutoscalerManager} that always returns true
* for isScaleUpEnabled, isScaleDownEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public void call() {
// get the aggregate metric values by metric group for all workers in stage
Map<String, GaugeData> allWorkerAggregates = getAggregates(listofAggregates);
logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString());
maybeEmitAutoscalerManagerEvent(numWorkers);

for (Map.Entry<String, Set<String>> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
final String metricGrp = userDefinedMetric.getKey();
Expand Down Expand Up @@ -416,6 +417,19 @@ public void onNext(MetricData metricData) {
};
}

private void maybeEmitAutoscalerManagerEvent(int numWorkers) {
final double currentValue = jobAutoscalerManager.getCurrentValue();
if (currentValue >= 0.0 && currentValue <= 100.0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a inline doc about this logic? why it's 0 to 100?

jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage,
currentValue,
currentValue,
numWorkers)
);
}
}

private void addScalerEventForSourceJobDrops(int numWorkers) {
double sourceJobDrops = 0;
boolean hasSourceJobDropsMetric = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.mantisrx.server.core.stats.MetricStringConstants;
import io.mantisrx.server.master.client.MantisMasterClientApi;
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void testDropDataMetricTriggersAutoScale() throws InterruptedException {

final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig();

final List<JobAutoScaler.Event> events = new ArrayList<>();
final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer<JobAutoScaler.Event>() {
@Override
public void onCompleted() {
Expand All @@ -96,8 +98,11 @@ public void onError(Throwable e) {
@Override
public void onNext(JobAutoScaler.Event event) {
logger.info("got auto scale event {}", event);
JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent, dropPercent, 1);
assertEquals(expected, event);
long count = latch.getCount();
if (count == 1) {
JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent, dropPercent, 1);
assertEquals(expected, event);
}
latch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
Expand All @@ -108,6 +113,7 @@ public void onNext(JobAutoScaler.Event event) {
metricDataObserver.onNext(new MetricData(new String(jobId), stage, workerIdx, workerNum, DATA_DROP_METRIC_GROUP, gauges));

assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS));

}

@Test
Expand Down Expand Up @@ -166,13 +172,12 @@ public void onNext(JobAutoScaler.Event event) {
if (count == 2) {
JobAutoScaler.Event expected1 = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, stage, metricValue * 3 / 4, metricValue * 3 / 4, numWorkers);
assertEquals(expected1, event);
latch.countDown();
}
if (count == 1) {
JobAutoScaler.Event expected2 = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, stage, kafkaLag, kafkaLag, numWorkers);
assertEquals(expected2, event);
latch.countDown();
}
latch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);

Expand Down Expand Up @@ -262,8 +267,11 @@ public void onError(Throwable e) {
@Override
public void onNext(JobAutoScaler.Event event) {
logger.info("got auto scale event {}", event);
JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent / numWorkers, dropPercent / numWorkers, numWorkers);
assertEquals(expected, event);
long count = autoScaleLatch.getCount();
if (count == 1) {
JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent / numWorkers, dropPercent / numWorkers, numWorkers);
assertEquals(expected, event);
}
autoScaleLatch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
Expand Down
Loading