Skip to content

Commit

Permalink
Update autoscaler strategies to utilize JobAutoScalerManager (#673)
Browse files Browse the repository at this point in the history
* Update docs
* review comments
  • Loading branch information
hmitnflx authored Jul 4, 2024
1 parent 2b05cdf commit 2c5bfd4
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 226 deletions.
85 changes: 59 additions & 26 deletions docs/docs/operate/autoscalingstrategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,42 @@ many workers to scale up/down a stage. They fall into 2 main categories. Rule ba
and scale up/down when a certain threshold is reached. PID control based strategies pick a resource utilization level
and scale up/down dynamically to maintain that level.

## Rule Based Strategy
A stage's autoscaling policy is composed of following configurations -

| Name | Type | Description |
|-------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------|
| `stage` | `int` | Index of the stage (`1`-based since `0` is reserved for job master). |
| `min` | `int` | Minimum number of workers in the stage. |
| `max` | `int` | Maximum number of workers in the stage. |
| `increment`[1] | `int` | Number of workers to add when scaling up. |
| `decrement`[1] | `int` | Number of workers to remove when scaling down. |
| `coolDownSecs` | `int` | Number of seconds to wait after a scaling operation has been completed before beginning another scaling operation. |
| `strategies` | `Map<Reason, Strategy>` | List of strategies to use for autoscaling and their configurations. |
| `allowAutoScaleManager` | `boolean` | Toggles an autoscaler manager that (dis)allow scale up and scale downs when certain criteria is met. Default is `false`. |

Note:
[1] increment/decrement are not used for Clutch strategies. Clutch strategies use PID controller to determine the number of workers to scale up/down.

## Rule Based Strategies

Rule based strategy can be defined for the following resources:

| Resource | Metric |
| --------------- | ------- |
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. |
| Resource | Metric |
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` |
| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` |
| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` |
| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` |
| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` |
| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` |
| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` |
| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. |
| `FailoverAware` | Metric is defined by an implementation of `FailoverStatusClient`. This is used to control autoscaling during failover events. (See #failover-aware) |

Each strategy has the following parameters:

| Name | Description |
| --------------- | ------------ |
| Name | Description |
|-------------------------------| ------------ |
| `Scale down below percentage` | When the aggregated value for all workers falls below this value, the stage will scale down. It will scale down by the decrement value specified in the policy. For data drop, this is calculated as the number of data items dropped divided by the total number of data items, dropped+processed. For CPU, Memory, etc., it is calculated as a percentage of allocated resource when you defined the worker. |
| `Scale up above percentage` | When the aggregated value for all workers rises above this value, the stage will scale up. |
| `Rolling count` | This value helps to keep jitter out of the autoscaling process. Instead of scaling immediately the first time values fall outside of the scale-down and scale-up percentage thresholds you define, Mantis will wait until the thresholds are exceeded a certain number of times within a certain window. For example, a rolling count of “6 of 10” means that only if in ten consecutive observations six or more of the observations fall below the scale-down threshold will the stage be scaled down. |
Expand All @@ -36,7 +53,23 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for
best to use the data drop strategy in conjunction with another strategy that provides the
scale-down trigger.

## PID Control Based Strategy
### Failover-Aware Strategy
During a failover event in a multi-region setup, all incoming traffic is moved away from the failed over
region—`evacuee`—to other region(s)—`savior(s)` to mitigate the issue faster.

For autoscaling mantis jobs, it's sometimes necessary to be failover aware because:

1. Scale down in evacuee regions — `allowScaleDownDuringEvacuated`.
After an extended period of evacuated state for the region, the jobs in that region would have been
scaled down to lowest values because of no incoming traffic and low resource utilization.
When the traffic is restored in that region, the job will see a big surge in incoming requests overwhelming
the job in that region.
2. Scale up in savior regions — Use `FailoverAware` in strategies.
When the traffic is moved to savior regions, the jobs in those regions should be scaled up to handle the
additional load. Currently, failover aware strategy will scale up the configured stages to max #workers in
configured in `StageScalingPolicy`. It also doesn't work with PID Control Based Strategy(ies).

## PID Control Based Strategies

PID control system uses a continuous feedback loop to maintain a signal at a target level (set point). Mantis offers variations of
this strategy that operates on different signals. Additionally, they try to learn the appropriate target over time
Expand Down Expand Up @@ -84,18 +117,18 @@ JSON in the job parameter `mantis.jobmaster.clutch.config`. Example:
}
```

| Field | Description |
| --------------- | ------------ |
| `minSize` | Minimum number of workers in the stage. It will not scale down below this number. |
| `maxSize` | Maximum number of workers in the stage. It will not scale up above this number. |
| `cooldownSeconds` | This indicates how many seconds to wait after a scaling operation has been completed before beginning another scaling operation. |
| `maxAdjustment` | Optional. The maximum number of workers to scale up/down in a single operation. |
| `rps` | Expected RPS per worker. Must be > 0. |
| `cpu`, `memory`, `network` | Configure PID controller for each resource. |
| `setPoint` | Target set point for the resource. This is expressed as a percentage of allocated resource to the worker. For example, `60.0` on `network` means network bytes should be 60% of the network limit on machine definition. |
| `rope` | Lower and upper buffer around the set point. Metric values within this buffer are assumed to be at set point, and thus contributes an error of 0 to the PID controller. |
| `kp` | Multiplier for the proportional term of the PID controller. This will affect the size of scaling actions. |
| `kd` | Multiplier for the derivative term of the PID controller. This will affect the size of scaling actions. |
| Field | Description |
|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `minSize` | Minimum number of workers in the stage. It will not scale down below this number. |
| `maxSize` | Maximum number of workers in the stage. It will not scale up above this number. |
| `cooldownSeconds` | This indicates how many seconds to wait after a scaling operation has been completed before beginning another scaling operation. |
| `maxAdjustment` | Optional. The maximum number of workers to scale up/down in a single operation. |
| `rps` | Expected RPS per worker. Must be > 0. |
| `cpu`, `memory`, `network` | Configure PID controller for each resource. |
| `setPoint` | Target set point for the resource. This is expressed as a percentage of allocated resource to the worker. For example, `60.0` on `network` means network bytes should be 60% of the network limit on machine definition. |
| `rope` | Lower and upper buffer around the set point. Metric values within this buffer are assumed to be at set point, and thus contributes an error of 0 to the PID controller. |
| `kp` | Multiplier for the proportional term of the PID controller. This will affect the size of scaling actions. |
| `kd` | Multiplier for the derivative term of the PID controller. This will affect the size of scaling actions. |

### Clutch RPS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Builder multiWorkerScalableStageWithConstraints(int numberOfWorkers, Mach
List<JobConstraints> hardConstraints, List<JobConstraints> softConstraints,
StageScalingPolicy scalingPolicy) {
StageScalingPolicy ssp = new StageScalingPolicy(currentStage, scalingPolicy.getMin(), scalingPolicy.getMax(),
scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies());
scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies(), scalingPolicy.isAllowAutoScaleManager());
return this.addStage(
StageSchedulingInfo.builder()
.numberOfInstances(numberOfWorkers)
Expand All @@ -148,7 +148,7 @@ public Builder multiWorkerScalableStageWithConstraints(int numberOfWorkers, Mach
List<JobConstraints> hardConstraints, List<JobConstraints> softConstraints,
StageScalingPolicy scalingPolicy, Map<String, String> containerAttributes) {
StageScalingPolicy ssp = new StageScalingPolicy(currentStage, scalingPolicy.getMin(), scalingPolicy.getMax(),
scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies());
scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies(), scalingPolicy.isAllowAutoScaleManager());
return this.addStage(
StageSchedulingInfo.builder()
.numberOfInstances(numberOfWorkers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import lombok.ToString;


@Getter
@ToString
@EqualsAndHashCode
public class StageScalingPolicy implements Serializable {

private static final long serialVersionUID = 1L;
Expand All @@ -39,13 +42,19 @@ public class StageScalingPolicy implements Serializable {
private final int decrement;
private final long coolDownSecs;
private final Map<ScalingReason, Strategy> strategies;
/**
* Controls whether AutoScaleManager is enabled or disabled
*/
private final boolean allowAutoScaleManager;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
public StageScalingPolicy(@JsonProperty("stage") int stage,
@JsonProperty("min") int min, @JsonProperty("max") int max,
@JsonProperty("increment") int increment, @JsonProperty("decrement") int decrement,
@JsonProperty("coolDownSecs") long coolDownSecs,
@JsonProperty("strategies") Map<ScalingReason, Strategy> strategies) {
@JsonProperty("strategies") Map<ScalingReason, Strategy> strategies,
@JsonProperty(value = "allowAutoScaleManager", defaultValue = "false") Boolean allowAutoScaleManager) {
this.stage = stage;
this.min = min;
this.max = Math.max(max, min);
Expand All @@ -54,100 +63,14 @@ public StageScalingPolicy(@JsonProperty("stage") int stage,
this.decrement = Math.max(decrement, 1);
this.coolDownSecs = coolDownSecs;
this.strategies = strategies == null ? new HashMap<ScalingReason, Strategy>() : new HashMap<>(strategies);
}

public int getStage() {
return stage;
}

public int getMin() {
return min;
}

public int getMax() {
return max;
}

public boolean isEnabled() {
return enabled;
}

public int getIncrement() {
return increment;
}

public int getDecrement() {
return decrement;
}

public long getCoolDownSecs() {
return coolDownSecs;
// `defaultValue` is for documentation purpose only, use `Boolean` to determine if the field is missing on `null`
this.allowAutoScaleManager = allowAutoScaleManager == Boolean.TRUE;
}

public Map<ScalingReason, Strategy> getStrategies() {
return Collections.unmodifiableMap(strategies);
}

@Override
public String toString() {
return "StageScalingPolicy{" +
"stage=" + stage +
", min=" + min +
", max=" + max +
", enabled=" + enabled +
", increment=" + increment +
", decrement=" + decrement +
", coolDownSecs=" + coolDownSecs +
", strategies=" + strategies +
'}';
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (coolDownSecs ^ (coolDownSecs >>> 32));
result = prime * result + decrement;
result = prime * result + (enabled ? 1231 : 1237);
result = prime * result + increment;
result = prime * result + max;
result = prime * result + min;
result = prime * result + stage;
result = prime * result + ((strategies == null) ? 0 : strategies.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StageScalingPolicy other = (StageScalingPolicy) obj;
if (coolDownSecs != other.coolDownSecs)
return false;
if (decrement != other.decrement)
return false;
if (enabled != other.enabled)
return false;
if (increment != other.increment)
return false;
if (max != other.max)
return false;
if (min != other.min)
return false;
if (stage != other.stage)
return false;
if (strategies == null) {
if (other.strategies != null)
return false;
} else if (!strategies.equals(other.strategies))
return false;
return true;
}

public enum ScalingReason {
CPU,
@Deprecated
Expand All @@ -162,7 +85,8 @@ public enum ScalingReason {
ClutchRps,
RPS,
JVMMemory,
SourceJobDrop
SourceJobDrop,
AutoscalerManagerEvent
}

@Getter
Expand Down
Loading

0 comments on commit 2c5bfd4

Please sign in to comment.