From 2c5bfd4edd8c9a075e2f3d01023f916a37bc34b3 Mon Sep 17 00:00:00 2001 From: hmitnflx <100323213+hmitnflx@users.noreply.github.com> Date: Thu, 4 Jul 2024 11:11:49 -0700 Subject: [PATCH] Update autoscaler strategies to utilize JobAutoScalerManager (#673) * Update docs * review comments --- docs/docs/operate/autoscalingstrategies.md | 85 +++++++++----- .../runtime/descriptor/SchedulingInfo.java | 4 +- .../descriptor/StageScalingPolicy.java | 104 +++--------------- .../descriptor/SchedulingInfoTest.java | 12 +- .../descriptor/StageScalingPolicyTest.java | 22 +++- .../master/events/WorkerRegistryV2Test.java | 6 +- .../master/jobcluster/JobClusterAkkaTest.java | 2 +- .../jobcluster/job/JobScaleUpDownTests.java | 18 +-- .../jobcluster/job/JobTestLifecycle.java | 4 +- .../master/domain/DataFormatAdapterTest.java | 2 +- .../jobmaster/AutoScaleMetricsConfig.java | 3 +- .../worker/jobmaster/JobAutoScaler.java | 56 ++++++---- .../worker/jobmaster/JobMasterService.java | 22 ++-- .../worker/jobmaster/WorkerMetricHandler.java | 73 ++++++------ .../worker/jobmaster/JobAutoScalerTest.java | 76 ++++++++++++- .../jobmaster/WorkerMetricHandlerTest.java | 8 +- .../RpsClutchConfigurationSelectorTest.java | 8 +- .../config/WorkerConfigurationWritable.java | 1 - .../mantisrx/server/agent/TaskExecutor.java | 3 +- .../server/agent/TaskExecutorStarter.java | 4 +- 20 files changed, 287 insertions(+), 226 deletions(-) diff --git a/docs/docs/operate/autoscalingstrategies.md b/docs/docs/operate/autoscalingstrategies.md index ff4d9aa7b..7d98d4d37 100644 --- a/docs/docs/operate/autoscalingstrategies.md +++ b/docs/docs/operate/autoscalingstrategies.md @@ -3,25 +3,42 @@ many workers to scale up/down a stage. They fall into 2 main categories. Rule ba and scale up/down when a certain threshold is reached. PID control based strategies pick a resource utilization level and scale up/down dynamically to maintain that level. -## Rule Based Strategy +A stage's autoscaling policy is composed of following configurations - + +| Name | Type | Description | +|-------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------| +| `stage` | `int` | Index of the stage (`1`-based since `0` is reserved for job master). | +| `min` | `int` | Minimum number of workers in the stage. | +| `max` | `int` | Maximum number of workers in the stage. | +| `increment`[1] | `int` | Number of workers to add when scaling up. | +| `decrement`[1] | `int` | Number of workers to remove when scaling down. | +| `coolDownSecs` | `int` | Number of seconds to wait after a scaling operation has been completed before beginning another scaling operation. | +| `strategies` | `Map` | List of strategies to use for autoscaling and their configurations. | +| `allowAutoScaleManager` | `boolean` | Toggles an autoscaler manager that (dis)allow scale up and scale downs when certain criteria is met. Default is `false`. | + +Note: +[1] increment/decrement are not used for Clutch strategies. Clutch strategies use PID controller to determine the number of workers to scale up/down. + +## Rule Based Strategies Rule based strategy can be defined for the following resources: -| Resource | Metric | -| --------------- | ------- | -| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` | -| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` | -| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` | -| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` | -| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` | -| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` | -| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` | -| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. | +| Resource | Metric | +|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` | +| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` | +| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` | +| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` | +| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` | +| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` | +| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` | +| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. | +| `FailoverAware` | Metric is defined by an implementation of `FailoverStatusClient`. This is used to control autoscaling during failover events. (See #failover-aware) | Each strategy has the following parameters: -| Name | Description | -| --------------- | ------------ | +| Name | Description | +|-------------------------------| ------------ | | `Scale down below percentage` | When the aggregated value for all workers falls below this value, the stage will scale down. It will scale down by the decrement value specified in the policy. For data drop, this is calculated as the number of data items dropped divided by the total number of data items, dropped+processed. For CPU, Memory, etc., it is calculated as a percentage of allocated resource when you defined the worker. | | `Scale up above percentage` | When the aggregated value for all workers rises above this value, the stage will scale up. | | `Rolling count` | This value helps to keep jitter out of the autoscaling process. Instead of scaling immediately the first time values fall outside of the scale-down and scale-up percentage thresholds you define, Mantis will wait until the thresholds are exceeded a certain number of times within a certain window. For example, a rolling count of “6 of 10” means that only if in ten consecutive observations six or more of the observations fall below the scale-down threshold will the stage be scaled down. | @@ -36,7 +53,23 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for best to use the data drop strategy in conjunction with another strategy that provides the scale-down trigger. -## PID Control Based Strategy +### Failover-Aware Strategy +During a failover event in a multi-region setup, all incoming traffic is moved away from the failed over +region—`evacuee`—to other region(s)—`savior(s)` to mitigate the issue faster. + +For autoscaling mantis jobs, it's sometimes necessary to be failover aware because: + +1. Scale down in evacuee regions — `allowScaleDownDuringEvacuated`. + After an extended period of evacuated state for the region, the jobs in that region would have been + scaled down to lowest values because of no incoming traffic and low resource utilization. + When the traffic is restored in that region, the job will see a big surge in incoming requests overwhelming + the job in that region. +2. Scale up in savior regions — Use `FailoverAware` in strategies. + When the traffic is moved to savior regions, the jobs in those regions should be scaled up to handle the + additional load. Currently, failover aware strategy will scale up the configured stages to max #workers in + configured in `StageScalingPolicy`. It also doesn't work with PID Control Based Strategy(ies). + +## PID Control Based Strategies PID control system uses a continuous feedback loop to maintain a signal at a target level (set point). Mantis offers variations of this strategy that operates on different signals. Additionally, they try to learn the appropriate target over time @@ -84,18 +117,18 @@ JSON in the job parameter `mantis.jobmaster.clutch.config`. Example: } ``` -| Field | Description | -| --------------- | ------------ | -| `minSize` | Minimum number of workers in the stage. It will not scale down below this number. | -| `maxSize` | Maximum number of workers in the stage. It will not scale up above this number. | -| `cooldownSeconds` | This indicates how many seconds to wait after a scaling operation has been completed before beginning another scaling operation. | -| `maxAdjustment` | Optional. The maximum number of workers to scale up/down in a single operation. | -| `rps` | Expected RPS per worker. Must be > 0. | -| `cpu`, `memory`, `network` | Configure PID controller for each resource. | -| `setPoint` | Target set point for the resource. This is expressed as a percentage of allocated resource to the worker. For example, `60.0` on `network` means network bytes should be 60% of the network limit on machine definition. | -| `rope` | Lower and upper buffer around the set point. Metric values within this buffer are assumed to be at set point, and thus contributes an error of 0 to the PID controller. | -| `kp` | Multiplier for the proportional term of the PID controller. This will affect the size of scaling actions. | -| `kd` | Multiplier for the derivative term of the PID controller. This will affect the size of scaling actions. | +| Field | Description | +|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `minSize` | Minimum number of workers in the stage. It will not scale down below this number. | +| `maxSize` | Maximum number of workers in the stage. It will not scale up above this number. | +| `cooldownSeconds` | This indicates how many seconds to wait after a scaling operation has been completed before beginning another scaling operation. | +| `maxAdjustment` | Optional. The maximum number of workers to scale up/down in a single operation. | +| `rps` | Expected RPS per worker. Must be > 0. | +| `cpu`, `memory`, `network` | Configure PID controller for each resource. | +| `setPoint` | Target set point for the resource. This is expressed as a percentage of allocated resource to the worker. For example, `60.0` on `network` means network bytes should be 60% of the network limit on machine definition. | +| `rope` | Lower and upper buffer around the set point. Metric values within this buffer are assumed to be at set point, and thus contributes an error of 0 to the PID controller. | +| `kp` | Multiplier for the proportional term of the PID controller. This will affect the size of scaling actions. | +| `kd` | Multiplier for the derivative term of the PID controller. This will affect the size of scaling actions. | ### Clutch RPS diff --git a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/SchedulingInfo.java b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/SchedulingInfo.java index 73c6950a7..c0efa2c34 100644 --- a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/SchedulingInfo.java +++ b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/SchedulingInfo.java @@ -132,7 +132,7 @@ public Builder multiWorkerScalableStageWithConstraints(int numberOfWorkers, Mach List hardConstraints, List softConstraints, StageScalingPolicy scalingPolicy) { StageScalingPolicy ssp = new StageScalingPolicy(currentStage, scalingPolicy.getMin(), scalingPolicy.getMax(), - scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies()); + scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies(), scalingPolicy.isAllowAutoScaleManager()); return this.addStage( StageSchedulingInfo.builder() .numberOfInstances(numberOfWorkers) @@ -148,7 +148,7 @@ public Builder multiWorkerScalableStageWithConstraints(int numberOfWorkers, Mach List hardConstraints, List softConstraints, StageScalingPolicy scalingPolicy, Map containerAttributes) { StageScalingPolicy ssp = new StageScalingPolicy(currentStage, scalingPolicy.getMin(), scalingPolicy.getMax(), - scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies()); + scalingPolicy.getIncrement(), scalingPolicy.getDecrement(), scalingPolicy.getCoolDownSecs(), scalingPolicy.getStrategies(), scalingPolicy.isAllowAutoScaleManager()); return this.addStage( StageSchedulingInfo.builder() .numberOfInstances(numberOfWorkers) diff --git a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java index 41f9518aa..f537968f4 100644 --- a/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java +++ b/mantis-common/src/main/java/io/mantisrx/runtime/descriptor/StageScalingPolicy.java @@ -28,6 +28,9 @@ import lombok.ToString; +@Getter +@ToString +@EqualsAndHashCode public class StageScalingPolicy implements Serializable { private static final long serialVersionUID = 1L; @@ -39,13 +42,19 @@ public class StageScalingPolicy implements Serializable { private final int decrement; private final long coolDownSecs; private final Map strategies; + /** + * Controls whether AutoScaleManager is enabled or disabled + */ + private final boolean allowAutoScaleManager; + @JsonCreator @JsonIgnoreProperties(ignoreUnknown = true) public StageScalingPolicy(@JsonProperty("stage") int stage, @JsonProperty("min") int min, @JsonProperty("max") int max, @JsonProperty("increment") int increment, @JsonProperty("decrement") int decrement, @JsonProperty("coolDownSecs") long coolDownSecs, - @JsonProperty("strategies") Map strategies) { + @JsonProperty("strategies") Map strategies, + @JsonProperty(value = "allowAutoScaleManager", defaultValue = "false") Boolean allowAutoScaleManager) { this.stage = stage; this.min = min; this.max = Math.max(max, min); @@ -54,100 +63,14 @@ public StageScalingPolicy(@JsonProperty("stage") int stage, this.decrement = Math.max(decrement, 1); this.coolDownSecs = coolDownSecs; this.strategies = strategies == null ? new HashMap() : new HashMap<>(strategies); - } - - public int getStage() { - return stage; - } - - public int getMin() { - return min; - } - - public int getMax() { - return max; - } - - public boolean isEnabled() { - return enabled; - } - - public int getIncrement() { - return increment; - } - - public int getDecrement() { - return decrement; - } - - public long getCoolDownSecs() { - return coolDownSecs; + // `defaultValue` is for documentation purpose only, use `Boolean` to determine if the field is missing on `null` + this.allowAutoScaleManager = allowAutoScaleManager == Boolean.TRUE; } public Map getStrategies() { return Collections.unmodifiableMap(strategies); } - @Override - public String toString() { - return "StageScalingPolicy{" + - "stage=" + stage + - ", min=" + min + - ", max=" + max + - ", enabled=" + enabled + - ", increment=" + increment + - ", decrement=" + decrement + - ", coolDownSecs=" + coolDownSecs + - ", strategies=" + strategies + - '}'; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (coolDownSecs ^ (coolDownSecs >>> 32)); - result = prime * result + decrement; - result = prime * result + (enabled ? 1231 : 1237); - result = prime * result + increment; - result = prime * result + max; - result = prime * result + min; - result = prime * result + stage; - result = prime * result + ((strategies == null) ? 0 : strategies.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - StageScalingPolicy other = (StageScalingPolicy) obj; - if (coolDownSecs != other.coolDownSecs) - return false; - if (decrement != other.decrement) - return false; - if (enabled != other.enabled) - return false; - if (increment != other.increment) - return false; - if (max != other.max) - return false; - if (min != other.min) - return false; - if (stage != other.stage) - return false; - if (strategies == null) { - if (other.strategies != null) - return false; - } else if (!strategies.equals(other.strategies)) - return false; - return true; - } - public enum ScalingReason { CPU, @Deprecated @@ -162,7 +85,8 @@ public enum ScalingReason { ClutchRps, RPS, JVMMemory, - SourceJobDrop + SourceJobDrop, + AutoscalerManagerEvent } @Getter diff --git a/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/SchedulingInfoTest.java b/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/SchedulingInfoTest.java index 50e3b13e3..5f9079a9a 100644 --- a/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/SchedulingInfoTest.java +++ b/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/SchedulingInfoTest.java @@ -153,13 +153,13 @@ public void testSerialization() throws Exception { 2, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap) + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, false) ) .multiWorkerScalableStageWithConstraints( 3, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap) + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, true) ); JsonSerializer serializer = new JsonSerializer(); @@ -204,6 +204,7 @@ public void testSerialization() throws Exception { " }" + " }" + " }," + + " \"allowAutoScaleManager\": false," + " \"enabled\": true" + " }," + " \"scalable\": true" + @@ -245,6 +246,7 @@ public void testSerialization() throws Exception { " }" + " }" + " }," + + " \"allowAutoScaleManager\": true," + " \"enabled\": true" + " }," + " \"scalable\": true" + @@ -264,14 +266,14 @@ public void testSerializationWithSkuId() throws Exception { 2, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap), + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, false), ImmutableMap.of("containerSkuID", "sku1") ) .multiWorkerScalableStageWithConstraints( 3, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap), + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, false), ImmutableMap.of("containerSkuID", "sku2") ); @@ -317,6 +319,7 @@ public void testSerializationWithSkuId() throws Exception { " }" + " }" + " }," + + " \"allowAutoScaleManager\": false," + " \"enabled\": true" + " }," + " \"scalable\": true," + @@ -359,6 +362,7 @@ public void testSerializationWithSkuId() throws Exception { " }" + " }" + " }," + + " \"allowAutoScaleManager\": false," + " \"enabled\": true" + " }," + " \"scalable\": true," + diff --git a/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/StageScalingPolicyTest.java b/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/StageScalingPolicyTest.java index 1833479d2..a584f4541 100644 --- a/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/StageScalingPolicyTest.java +++ b/mantis-common/src/test/java/io/mantisrx/runtime/descriptor/StageScalingPolicyTest.java @@ -35,7 +35,7 @@ public void testSerialization() throws Exception { Map smap = new HashMap<>(); smap.put(ScalingReason.CPU, new Strategy(ScalingReason.CPU, 0.5, 0.75, null)); smap.put(ScalingReason.DataDrop, new Strategy(ScalingReason.DataDrop, 0.0, 2.0, null)); - StageScalingPolicy policy = new StageScalingPolicy(1, 1, 2, 1, 1, 60, smap); + StageScalingPolicy policy = new StageScalingPolicy(1, 1, 2, 1, 1, 60, smap, false); final String expected = "{\n" + " \"stage\": 1,\n" + @@ -80,21 +80,35 @@ public void testSerialization() throws Exception { public void testDeserialization() throws Exception { String json1 = "{\"stage\":1,\"min\":1,\"max\":2,\"increment\":1,\"decrement\":1,\"strategies\":{},\"enabled\":false}"; StageScalingPolicy actual = serializer.fromJSON(json1, StageScalingPolicy.class); - StageScalingPolicy expected = new StageScalingPolicy(1, 1, 2, 1, 1, 0, null); + StageScalingPolicy expected = new StageScalingPolicy(1, 1, 2, 1, 1, 0, null, false); assertEquals(expected, actual); String json2 = "{\"stage\":1,\"min\":1,\"max\":5,\"increment\":1,\"decrement\":1,\"coolDownSecs\":600,\"strategies\":{\"CPU\":{\"reason\":\"CPU\",\"scaleDownBelowPct\":50,\"scaleUpAbovePct\":75}},\"enabled\":true}"; actual = serializer.fromJSON(json2, StageScalingPolicy.class); Map smap = new HashMap<>(); smap.put(ScalingReason.CPU, new Strategy(ScalingReason.CPU, 50, 75.0, new RollingCount(1, 1))); - expected = new StageScalingPolicy(1, 1, 5, 1, 1, 600, smap); + expected = new StageScalingPolicy(1, 1, 5, 1, 1, 600, smap, false); assertEquals(expected, actual); String json3 = "{\"stage\":1,\"min\":1,\"max\":3,\"increment\":1,\"decrement\":1,\"coolDownSecs\":0,\"strategies\":{\"Memory\":{\"reason\":\"Memory\",\"scaleDownBelowPct\":65,\"scaleUpAbovePct\":80,\"rollingCount\":{\"count\":6,\"of\":10}}},\"enabled\":true}"; actual = serializer.fromJSON(json3, StageScalingPolicy.class); smap = new HashMap<>(); smap.put(ScalingReason.Memory, new Strategy(ScalingReason.Memory, 65, 80.0, new RollingCount(6, 10))); - expected = new StageScalingPolicy(1, 1, 3, 1, 1, 0, smap); + expected = new StageScalingPolicy(1, 1, 3, 1, 1, 0, smap, false); + assertEquals(expected, actual); + + String json4 = "{\"stage\":1,\"min\":1,\"max\":3,\"increment\":1,\"decrement\":1,\"coolDownSecs\":0,\"strategies\":{\"Memory\":{\"reason\":\"Memory\",\"scaleDownBelowPct\":65,\"scaleUpAbovePct\":80,\"rollingCount\":{\"count\":6,\"of\":10}}},\"allowAutoScaleManager\":false,\"enabled\":true}"; + actual = serializer.fromJSON(json4, StageScalingPolicy.class); + smap = new HashMap<>(); + smap.put(ScalingReason.Memory, new Strategy(ScalingReason.Memory, 65, 80.0, new RollingCount(6, 10))); + expected = new StageScalingPolicy(1, 1, 3, 1, 1, 0, smap, false); + assertEquals(expected, actual); + + String json5 = "{\"stage\":1,\"min\":1,\"max\":3,\"increment\":1,\"decrement\":1,\"coolDownSecs\":0,\"strategies\":{\"Memory\":{\"reason\":\"Memory\",\"scaleDownBelowPct\":65,\"scaleUpAbovePct\":80,\"rollingCount\":{\"count\":6,\"of\":10}}},\"allowAutoScaleManager\":true,\"enabled\":true}"; + actual = serializer.fromJSON(json5, StageScalingPolicy.class); + smap = new HashMap<>(); + smap.put(ScalingReason.Memory, new Strategy(ScalingReason.Memory, 65, 80.0, new RollingCount(6, 10))); + expected = new StageScalingPolicy(1, 1, 3, 1, 1, 0, smap, true); assertEquals(expected, actual); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/events/WorkerRegistryV2Test.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/events/WorkerRegistryV2Test.java index dc118a511..1ba8229dd 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/events/WorkerRegistryV2Test.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/events/WorkerRegistryV2Test.java @@ -177,7 +177,7 @@ public void testJobScaleUp() throws Exception, InvalidJobException, io.mantisrx. new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleUp"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -229,7 +229,7 @@ public void testJobScaleDown() throws Exception { new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleDown"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -285,7 +285,7 @@ public void testJobShutdown() { new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobShutdown"; MantisScheduler schedulerMock = mock(MantisScheduler.class); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index 0396bf89e..1ecf2008d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -2416,7 +2416,7 @@ public void testScaleStage() { smap.put(StageScalingPolicy.ScalingReason.DataDrop, new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.DataDrop, 0.0, 2.0, null)); SchedulingInfo SINGLE_WORKER_SCHED_INFO = new SchedulingInfo.Builder().numberOfStages(1) - .multiWorkerScalableStageWithConstraints(1,DEFAULT_MACHINE_DEFINITION,Lists.newArrayList(),Lists.newArrayList(),new StageScalingPolicy(1,1,10,1,1,1, smap)).build(); + .multiWorkerScalableStageWithConstraints(1,DEFAULT_MACHINE_DEFINITION,Lists.newArrayList(),Lists.newArrayList(),new StageScalingPolicy(1,1,10,1,1,1, smap, true)).build(); final JobDefinition jobDefn = createJob(clusterName, 1, MantisJobDurationType.Transient, "USER_TYPE", SINGLE_WORKER_SCHED_INFO, Lists.newArrayList()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java index 39d768d35..de434a776 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java @@ -118,7 +118,7 @@ public void testJobScaleUp() throws Exception, InvalidJobException, io.mantisrx. new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleUp"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -161,7 +161,7 @@ public void testJobScaleDown() throws Exception, InvalidJobException, io.mantisr new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleUp"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -219,7 +219,7 @@ public void testSchedulingInfo() throws Exception { new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testSchedulingInfo"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -500,7 +500,7 @@ public void testJobScaleUpFailsIfNoScaleStrategy() throws Exception { new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap)) + new StageScalingPolicy(1, 0, 10, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleUpFailsIfNoScaleStrategy"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -543,7 +543,7 @@ public void testJobScaleUpFailsIfMinEqualsMax() throws Exception { new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList(), - new StageScalingPolicy(1, 1, 1, 1, 1, 0, smap)) + new StageScalingPolicy(1, 1, 1, 1, 1, 0, smap, true)) .build(); String clusterName = "testJobScaleUpFailsIfNoScaleStrategy"; MantisScheduler schedulerMock = mock(MantisScheduler.class); @@ -583,7 +583,7 @@ public void stageScalingPolicyTest() { long cooldownsecs = 300; Map smap = new HashMap<>(); smap.put(ScalingReason.CPU, new Strategy(ScalingReason.CPU, 0.5, 0.75, null)); - StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap); + StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap, true); assertTrue(ssp.isEnabled()); } @@ -598,7 +598,7 @@ public void stageScalingPolicyNoStrategyTest() { long cooldownsecs = 300; Map smap = new HashMap<>(); - StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap); + StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap, true); assertFalse(ssp.isEnabled()); } @@ -613,7 +613,7 @@ public void stageScalingPolicyMinEqMaxTest() { long cooldownsecs = 300; Map smap = new HashMap<>(); smap.put(ScalingReason.CPU, new Strategy(ScalingReason.CPU, 0.5, 0.75, null)); - StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap); + StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap, true); assertFalse(ssp.isEnabled()); } @@ -628,7 +628,7 @@ public void stageScalingPolicyMinGreaterThanMaxTest() { long cooldownsecs = 300; Map smap = new HashMap<>(); smap.put(ScalingReason.CPU, new Strategy(ScalingReason.CPU, 0.5, 0.75, null)); - StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap); + StageScalingPolicy ssp = new StageScalingPolicy(stageNo, min, max, increment, decrement, cooldownsecs, smap, true); assertTrue(ssp.isEnabled()); // max will be set equal to min diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 8404c4214..1aaba8e12 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -479,13 +479,13 @@ public void testJobSubmitWithMultipleStagesAndWorkers() { 2, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap) + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, true) ) .multiWorkerScalableStageWithConstraints( 3, new MachineDefinition(1, 1.24, 0.0, 1, 1), null, null, - new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap) + new StageScalingPolicy(1, 1, 3, 1, 1, 60, smap, true) ); SchedulingInfo sInfo = builder.build(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/domain/DataFormatAdapterTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/domain/DataFormatAdapterTest.java index d4f455d7f..c774f947c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/domain/DataFormatAdapterTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/domain/DataFormatAdapterTest.java @@ -779,7 +779,7 @@ public void convertMantisStageMetaTest() { int increment = 1; int decrement = 1; int coolDownSecs = 300; - StageScalingPolicy stageScalingPolicy = new StageScalingPolicy(stageNo, min, max, increment, decrement, coolDownSecs, smap); + StageScalingPolicy stageScalingPolicy = new StageScalingPolicy(stageNo, min, max, increment, decrement, coolDownSecs, smap, true); List softConstraintsList = new ArrayList<>(); softConstraintsList.add(JobConstraints.ExclusiveHost); List hardConstraintsList = new ArrayList<>(); diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java index b03638a8b..cbfccbabd 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/AutoScaleMetricsConfig.java @@ -142,8 +142,7 @@ public Map> getUserDefinedMetrics() { final Map> metrics = new HashMap<>(); for (Map.Entry> entry : userDefinedAutoScaleMetrics.entrySet()) { - metrics.put(entry.getKey(), - entry.getValue().keySet()); + metrics.put(entry.getKey(), entry.getValue().keySet()); } return metrics; diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java index 9edd011c2..c53d0f11b 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java @@ -117,7 +117,7 @@ void start() { logger.info("onOverflow triggered, dropping old events"); }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .doOnRequest(x -> logger.info("Scaler requested {} metrics.", x)) - .groupBy(event -> event.getStage()) + .groupBy(Event::getStage) .flatMap(go -> { Integer stage = Optional.ofNullable(go.getKey()).orElse(-1); @@ -142,12 +142,13 @@ void start() { boolean useClutchRps = false; boolean useClutchExperimental = false; + final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); // Determine which type of scaler to use. - if (stageSchedulingInfo.getScalingPolicy() != null) { - minSize = stageSchedulingInfo.getScalingPolicy().getMin(); - maxSize = stageSchedulingInfo.getScalingPolicy().getMax(); - if (stageSchedulingInfo.getScalingPolicy().getStrategies() != null) { - Set reasons = stageSchedulingInfo.getScalingPolicy().getStrategies() + if (scalingPolicy != null) { + minSize = scalingPolicy.getMin(); + maxSize = scalingPolicy.getMax(); + if (scalingPolicy.getStrategies() != null) { + Set reasons = scalingPolicy.getStrategies() .values() .stream() .map(StageScalingPolicy.Strategy::getReason) @@ -189,14 +190,13 @@ void start() { MantisStageActuator actuator = new MantisStageActuator(initialSize, scaler); Observable.Transformer transformToClutchEvent = - obs -> obs.map(event -> this.mantisEventToClutchEvent(event)) + obs -> obs.map(this::mantisEventToClutchEvent) .filter(event -> event.metric != null); Observable workerCounts = context.getWorkerMapObservable() .map(x -> x.getWorkersForStage(go.getKey()).size()) .distinctUntilChanged() .throttleLast(5, TimeUnit.SECONDS); - // Create the scaler. if (useClutchRps) { logger.info("Using clutch rps scaler, job: {}, stage: {} ", jobId, stage); ClutchRpsPIDConfig rpsConfig = Option.of(config).flatMap(ClutchConfiguration::getRpsConfig).getOrNull(); @@ -248,13 +248,11 @@ void start() { } }) .doOnCompleted(() -> logger.info("onComplete on JobAutoScaler subject")) - .doOnError(t -> logger.error("got onError in JobAutoScaler", t)) - .doOnSubscribe(() -> logger.info("onSubscribe JobAutoScaler")) - .doOnUnsubscribe(() -> { - logger.info("Unsubscribing for JobAutoScaler of job " + jobId); - }) + .doOnError(t -> logger.error("got onError in JobAutoScaler", t)) + .doOnSubscribe(() -> logger.info("onSubscribe JobAutoScaler")) + .doOnUnsubscribe(() -> logger.info("Unsubscribing for JobAutoScaler of job {}", jobId)) .retry() - .subscribe(); + .subscribe(); } /** @@ -331,12 +329,16 @@ private void setOutstandingScalingRequest(final Subscription subscription) { public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) { final int desiredWorkers; if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { - logger.warn("Job " + jobId + " stage " + stage + " is not scalable, can't increment #workers by " + increment); + logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment); return numCurrentWorkers; } + if (numCurrentWorkers < 0 || increment < 1) { logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment); return numCurrentWorkers; + } else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { + logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId); + return numCurrentWorkers; } else { final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax(); desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage); @@ -347,6 +349,11 @@ public int getDesiredWorkersForScaleUp(final int increment, final int numCurrent public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) { logger.info("scaleUpStage incrementing number of workers from {} to {}", numCurrentWorkers, desiredWorkers); cancelOutstandingScalingRequest(); + StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); + if (scalingPolicy != null && scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { + logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId); + return; + } final Subscription subscription = masterClientApi.scaleJobStage(jobId, stage, desiredWorkers, reason) .retryWhen(retryLogic) .onErrorResumeNext(throwable -> { @@ -359,15 +366,19 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) { final int desiredWorkers; - if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { - logger.warn("Job " + jobId + " stage " + stage + " is not scalable, can't decrement #workers by " + decrement); + final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); + if (!scalingPolicy.isEnabled()) { + logger.warn("Job {} stage {} is not scalable, can't decrement #workers by {}", jobId, stage, decrement); return numCurrentWorkers; } if (numCurrentWorkers < 0 || decrement < 1) { logger.error("current number of workers({}) not known or decrement({}) < 1, will not scale down", numCurrentWorkers, decrement); return numCurrentWorkers; + } else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) { + logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId); + return numCurrentWorkers; } else { - int min = stageSchedulingInfo.getScalingPolicy().getMin(); + int min = scalingPolicy.getMin(); desiredWorkers = Math.max(numCurrentWorkers - decrement, min); } return desiredWorkers; @@ -376,6 +387,11 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre public void scaleDownStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) { logger.info("scaleDownStage decrementing number of workers from {} to {}", numCurrentWorkers, desiredWorkers); cancelOutstandingScalingRequest(); + final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); + if (scalingPolicy != null && scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) { + logger.warn("Scaledown is disabled for all autoscaling strategy. For stage {} of job {}", stage, jobId); + return; + } final Subscription subscription = masterClientApi.scaleJobStage(jobId, stage, desiredWorkers, reason) .retryWhen(retryLogic) .onErrorResumeNext(throwable -> { @@ -428,8 +444,8 @@ public void onNext(Event event) { final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); long coolDownSecs = scalingPolicy == null ? Long.MAX_VALUE : scalingPolicy.getCoolDownSecs(); boolean scalable = stageSchedulingInfo.getScalable() && scalingPolicy != null && scalingPolicy.isEnabled(); - logger.debug("Will check for autoscaling job " + jobId + " stage " + stage + " due to event: " + event); - if (scalable && scalingPolicy != null) { + logger.debug("Will check for autoscaling job {} stage {} due to event: {}", jobId, stage, event); + if (scalable) { final StageScalingPolicy.Strategy strategy = scalingPolicy.getStrategies().get(event.getType()); if (strategy != null) { double effectiveValue = event.getEffectiveValue(); diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java index bfcb76a73..902b1a0a8 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobMasterService.java @@ -75,7 +75,7 @@ public JobMasterService(final String jobId, this.autoScaleMetricsConfig = autoScaleMetricsConfig; this.masterClientApi = masterClientApi; this.jobAutoScaler = new JobAutoScaler(jobId, schedInfo, masterClientApi, context, jobAutoscalerManager); - this.metricObserver = new WorkerMetricHandler(jobId, jobAutoScaler.getObserver(), masterClientApi, autoScaleMetricsConfig).initAndGetMetricDataObserver(); + this.metricObserver = new WorkerMetricHandler(jobId, jobAutoScaler.getObserver(), masterClientApi, autoScaleMetricsConfig, jobAutoscalerManager).initAndGetMetricDataObserver(); this.observableOnCompleteCallback = observableOnCompleteCallback; this.observableOnErrorCallback = observableOnErrorCallback; this.observableOnTerminateCallback = observableOnTerminateCallback; @@ -98,7 +98,7 @@ private Measurements handleMetricEvent(final String ev) { stage = 1; if (gauges.isEmpty()) { gauges = measurements.getCounters().stream().map(counter -> - new GaugeMeasurement(counter.getEvent(), counter.getCount())).collect(Collectors.toList()); + new GaugeMeasurement(counter.getEvent(), counter.getCount())).collect(Collectors.toList()); } } metricObserver.onNext(new MetricData(jobId, stage, workerIdx, workerNum, measurements.getName(), gauges)); @@ -123,33 +123,33 @@ public void start() { Observable> metrics = workerMetricSubscription.getMetricsClient().getResults(); boolean isSourceJobMetricEnabled = (boolean) context.getParameters().get( - SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_METRIC_PARAM, false); + SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_METRIC_PARAM, false); if (isSourceJobMetricEnabled) { metrics = metrics.mergeWith(getSourceJobMetrics()); } subscription = Observable.merge(metrics) - .map(event -> handleMetricEvent(event.getEventAsString())) - .doOnTerminate(observableOnTerminateCallback) - .doOnCompleted(observableOnCompleteCallback) - .doOnError(observableOnErrorCallback) - .subscribe(); + .map(event -> handleMetricEvent(event.getEventAsString())) + .doOnTerminate(observableOnTerminateCallback) + .doOnCompleted(observableOnCompleteCallback) + .doOnError(observableOnErrorCallback) + .subscribe(); } protected Observable> getSourceJobMetrics() { List targetInfos = SourceJobParameters.parseTargetInfo( - (String) context.getParameters().get(SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_TARGET_PARAM, "{}")); + (String) context.getParameters().get(SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_TARGET_PARAM, "{}")); if (targetInfos.isEmpty()) { targetInfos = SourceJobParameters.parseInputParameters(context); } targetInfos = SourceJobParameters.enforceClientIdConsistency(targetInfos, jobId); String additionalDropMetricPatterns = - (String) context.getParameters().get(SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_DROP_METRIC_PATTERNS_PARAM, ""); + (String) context.getParameters().get(SystemParameters.JOB_MASTER_AUTOSCALE_SOURCEJOB_DROP_METRIC_PATTERNS_PARAM, ""); autoScaleMetricsConfig.addSourceJobDropMetrics(additionalDropMetricPatterns); SourceJobWorkerMetricsSubscription sourceSub = new SourceJobWorkerMetricsSubscription( - targetInfos, masterClientApi, workerMetricsClient, autoScaleMetricsConfig); + targetInfos, masterClientApi, workerMetricsClient, autoScaleMetricsConfig); return sourceSub.getResults(); } diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index b9bf9bfea..de31172e8 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -71,16 +71,19 @@ return -1; } }; + private final JobAutoscalerManager jobAutoscalerManager; public WorkerMetricHandler(final String jobId, final Observer jobAutoScaleObserver, final MantisMasterGateway masterClientApi, - final AutoScaleMetricsConfig autoScaleMetricsConfig) { + final AutoScaleMetricsConfig autoScaleMetricsConfig, + final JobAutoscalerManager jobAutoscalerManager) { this.jobId = jobId; this.jobAutoScaleObserver = jobAutoScaleObserver; this.masterClientApi = masterClientApi; this.autoScaleMetricsConfig = autoScaleMetricsConfig; this.metricAggregator = new MetricAggregator(autoScaleMetricsConfig); + this.jobAutoscalerManager = jobAutoscalerManager; } public Observer initAndGetMetricDataObserver() { @@ -94,10 +97,9 @@ private Map getAggregates(List> dataPo for (Map datapoint : dataPointsList) { for (Map.Entry gauge : datapoint.entrySet()) { - if (!transformed.containsKey(gauge.getKey())) { - transformed.put(gauge.getKey(), new ArrayList<>()); - } - transformed.get(gauge.getKey()).add(gauge.getValue()); + transformed + .computeIfAbsent(gauge.getKey(), (k) -> new ArrayList<>()) + .add(gauge.getValue()); } } @@ -276,8 +278,7 @@ public void call() { final int numWorkers = numStageWorkersFn.call(stage); // get the aggregate metric values by metric group for all workers in stage Map allWorkerAggregates = getAggregates(listofAggregates); - logger.info("Job stage " + stage + " avgResUsage from " + - workersMap.size() + " workers: " + allWorkerAggregates.toString()); + logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString()); for (Map.Entry> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) { final String metricGrp = userDefinedMetric.getKey(); @@ -387,33 +388,7 @@ public void call() { } } - double sourceJobDrops = 0; - boolean hasSourceJobDropsMetric = false; - Map sourceMetricsRecent = sourceJobMetricsRecent.asMap(); - for (Map.Entry worker : sourceJobWorkersMap.entrySet()) { - Map metricGroups = metricAggregator.getAggregates(worker.getValue().getGaugesByMetricGrp()); - for (Map.Entry group : metricGroups.entrySet()) { - String metricKey = worker.getKey() + ":" + group.getKey(); - for (Map.Entry gauge : group.getValue().getGauges().entrySet()) { - if (sourceMetricsRecent.containsKey(metricKey) && - autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) { - sourceJobDrops += gauge.getValue(); - hasSourceJobDropsMetric = true; - } - } - } - } - if (hasSourceJobDropsMetric) { - logger.info("Job stage {}, source job drop metrics: {}", stage, sourceJobDrops); - // Divide by 6 to account for 6 second reset by Atlas on counter metric. - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event( - StageScalingPolicy.ScalingReason.SourceJobDrop, - stage, - sourceJobDrops / 6.0 / numWorkers, - sourceJobDrops / 6.0 / numWorkers, - numWorkers)); - } + addScalerEventForSourceJobDrops(numWorkers); } }, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS )); @@ -440,6 +415,36 @@ public void onNext(MetricData metricData) { } }; } + + private void addScalerEventForSourceJobDrops(int numWorkers) { + double sourceJobDrops = 0; + boolean hasSourceJobDropsMetric = false; + Map sourceMetricsRecent = sourceJobMetricsRecent.asMap(); + for (Map.Entry worker : sourceJobWorkersMap.entrySet()) { + Map metricGroups = metricAggregator.getAggregates(worker.getValue().getGaugesByMetricGrp()); + for (Map.Entry group : metricGroups.entrySet()) { + String metricKey = worker.getKey() + ":" + group.getKey(); + for (Map.Entry gauge : group.getValue().getGauges().entrySet()) { + if (sourceMetricsRecent.containsKey(metricKey) && + autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) { + sourceJobDrops += gauge.getValue(); + hasSourceJobDropsMetric = true; + } + } + } + } + if (hasSourceJobDropsMetric) { + logger.info("Job stage {}, source job drop metrics: {}", stage, sourceJobDrops); + // Divide by 6 to account for 6 second reset by Atlas on counter metric. + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event( + StageScalingPolicy.ScalingReason.SourceJobDrop, + stage, + sourceJobDrops / 6.0 / numWorkers, + sourceJobDrops / 6.0 / numWorkers, + numWorkers)); + } + } } private void start() { diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java index e4a661ff4..a92ef883e 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/JobAutoScalerTest.java @@ -78,7 +78,7 @@ public void testScaleUp() throws InterruptedException { .machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, - new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))))) + new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))), false)) .scalable(true) .build(); @@ -143,8 +143,12 @@ public void testScalingResiliency() throws InterruptedException { .numberOfInstances(numStage1Workers) .machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, - Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, - new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))))) + new HashMap() { + { + put(StageScalingPolicy.ScalingReason.Memory, new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))); + put(StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, 0.0, 0.0, new StageScalingPolicy.RollingCount(1, 2))); + } + }, false)) .scalable(true) .build(); @@ -201,7 +205,7 @@ public void testScaleDown() throws InterruptedException { .machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, - new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))))) + new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))), false)) .scalable(true) .build(); @@ -234,6 +238,66 @@ public void testScaleDown() throws InterruptedException { } } + @Test + public void testScaleDownManagerDisabled() throws InterruptedException { + final String jobId = "test-job-1"; + final int coolDownSec = 2; + final int scalingStageNum = 1; + final MantisMasterClientApi mockMasterClientApi = mock(MantisMasterClientApi.class); + final Map schedulingInfoMap = new HashMap<>(); + final int numStage1Workers = 2; + final int increment = 1; + final int decrement = 1; + final int min = 1; + final int max = 5; + final double scaleUpAbovePct = 45.0; + final double scaleDownBelowPct = 15.0; + final double workerMemoryMB = 512.0; + + final StageSchedulingInfo stage1SchedInfo = StageSchedulingInfo.builder() + .numberOfInstances(numStage1Workers) + .machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) + .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, + Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, + new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))), true)) + .scalable(true) + .build(); + + schedulingInfoMap.put(scalingStageNum, stage1SchedInfo); + + when(mockMasterClientApi.scaleJobStage(eq(jobId), eq(scalingStageNum), eq(numStage1Workers - decrement), anyString())).thenReturn(Observable.just(true)); + + Context context = mock(Context.class); + when(context.getWorkerMapObservable()).thenReturn(Observable.empty()); + + JobAutoscalerManager scalarManager = new JobAutoscalerManager() { + @Override + public boolean isScaleDownEnabled() { + return false; + } + }; + final JobAutoScaler jobAutoScaler = new JobAutoScaler(jobId, new SchedulingInfo(schedulingInfoMap), mockMasterClientApi, context, scalarManager); + jobAutoScaler.start(); + final Observer jobAutoScalerObserver = jobAutoScaler.getObserver(); + + // should trigger a scale down (below 15% scaleDown threshold) + jobAutoScalerObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, scalingStageNum, workerMemoryMB * (scaleDownBelowPct / 100.0 - 0.01), (scaleDownBelowPct / 100.0 - 0.01) * 100.0, numStage1Workers)); + + verify(mockMasterClientApi, timeout(1000).times(0)).scaleJobStage(jobId, scalingStageNum, numStage1Workers - decrement, String.format("Memory with value %1$,.2f is below scaleDown threshold of %2$,.1f", (scaleDownBelowPct / 100.0 - 0.01) * 100.0, scaleDownBelowPct)); + + // should *not* trigger a scale down before cooldown period (below 15% scaleDown threshold) + jobAutoScalerObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, scalingStageNum, workerMemoryMB * (scaleDownBelowPct / 100.0 - 0.01), scaleDownBelowPct - 0.01, numStage1Workers - decrement)); + jobAutoScalerObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, scalingStageNum, workerMemoryMB * (scaleDownBelowPct / 100.0 - 0.01), scaleDownBelowPct - 0.01, numStage1Workers - decrement)); + + Thread.sleep(coolDownSec * 1000); + + if (numStage1Workers - decrement == min) { + // should not trigger a scale down after cooldown period if numWorkers=min (below 15% scaleDown threshold) + jobAutoScalerObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, scalingStageNum, workerMemoryMB * (scaleDownBelowPct / 100.0 - 0.01), scaleDownBelowPct - 0.01, numStage1Workers - decrement)); + verifyNoMoreInteractions(mockMasterClientApi); + } + } + @Test public void testScaleDownNotLessThanMin() throws InterruptedException { final String jobId = "test-job-1"; @@ -255,7 +319,7 @@ public void testScaleDownNotLessThanMin() throws InterruptedException { .numberOfInstances(numStage1Workers).machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, - new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))))) + new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, scaleDownBelowPct, scaleUpAbovePct, new StageScalingPolicy.RollingCount(1, 2))), false)) .scalable(true) .build(); @@ -301,7 +365,7 @@ public void testScaleUpOnDifferentScalingReasons() throws InterruptedException { .machineDefinition(new MachineDefinition(2, workerMemoryMB, 200, 1024, 2)) .scalingPolicy(new StageScalingPolicy(scalingStageNum, min, max, increment, decrement, coolDownSec, Collections.singletonMap(scalingReason, - new StageScalingPolicy.Strategy(scalingReason, scaleDownBelow, scaleUpAbove, new StageScalingPolicy.RollingCount(1, 2))))) + new StageScalingPolicy.Strategy(scalingReason, scaleDownBelow, scaleUpAbove, new StageScalingPolicy.RollingCount(1, 2))), false)) .scalable(true) .build(); diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index cec9ba223..c784db21d 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -100,7 +100,7 @@ public void onNext(JobAutoScaler.Event event) { assertEquals(expected, event); latch.countDown(); } - }, mockMasterClientApi, aggregationConfig); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -174,7 +174,7 @@ public void onNext(JobAutoScaler.Event event) { latch.countDown(); } } - }, mockMasterClientApi, aggregationConfig); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -266,7 +266,7 @@ public void onNext(JobAutoScaler.Event event) { assertEquals(expected, event); autoScaleLatch.countDown(); } - }, mockMasterClientApi, aggregationConfig); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -322,7 +322,7 @@ public void onNext(JobAutoScaler.Event event) { latch.countDown(); } } - }, mockMasterClientApi, aggregationConfig); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelectorTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelectorTest.java index 4bff90487..adebda650 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelectorTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/clutch/rps/RpsClutchConfigurationSelectorTest.java @@ -70,7 +70,7 @@ public void testScalingPolicyFallback() { rpsSketch.update(100); Map sketches = ImmutableMap.of(Clutch.Metric.RPS, rpsSketch); - StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null); + StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null, true); StageSchedulingInfo schedulingInfo = StageSchedulingInfo.builder() .numberOfInstances(3) @@ -99,7 +99,7 @@ public void testSetPointQuantile() { } Map sketches = ImmutableMap.of(Clutch.Metric.RPS, rpsSketch); - StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null); + StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null, true); StageSchedulingInfo schedulingInfo = StageSchedulingInfo.builder() .numberOfInstances(3) @@ -123,7 +123,7 @@ public void testReturnSameConfigIfSetPointWithin5Percent() { } Map sketches = ImmutableMap.of(Clutch.Metric.RPS, rpsSketch); - StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null); + StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null, true); StageSchedulingInfo schedulingInfo = StageSchedulingInfo.builder() .numberOfInstances(3) @@ -162,7 +162,7 @@ public void testSetPointDriftAdjust() { } Map sketches = ImmutableMap.of(Clutch.Metric.RPS, rpsSketch); - StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null); + StageScalingPolicy scalingPolicy = new StageScalingPolicy(1, 2, 9, 0, 0, 400L, null, true); StageSchedulingInfo schedulingInfo = StageSchedulingInfo.builder() .numberOfInstances(3) diff --git a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/config/WorkerConfigurationWritable.java b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/config/WorkerConfigurationWritable.java index 38055cdc8..a0356548c 100644 --- a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/config/WorkerConfigurationWritable.java +++ b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/config/WorkerConfigurationWritable.java @@ -79,7 +79,6 @@ public class WorkerConfigurationWritable implements WorkerConfiguration { String metricsCollectorClass; String jobAutoscalerManagerClassName; - @JsonIgnore MetricsPublisher metricsPublisher; diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java index 386f14f20..b655c21a6 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java @@ -132,7 +132,8 @@ public TaskExecutor( new DefaultMantisPropertiesLoader(System.getProperties()), highAvailabilityServices, classLoaderHandle, - null); + null + ); } public TaskExecutor( diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index 26a44d25c..9d863dac4 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -110,6 +110,7 @@ public static class TaskExecutorStarterBuilder { private final List> listeners = new ArrayList<>(); + private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) { this.workerConfiguration = workerConfiguration; this.configuration = new Configuration(); @@ -194,7 +195,8 @@ public TaskExecutorStarter build() throws Exception { Preconditions.checkNotNull(propertiesLoader, "propertiesLoader for TaskExecutor is null"), highAvailabilityServices, getClassLoaderHandle(), - this.taskFactory); + this.taskFactory + ); for (Tuple2 listener : listeners) { taskExecutor.addListener(listener._1(), listener._2());