From 08917f757bd6f3a73f1f19a5ac1ea7c8826b53ef Mon Sep 17 00:00:00 2001 From: sundargates Date: Tue, 25 Jul 2023 10:03:35 -0700 Subject: [PATCH] Cleaning up logic related to when task executor is considered as registered (#491) * Updating deps * Cleaning up logic related to when task executors are considered registered * Adding comments * Clearing registered only when heartbeats fail successfully --------- Co-authored-by: Sundaram Ananthanarayanan --- build.gradle | 1 + .../mantis-examples-twitter-sample/build.gradle | 1 - .../mantis-examples-wordcount/build.gradle | 1 - .../java/io/mantisrx/server/agent/TaskExecutor.java | 13 ++++++++++--- .../mantisrx/server/agent/TaskExecutorStarter.java | 4 ++++ mantis-server/mantis-server-worker/build.gradle | 3 +-- 6 files changed, 16 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 0786077a2..ffc9540ae 100644 --- a/build.gradle +++ b/build.gradle @@ -65,6 +65,7 @@ ext.libraries = [ ], hadoopCommon : "org.apache.hadoop:hadoop-common:${versions.hadoop}", hadoopS3 : "org.apache.hadoop:hadoop-aws:${versions.hadoop}", + httpClient : "org.apache.httpcomponents:httpclient:4.5.14", jsr305 : "com.google.code.findbugs:jsr305:${versions.jsr305}", // For Nonnull annotation junitJupiter : [ "org.junit.jupiter:junit-jupiter-api:${versions.junit5}", diff --git a/mantis-examples/mantis-examples-twitter-sample/build.gradle b/mantis-examples/mantis-examples-twitter-sample/build.gradle index 2a4ec68ba..e37a78a81 100644 --- a/mantis-examples/mantis-examples-twitter-sample/build.gradle +++ b/mantis-examples/mantis-examples-twitter-sample/build.gradle @@ -4,7 +4,6 @@ apply plugin: 'mantis' configurations.all { resolutionStrategy { force "com.google.guava:guava:31.1-jre" - force "org.apache.httpcomponents:httpclient:4.5.9" } } diff --git a/mantis-examples/mantis-examples-wordcount/build.gradle b/mantis-examples/mantis-examples-wordcount/build.gradle index 5d12e0812..6e54eb8bc 100644 --- a/mantis-examples/mantis-examples-wordcount/build.gradle +++ b/mantis-examples/mantis-examples-wordcount/build.gradle @@ -4,7 +4,6 @@ apply plugin: 'mantis' configurations.all { resolutionStrategy { force "com.google.guava:guava:31.1-jre" - force "org.apache.httpcomponents:httpclient:4.5.9" } } diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java index 99b15304d..f19006bc3 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java @@ -363,6 +363,9 @@ static class ResourceManagerGatewayCxn extends AbstractScheduledService { private final int tolerableConsecutiveHeartbeatFailures; private int numFailedHeartbeats = 0; + // flag representing if the task executor has been registered with the resource manager + @Getter + private volatile boolean registered = false; @Override protected String serviceName() { @@ -384,6 +387,7 @@ public void startUp() throws Exception { gateway .registerTaskExecutor(taskExecutorRegistration) .get(heartBeatTimeout.getSize(), heartBeatTimeout.getUnit()); + registered = true; } catch (Exception e) { // the registration may or may not have succeeded. Since we don't know let's just // do the disconnection just to be safe. @@ -413,13 +417,15 @@ public void runOneIteration() throws Exception { }) .get(heartBeatTimeout.getSize(), heartBeatTimeout.getUnit()); - // the heartbeat was successful, let's reset the counter. + // the heartbeat was successful, let's reset the counter and set the registered flag numFailedHeartbeats = 0; + registered = true; } catch (Exception e) { log.error("Failed to send heartbeat to gateway {}", gateway, e); - // increase the number of failed heartbeats by 1 + // increase the number of failed heartbeats by 1 and clear the registered flag numFailedHeartbeats += 1; if (numFailedHeartbeats > tolerableConsecutiveHeartbeatFailures) { + registered = false; throw e; } else { log.info("Ignoring heartbeat failure to gateway {} due to failed heartbeats {} <= {}", @@ -430,6 +436,7 @@ public void runOneIteration() throws Exception { @Override public void shutDown() throws Exception { + registered = false; gateway .disconnectTaskExecutor( new TaskExecutorDisconnection(taskExecutorRegistration.getTaskExecutorID(), @@ -658,7 +665,7 @@ public CompletableFuture requestThreadDump() { @Override public CompletableFuture isRegistered() { - return callAsync(() -> this.currentResourceManagerCxn != null, DEFAULT_TIMEOUT); + return callAsync(() -> this.currentResourceManagerCxn != null && this.currentResourceManagerCxn.isRegistered(), DEFAULT_TIMEOUT); } CompletableFuture isRegistered(Time timeout) { diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index f5622ab7f..cf5017e94 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -81,6 +81,10 @@ protected void shutDown() throws Exception { .get(); } + public TaskExecutor getTaskExecutor() { + return this.taskExecutor; + } + public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfiguration) { return new TaskExecutorStarterBuilder(workerConfiguration); } diff --git a/mantis-server/mantis-server-worker/build.gradle b/mantis-server/mantis-server-worker/build.gradle index f635acd61..16e53a859 100644 --- a/mantis-server/mantis-server-worker/build.gradle +++ b/mantis-server/mantis-server-worker/build.gradle @@ -19,7 +19,6 @@ apply plugin: 'eu.appsatori.fatjar' ext { mantisRxControlVersion = '1.3.+' mesosVersion = '1.7.2' - httpComponentsVersion = '4.5.6' } dependencies { @@ -36,7 +35,7 @@ dependencies { exclude group: 'com.github.spullara.cli-parser', module: 'cli-parser' exclude group: 'org.pentaho.pentaho-commons', module: 'pentaho-package-manager' } - implementation "org.apache.httpcomponents:httpclient:$httpComponentsVersion" + implementation libraries.httpClient implementation "io.mantisrx:mantis-rxcontrol:$mantisRxControlVersion" implementation "com.yahoo.datasketches:sketches-core:0.9.1"