Skip to content

Commit

Permalink
Throttle RC actions (#530)
Browse files Browse the repository at this point in the history
* Throttle RC actions

* Improve scheduling error

* fix npe

* comments

* cleanup
  • Loading branch information
Andyz26 authored Aug 21, 2023
1 parent cafbe81 commit 6aa4546
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ private CompletableFuture<Ack> performAction(String action, Object body) {
return client.executeRequest(request).toCompletableFuture().thenCompose(response -> {
if (response.getStatusCode() == 200) {
return CompletableFuture.completedFuture(Ack.getInstance());
} else {
}
else if (response.getStatusCode() == 429) {
log.warn("request was throttled on control plane side: {}", request);
return CompletableFutures.exceptionallyCompletedFuture(
new RequestThrottledException("request was throttled on control plane side: " + request));
}
else {
try {
log.error("failed request {} with response {}", request, response.getResponseBody());
return CompletableFutures.exceptionallyCompletedFuture(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.server.master.resourcecluster;

/**
* This exception will be thrown when the request is throttled.
*/
public class RequestThrottledException extends Exception {
public RequestThrottledException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
api "org.skife.config:config-magic:$configMagicVersion"

implementation libraries.vavr
implementation libraries.spotifyFutures

// todo: separate worker entrypoint and move this to testImplementation instead.
implementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.mantisrx.master.api.akka.route.Jackson;
import io.mantisrx.master.api.akka.route.MasterApiMetrics;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.TaskExecutorNotFoundException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -327,6 +328,11 @@ protected <T> Route withFuture(CompletableFuture<T> tFuture) {
if (throwable instanceof TaskExecutorNotFoundException) {
return complete(StatusCodes.NOT_FOUND);
}

if (throwable instanceof RequestThrottledException) {
return complete(StatusCodes.TOO_MANY_REQUESTS);
}

return complete(StatusCodes.INTERNAL_SERVER_ERROR, throwable, Jackson.marshaller());
},
r -> complete(StatusCodes.OK, r, Jackson.marshaller())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,16 @@ private Route registerTaskExecutor(ClusterID clusterID) {
"POST /api/v1/resourceClusters/{}/actions/registerTaskExecutor called {}",
clusterID,
request);

return withFuture(gateway.getClusterFor(clusterID).registerTaskExecutor(request));
});
}


private Route heartbeatFromTaskExecutor(ClusterID clusterID) {
return entity(Jackson.unmarshaller(TaskExecutorHeartbeat.class), request -> {
log.debug(
"POST /api/v1/resourceClusters/{}/actions/heartbeatFromTaskExecutor called {}",
clusterID.getResourceID(),
request);

return withFuture(gateway.getClusterFor(clusterID).heartBeatFromTaskExecutor(request));
});
}
Expand All @@ -119,7 +116,6 @@ private Route disconnectTaskExecutor(ClusterID clusterID) {
"POST /api/v1/resourceClusters/{}/actions/disconnectTaskExecutor called {}",
clusterID.getResourceID(),
request);

return withFuture(gateway.getClusterFor(clusterID).disconnectTaskExecutor(request));
});
}
Expand All @@ -138,5 +134,4 @@ private Route notifyTaskExecutorStatusChange(ClusterID clusterID) {
private ClusterID getClusterID(String clusterName) {
return ClusterID.of(clusterName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ interface ExecutorStateManager {
@Nullable
TaskExecutorState get(TaskExecutorID taskExecutorID);

@Nullable
TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID);

@Nullable
TaskExecutorState archive(TaskExecutorID taskExecutorID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public List<TaskExecutorID> getIdleInstanceList(GetClusterIdleInstancesRequest r

@Override
public TaskExecutorState get(TaskExecutorID taskExecutorID) {
return this.taskExecutorStateMap.get(taskExecutorID);
}

@Override
public TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID) {
if (this.taskExecutorStateMap.containsKey(taskExecutorID)) {
return this.taskExecutorStateMap.get(taskExecutorID);
}
Expand Down Expand Up @@ -227,12 +232,25 @@ public Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecuto
SortedMap<Double, NavigableSet<TaskExecutorHolder>> targetMap =
this.executorByCores.tailMap(request.getAllocationRequest().getMachineDefinition().getCpuCores());

if (targetMap.size() < 1) {
if (targetMap.isEmpty()) {
log.warn("Cannot find any executor for request: {}", request);
return Optional.empty();
}
Double targetCoreCount = targetMap.firstKey();
log.trace("Applying assignmentReq: {} to {} cores.", request, targetCoreCount);
log.debug("Applying assignmentReq: {} to {} cores.", request, targetCoreCount);

Double requestedCoreCount = request.getAllocationRequest().getMachineDefinition().getCpuCores();
if (Math.abs(targetCoreCount - requestedCoreCount) > 1E-10) {
// this mismatch should not happen in production and indicates TE registration/spec problem.
log.warn("Requested core count mismatched. requested: {}, found: {} for {}", requestedCoreCount,
targetCoreCount,
request);
}

if (this.executorByCores.get(targetCoreCount).isEmpty()) {
log.warn("No available TE found for core count: {}, request: {}", targetCoreCount, request);
return Optional.empty();
}

return this.executorByCores.get(targetCoreCount)
.descendingSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,14 @@ private void getActiveJobs(GetActiveJobsRequest req) {

private void onTaskExecutorInfoRequest(TaskExecutorInfoRequest request) {
if (request.getTaskExecutorID() != null) {
sender().tell(this.executorStateManager.get(request.getTaskExecutorID()).getRegistration(), self());
TaskExecutorState state =
this.executorStateManager.getIncludeArchived(request.getTaskExecutorID());
if (state != null && state.getRegistration() != null) {
sender().tell(state.getRegistration(), self());
} else {
sender().tell(new Status.Failure(new Exception(String.format("No task executor state for %s",
request.getTaskExecutorID()))), self());
}
} else {
Optional<TaskExecutorRegistration> taskExecutorRegistration =
this.executorStateManager
Expand Down Expand Up @@ -546,6 +553,8 @@ private void onHeartbeat(TaskExecutorHeartbeat heartbeat) {
setupTaskExecutorStateIfNecessary(heartbeat.getTaskExecutorID());
try {
final TaskExecutorID taskExecutorID = heartbeat.getTaskExecutorID();

// todo: metrics: RC actor mailbox, TE heertbeat, no resouce log
final TaskExecutorState state = this.executorStateManager.get(taskExecutorID);
boolean stateChange = state.onHeartbeat(heartbeat);
if (stateChange) {
Expand Down Expand Up @@ -622,8 +631,11 @@ private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest reque

private void onTaskExecutorAssignmentTimeout(TaskExecutorAssignmentTimeout request) {
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state.isRunningTask()) {
log.debug("TaskExecutor {} entered running state alraedy; no need to act", request.getTaskExecutorID());
if (state == null) {
log.error("TaskExecutor lost during task assignment: {}", request);
}
else if (state.isRunningTask()) {
log.debug("TaskExecutor {} entered running state already; no need to act", request.getTaskExecutorID());
} else {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public ResourceClusterAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ClusterID clusterID,
ResourceClusterTaskExecutorMapper mapper) {
super(resourceClusterManagerActor, askTimeout, mapper);
ResourceClusterTaskExecutorMapper mapper,
int rateLimitPerSecond) {
super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond);
this.clusterID = clusterID;
this.mapper = mapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,73 @@

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

@RequiredArgsConstructor
class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
protected final ActorRef resourceClusterManagerActor;
protected final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
protected final RateLimiter rateLimiter;

ResourceClusterGatewayAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ResourceClusterTaskExecutorMapper mapper,
int maxConcurrentRequestCount) {
this.resourceClusterManagerActor = resourceClusterManagerActor;
this.askTimeout = askTimeout;
this.mapper = mapper;

this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount);
}

private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> func) {
return in -> {
if (rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
return func.apply(in);
} else {
return CompletableFutures.exceptionallyCompletedFuture(
new RequestThrottledException("Throttled req: " + in.getClass().getSimpleName())
);
}
};
}

@Override
public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration registration) {
return
Patterns
return withThrottle(this::registerTaskExecutorImpl).apply(registration);
}

private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration registration) {
return Patterns
.ask(resourceClusterManagerActor, registration, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture()
.whenComplete((dontCare, throwable) ->
mapper.onTaskExecutorDiscovered(
registration.getClusterID(),
registration.getTaskExecutorID()));
mapper.onTaskExecutorDiscovered(
registration.getClusterID(),
registration.getTaskExecutorID()));
}

@Override
public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) {
return withThrottle(this::heartBeatFromTaskExecutorImpl).apply(heartbeat);
}

private CompletableFuture<Ack> heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat heartbeat) {
return
Patterns
.ask(resourceClusterManagerActor, heartbeat, askTimeout)
Expand All @@ -69,9 +104,14 @@ public CompletableFuture<Ack> notifyTaskExecutorStatusChange(TaskExecutorStatusC
@Override
public CompletableFuture<Ack> disconnectTaskExecutor(
TaskExecutorDisconnection taskExecutorDisconnection) {
return withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection);
}

CompletableFuture<Ack> disconnectTaskExecutorImpl(
TaskExecutorDisconnection taskExecutorDisconnection) {
return
Patterns.ask(resourceClusterManagerActor, taskExecutorDisconnection, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
Patterns.ask(resourceClusterManagerActor, taskExecutorDisconnection, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ResourceClustersAkkaImpl implements ResourceClusters {
private final ActorRef resourceClustersManagerActor;
private final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final int rateLimitPerSecond;
private final ConcurrentMap<ClusterID, ResourceCluster> cache =
new ConcurrentHashMap<>();

Expand All @@ -60,7 +61,8 @@ public ResourceCluster getClusterFor(ClusterID clusterID) {
resourceClustersManagerActor,
askTimeout,
clusterID,
mapper));
mapper,
rateLimitPerSecond));
return cache.get(clusterID);
}

Expand Down Expand Up @@ -92,6 +94,7 @@ public static ResourceClusters load(

final Duration askTimeout = java.time.Duration.ofMillis(
ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs());
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper);
final int rateLimitPerSecond = masterConfiguration.getResourceClusterActionsPermitsPerSecond();
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("")
String getJobClustersWithArtifactCachingEnabled();

// rate limit actions on resource cluster actor to control backlog.
@Config("mantis.master.resource.cluster.actions.permitsPerSecond")
@Default("300")
int getResourceClusterActionsPermitsPerSecond();

default Duration getHeartbeatInterval() {
return Duration.ofMillis(getHeartbeatIntervalInMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public void setupActor() {
resourceClusterActor,
Duration.ofSeconds(1),
CLUSTER_ID,
mapper);
mapper,
100);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,7 @@ public void runOneIteration() throws Exception {
currentReportSupplier.apply(timeout)
.thenComposeAsync(report -> {
log.debug("Sending heartbeat to resource manager {} with report {}", gateway, report);
return gateway.heartBeatFromTaskExecutor(
new TaskExecutorHeartbeat(taskExecutorRegistration.getTaskExecutorID(),
taskExecutorRegistration.getClusterID(), report));
return gateway.heartBeatFromTaskExecutor(new TaskExecutorHeartbeat(taskExecutorRegistration.getTaskExecutorID(), taskExecutorRegistration.getClusterID(), report));
})
.get(heartBeatTimeout.getSize(), heartBeatTimeout.getUnit());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.ResourceLeaderConnection;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
Expand Down Expand Up @@ -406,7 +407,7 @@ private static ResourceClusterGateway getHealthyGateway(String name) {
return gateway;
}

private static ResourceClusterGateway getUnhealthyGateway(String name) {
private static ResourceClusterGateway getUnhealthyGateway(String name) throws RequestThrottledException {
ResourceClusterGateway gateway = mock(ResourceClusterGateway.class);
when(gateway.registerTaskExecutor(any())).thenReturn(
CompletableFutures.exceptionallyCompletedFuture(new UnknownError("error")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void helloWorld() throws Exception {
controlPlanePort,
5,
Duration.ofSeconds(2).toMillis())) {
fail("Failed to start job worker.");
fail("Failed to start job worker.");
}

// test sse
Expand Down

0 comments on commit 6aa4546

Please sign in to comment.