From c1c7f34a1cdf2fea6a84575eb6f6b94eb3dc71a7 Mon Sep 17 00:00:00 2001 From: hmitnflx <100323213+hmitnflx@users.noreply.github.com> Date: Wed, 10 Jul 2024 16:19:49 -0700 Subject: [PATCH] Emit failover events into the job autoscaler events to allow autoscaling decisions (#676) * Use service locator instead for initializing JobAutoScalerManager --- docs/docs/operate/autoscalingstrategies.md | 40 ++++++++------ ...WorkerExecutionOperationsNetworkStage.java | 18 ++----- .../worker/jobmaster/JobAutoScaler.java | 54 +++++++++++-------- .../jobmaster/JobAutoscalerManager.java | 9 ++++ .../worker/jobmaster/WorkerMetricHandler.java | 16 ++++++ .../jobmaster/WorkerMetricHandlerTest.java | 20 ++++--- 6 files changed, 101 insertions(+), 56 deletions(-) diff --git a/docs/docs/operate/autoscalingstrategies.md b/docs/docs/operate/autoscalingstrategies.md index 7d98d4d37..d19696269 100644 --- a/docs/docs/operate/autoscalingstrategies.md +++ b/docs/docs/operate/autoscalingstrategies.md @@ -23,17 +23,17 @@ Note: Rule based strategy can be defined for the following resources: -| Resource | Metric | -|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| -| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` | -| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` | -| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` | -| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` | -| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` | -| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` | -| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` | -| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`. | -| `FailoverAware` | Metric is defined by an implementation of `FailoverStatusClient`. This is used to control autoscaling during failover events. (See #failover-aware) | +| Resource | Metric | +|----------------------------|----------------------------------------------------------------------------------------------------------------------------------| +| `CPU` | group: `ResourceUsage` name: `cpuPctUsageCurr` aggregation: `AVG` | +| `Memory` | group: `ResourceUsage` name: `totMemUsageCurr` aggregation: `AVG` | +| `Network` | group: `ResourceUsage` name: `nwBytesUsageCurr` aggregation: `AVG` | +| `JVMMemory` | group: `ResourceUsage` name: `jvmMemoryUsedBytes` aggregation: `AVG` | +| `DataDrop` | group: `DataDrop` name: `dropCount` aggregation: `AVG` | +| `KafkaLag` | group: `consumer-fetch-manager-metrics` name: `records-lag-max` aggregation: `MAX` | +| `KafkaProcessed` | group: `consumer-fetch-manager-metrics` name: `records-consumed-rate` aggregation: `AVG` | +| `UserDefined` | Metric is defined by user with job parameter `mantis.jobmaster.autoscale.metric` in this format `{group}::{name}::{aggregation}`.| +| `AutoscalerManagerEvent` | Custom event defined by an implementation of `JobAutoScalerManager`. This allows event-based runtime control over stage workers | Each strategy has the following parameters: @@ -53,21 +53,31 @@ scaling action, the cooldown will prevent subsequent strategies from scaling for best to use the data drop strategy in conjunction with another strategy that provides the scale-down trigger. -### Failover-Aware Strategy +### AutoscalerManagerEvent Strategy +This is a custom strategy to set a target worker size at runtime. +The strategy uses `getCurrentValue` from `JobAutoScalerManager` to determine the target worker size. +For a non-negative value `[0.0, 100.0]`, the autoscaler will scale the stage from min to max. +All other values are ignored. Default implementation returns a -1.0 for `currentValue` meaning a no-op for event based scaling. + +Example #1: Use-case during region failover in a multi-region setup During a failover event in a multi-region setup, all incoming traffic is moved away from the failed over region—`evacuee`—to other region(s)—`savior(s)` to mitigate the issue faster. -For autoscaling mantis jobs, it's sometimes necessary to be failover aware because: +For some autoscaling mantis jobs, it's necessary to be failover aware to: -1. Scale down in evacuee regions — `allowScaleDownDuringEvacuated`. +1. Prevent scale down in evacuee regions — Use `allowAutoScaleManager`. After an extended period of evacuated state for the region, the jobs in that region would have been scaled down to lowest values because of no incoming traffic and low resource utilization. When the traffic is restored in that region, the job will see a big surge in incoming requests overwhelming the job in that region. -2. Scale up in savior regions — Use `FailoverAware` in strategies. + Mitigation: Use `allowAutoScaleManager` to enable `JobAutoScalerManager` for stage. Provide a custom implementation of + `JobAutoScalerManager` to return `false` for `isScaleDownEnabled` in evacuee region. +2. Scale up in savior regions — Use `AutoscalerManagerEvent` in strategies. When the traffic is moved to savior regions, the jobs in those regions should be scaled up to handle the additional load. Currently, failover aware strategy will scale up the configured stages to max #workers in configured in `StageScalingPolicy`. It also doesn't work with PID Control Based Strategy(ies). + Usage: Provide a custom JobAutoScalerManager implementation to return `1.0` in the savior region to scale stage + numWorkers to max. ## PID Control Based Strategies diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java index a6c1ada89..6056dfa12 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java @@ -66,8 +66,6 @@ import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector; import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -78,7 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -416,7 +413,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { logger.info("param {} is null or empty", JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM); } - JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(config); + JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(serviceLocator); JobMasterService jobMasterService = new JobMasterService(rw.getJobId(), rw.getSchedulingInfo(), workerMetricsClient, autoScaleMetricsConfig, mantisMasterApi, rw.getContext(), rw.getOnCompleteCallback(), rw.getOnErrorCallback(), rw.getOnTerminateCallback(), jobAutoscalerManager); jobMasterService.start(); @@ -482,16 +479,9 @@ remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.ge } } - private JobAutoscalerManager getJobAutoscalerManagerInstance(WorkerConfiguration config) { - try { - Class jobAutoscalerManagerClass = Class.forName(config.getJobAutoscalerManagerClassName()); - logger.info("Picking {} jobAutoscalerManager", jobAutoscalerManagerClass.getName()); - Method managerClassFactory = jobAutoscalerManagerClass.getMethod("valueOf", Properties.class); - return (JobAutoscalerManager) managerClassFactory.invoke(null, System.getProperties()); - } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - logger.warn("Couldnt instantiate jobAutoscalerManager from class {} because ", config.getJobAutoscalerManagerClassName(), e); - return JobAutoscalerManager.DEFAULT; - } + private JobAutoscalerManager getJobAutoscalerManagerInstance(ServiceLocator serviceLocator) { + final JobAutoscalerManager autoscalerManager = serviceLocator.service(JobAutoscalerManager.class); + return Optional.ofNullable(autoscalerManager).orElse(JobAutoscalerManager.DEFAULT); } private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) { diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java index c53d0f11b..ee1555e45 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoScaler.java @@ -326,9 +326,16 @@ private void setOutstandingScalingRequest(final Subscription subscription) { inProgressScalingSubscription.compareAndSet(null, subscription); } - public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers) { + private int getDesiredWorkers(StageScalingPolicy scalingPolicy, Event event) { + final int maxWorkersForStage = scalingPolicy.getMax(); + final int minWorkersForStage = scalingPolicy.getMin(); + return minWorkersForStage + (int) Math.round((maxWorkersForStage - minWorkersForStage) * event.getEffectiveValue() / 100.0); + } + + public int getDesiredWorkersForScaleUp(final int increment, final int numCurrentWorkers, Event event) { final int desiredWorkers; - if (!stageSchedulingInfo.getScalingPolicy().isEnabled()) { + final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); + if (!scalingPolicy.isEnabled()) { logger.warn("Job {} stage {} is not scalable, can't increment #workers by {}", jobId, stage, increment); return numCurrentWorkers; } @@ -336,14 +343,18 @@ public int getDesiredWorkersForScaleUp(final int increment, final int numCurrent if (numCurrentWorkers < 0 || increment < 1) { logger.error("current number of workers({}) not known or increment({}) < 1, will not scale up", numCurrentWorkers, increment); return numCurrentWorkers; - } else if (stageSchedulingInfo.getScalingPolicy().isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { + } else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleUpEnabled()) { logger.warn("Scaleup is disabled for all autoscaling strategy, not scaling up stage {} of job {}", stage, jobId); return numCurrentWorkers; + } else if (event.getType() == ScalingReason.AutoscalerManagerEvent) { + desiredWorkers = getDesiredWorkers(scalingPolicy, event); + logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers); } else { - final int maxWorkersForStage = stageSchedulingInfo.getScalingPolicy().getMax(); + final int maxWorkersForStage = scalingPolicy.getMax(); desiredWorkers = Math.min(numCurrentWorkers + increment, maxWorkersForStage); - return desiredWorkers; } + return desiredWorkers; + } public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, final String reason) { @@ -364,7 +375,7 @@ public void scaleUpStage(final int numCurrentWorkers, final int desiredWorkers, setOutstandingScalingRequest(subscription); } - public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers) { + public int getDesiredWorkersForScaleDown(final int decrement, final int numCurrentWorkers, Event event) { final int desiredWorkers; final StageScalingPolicy scalingPolicy = stageSchedulingInfo.getScalingPolicy(); if (!scalingPolicy.isEnabled()) { @@ -377,9 +388,12 @@ public int getDesiredWorkersForScaleDown(final int decrement, final int numCurre } else if (scalingPolicy.isAllowAutoScaleManager() && !jobAutoscalerManager.isScaleDownEnabled()) { logger.warn("Scaledown is disabled for all autoscaling strategy, not scaling down stage {} of job {}", stage, jobId); return numCurrentWorkers; + } else if (event.getType() == ScalingReason.AutoscalerManagerEvent) { + desiredWorkers = getDesiredWorkers(scalingPolicy, event); + logger.info("AutoscalerManagerEvent scaling up stage {} of job {} to desiredWorkers {}", stage, jobId, desiredWorkers); } else { - int min = scalingPolicy.getMin(); - desiredWorkers = Math.max(numCurrentWorkers - decrement, min); + int min = scalingPolicy.getMin(); + desiredWorkers = Math.max(numCurrentWorkers - decrement, min); } return desiredWorkers; } @@ -457,16 +471,15 @@ public void onNext(Event event) { } stats.add(effectiveValue); if (lastScaledAt < (System.currentTimeMillis() - coolDownSecs * 1000)) { - logger.info(jobId + ", stage " + stage + ": eff=" + - String.format(PercentNumberFormat, effectiveValue) + ", thresh=" + strategy.getScaleUpAbovePct()); + logger.info("{}, stage {}, eventType {}: eff={}, thresh={}", jobId, stage, event.getType(), + String.format(PercentNumberFormat, effectiveValue), strategy.getScaleUpAbovePct()); if (stats.getHighThreshTriggered()) { - logger.info("Attempting to scale up stage " + stage + " of job " + jobId + " by " + - scalingPolicy.getIncrement() + " workers, because " + - event.type + " exceeded scaleUpThreshold of " + - String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()) + " " + - stats.getCurrentHighCount() + " times"); + logger.info("Attempting to scale up stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times", + stage, jobId, scalingPolicy.getIncrement(), event.getType(), + String.format(PercentNumberFormat, strategy.getScaleUpAbovePct()), + stats.getCurrentHighCount()); final int numCurrWorkers = event.getNumWorkers(); - final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers); + final int desiredWorkers = scaler.getDesiredWorkersForScaleUp(scalingPolicy.getIncrement(), numCurrWorkers, event); if (desiredWorkers > numCurrWorkers) { scaler.scaleUpStage(numCurrWorkers, desiredWorkers, event.getType() + " with value " + String.format(PercentNumberFormat, effectiveValue) + @@ -477,12 +490,11 @@ public void onNext(Event event) { logger.debug("scale up NOOP: desiredWorkers same as current workers"); } } else if (stats.getLowThreshTriggered()) { - logger.info("Attempting to scale down stage " + stage + " of job " + jobId + " by " + - scalingPolicy.getDecrement() + " workers because " + event.getType() + - " is below scaleDownThreshold of " + strategy.getScaleDownBelowPct() + - " " + stats.getCurrentLowCount() + " times"); + logger.info("Attempting to scale down stage {} of job {} by {} workers, because {} exceeded scaleUpThreshold of {} {} times", + stage, jobId, scalingPolicy.getDecrement(), event.getType(), + strategy.getScaleDownBelowPct(), stats.getCurrentLowCount()); final int numCurrentWorkers = event.getNumWorkers(); - final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers); + final int desiredWorkers = scaler.getDesiredWorkersForScaleDown(scalingPolicy.getDecrement(), numCurrentWorkers, event); if (desiredWorkers < numCurrentWorkers) { scaler.scaleDownStage(numCurrentWorkers, desiredWorkers, event.getType() + " with value " + String.format(PercentNumberFormat, effectiveValue) + diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java index 278c879c7..b99aabc4e 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/JobAutoscalerManager.java @@ -42,6 +42,15 @@ default boolean isScaleDownEnabled() { return true; } + /** + * Get the current fractional value to set size for stage numWorkers. + * Valid values are [0.0, 100.0] which set numWorkers from [min, max]. + * All other values are ignored for scaling decisions. + */ + default double getCurrentValue() { + return -1.0; + } + /** * Noop implementation of {@link JobAutoscalerManager} that always returns true * for isScaleUpEnabled, isScaleDownEnabled diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index de31172e8..658bc8378 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -279,6 +279,7 @@ public void call() { // get the aggregate metric values by metric group for all workers in stage Map allWorkerAggregates = getAggregates(listofAggregates); logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString()); + maybeEmitAutoscalerManagerEvent(numWorkers); for (Map.Entry> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) { final String metricGrp = userDefinedMetric.getKey(); @@ -416,6 +417,21 @@ public void onNext(MetricData metricData) { }; } + private void maybeEmitAutoscalerManagerEvent(int numWorkers) { + final double currentValue = jobAutoscalerManager.getCurrentValue(); + // The effective value is a pct value and hence ranges from [0, 100]. + // Ignore all other values to disable autoscaling for custom events. + if (currentValue >= 0.0 && currentValue <= 100.0) { + jobAutoScaleObserver.onNext( + new JobAutoScaler.Event( + StageScalingPolicy.ScalingReason.AutoscalerManagerEvent, stage, + currentValue, + currentValue, + numWorkers) + ); + } + } + private void addScalerEventForSourceJobDrops(int numWorkers) { double sourceJobDrops = 0; boolean hasSourceJobDropsMetric = false; diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index c784db21d..cfa81ce71 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -36,6 +36,7 @@ import io.mantisrx.server.core.stats.MetricStringConstants; import io.mantisrx.server.master.client.MantisMasterClientApi; import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -82,6 +83,7 @@ public void testDropDataMetricTriggersAutoScale() throws InterruptedException { final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(); + final List events = new ArrayList<>(); final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { @Override public void onCompleted() { @@ -96,8 +98,11 @@ public void onError(Throwable e) { @Override public void onNext(JobAutoScaler.Event event) { logger.info("got auto scale event {}", event); - JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent, dropPercent, 1); - assertEquals(expected, event); + long count = latch.getCount(); + if (count == 1) { + JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent, dropPercent, 1); + assertEquals(expected, event); + } latch.countDown(); } }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); @@ -108,6 +113,7 @@ public void onNext(JobAutoScaler.Event event) { metricDataObserver.onNext(new MetricData(new String(jobId), stage, workerIdx, workerNum, DATA_DROP_METRIC_GROUP, gauges)); assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + } @Test @@ -166,13 +172,12 @@ public void onNext(JobAutoScaler.Event event) { if (count == 2) { JobAutoScaler.Event expected1 = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, stage, metricValue * 3 / 4, metricValue * 3 / 4, numWorkers); assertEquals(expected1, event); - latch.countDown(); } if (count == 1) { JobAutoScaler.Event expected2 = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, stage, kafkaLag, kafkaLag, numWorkers); assertEquals(expected2, event); - latch.countDown(); } + latch.countDown(); } }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); @@ -262,8 +267,11 @@ public void onError(Throwable e) { @Override public void onNext(JobAutoScaler.Event event) { logger.info("got auto scale event {}", event); - JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent / numWorkers, dropPercent / numWorkers, numWorkers); - assertEquals(expected, event); + long count = autoScaleLatch.getCount(); + if (count == 1) { + JobAutoScaler.Event expected = new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, stage, dropPercent / numWorkers, dropPercent / numWorkers, numWorkers); + assertEquals(expected, event); + } autoScaleLatch.countDown(); } }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);