Skip to content

Commit

Permalink
Add GetDisabledTaskExecutors Api + scaler handling on disabled TEs (#664
Browse files Browse the repository at this point in the history
)

* add getDisabledTE api

* Add rcmActor fwd

* comment

* include disabled TEs in usage
  • Loading branch information
Andyz26 authored Apr 30, 2024
1 parent 19082dc commit 22b0a28
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ default CompletableFuture<List<TaskExecutorID>> getBusyTaskExecutors() {

CompletableFuture<List<TaskExecutorID>> getBusyTaskExecutors(Map<String, String> attributes);

CompletableFuture<List<TaskExecutorID>> getDisabledTaskExecutors(Map<String, String> attributes);

default CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors() {
return getUnregisteredTaskExecutors(Collections.emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
* /api/v1/resourceClusters/{}/getResourceOverview (GET)
* /api/v1/resourceClusters/{}/getRegisteredTaskExecutors (GET)
* /api/v1/resourceClusters/{}/getBusyTaskExecutors (GET)
* /api/v1/resourceClusters/{}/getDisabledTaskExecutors (GET)
* /api/v1/resourceClusters/{}/getAvailableTaskExecutors (GET)
* /api/v1/resourceClusters/{}/getUnregisteredTaskExecutors (GET)
* /api/v1/resourceClusters/{}/scaleSku (POST)
Expand Down Expand Up @@ -216,6 +217,12 @@ protected Route constructRoutes() {
(clusterName) -> pathEndOrSingleSlash(() -> concat(
get(() -> mkTaskExecutorsRoute(getClusterID(clusterName), (rc, req) -> rc.getBusyTaskExecutors(req.getAttributes())))))
),
// /{}/getDisabledTaskExecutors
path(
PathMatchers.segment().slash("getDisabledTaskExecutors"),
(clusterName) -> pathEndOrSingleSlash(() -> concat(
get(() -> mkTaskExecutorsRoute(getClusterID(clusterName), (rc, req) -> rc.getDisabledTaskExecutors(req.getAttributes())))))
),
// /{}/getAvailableTaskExecutors
path(
PathMatchers.segment().slash("getAvailableTaskExecutors"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DisableTaskExecutorsRequest {
Optional<TaskExecutorID> taskExecutorID;

boolean isRequestByAttributes() {
return attributes.size() > 0;
return attributes != null && attributes.size() > 0;
}

boolean isExpired(Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
e -> e.getValue().isAvailable();

Predicate<Entry<TaskExecutorID, TaskExecutorState>> isDisabled =
e -> e.getValue().isDisabled();
e -> e.getValue().isDisabled() && e.getValue().isRegistered();

Predicate<Entry<TaskExecutorID, TaskExecutorState>> isAssigned =
e -> e.getValue().isAssigned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,6 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
return;
}

// do not count the disabled TEs.
if (value.isDisabled()) {
return;
}

Optional<String> groupKeyO =
req.getGroupKeyFunc().apply(value.getRegistration());

Expand All @@ -437,7 +432,7 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
String groupKey = groupKeyO.get();

Pair<Integer, Integer> kvState = Pair.of(
value.isAvailable() ? 1 : 0,
value.isAvailable() && !value.isDisabled() ? 1 : 0,
value.isRegistered() ? 1 : 0);

if (usageByGroupKey.containsKey(groupKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetDisabledTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest;
Expand Down Expand Up @@ -118,6 +119,16 @@ public CompletableFuture<List<TaskExecutorID>> getBusyTaskExecutors(Map<String,
.thenApply(l -> l.getTaskExecutors());
}

@Override
public CompletableFuture<List<TaskExecutorID>> getDisabledTaskExecutors(Map<String, String> attributes) {
return Patterns.ask(
resourceClusterManagerActor,
new GetDisabledTaskExecutorsRequest(clusterID, attributes), askTimeout)
.thenApply(TaskExecutorsList.class::cast)
.toCompletableFuture()
.thenApply(l -> l.getTaskExecutors());
}

@Override
public CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors(Map<String, String> attributes) {
return Patterns.ask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetDisabledTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest;
Expand Down Expand Up @@ -122,6 +123,7 @@ public Receive createReceive() {

.match(GetRegisteredTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
.match(GetBusyTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
.match(GetDisabledTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
.match(GetAvailableTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
.match(GetUnregisteredTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
.match(GetTaskExecutorStatusRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.mantisrx.shaded.com.google.common.collect.ImmutableSet;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
Expand Down Expand Up @@ -283,6 +284,43 @@ public void testGetTaskExecutorsUsage_WithSizeName() throws Exception {
assertIdleAndTotalCount(usageRes, "SKU4-JDK17", 2, 2);
}

@Test
public void testGetTaskExecutorsUsage_WithDisabledTEs() throws Exception {
registerTE(TaskExecutorRegistration.builder()
.taskExecutorID(TaskExecutorID.of("TE4"))
.clusterID(CLUSTER_ID)
.taskExecutorAddress(TASK_EXECUTOR_ADDRESS)
.hostname(HOST_NAME)
.workerPorts(WORKER_PORTS)
.machineDefinition(MACHINE_DEFINITION_1)
.taskExecutorAttributes(
ImmutableMap.of(
WorkerConstants.WORKER_CONTAINER_DEFINITION_ID, CONTAINER_DEF_ID_1.getResourceID(),
"MANTIS_SCHEDULING_ATTRIBUTE_JDK", "17"))
.build());
registerTE(TaskExecutorRegistration.builder()
.taskExecutorID(TaskExecutorID.of("TE5"))
.clusterID(CLUSTER_ID)
.taskExecutorAddress(TASK_EXECUTOR_ADDRESS)
.hostname(HOST_NAME)
.workerPorts(WORKER_PORTS)
.machineDefinition(MACHINE_DEFINITION_1)
.taskExecutorAttributes(
ImmutableMap.of(
WorkerConstants.WORKER_CONTAINER_DEFINITION_ID, CONTAINER_DEF_ID_1.getResourceID(),
"MANTIS_SCHEDULING_ATTRIBUTE_JDK", "17"))
.build());
disableTE(TaskExecutorID.of("TE5"));

// Test get cluster usage
TestKit probe = new TestKit(actorSystem);
resourceClusterActor.tell(new GetClusterUsageRequest(CLUSTER_ID, ResourceClusterScalerActor.groupKeyFromTaskExecutorDefinitionIdFunc),
probe.getRef());
GetClusterUsageResponse usageRes = probe.expectMsgClass(GetClusterUsageResponse.class);
assertEquals(3, usageRes.getUsages().size());
assertIdleAndTotalCount(usageRes, "SKU1-JDK17", 2, 3);
}

@Test
public void testGetTaskExecutorsUsage_WithSizeNameAndAllocation() throws Exception {
// registering 3 Task Executors with sizeName
Expand Down Expand Up @@ -415,6 +453,14 @@ private void registerTE(TaskExecutorRegistration registration) throws Exception
TaskExecutorReport.available())).get());
}

private void disableTE(TaskExecutorID taskExecutorID) throws Exception {
assertEquals(Ack.getInstance(), resourceCluster.disableTaskExecutorsFor(
null,
Instant.now().plus(Duration.ofHours(1)),
Optional.of(taskExecutorID))
.get());
}

private static void assertIdleAndTotalCount(GetClusterUsageResponse usageRes, String skuID, int idleCount, int totalCount) {
assertEquals(1, usageRes.getUsages().stream()
.filter(usage -> Objects.equals(usage.getUsageGroupKey(), skuID)).count());
Expand Down

0 comments on commit 22b0a28

Please sign in to comment.