Skip to content

Commit

Permalink
Cleanup logic from handoff API (apache#16457)
Browse files Browse the repository at this point in the history
* Cleanup logic from handoff API

* Fix test

* Fix checkstyle

* Update docs
  • Loading branch information
George Shiqi Wu authored May 16, 2024
1 parent 435b58f commit ed9881d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 26 deletions.
12 changes: 1 addition & 11 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3594,17 +3594,7 @@ Content-Type: application/json

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"taskGroupIds": [
1,
2,
3
]
}
```
(empty response)
</details>

### Shut down a supervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull fin
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
return Response.ok().build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public class TaskGroup
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action

boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly
boolean handoffEarly = false; // set by SupervisorManager.stopTaskGroupEarly

TaskGroup(
int groupId,
Expand Down Expand Up @@ -268,14 +268,14 @@ Set<String> taskIds()
return tasks.keySet();
}

void setShutdownEarly()
void setHandoffEarly()
{
shutdownEarly = true;
handoffEarly = true;
}

Boolean getShutdownEarly()
Boolean getHandoffEarly()
{
return shutdownEarly;
return handoffEarly;
}

@VisibleForTesting
Expand Down Expand Up @@ -690,8 +690,8 @@ public void handle()
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}

taskGroup.setShutdownEarly();
log.info("Task group [%d] for supervisor [%s] will handoff early.", taskGroupId, supervisorId);
taskGroup.setHandoffEarly();
}
}

Expand Down Expand Up @@ -3194,15 +3194,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getHandoffEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
< ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
Expand Down Expand Up @@ -95,9 +94,13 @@ default Boolean isHealthy()

int getActiveTaskGroupsCount();

/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
throw new NotImplementedException("Supervisor does not have the feature to handoff task groups early implemented");
throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.indexing;

import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
Expand Down Expand Up @@ -97,7 +96,7 @@ public void testNoppSupervisorStopTaskEarlyDoNothing()
{
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
Supervisor noOpSupervisor = expectedSpec.createSupervisor();
Assert.assertThrows(NotImplementedException.class,
Assert.assertThrows(UnsupportedOperationException.class,
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}
Expand Down

0 comments on commit ed9881d

Please sign in to comment.