From 8effc98d9fa9e24f4c170889826f40355c083e1c Mon Sep 17 00:00:00 2001 From: hmitnflx <100323213+hmitnflx@users.noreply.github.com> Date: Thu, 11 Apr 2024 13:56:20 -0700 Subject: [PATCH] allow failures setting nextWorkerNumberToUse (#626) Issue: Sometimes, the underlying store isn't available for the extensive writes made to update nextWorkerNumberToUse. An exception is thrown cascaded upstream preventing further writes and updates to worker from JobActor. To handle it, we tolerate some (consecutive) write failures for this call before propagating the exception upstream. Invariant: This is fine from a constraints perspective. Here's why: On new mantis master leader election, job and worker state is read from the DB. If the worker count was synced correctly no issue. If not and there were extra workers than workerMax stored in DB, those extra workers will be killed when TEs send heartbeat. If not and workerMax in DB is higher than actual workers, we'll schedule those workers on leader election. --- .../master/jobcluster/job/JobActor.java | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index 3835c65e1..06f0c54db 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1175,6 +1175,8 @@ static long getHeartbeatIntervalSecs(final IMantisJobMetadata mjmd) { static class WorkerNumberGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(WorkerNumberGenerator.class); + private static final int MAX_ATTEMPTS = 10; + private static final long SLEEP_DURATION_MS = Duration.ofSeconds(2).toMillis(); private static final int DEFAULT_INCREMENT_STEP = 10; private final int incrementStep; private int lastUsed; @@ -1208,15 +1210,32 @@ static class WorkerNumberGenerator { private void advance(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore) { try { - currLimit += incrementStep; - - mantisJobMetaData.setNextWorkerNumberToUse(currLimit, jobStore); + final int value = currLimit + incrementStep; + // If store operations fail, extraneous workers will be killed since currLimit would be lower + setNextWorkerNumberWithRetries(mantisJobMetaData, jobStore, value); + currLimit = value; } catch (Exception e) { hasErrored = true; - LOGGER.error("Exception setting next Worker number to use ", e); + LOGGER.error("Exception setting nextWorkerNumberToUse after {} consecutive attempts", MAX_ATTEMPTS, e); throw new RuntimeException("Unexpected error setting next worker number to use", e); } } + private void setNextWorkerNumberWithRetries(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore, int value) throws Exception { + int attempts = 0; + Exception exception = null; + while (attempts < MAX_ATTEMPTS) { + try { + mantisJobMetaData.setNextWorkerNumberToUse(value, jobStore); + return; + } catch (Exception e) { + LOGGER.warn("Failed to setNextWorkerNumberToUse to {} (attempt {}/{})", value, attempts, MAX_ATTEMPTS, e); + exception = e; + } + Thread.sleep(SLEEP_DURATION_MS); + attempts++; + } + throw exception; + } /** * Get the next unused worker number. @@ -2020,8 +2039,7 @@ public void processEvent(WorkerEvent event, JobState jobState) { + "worker", event.getWorkerId(), currentWorkerNum); - } - else if (currentWorkerNum < eventWorkerNum) { + } else if (currentWorkerNum < eventWorkerNum) { // this case should not happen as new worker assignment should update state and persist first. LOGGER.error( "[Corrupted state] Newer worker num received: {}, Current stage worker: {}",