Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLA Max: Config to apply to Accepted + Launched jobs. #713

Merged
merged 8 commits into from
Sep 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
* limitations under the License.
*/

plugins {
id 'org.gradle.test-retry' version '1.0.0'
}


ext {
akkaVersion = '2.6.15'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
* limitations under the License.
*/

plugins {
id 'org.gradle.test-retry' version '1.0.0'
}

apply plugin: 'application'
apply plugin: 'com.bmuschko.docker-java-application'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public class JobClustersManagerActor extends AbstractActorWithTimers implements
private final Counter numJobClusterInitFailures;
private final Counter numJobClusterInitSuccesses;
private Receive initializedBehavior;
public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) {
return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator)
public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, int slaHeadroomForAcceptedJobs) {
return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator, slaHeadroomForAcceptedJobs)
.withMailbox("akka.actor.metered-mailbox");
}

Expand All @@ -152,11 +152,14 @@ public static Props props(final MantisJobStore jobStore, final LifecycleEventPub

JobClusterInfoManager jobClusterInfoManager;

private final int slaHeadroomForAcceptedJobs;

private ActorRef jobListHelperActor;
public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) {
public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, int slaHeadroomForAcceptedJobs) {
this.jobStore = store;
this.eventPublisher = eventPublisher;
this.costsCalculator = costsCalculator;
this.slaHeadroomForAcceptedJobs = slaHeadroomForAcceptedJobs;

MetricGroupId metricGroupId = getMetricGroupId();
Metrics m = new Metrics.Builder()
Expand Down Expand Up @@ -857,9 +860,10 @@ Optional<JobClusterInfo> createClusterActorAndRegister(IJobClusterDefinition job
logger.error("Cannot create actor for cluster with invalid name {}", clusterName);
return empty();
}

ActorRef jobClusterActor =
getContext().actorOf(
JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator),
JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator, slaHeadroomForAcceptedJobs),
"JobClusterActor-" + clusterName);
getContext().watch(jobClusterActor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ public static Props props(
final MantisJobStore jobStore,
final MantisSchedulerFactory mantisSchedulerFactory,
final LifecycleEventPublisher eventPublisher,
final CostsCalculator costsCalculator) {
return Props.create(JobClusterActor.class, name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator);
final CostsCalculator costsCalculator,
final int slaHeadroomForAcceptedJobs) {
return Props.create(JobClusterActor.class, name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator, slaHeadroomForAcceptedJobs);
}

private final Receive initializedBehavior;
Expand All @@ -222,17 +223,21 @@ public static Props props(
private final JobDefinitionResolver jobDefinitionResolver = new JobDefinitionResolver();
private final Metrics metrics;

private final int slaHeadroomForAcceptedJobs;


public JobClusterActor(
final String name,
final MantisJobStore jobStore,
final MantisSchedulerFactory schedulerFactory,
final LifecycleEventPublisher eventPublisher,
final CostsCalculator costsCalculator) {
final CostsCalculator costsCalculator,
final int slaHeadroomForAcceptedJobs) {
this.name = name;
this.jobStore = jobStore;
this.mantisSchedulerFactory = schedulerFactory;
this.eventPublisher = eventPublisher;
this.slaHeadroomForAcceptedJobs = slaHeadroomForAcceptedJobs;

this.jobManager = new JobManager(name, getContext(), mantisSchedulerFactory, eventPublisher, jobStore, costsCalculator);

Expand Down Expand Up @@ -1960,7 +1965,6 @@ public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) {

List<JobInfo> jobsStuckInTerminatingList = jobManager.getJobsStuckInTerminating(now, getExpireAcceptedDelayMs());


if(!slaEnforcer.hasSLA()) {
return;
}
Expand Down Expand Up @@ -1988,7 +1992,7 @@ public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) {
listOfJobs.addAll(jobManager.getActiveJobsList());
listOfJobs.addAll(jobManager.getAcceptedJobsList());

List<JobId> jobsToKill = slaEnforcer.enforceSLAMax(Collections.unmodifiableList(listOfJobs));
List<JobId> jobsToKill = slaEnforcer.enforceSLAMax(Collections.unmodifiableList(listOfJobs), this.slaHeadroomForAcceptedJobs);

for (JobId jobId : jobsToKill) {
logger.info("Request termination for job {}", jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SLAEnforcer {
private static final Logger logger = LoggerFactory.getLogger(SLAEnforcer.class);
private final Optional<SLA> sla;

// Comparator sorts JobInfo by job number in ascending order.
private final Comparator<JobInfo> comparator = (o1, o2) -> {
if (o2 == null)
return -1;
Expand Down Expand Up @@ -66,14 +71,26 @@ public int enforceSLAMin(int activeJobsCount, int acceptedJobsCount) {
return 0;
}

/**
* Walk the set of jobs in descending order (newest jobs first) track no. of running jobs. Once this
* count equals slamax mark the rest of them for deletion.
*
* @param list A sorted (by job number) set of jobs in either running or accepted state
* @return A list of jobs ids that need to be terminated.
*/
public List<JobId> enforceSLAMax(List<JobInfo> list) {
return enforceSLAMax(list, 0);
}

/**
* Walk the set of jobs in descending order (newest jobs first) track no. of running jobs. Once this
* count equals slamax mark the rest of them for deletion.
*
* @param list A sorted (by job number) set of jobs in either running or accepted state
* @return
* @param slaMaxAcceptedJobAllowance The number of accepted jobs above the SLA max allowed. 0 for unlimited.
* @return A list of jobs ids that need to be terminated.
*/
public List<JobId> enforceSLAMax(List<JobInfo> list) {
public List<JobId> enforceSLAMax(List<JobInfo> list, int slaMaxAcceptedJobAllowance) {
Preconditions.checkNotNull(list, "runningOrAcceptedJobSet is null");

List<JobId> jobsToDelete = Lists.newArrayList();
Expand Down Expand Up @@ -104,6 +121,15 @@ public List<JobId> enforceSLAMax(List<JobInfo> list) {
}
}

int headroom = slaMax - activeJobCount; // If we're under SLA max we account for this in our allowance.
if (slaMaxAcceptedJobAllowance > 0) {
list.stream()
.filter(job -> job.state == JobState.Accepted)
.sorted(comparator.reversed())
.skip(headroom + slaMaxAcceptedJobAllowance)
.forEach(job -> jobsToDelete.add(job.jobId));
}

return jobsToDelete;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public MasterMain(

storageProvider = new KeyValueBasedPersistenceProvider(this.config.getStorageProvider(), lifecycleEventPublisher);
final MantisJobStore mantisJobStore = new MantisJobStore(storageProvider);
final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator()), "JobClustersManager");

final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator(), config.getSlaMaxHeadroomForAccepted()), "JobClustersManager");
final JobMessageRouter jobMessageRouter = new JobMessageRouterImpl(jobClusterManagerActor);

// Beginning of new stuff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("")
String getSchedulingConstraintsString();

@Config("mantis.sla.headroomForAcceptedJobs")
@Default("3")
int getSlaMaxHeadroomForAccepted();

default Duration getHeartbeatInterval() {
return Duration.ofMillis(getHeartbeatIntervalInMs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(new FileBasedStore(temporaryFolder.newFolder("test")))),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");
MantisSchedulerFactory fakeSchedulerFactory = mock(MantisSchedulerFactory.class);
MantisScheduler fakeScheduler = new FakeMantisScheduler(jobClustersManagerActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public static void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(true)),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");

MantisSchedulerFactory fakeSchedulerFactory = mock(MantisSchedulerFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public static void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(true)),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");

MantisSchedulerFactory fakeSchedulerFactory = mock(MantisSchedulerFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void setup() throws Exception {
final LifecycleEventPublisher lifecycleEventPublisher = new LifecycleEventPublisherImpl(new AuditEventSubscriberLoggingImpl(), new StatusEventSubscriberLoggingImpl(), new WorkerEventSubscriberLoggingImpl());

ActorRef jobClustersManagerActor = system.actorOf(JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(true)), lifecycleEventPublisher, CostsCalculator.noop()), "jobClustersManager");
new MantisJobStore(new FileBasedPersistenceProvider(true)), lifecycleEventPublisher, CostsCalculator.noop(), 0), "jobClustersManager");

MantisSchedulerFactory fakeSchedulerFactory = mock(MantisSchedulerFactory.class);
MantisScheduler fakeScheduler = new FakeMantisScheduler(jobClustersManagerActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public static void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(stateDirectory, true)),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");
MantisSchedulerFactory mantisSchedulerFactory = mock(MantisSchedulerFactory.class);
MantisScheduler fakeScheduler = new FakeMantisScheduler(jobClustersManagerActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public static void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(true)),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");

MantisSchedulerFactory fakeSchedulerFactory = mock(MantisSchedulerFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public static void setup() throws Exception {
JobClustersManagerActor.props(
new MantisJobStore(new FileBasedPersistenceProvider(true)),
lifecycleEventPublisher,
CostsCalculator.noop()),
CostsCalculator.noop(),
0),
"jobClustersManager");

IMantisPersistenceProvider simpleCachedFileStorageProvider = new FileBasedPersistenceProvider(new FileBasedStore());
Expand Down
Loading
Loading