diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterGatewayClient.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterGatewayClient.java index fc719321f..d9718d542 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterGatewayClient.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterGatewayClient.java @@ -90,13 +90,7 @@ private CompletableFuture performAction(String action, Object body) { return client.executeRequest(request).toCompletableFuture().thenCompose(response -> { if (response.getStatusCode() == 200) { return CompletableFuture.completedFuture(Ack.getInstance()); - } - else if (response.getStatusCode() == 429) { - log.warn("request was throttled on control plane side: {}", request); - return CompletableFutures.exceptionallyCompletedFuture( - new RequestThrottledException("request was throttled on control plane side: " + request)); - } - else { + } else { try { log.error("failed request {} with response {}", request, response.getResponseBody()); return CompletableFutures.exceptionallyCompletedFuture( diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/RequestThrottledException.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/RequestThrottledException.java deleted file mode 100644 index 1a1f3ac26..000000000 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/RequestThrottledException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2023 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.mantisrx.server.master.resourcecluster; - -/** - * This exception will be thrown when the request is throttled. - */ -public class RequestThrottledException extends Exception { - public RequestThrottledException(String msg) { - super(msg); - } -} diff --git a/mantis-control-plane/mantis-control-plane-server/build.gradle b/mantis-control-plane/mantis-control-plane-server/build.gradle index e8a7a2b72..2b6c9c423 100644 --- a/mantis-control-plane/mantis-control-plane-server/build.gradle +++ b/mantis-control-plane/mantis-control-plane-server/build.gradle @@ -48,7 +48,6 @@ dependencies { api "org.skife.config:config-magic:$configMagicVersion" implementation libraries.vavr - implementation libraries.spotifyFutures // todo: separate worker entrypoint and move this to testImplementation instead. implementation libraries.spectatorApi diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java index e590f0545..fea2b7cb2 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java @@ -42,7 +42,6 @@ import io.mantisrx.master.api.akka.route.Jackson; import io.mantisrx.master.api.akka.route.MasterApiMetrics; import io.mantisrx.master.jobcluster.proto.BaseResponse; -import io.mantisrx.server.master.resourcecluster.RequestThrottledException; import io.mantisrx.server.master.resourcecluster.TaskExecutorNotFoundException; import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory; import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.ObjectNode; @@ -330,11 +329,6 @@ protected Route withFuture(CompletableFuture tFuture) { return complete(StatusCodes.NOT_FOUND); } - if (throwable instanceof RequestThrottledException) { - MasterApiMetrics.getInstance().incrementResp4xx(); - return complete(StatusCodes.TOO_MANY_REQUESTS); - } - if (throwable instanceof AskTimeoutException) { MasterApiMetrics.getInstance().incrementAskTimeOutCount(); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java index 22ded04e7..4bfa1ace1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java @@ -96,16 +96,19 @@ private Route registerTaskExecutor(ClusterID clusterID) { "POST /api/v1/resourceClusters/{}/actions/registerTaskExecutor called {}", clusterID, request); + return withFuture(gateway.getClusterFor(clusterID).registerTaskExecutor(request)); }); } + private Route heartbeatFromTaskExecutor(ClusterID clusterID) { return entity(Jackson.unmarshaller(TaskExecutorHeartbeat.class), request -> { log.debug( "POST /api/v1/resourceClusters/{}/actions/heartbeatFromTaskExecutor called {}", clusterID.getResourceID(), request); + return withFuture(gateway.getClusterFor(clusterID).heartBeatFromTaskExecutor(request)); }); } @@ -116,6 +119,7 @@ private Route disconnectTaskExecutor(ClusterID clusterID) { "POST /api/v1/resourceClusters/{}/actions/disconnectTaskExecutor called {}", clusterID.getResourceID(), request); + return withFuture(gateway.getClusterFor(clusterID).disconnectTaskExecutor(request)); }); } @@ -134,4 +138,5 @@ private Route notifyTaskExecutorStatusChange(ClusterID clusterID) { private ClusterID getClusterID(String clusterName) { return ClusterID.of(clusterName); } + } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java index 62f5dda34..6bfa90a90 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java @@ -59,9 +59,6 @@ interface ExecutorStateManager { @Nullable TaskExecutorState get(TaskExecutorID taskExecutorID); - @Nullable - TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID); - @Nullable TaskExecutorState archive(TaskExecutorID taskExecutorID); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index e471c3611..ac63baa30 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -167,11 +167,6 @@ public List getIdleInstanceList(GetClusterIdleInstancesRequest r @Override public TaskExecutorState get(TaskExecutorID taskExecutorID) { - return this.taskExecutorStateMap.get(taskExecutorID); - } - - @Override - public TaskExecutorState getIncludeArchived(TaskExecutorID taskExecutorID) { if (this.taskExecutorStateMap.containsKey(taskExecutorID)) { return this.taskExecutorStateMap.get(taskExecutorID); } @@ -232,25 +227,12 @@ public Optional> findBestFit(TaskExecuto SortedMap> targetMap = this.executorByCores.tailMap(request.getAllocationRequest().getMachineDefinition().getCpuCores()); - if (targetMap.isEmpty()) { + if (targetMap.size() < 1) { log.warn("Cannot find any executor for request: {}", request); return Optional.empty(); } Double targetCoreCount = targetMap.firstKey(); - log.debug("Applying assignmentReq: {} to {} cores.", request, targetCoreCount); - - Double requestedCoreCount = request.getAllocationRequest().getMachineDefinition().getCpuCores(); - if (Math.abs(targetCoreCount - requestedCoreCount) > 1E-10) { - // this mismatch should not happen in production and indicates TE registration/spec problem. - log.warn("Requested core count mismatched. requested: {}, found: {} for {}", requestedCoreCount, - targetCoreCount, - request); - } - - if (this.executorByCores.get(targetCoreCount).isEmpty()) { - log.warn("No available TE found for core count: {}, request: {}", targetCoreCount, request); - return Optional.empty(); - } + log.trace("Applying assignmentReq: {} to {} cores.", request, targetCoreCount); return this.executorByCores.get(targetCoreCount) .descendingSet() diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index a86e45234..0ed814d68 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -309,14 +309,7 @@ private void getActiveJobs(GetActiveJobsRequest req) { private void onTaskExecutorInfoRequest(TaskExecutorInfoRequest request) { if (request.getTaskExecutorID() != null) { - TaskExecutorState state = - this.executorStateManager.getIncludeArchived(request.getTaskExecutorID()); - if (state != null && state.getRegistration() != null) { - sender().tell(state.getRegistration(), self()); - } else { - sender().tell(new Status.Failure(new Exception(String.format("No task executor state for %s", - request.getTaskExecutorID()))), self()); - } + sender().tell(this.executorStateManager.get(request.getTaskExecutorID()).getRegistration(), self()); } else { Optional taskExecutorRegistration = this.executorStateManager @@ -630,11 +623,8 @@ private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest reque private void onTaskExecutorAssignmentTimeout(TaskExecutorAssignmentTimeout request) { TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID()); - if (state == null) { - log.error("TaskExecutor lost during task assignment: {}", request); - } - else if (state.isRunningTask()) { - log.debug("TaskExecutor {} entered running state already; no need to act", request.getTaskExecutorID()); + if (state.isRunningTask()) { + log.debug("TaskExecutor {} entered running state alraedy; no need to act", request.getTaskExecutorID()); } else { try { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 10bf6c684..875906d41 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -67,9 +67,8 @@ public ResourceClusterAkkaImpl( ActorRef resourceClusterManagerActor, Duration askTimeout, ClusterID clusterID, - ResourceClusterTaskExecutorMapper mapper, - int rateLimitPerSecond) { - super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond); + ResourceClusterTaskExecutorMapper mapper) { + super(resourceClusterManagerActor, askTimeout, mapper); this.clusterID = clusterID; this.mapper = mapper; } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java index d1aefa1dd..9d4b07b3c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java @@ -18,24 +18,21 @@ import akka.actor.ActorRef; import akka.pattern.Patterns; -import com.spotify.futures.CompletableFutures; import io.mantisrx.common.Ack; import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; -import io.mantisrx.server.master.resourcecluster.RequestThrottledException; import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway; import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper; import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection; import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange; -import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import lombok.RequiredArgsConstructor; +@RequiredArgsConstructor class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { protected final ActorRef resourceClusterManagerActor; protected final Duration askTimeout; @@ -43,20 +40,16 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { private final Counter registrationCounter; private final Counter heartbeatCounter; private final Counter disconnectionCounter; - private final Counter throttledCounter; - protected final RateLimiter rateLimiter; ResourceClusterGatewayAkkaImpl( ActorRef resourceClusterManagerActor, Duration askTimeout, - ResourceClusterTaskExecutorMapper mapper, - int maxConcurrentRequestCount) { + ResourceClusterTaskExecutorMapper mapper) { this.resourceClusterManagerActor = resourceClusterManagerActor; this.askTimeout = askTimeout; this.mapper = mapper; - this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount); Metrics m = new Metrics.Builder() .id("ResourceClusterGatewayAkkaImpl") .addCounter("registrationCounter") @@ -68,46 +61,23 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { this.registrationCounter = metrics.getCounter("registrationCounter"); this.heartbeatCounter = metrics.getCounter("heartbeatCounter"); this.disconnectionCounter = metrics.getCounter("disconnectionCounter"); - this.throttledCounter = metrics.getCounter("throttledCounter"); - } - - private Function> withThrottle(Function> func) { - return in -> { - if (rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { - return func.apply(in); - } else { - this.throttledCounter.increment(); - return CompletableFutures.exceptionallyCompletedFuture( - new RequestThrottledException("Throttled req: " + in.getClass().getSimpleName()) - ); - } - }; } @Override public CompletableFuture registerTaskExecutor(TaskExecutorRegistration registration) { - return withThrottle(this::registerTaskExecutorImpl).apply(registration); - } - - private CompletableFuture registerTaskExecutorImpl(TaskExecutorRegistration registration) { this.registrationCounter.increment(); return Patterns .ask(resourceClusterManagerActor, registration, askTimeout) .thenApply(Ack.class::cast) .toCompletableFuture() .whenComplete((dontCare, throwable) -> - mapper.onTaskExecutorDiscovered( - registration.getClusterID(), - registration.getTaskExecutorID())); + mapper.onTaskExecutorDiscovered( + registration.getClusterID(), + registration.getTaskExecutorID())); } @Override public CompletableFuture heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) { - return withThrottle(this::heartBeatFromTaskExecutorImpl).apply(heartbeat); - } - - private CompletableFuture heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat heartbeat) { - this.heartbeatCounter.increment(); return Patterns .ask(resourceClusterManagerActor, heartbeat, askTimeout) @@ -127,15 +97,10 @@ public CompletableFuture notifyTaskExecutorStatusChange(TaskExecutorStatusC @Override public CompletableFuture disconnectTaskExecutor( TaskExecutorDisconnection taskExecutorDisconnection) { - this.disconnectionCounter.increment(); - return withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection); - } - - CompletableFuture disconnectTaskExecutorImpl( - TaskExecutorDisconnection taskExecutorDisconnection) { - return + this.disconnectionCounter.increment(); + return Patterns.ask(resourceClusterManagerActor, taskExecutorDisconnection, askTimeout) - .thenApply(Ack.class::cast) - .toCompletableFuture(); + .thenApply(Ack.class::cast) + .toCompletableFuture(); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java index 637640544..244a8071b 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java @@ -48,7 +48,6 @@ public class ResourceClustersAkkaImpl implements ResourceClusters { private final ActorRef resourceClustersManagerActor; private final Duration askTimeout; private final ResourceClusterTaskExecutorMapper mapper; - private final int rateLimitPerSecond; private final ConcurrentMap cache = new ConcurrentHashMap<>(); @@ -61,8 +60,7 @@ public ResourceCluster getClusterFor(ClusterID clusterID) { resourceClustersManagerActor, askTimeout, clusterID, - mapper, - rateLimitPerSecond)); + mapper)); return cache.get(clusterID); } @@ -94,7 +92,6 @@ public static ResourceClusters load( final Duration askTimeout = java.time.Duration.ofMillis( ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs()); - final int rateLimitPerSecond = masterConfiguration.getResourceClusterActionsPermitsPerSecond(); - return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond); + return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java index ce8042268..05643d52f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java @@ -378,10 +378,6 @@ default Duration getSchedulerIntervalBetweenRetries() { @Default("") String getJobClustersWithArtifactCachingEnabled(); - // rate limit actions on resource cluster actor to control backlog. - @Config("mantis.master.resource.cluster.actions.permitsPerSecond") - @Default("300") - int getResourceClusterActionsPermitsPerSecond(); default Duration getHeartbeatInterval() { return Duration.ofMillis(getHeartbeatIntervalInMs()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index 5020924b6..2464f7a2f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -200,8 +200,7 @@ public void setupActor() { resourceClusterActor, Duration.ofSeconds(1), CLUSTER_ID, - mapper, - 100); + mapper); } @Test diff --git a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java index 1ea119ad8..53368b566 100644 --- a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java +++ b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java @@ -57,7 +57,6 @@ import io.mantisrx.server.master.client.HighAvailabilityServices; import io.mantisrx.server.master.client.MantisMasterGateway; import io.mantisrx.server.master.client.ResourceLeaderConnection; -import io.mantisrx.server.master.resourcecluster.RequestThrottledException; import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway; import io.mantisrx.server.master.resourcecluster.TaskExecutorReport; import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange; @@ -410,7 +409,7 @@ private static ResourceClusterGateway getHealthyGateway(String name) { return gateway; } - private static ResourceClusterGateway getUnhealthyGateway(String name) throws RequestThrottledException { + private static ResourceClusterGateway getUnhealthyGateway(String name) { ResourceClusterGateway gateway = mock(ResourceClusterGateway.class); when(gateway.registerTaskExecutor(any())).thenReturn( CompletableFutures.exceptionallyCompletedFuture(new UnknownError("error"))); diff --git a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java index 35aac98bb..e8bfa2be4 100644 --- a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java +++ b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java @@ -177,7 +177,7 @@ public void helloWorld() throws Exception { controlPlanePort, 5, Duration.ofSeconds(2).toMillis())) { - fail("Failed to start job worker."); + fail("Failed to start job worker."); } // test sse