Skip to content

Commit

Permalink
Cleaning up logic related to when task executor is considered as regi…
Browse files Browse the repository at this point in the history
…stered (#491)

* Updating deps

* Cleaning up logic related to when task executors are considered registered

* Adding comments

* Clearing registered only when heartbeats fail successfully

---------

Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
  • Loading branch information
sundargates and sundargates authored Jul 25, 2023
1 parent 48d8166 commit 08917f7
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 7 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ ext.libraries = [
],
hadoopCommon : "org.apache.hadoop:hadoop-common:${versions.hadoop}",
hadoopS3 : "org.apache.hadoop:hadoop-aws:${versions.hadoop}",
httpClient : "org.apache.httpcomponents:httpclient:4.5.14",
jsr305 : "com.google.code.findbugs:jsr305:${versions.jsr305}", // For Nonnull annotation
junitJupiter : [
"org.junit.jupiter:junit-jupiter-api:${versions.junit5}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ apply plugin: 'mantis'
configurations.all {
resolutionStrategy {
force "com.google.guava:guava:31.1-jre"
force "org.apache.httpcomponents:httpclient:4.5.9"
}
}

Expand Down
1 change: 0 additions & 1 deletion mantis-examples/mantis-examples-wordcount/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ apply plugin: 'mantis'
configurations.all {
resolutionStrategy {
force "com.google.guava:guava:31.1-jre"
force "org.apache.httpcomponents:httpclient:4.5.9"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ static class ResourceManagerGatewayCxn extends AbstractScheduledService {
private final int tolerableConsecutiveHeartbeatFailures;

private int numFailedHeartbeats = 0;
// flag representing if the task executor has been registered with the resource manager
@Getter
private volatile boolean registered = false;

@Override
protected String serviceName() {
Expand All @@ -384,6 +387,7 @@ public void startUp() throws Exception {
gateway
.registerTaskExecutor(taskExecutorRegistration)
.get(heartBeatTimeout.getSize(), heartBeatTimeout.getUnit());
registered = true;
} catch (Exception e) {
// the registration may or may not have succeeded. Since we don't know let's just
// do the disconnection just to be safe.
Expand Down Expand Up @@ -413,13 +417,15 @@ public void runOneIteration() throws Exception {
})
.get(heartBeatTimeout.getSize(), heartBeatTimeout.getUnit());

// the heartbeat was successful, let's reset the counter.
// the heartbeat was successful, let's reset the counter and set the registered flag
numFailedHeartbeats = 0;
registered = true;
} catch (Exception e) {
log.error("Failed to send heartbeat to gateway {}", gateway, e);
// increase the number of failed heartbeats by 1
// increase the number of failed heartbeats by 1 and clear the registered flag
numFailedHeartbeats += 1;
if (numFailedHeartbeats > tolerableConsecutiveHeartbeatFailures) {
registered = false;
throw e;
} else {
log.info("Ignoring heartbeat failure to gateway {} due to failed heartbeats {} <= {}",
Expand All @@ -430,6 +436,7 @@ public void runOneIteration() throws Exception {

@Override
public void shutDown() throws Exception {
registered = false;
gateway
.disconnectTaskExecutor(
new TaskExecutorDisconnection(taskExecutorRegistration.getTaskExecutorID(),
Expand Down Expand Up @@ -658,7 +665,7 @@ public CompletableFuture<String> requestThreadDump() {

@Override
public CompletableFuture<Boolean> isRegistered() {
return callAsync(() -> this.currentResourceManagerCxn != null, DEFAULT_TIMEOUT);
return callAsync(() -> this.currentResourceManagerCxn != null && this.currentResourceManagerCxn.isRegistered(), DEFAULT_TIMEOUT);
}

CompletableFuture<Boolean> isRegistered(Time timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ protected void shutDown() throws Exception {
.get();
}

public TaskExecutor getTaskExecutor() {
return this.taskExecutor;
}

public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfiguration) {
return new TaskExecutorStarterBuilder(workerConfiguration);
}
Expand Down
3 changes: 1 addition & 2 deletions mantis-server/mantis-server-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ apply plugin: 'eu.appsatori.fatjar'
ext {
mantisRxControlVersion = '1.3.+'
mesosVersion = '1.7.2'
httpComponentsVersion = '4.5.6'
}

dependencies {
Expand All @@ -36,7 +35,7 @@ dependencies {
exclude group: 'com.github.spullara.cli-parser', module: 'cli-parser'
exclude group: 'org.pentaho.pentaho-commons', module: 'pentaho-package-manager'
}
implementation "org.apache.httpcomponents:httpclient:$httpComponentsVersion"
implementation libraries.httpClient
implementation "io.mantisrx:mantis-rxcontrol:$mantisRxControlVersion"
implementation "com.yahoo.datasketches:sketches-core:0.9.1"

Expand Down

0 comments on commit 08917f7

Please sign in to comment.