Skip to content

Commit

Permalink
allow failures setting nextWorkerNumberToUse (#626)
Browse files Browse the repository at this point in the history
Issue:
Sometimes, the underlying store isn't available for the extensive writes made to update nextWorkerNumberToUse. An exception is thrown cascaded upstream preventing further writes and updates to worker from JobActor.
To handle it, we tolerate some (consecutive) write failures for this call before propagating the exception upstream.

Invariant:
This is fine from a constraints perspective. Here's why:
On new mantis master leader election, job and worker state is read from the DB. If the worker count was synced correctly no issue.
If not and there were extra workers than workerMax stored in DB, those extra workers will be killed when TEs send heartbeat.
If not and workerMax in DB is higher than actual workers, we'll schedule those workers on leader election.
  • Loading branch information
hmitnflx authored Apr 11, 2024
1 parent 0332745 commit 8effc98
Showing 1 changed file with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,8 @@ static long getHeartbeatIntervalSecs(final IMantisJobMetadata mjmd) {
static class WorkerNumberGenerator {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkerNumberGenerator.class);
private static final int MAX_ATTEMPTS = 10;
private static final long SLEEP_DURATION_MS = Duration.ofSeconds(2).toMillis();
private static final int DEFAULT_INCREMENT_STEP = 10;
private final int incrementStep;
private int lastUsed;
Expand Down Expand Up @@ -1208,15 +1210,32 @@ static class WorkerNumberGenerator {

private void advance(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore) {
try {
currLimit += incrementStep;

mantisJobMetaData.setNextWorkerNumberToUse(currLimit, jobStore);
final int value = currLimit + incrementStep;
// If store operations fail, extraneous workers will be killed since currLimit would be lower
setNextWorkerNumberWithRetries(mantisJobMetaData, jobStore, value);
currLimit = value;
} catch (Exception e) {
hasErrored = true;
LOGGER.error("Exception setting next Worker number to use ", e);
LOGGER.error("Exception setting nextWorkerNumberToUse after {} consecutive attempts", MAX_ATTEMPTS, e);
throw new RuntimeException("Unexpected error setting next worker number to use", e);
}
}
private void setNextWorkerNumberWithRetries(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore, int value) throws Exception {
int attempts = 0;
Exception exception = null;
while (attempts < MAX_ATTEMPTS) {
try {
mantisJobMetaData.setNextWorkerNumberToUse(value, jobStore);
return;
} catch (Exception e) {
LOGGER.warn("Failed to setNextWorkerNumberToUse to {} (attempt {}/{})", value, attempts, MAX_ATTEMPTS, e);
exception = e;
}
Thread.sleep(SLEEP_DURATION_MS);
attempts++;
}
throw exception;
}

/**
* Get the next unused worker number.
Expand Down Expand Up @@ -2020,8 +2039,7 @@ public void processEvent(WorkerEvent event, JobState jobState) {
+ "worker",
event.getWorkerId(),
currentWorkerNum);
}
else if (currentWorkerNum < eventWorkerNum) {
} else if (currentWorkerNum < eventWorkerNum) {
// this case should not happen as new worker assignment should update state and persist first.
LOGGER.error(
"[Corrupted state] Newer worker num received: {}, Current stage worker: {}",
Expand Down

0 comments on commit 8effc98

Please sign in to comment.