diff --git a/docs/docs/operate/autoscalingstrategies.md b/docs/docs/operate/autoscalingstrategies.md index b7f6bc4e8..d19696269 100644 --- a/docs/docs/operate/autoscalingstrategies.md +++ b/docs/docs/operate/autoscalingstrategies.md @@ -56,7 +56,7 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for ### AutoscalerManagerEvent Strategy This is a custom strategy to set a target worker size at runtime. The strategy uses `getCurrentValue` from `JobAutoScalerManager` to determine the target worker size. -For a non-negative value `[0.0, 1.0]`, the autoscaler will scale the stage from min to max. +For a non-negative value `[0.0, 100.0]`, the autoscaler will scale the stage from min to max. All other values are ignored. Default implementation returns a -1.0 for `currentValue` meaning a no-op for event based scaling. Example #1: Use-case during region failover in a multi-region setup diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java index 7b483d0d8..ee1555e45 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java @@ -326,9 +326,16 @@ private void setOutstandingScalingRequest(final Subscription subscription) { inProgressScalingSubscription.compareAndSet(null, subscription); } - public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, ScalingReason reason) { + private int getDesiredWorkers(StageScalingPolicy scalingPolicy, Event event) { + final int maxWorkersForStage = scalingPolicy.getMax(); + final int minWorkersForStage = scalingPolicy.getMin(); + return minWorkersForStage + (int) Math.round((maxWorkersForStage - minWorkersForStage) * event.getEffectiveValue() / 100.0); + } + + public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, Event event) { final int desiredWorkers; - if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { + final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); + if (!scalingPolicy.isEnabled()) { logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment); return numCurrentWorkers; } @@ -336,19 +343,18 @@ public int getDesiredWorkersForScaleUp(final int increment, final int numCurrent if (numCurrentWorkers < 0 || increment < 1) { logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment); return numCurrentWorkers; - } else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { + } else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId); return numCurrentWorkers; + } else if (event.getType() == ScalingReason.AutoscalerManagerEvent) { + desiredWorkers = getDesiredWorkers(scalingPolicy, event); + logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers); } else { - final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax(); - if (reason == ScalingReason.AutoscalerManagerEvent) { - logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to maxWorkersForStage {}", stage, jobId, maxWorkersForStage); - desiredWorkers = maxWorkersForStage; - } else { - desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage); - } - return desiredWorkers; + final int maxWorkersForStage = scalingPolicy.getMax(); + desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage); } + return desiredWorkers; + } public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) { @@ -369,7 +375,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, setOutstandingScalingRequest(subscription); } - public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, ScalingReason reason) { + public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, Event event) { final int desiredWorkers; final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); if (!scalingPolicy.isEnabled()) { @@ -382,9 +388,12 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre } else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) { logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId); return numCurrentWorkers; + } else if (event.getType() == ScalingReason.AutoscalerManagerEvent) { + desiredWorkers = getDesiredWorkers(scalingPolicy, event); + logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers); } else { - int min = scalingPolicy.getMin(); - desiredWorkers = Math.max(numCurrentWorkers - decrement, min); + int min = scalingPolicy.getMin(); + desiredWorkers = Math.max(numCurrentWorkers - decrement, min); } return desiredWorkers; } @@ -470,7 +479,7 @@ public void onNext(Event event) { String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()), stats.getCurrentHighCount()); final int numCurrWorkers = event.getNumWorkers(); - final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, strategy.getReason()); + final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event); if (desiredWorkers > numCurrWorkers) { scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " + String.format(PercentNumberFormat, effectiveValue) + @@ -485,7 +494,7 @@ public void onNext(Event event) { stage, jobId, scalingPolicy.getDecrement(), event.getType(), strategy.getScaleDownBelowPct(), stats.getCurrentLowCount()); final int numCurrentWorkers = event.getNumWorkers(); - final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, strategy.getReason()); + final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event); if (desiredWorkers < numCurrentWorkers) { scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " + String.format(PercentNumberFormat, effectiveValue) + diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java index 225406534..b99aabc4e 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java @@ -44,7 +44,7 @@ default boolean isScaleDownEnabled() { /** * Get the current fractional value to set size for stage numWorkers. - * Valid values are [0.0, 1.0] which set numWorkers from [min, max]. + * Valid values are [0.0, 100.0] which set numWorkers from [min, max]. * All other values are ignored for scaling decisions. */ default double getCurrentValue() { diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index b48cc8606..64416e630 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -279,13 +279,7 @@ public void call() { // get the aggregate metric values by metric group for all workers in stage Map allWorkerAggregates = getAggregates(listofAggregates); logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString()); - jobAutoScaleObserver.onNext( - new JobAutoScaler.Event( - StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage, - jobAutoscalerManager.getCurrentValue(), - jobAutoscalerManager.getCurrentValue(), - numWorkers) - ); + maybeEmitAutoscalerManagerEvent(numWorkers); for (Map.Entry> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) { final String metricGrp = userDefinedMetric.getKey(); @@ -423,6 +417,19 @@ public void onNext(MetricData metricData) { }; } + private void maybeEmitAutoscalerManagerEvent(int numWorkers) { + final double currentValue = jobAutoscalerManager.getCurrentValue(); + if (currentValue >= 0.0 && currentValue <= 100.0) { + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event( + StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage, + currentValue, + currentValue, + numWorkers) + ); + } + } + private void addScalerEventForSourceJobDrops(int numWorkers) { double sourceJobDrops = 0; boolean hasSourceJobDropsMetric = false; diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index 494381f83..7a53f2471 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -37,7 +37,6 @@ import io.mantisrx.server.core.stats.MetricStringConstants; import io.mantisrx.server.master.client.MantisMasterClientApi; import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections;