Skip to content

Commit

Permalink
Revert "Throttle RC actions (#530)"
Browse files Browse the repository at this point in the history
This reverts commit e6bd5d4.
  • Loading branch information
Andyz26 committed Aug 29, 2023
1 parent 887702c commit 4c8ef73
Show file tree
Hide file tree
Showing 15 changed files with 28 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,7 @@ private CompletableFuture<Ack> performAction(String action, Object body) {
return client.executeRequest(request).toCompletableFuture().thenCompose(response -> {
if (response.getStatusCode() == 200) {
return CompletableFuture.completedFuture(Ack.getInstance());
}
else if (response.getStatusCode() == 429) {
log.warn("request was throttled on control plane side: {}", request);
return CompletableFutures.exceptionallyCompletedFuture(
new RequestThrottledException("request was throttled on control plane side: " + request));
}
else {
} else {
try {
log.error("failed request {} with response {}", request, response.getResponseBody());
return CompletableFutures.exceptionallyCompletedFuture(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
api "org.skife.config:config-magic:$configMagicVersion"

implementation libraries.vavr
implementation libraries.spotifyFutures

// todo: separate worker entrypoint and move this to testImplementation instead.
implementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.mantisrx.master.api.akka.route.Jackson;
import io.mantisrx.master.api.akka.route.MasterApiMetrics;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.TaskExecutorNotFoundException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -330,11 +329,6 @@ protected <T> Route withFuture(CompletableFuture<T> tFuture) {
return complete(StatusCodes.NOT_FOUND);
}

if (throwable instanceof RequestThrottledException) {
MasterApiMetrics.getInstance().incrementResp4xx();
return complete(StatusCodes.TOO_MANY_REQUESTS);
}

if (throwable instanceof AskTimeoutException) {
MasterApiMetrics.getInstance().incrementAskTimeOutCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,19 @@ private Route registerTaskExecutor(ClusterID clusterID) {
"POST /api/v1/resourceClusters/{}/actions/registerTaskExecutor called {}",
clusterID,
request);

return withFuture(gateway.getClusterFor(clusterID).registerTaskExecutor(request));
});
}


private Route heartbeatFromTaskExecutor(ClusterID clusterID) {
return entity(Jackson.unmarshaller(TaskExecutorHeartbeat.class), request -> {
log.debug(
"POST /api/v1/resourceClusters/{}/actions/heartbeatFromTaskExecutor called {}",
clusterID.getResourceID(),
request);

return withFuture(gateway.getClusterFor(clusterID).heartBeatFromTaskExecutor(request));
});
}
Expand All @@ -116,6 +119,7 @@ private Route disconnectTaskExecutor(ClusterID clusterID) {
"POST /api/v1/resourceClusters/{}/actions/disconnectTaskExecutor called {}",
clusterID.getResourceID(),
request);

return withFuture(gateway.getClusterFor(clusterID).disconnectTaskExecutor(request));
});
}
Expand All @@ -134,4 +138,5 @@ private Route notifyTaskExecutorStatusChange(ClusterID clusterID) {
private ClusterID getClusterID(String clusterName) {
return ClusterID.of(clusterName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ interface ExecutorStateManager {
@Nullable
TaskExecutorState get(TaskExecutorID taskExecutorID);

@Nullable
TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID);

@Nullable
TaskExecutorState archive(TaskExecutorID taskExecutorID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ public List<TaskExecutorID> getIdleInstanceList(GetClusterIdleInstancesRequest r

@Override
public TaskExecutorState get(TaskExecutorID taskExecutorID) {
return this.taskExecutorStateMap.get(taskExecutorID);
}

@Override
public TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID) {
if (this.taskExecutorStateMap.containsKey(taskExecutorID)) {
return this.taskExecutorStateMap.get(taskExecutorID);
}
Expand Down Expand Up @@ -232,25 +227,12 @@ public Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecuto
SortedMap<Double, NavigableSet<TaskExecutorHolder>> targetMap =
this.executorByCores.tailMap(request.getAllocationRequest().getMachineDefinition().getCpuCores());

if (targetMap.isEmpty()) {
if (targetMap.size() < 1) {
log.warn("Cannot find any executor for request: {}", request);
return Optional.empty();
}
Double targetCoreCount = targetMap.firstKey();
log.debug("Applying assignmentReq: {} to {} cores.", request, targetCoreCount);

Double requestedCoreCount = request.getAllocationRequest().getMachineDefinition().getCpuCores();
if (Math.abs(targetCoreCount - requestedCoreCount) > 1E-10) {
// this mismatch should not happen in production and indicates TE registration/spec problem.
log.warn("Requested core count mismatched. requested: {}, found: {} for {}", requestedCoreCount,
targetCoreCount,
request);
}

if (this.executorByCores.get(targetCoreCount).isEmpty()) {
log.warn("No available TE found for core count: {}, request: {}", targetCoreCount, request);
return Optional.empty();
}
log.trace("Applying assignmentReq: {} to {} cores.", request, targetCoreCount);

return this.executorByCores.get(targetCoreCount)
.descendingSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,7 @@ private void getActiveJobs(GetActiveJobsRequest req) {

private void onTaskExecutorInfoRequest(TaskExecutorInfoRequest request) {
if (request.getTaskExecutorID() != null) {
TaskExecutorState state =
this.executorStateManager.getIncludeArchived(request.getTaskExecutorID());
if (state != null && state.getRegistration() != null) {
sender().tell(state.getRegistration(), self());
} else {
sender().tell(new Status.Failure(new Exception(String.format("No task executor state for %s",
request.getTaskExecutorID()))), self());
}
sender().tell(this.executorStateManager.get(request.getTaskExecutorID()).getRegistration(), self());
} else {
Optional<TaskExecutorRegistration> taskExecutorRegistration =
this.executorStateManager
Expand Down Expand Up @@ -630,11 +623,8 @@ private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest reque

private void onTaskExecutorAssignmentTimeout(TaskExecutorAssignmentTimeout request) {
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state == null) {
log.error("TaskExecutor lost during task assignment: {}", request);
}
else if (state.isRunningTask()) {
log.debug("TaskExecutor {} entered running state already; no need to act", request.getTaskExecutorID());
if (state.isRunningTask()) {
log.debug("TaskExecutor {} entered running state alraedy; no need to act", request.getTaskExecutorID());
} else {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ public ResourceClusterAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ClusterID clusterID,
ResourceClusterTaskExecutorMapper mapper,
int rateLimitPerSecond) {
super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond);
ResourceClusterTaskExecutorMapper mapper) {
super(resourceClusterManagerActor, askTimeout, mapper);
this.clusterID = clusterID;
this.mapper = mapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,38 @@

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
protected final ActorRef resourceClusterManagerActor;
protected final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final Counter registrationCounter;
private final Counter heartbeatCounter;
private final Counter disconnectionCounter;
private final Counter throttledCounter;

protected final RateLimiter rateLimiter;

ResourceClusterGatewayAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ResourceClusterTaskExecutorMapper mapper,
int maxConcurrentRequestCount) {
ResourceClusterTaskExecutorMapper mapper) {
this.resourceClusterManagerActor = resourceClusterManagerActor;
this.askTimeout = askTimeout;
this.mapper = mapper;

this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount);
Metrics m = new Metrics.Builder()
.id("ResourceClusterGatewayAkkaImpl")
.addCounter("registrationCounter")
Expand All @@ -68,46 +61,23 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
this.registrationCounter = metrics.getCounter("registrationCounter");
this.heartbeatCounter = metrics.getCounter("heartbeatCounter");
this.disconnectionCounter = metrics.getCounter("disconnectionCounter");
this.throttledCounter = metrics.getCounter("throttledCounter");
}

private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> func) {
return in -> {
if (rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
return func.apply(in);
} else {
this.throttledCounter.increment();
return CompletableFutures.exceptionallyCompletedFuture(
new RequestThrottledException("Throttled req: " + in.getClass().getSimpleName())
);
}
};
}

@Override
public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration registration) {
return withThrottle(this::registerTaskExecutorImpl).apply(registration);
}

private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration registration) {
this.registrationCounter.increment();
return Patterns
.ask(resourceClusterManagerActor, registration, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture()
.whenComplete((dontCare, throwable) ->
mapper.onTaskExecutorDiscovered(
registration.getClusterID(),
registration.getTaskExecutorID()));
mapper.onTaskExecutorDiscovered(
registration.getClusterID(),
registration.getTaskExecutorID()));
}

@Override
public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) {
return withThrottle(this::heartBeatFromTaskExecutorImpl).apply(heartbeat);
}

private CompletableFuture<Ack> heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat heartbeat) {
this.heartbeatCounter.increment();
return
Patterns
.ask(resourceClusterManagerActor, heartbeat, askTimeout)
Expand All @@ -127,15 +97,10 @@ public CompletableFuture<Ack> notifyTaskExecutorStatusChange(TaskExecutorStatusC
@Override
public CompletableFuture<Ack> disconnectTaskExecutor(
TaskExecutorDisconnection taskExecutorDisconnection) {
this.disconnectionCounter.increment();
return withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection);
}

CompletableFuture<Ack> disconnectTaskExecutorImpl(
TaskExecutorDisconnection taskExecutorDisconnection) {
return
this.disconnectionCounter.increment();
return
Patterns.ask(resourceClusterManagerActor, taskExecutorDisconnection, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
.thenApply(Ack.class::cast)
.toCompletableFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class ResourceClustersAkkaImpl implements ResourceClusters {
private final ActorRef resourceClustersManagerActor;
private final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final int rateLimitPerSecond;
private final ConcurrentMap<ClusterID, ResourceCluster> cache =
new ConcurrentHashMap<>();

Expand All @@ -61,8 +60,7 @@ public ResourceCluster getClusterFor(ClusterID clusterID) {
resourceClustersManagerActor,
askTimeout,
clusterID,
mapper,
rateLimitPerSecond));
mapper));
return cache.get(clusterID);
}

Expand Down Expand Up @@ -94,7 +92,6 @@ public static ResourceClusters load(

final Duration askTimeout = java.time.Duration.ofMillis(
ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs());
final int rateLimitPerSecond = masterConfiguration.getResourceClusterActionsPermitsPerSecond();
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond);
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,6 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("")
String getJobClustersWithArtifactCachingEnabled();

// rate limit actions on resource cluster actor to control backlog.
@Config("mantis.master.resource.cluster.actions.permitsPerSecond")
@Default("300")
int getResourceClusterActionsPermitsPerSecond();

default Duration getHeartbeatInterval() {
return Duration.ofMillis(getHeartbeatIntervalInMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ public void setupActor() {
resourceClusterActor,
Duration.ofSeconds(1),
CLUSTER_ID,
mapper,
100);
mapper);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.ResourceLeaderConnection;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
Expand Down Expand Up @@ -410,7 +409,7 @@ private static ResourceClusterGateway getHealthyGateway(String name) {
return gateway;
}

private static ResourceClusterGateway getUnhealthyGateway(String name) throws RequestThrottledException {
private static ResourceClusterGateway getUnhealthyGateway(String name) {
ResourceClusterGateway gateway = mock(ResourceClusterGateway.class);
when(gateway.registerTaskExecutor(any())).thenReturn(
CompletableFutures.exceptionallyCompletedFuture(new UnknownError("error")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void helloWorld() throws Exception {
controlPlanePort,
5,
Duration.ofSeconds(2).toMillis())) {
fail("Failed to start job worker.");
fail("Failed to start job worker.");
}

// test sse
Expand Down

0 comments on commit 4c8ef73

Please sign in to comment.