From 22b0a285bc8580e5cee86dad8758dda4effdfef7 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Mon, 29 Apr 2024 22:57:17 -0700 Subject: [PATCH] Add GetDisabledTaskExecutors Api + scaler handling on disabled TEs (#664) * add getDisabledTE api * Add rcmActor fwd * comment * include disabled TEs in usage --- .../resourcecluster/ResourceCluster.java | 2 + ...esourceClustersNonLeaderRedirectRoute.java | 7 +++ .../DisableTaskExecutorsRequest.java | 2 +- .../resourcecluster/ExecutorStateManager.java | 2 +- .../ExecutorStateManagerImpl.java | 7 +-- .../ResourceClusterAkkaImpl.java | 11 +++++ .../ResourceClustersManagerActor.java | 2 + ...ourceClusterActorClusterUsageAkkaTest.java | 46 +++++++++++++++++++ 8 files changed, 71 insertions(+), 8 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index ec4a55a74..72fff4f73 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -85,6 +85,8 @@ default CompletableFuture> getBusyTaskExecutors() { CompletableFuture> getBusyTaskExecutors(Map attributes); + CompletableFuture> getDisabledTaskExecutors(Map attributes); + default CompletableFuture> getUnregisteredTaskExecutors() { return getUnregisteredTaskExecutors(Collections.emptyMap()); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java index c92e516ff..7cc50b625 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java @@ -78,6 +78,7 @@ * /api/v1/resourceClusters/{}/getResourceOverview (GET) * /api/v1/resourceClusters/{}/getRegisteredTaskExecutors (GET) * /api/v1/resourceClusters/{}/getBusyTaskExecutors (GET) + * /api/v1/resourceClusters/{}/getDisabledTaskExecutors (GET) * /api/v1/resourceClusters/{}/getAvailableTaskExecutors (GET) * /api/v1/resourceClusters/{}/getUnregisteredTaskExecutors (GET) * /api/v1/resourceClusters/{}/scaleSku (POST) @@ -216,6 +217,12 @@ protected Route constructRoutes() { (clusterName) -> pathEndOrSingleSlash(() -> concat( get(() -> mkTaskExecutorsRoute(getClusterID(clusterName), (rc, req) -> rc.getBusyTaskExecutors(req.getAttributes()))))) ), + // /{}/getDisabledTaskExecutors + path( + PathMatchers.segment().slash("getDisabledTaskExecutors"), + (clusterName) -> pathEndOrSingleSlash(() -> concat( + get(() -> mkTaskExecutorsRoute(getClusterID(clusterName), (rc, req) -> rc.getDisabledTaskExecutors(req.getAttributes()))))) + ), // /{}/getAvailableTaskExecutors path( PathMatchers.segment().slash("getAvailableTaskExecutors"), diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/DisableTaskExecutorsRequest.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/DisableTaskExecutorsRequest.java index 22362e7b7..ae4467139 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/DisableTaskExecutorsRequest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/DisableTaskExecutorsRequest.java @@ -41,7 +41,7 @@ public class DisableTaskExecutorsRequest { Optional taskExecutorID; boolean isRequestByAttributes() { - return attributes.size() > 0; + return attributes != null && attributes.size() > 0; } boolean isExpired(Instant now) { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java index 1f899d13b..3e1bce7d9 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java @@ -101,7 +101,7 @@ Optional> findFirst( e -> e.getValue().isAvailable(); Predicate> isDisabled = - e -> e.getValue().isDisabled(); + e -> e.getValue().isDisabled() && e.getValue().isRegistered(); Predicate> isAssigned = e -> e.getValue().isAssigned(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index a98d57dab..fc80491c2 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -421,11 +421,6 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) { return; } - // do not count the disabled TEs. - if (value.isDisabled()) { - return; - } - Optional groupKeyO = req.getGroupKeyFunc().apply(value.getRegistration()); @@ -437,7 +432,7 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) { String groupKey = groupKeyO.get(); Pair kvState = Pair.of( - value.isAvailable() ? 1 : 0, + value.isAvailable() && !value.isDisabled() ? 1 : 0, value.isRegistered() ? 1 : 0); if (usageByGroupKey.containsKey(groupKey)) { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 0619fed4a..93cf4e1f5 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -26,6 +26,7 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetDisabledTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest; @@ -118,6 +119,16 @@ public CompletableFuture> getBusyTaskExecutors(Map l.getTaskExecutors()); } + @Override + public CompletableFuture> getDisabledTaskExecutors(Map attributes) { + return Patterns.ask( + resourceClusterManagerActor, + new GetDisabledTaskExecutorsRequest(clusterID, attributes), askTimeout) + .thenApply(TaskExecutorsList.class::cast) + .toCompletableFuture() + .thenApply(l -> l.getTaskExecutors()); + } + @Override public CompletableFuture> getUnregisteredTaskExecutors(Map attributes) { return Patterns.ask( diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index a7035699c..ae22dab24 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -27,6 +27,7 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetDisabledTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest; @@ -122,6 +123,7 @@ public Receive createReceive() { .match(GetRegisteredTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(GetBusyTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) + .match(GetDisabledTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(GetAvailableTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(GetUnregisteredTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(GetTaskExecutorStatusRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java index 9079fec5b..5c47eb9ce 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java @@ -54,6 +54,7 @@ import io.mantisrx.shaded.com.google.common.collect.ImmutableSet; import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.Objects; @@ -283,6 +284,43 @@ public void testGetTaskExecutorsUsage_WithSizeName() throws Exception { assertIdleAndTotalCount(usageRes, "SKU4-JDK17", 2, 2); } + @Test + public void testGetTaskExecutorsUsage_WithDisabledTEs() throws Exception { + registerTE(TaskExecutorRegistration.builder() + .taskExecutorID(TaskExecutorID.of("TE4")) + .clusterID(CLUSTER_ID) + .taskExecutorAddress(TASK_EXECUTOR_ADDRESS) + .hostname(HOST_NAME) + .workerPorts(WORKER_PORTS) + .machineDefinition(MACHINE_DEFINITION_1) + .taskExecutorAttributes( + ImmutableMap.of( + WorkerConstants.WORKER_CONTAINER_DEFINITION_ID, CONTAINER_DEF_ID_1.getResourceID(), + "MANTIS_SCHEDULING_ATTRIBUTE_JDK", "17")) + .build()); + registerTE(TaskExecutorRegistration.builder() + .taskExecutorID(TaskExecutorID.of("TE5")) + .clusterID(CLUSTER_ID) + .taskExecutorAddress(TASK_EXECUTOR_ADDRESS) + .hostname(HOST_NAME) + .workerPorts(WORKER_PORTS) + .machineDefinition(MACHINE_DEFINITION_1) + .taskExecutorAttributes( + ImmutableMap.of( + WorkerConstants.WORKER_CONTAINER_DEFINITION_ID, CONTAINER_DEF_ID_1.getResourceID(), + "MANTIS_SCHEDULING_ATTRIBUTE_JDK", "17")) + .build()); + disableTE(TaskExecutorID.of("TE5")); + + // Test get cluster usage + TestKit probe = new TestKit(actorSystem); + resourceClusterActor.tell(new GetClusterUsageRequest(CLUSTER_ID, ResourceClusterScalerActor.groupKeyFromTaskExecutorDefinitionIdFunc), + probe.getRef()); + GetClusterUsageResponse usageRes = probe.expectMsgClass(GetClusterUsageResponse.class); + assertEquals(3, usageRes.getUsages().size()); + assertIdleAndTotalCount(usageRes, "SKU1-JDK17", 2, 3); + } + @Test public void testGetTaskExecutorsUsage_WithSizeNameAndAllocation() throws Exception { // registering 3 Task Executors with sizeName @@ -415,6 +453,14 @@ private void registerTE(TaskExecutorRegistration registration) throws Exception TaskExecutorReport.available())).get()); } + private void disableTE(TaskExecutorID taskExecutorID) throws Exception { + assertEquals(Ack.getInstance(), resourceCluster.disableTaskExecutorsFor( + null, + Instant.now().plus(Duration.ofHours(1)), + Optional.of(taskExecutorID)) + .get()); + } + private static void assertIdleAndTotalCount(GetClusterUsageResponse usageRes, String skuID, int idleCount, int totalCount) { assertEquals(1, usageRes.getUsages().stream() .filter(usage -> Objects.equals(usage.getUsageGroupKey(), skuID)).count());