Skip to content

Commit

Permalink
Handle TE gateway connection error cases (#554)
Browse files Browse the repository at this point in the history
* Handle TE gateway reconnect

* comments
  • Loading branch information
Andyz26 authored Sep 13, 2023
1 parent eafa3b9 commit 731f3db
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ CompletableFuture<TaskExecutorID> getTaskExecutorFor(
*/
CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(TaskExecutorID taskExecutorID);

CompletableFuture<TaskExecutorGateway> reconnectTaskExecutorGateway(TaskExecutorID taskExecutorID);

CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public Receive createReceive() {
.match(ResourceOverviewRequest.class, this::onResourceOverviewRequest)
.match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest)
.match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest)
.match(TaskExecutorGatewayReconnectRequest.class, this::onTaskExecutorGatewayReconnectRequest)
.match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest)
.match(CheckDisabledTaskExecutors.class, this::findAndMarkDisabledTaskExecutors)
.match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry)
Expand Down Expand Up @@ -349,13 +350,15 @@ private void onAssignedTaskExecutorRequest(GetAssignedTaskExecutorRequest reques
private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) {
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state == null) {
sender().tell(new Exception(), self());
sender().tell(new NullPointerException("Null TaskExecutorState for: " + request.getTaskExecutorID()), self());
} else {
try {
if (state.isRegistered()) {
sender().tell(state.getGateway(), self());
} else {
sender().tell(new Status.Failure(new Exception("")), self());
sender().tell(
new Status.Failure(new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())),
self());
}
} catch (Exception e) {
metrics.incrementCounter(
Expand All @@ -382,6 +385,36 @@ private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) {
}
}

private void onTaskExecutorGatewayReconnectRequest(TaskExecutorGatewayReconnectRequest request) {
log.info("Requesting to reconnect to TaskExecutor: {}", request);
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state == null) {
sender().tell(
new Status.Failure(new NullPointerException("Null TaskExecutor state: " + request.getTaskExecutorID())),
self());
} else {
try {
if (state.isRegistered()) {
sender().tell(state.reconnect().join(), self());
} else {
sender().tell(
new Status.Failure(
new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())),
self());
}
} catch (Exception e) {
metrics.incrementCounter(
ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE,
TagList.create(ImmutableMap.of(
"resourceCluster",
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
sender().tell(new Status.Failure(new ConnectionFailedException(e)), self());
}
}
}

// custom equals function to check if the existing set already has the request under consideration.
private boolean addNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest newRequest) {
if (newRequest.isRequestByAttributes()) {
Expand Down Expand Up @@ -868,6 +901,13 @@ static class TaskExecutorGatewayRequest {
ClusterID clusterID;
}

@Value
static class TaskExecutorGatewayReconnectRequest {
TaskExecutorID taskExecutorID;

ClusterID clusterID;
}

@Value
static class GetRegisteredTaskExecutorsRequest implements HasAttributes {
ClusterID clusterID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList;
Expand Down Expand Up @@ -202,6 +203,17 @@ public CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(
.toCompletableFuture();
}

@Override
public CompletableFuture<TaskExecutorGateway> reconnectTaskExecutorGateway(
TaskExecutorID taskExecutorID) {
return
Patterns
.ask(resourceClusterManagerActor, new TaskExecutorGatewayReconnectRequest(taskExecutorID, clusterID),
askTimeout)
.thenApply(TaskExecutorGateway.class::cast)
.toCompletableFuture();
}

@Override
public CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
Expand Down Expand Up @@ -143,6 +144,8 @@ public Receive createReceive() {
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayReconnectRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(DisableTaskExecutorsRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(AddNewJobArtifactsToCacheRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)
} catch (Exception e) {
// we are not able to get the gateway, which either means the node is not great or some transient network issue
// we will retry the request
log.error(
log.warn(
"Failed to establish connection with the task executor {}; Resubmitting the request",
event.getTaskExecutorID(), e);
connectionFailures.increment();
Expand Down Expand Up @@ -188,25 +188,29 @@ private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) {

private void onSubmittedScheduleRequestEvent(SubmittedScheduleRequestEvent event) {
final TaskExecutorID taskExecutorID = event.getTaskExecutorID();
final TaskExecutorRegistration info = resourceCluster.getTaskExecutorInfo(taskExecutorID)
.join();
boolean success =
jobMessageRouter.routeWorkerEvent(new WorkerLaunched(
event.getEvent().getRequest().getWorkerId(),
event.getEvent().getRequest().getStageNum(),
info.getHostname(),
taskExecutorID.getResourceId(),
Optional.ofNullable(info.getClusterID().getResourceID()),
Optional.of(info.getClusterID()),
info.getWorkerPorts()));
final Duration latency =
Duration.between(event.getEvent().getEventTime(), Clock.systemDefaultZone().instant());
schedulingLatency.record(latency.toNanos(), TimeUnit.NANOSECONDS);

if (!success) {
log.error(
"Routing message to jobMessageRouter was never expected to fail but it has failed to event {}",
event);
try {
final TaskExecutorRegistration info = resourceCluster.getTaskExecutorInfo(taskExecutorID)
.join();
boolean success =
jobMessageRouter.routeWorkerEvent(new WorkerLaunched(
event.getEvent().getRequest().getWorkerId(),
event.getEvent().getRequest().getStageNum(),
info.getHostname(),
taskExecutorID.getResourceId(),
Optional.ofNullable(info.getClusterID().getResourceID()),
Optional.of(info.getClusterID()),
info.getWorkerPorts()));
final Duration latency =
Duration.between(event.getEvent().getEventTime(), Clock.systemDefaultZone().instant());
schedulingLatency.record(latency.toNanos(), TimeUnit.NANOSECONDS);

if (!success) {
log.error(
"Routing message to jobMessageRouter was never expected to fail but it has failed to event {}",
event);
}
} catch (Exception ex) {
log.warn("Failed to route message due to error in getting TaskExecutor info: {}", taskExecutorID, ex);
}
}

Expand All @@ -216,6 +220,15 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE
event.getScheduleRequestEvent().getRequest().getWorkerId(),
event.getScheduleRequestEvent().getRequest().getStageNum(),
Throwables.getStackTraceAsString(event.throwable)));

try {
resourceCluster.reconnectTaskExecutorGateway(event.getTaskExecutorID()).join();
} catch (Exception e) {
log.warn(
"Failed to establish re-connection with the task executor {} on failed schedule request",
event.getTaskExecutorID(), e);
connectionFailures.increment();
}
}

private void onCancelRequestEvent(CancelRequestEvent event) {
Expand Down

0 comments on commit 731f3db

Please sign in to comment.