From 0b7e629085f985bbed0dab474ae5a43ac3e37cb0 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Wed, 29 Nov 2023 17:20:59 -0800 Subject: [PATCH] Fix losing job on control plane restart (#594) * patch lost job on restart * ut * rename * comment --- .../KeyValueBasedPersistenceProvider.java | 28 ++++--- .../store/MantisJobMetadataWritable.java | 22 ++++- .../store/MantisStageMetadataWritable.java | 54 ++++++------ .../KeyValueBasedPersistenceProviderTest.java | 84 +++++++++++++++++++ 4 files changed, 148 insertions(+), 40 deletions(-) create mode 100644 mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProviderTest.java diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java index d1d54306e..882847629 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java @@ -41,6 +41,7 @@ import io.mantisrx.server.master.store.MantisJobMetadataWritable; import io.mantisrx.server.master.store.MantisStageMetadata; import io.mantisrx.server.master.store.MantisStageMetadataWritable; +import io.mantisrx.server.master.store.MantisWorkerMetadata; import io.mantisrx.server.master.store.MantisWorkerMetadataWritable; import io.mantisrx.server.master.store.NamedJob; import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; @@ -49,7 +50,6 @@ import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import io.mantisrx.shaded.com.google.common.base.Preconditions; import io.mantisrx.shaded.com.google.common.collect.ImmutableList; import io.mantisrx.shaded.com.google.common.collect.Lists; import java.io.IOException; @@ -129,7 +129,9 @@ public class KeyValueBasedPersistenceProvider implements IMantisPersistenceProvi private final KeyValueStore kvStore; private final LifecycleEventPublisher eventPublisher; private final Counter noWorkersFoundCounter; + private final Counter staleWorkersFoundCounter; private final Counter workersFoundCounter; + private final Counter failedToLoadJobCounter; public KeyValueBasedPersistenceProvider(KeyValueStore kvStore, LifecycleEventPublisher eventPublisher) { this.kvStore = kvStore; @@ -137,12 +139,16 @@ public KeyValueBasedPersistenceProvider(KeyValueStore kvStore, LifecycleEventPub Metrics m = new Metrics.Builder() .id("storage") .addCounter("noWorkersFound") + .addCounter("staleWorkerFound") .addCounter("workersFound") + .addCounter("failedToLoadJobCount") .build(); m = MetricsRegistry.getInstance().registerAndGet(m); this.noWorkersFoundCounter = m.getCounter("noWorkersFound"); + this.staleWorkersFoundCounter = m.getCounter("staleWorkerFound"); this.workersFoundCounter = m.getCounter("workersFound"); + this.failedToLoadJobCounter = m.getCounter("failedToLoadJobCount"); } protected String getJobMetadataFieldName() { @@ -394,18 +400,20 @@ public List loadAllJobs() throws IOException { workersFoundCounter.increment(); for (MantisWorkerMetadataWritable workerMeta : workersByJobId.get(jobId)) { - Preconditions.checkState( - jobMeta.addWorkerMedata(workerMeta.getStageNum(), workerMeta, null), - "JobID=%s stage=%d workerIdx=%d has existing worker, existing=%s, new=%s", - workerMeta.getJobId(), - workerMeta.getStageNum(), - workerMeta.getWorkerIndex(), - jobMeta.getWorkerByIndex(workerMeta.getStageNum(), workerMeta.getWorkerIndex()).getWorkerId(), - workerMeta.getWorkerId()); + // If there are duplicate workers on the same stage index, only attach the one with latest + // worker number. The stale workers will not present in the stage metadata thus gets terminated + // when it sends heartbeats to its JobActor. + MantisWorkerMetadata existedWorker = + jobMeta.tryAddOrReplaceWorker(workerMeta.getStageNum(), workerMeta); + if (existedWorker != null) { + logger.error("Encountered duplicate worker {} when adding {}", existedWorker, workerMeta); + staleWorkersFoundCounter.increment(); + } } jobMetas.add(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(jobMeta, eventPublisher)); } catch (Exception e) { - logger.warn("Exception loading job {}", jobId, e); + logger.error("Exception loading job {}", jobId, e); + this.failedToLoadJobCounter.increment(); } } // need to load all workers for the jobMeta and then ensure they are added to jobMetas! diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisJobMetadataWritable.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisJobMetadataWritable.java index 997725051..151842b09 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisJobMetadataWritable.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisJobMetadataWritable.java @@ -177,11 +177,18 @@ public boolean addJobStageIfAbsent(MantisStageMetadataWritable msmd) { return stageMetadataMap.putIfAbsent(msmd.getStageNum(), msmd) == null; } - public boolean addWorkerMedata(int stageNum, MantisWorkerMetadata workerMetadata, MantisWorkerMetadata replacedWorker) - throws InvalidJobException { + /** + * Add the given MantisWorkerMetadata instance to the corresponding stage. + * If the stage worker index already exists, replace it only when the given worker has higher worker number. + * @param stageNum target stage number. + * @param workerMetadata new worker metadata instance. + * @return null if the given worker metadata is added to this job. Otherwise, return the existing worker with + * newer number. + */ + public MantisWorkerMetadata tryAddOrReplaceWorker(int stageNum, MantisWorkerMetadata workerMetadata) { final boolean result = stageMetadataMap.get(stageNum) - .replaceWorkerIndex(workerMetadata, replacedWorker); + .replaceWorkerIndex(workerMetadata); if (result) { Integer integer = workerNumberToStageMap.put(workerMetadata.getWorkerNumber(), stageNum); @@ -189,8 +196,15 @@ public boolean addWorkerMedata(int stageNum, MantisWorkerMetadata workerMetadata logger.error(String.format("Unexpected to put worker number mapping from %d to stage %d for job %s, prev mapping to stage %d", workerMetadata.getWorkerNumber(), stageNum, workerMetadata.getJobId(), integer)); } + return null; + } else { + try { + return stageMetadataMap.get(stageNum).getWorkerByIndex(workerMetadata.getWorkerIndex()); + } catch (InvalidJobException e) { + logger.error("Failed to fetch existing worker when new worker got rejected: {}", workerMetadata, e); + throw new RuntimeException("Failed to fetch existing worker when new worker got rejected", e); + } } - return result; } @JsonIgnore diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisStageMetadataWritable.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisStageMetadataWritable.java index 278f00af3..c26ffffbb 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisStageMetadataWritable.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/store/MantisStageMetadataWritable.java @@ -208,35 +208,37 @@ Collection removeArchiveableWorkers() { return removedWorkers; } - public boolean replaceWorkerIndex(MantisWorkerMetadata newWorker, MantisWorkerMetadata oldWorker) - throws InvalidJobException { + /** + * Use the given new worker to add or replace the target index. + * If the stage worker index already exists, replace it only when the given worker has higher worker number. + * @param newWorker new worker metadata instance. + * @return true if the new worker is used in this stage. + */ + public boolean replaceWorkerIndex(MantisWorkerMetadata newWorker) { int index = newWorker.getWorkerIndex(); boolean result = true; - if (!MantisJobState.isErrorState(newWorker.getState())) { - if (oldWorker == null) { - if (workerByIndexMetadataSet.putIfAbsent(index, newWorker) != null) { - result = false; - } - } else { - if (oldWorker.getWorkerIndex() != index) { - throw new InvalidJobException(newWorker.getJobId(), stageNum, oldWorker.getWorkerIndex()); - } - MantisWorkerMetadata mwmd = workerByIndexMetadataSet.put(index, newWorker); - if (mwmd.getWorkerNumber() != oldWorker.getWorkerNumber()) { - workerByIndexMetadataSet.put(index, mwmd); - result = false; - logger.info("Did not replace worker " + oldWorker.getWorkerNumber() + " with " + - newWorker.getWorkerNumber() + " for index " + newWorker.getWorkerIndex() + " of job " + - jobId + ", different worker " + mwmd.getWorkerNumber() + " exists already"); - } - // else - // logger.info("Replaced worker " + oldWorker.getWorkerNumber() + " with " + newWorker.getWorkerNumber() + - // " for index " + newWorker.getWorkerIndex() + " of job " + jobId); + + if (workerByIndexMetadataSet.containsKey(index)) { + MantisWorkerMetadata existingWorker = workerByIndexMetadataSet.get(index); + int existingWorkerNum = existingWorker.getWorkerNumber(); + if (existingWorkerNum >= newWorker.getWorkerNumber()) { + logger.warn("Encounter stale worker: {} when newer worker exist: {}, ignore.", newWorker, + existingWorker); + result = false; + } + else { + logger.warn("Replace stale worker {} with {}.", existingWorker, newWorker); + addNewWorker(newWorker); } - } else if (oldWorker != null) - result = false; - if (result) - workerByNumberMetadataSet.put(newWorker.getWorkerNumber(), newWorker); + } else { + addNewWorker(newWorker); + } + return result; } + + private void addNewWorker(MantisWorkerMetadata newWorker) { + workerByIndexMetadataSet.put(newWorker.getWorkerIndex(), newWorker); + workerByNumberMetadataSet.put(newWorker.getWorkerNumber(), newWorker); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProviderTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProviderTest.java new file mode 100644 index 000000000..0192663fd --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProviderTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.server.master.persistence; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import io.mantisrx.master.events.LifecycleEventPublisher; +import io.mantisrx.master.jobcluster.job.IMantisJobMetadata; +import io.mantisrx.server.master.persistence.exceptions.InvalidJobException; +import io.mantisrx.server.master.store.KeyValueStore; +import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import org.junit.Test; +import org.mockito.Mockito; + +public class KeyValueBasedPersistenceProviderTest { + + @Test + public void testLoadJobWithDupeWorker() throws IOException, InvalidJobException { + KeyValueStore kvStore = Mockito.mock(KeyValueStore.class); + when(kvStore.getAllRows("MantisWorkers")) + .thenReturn(ImmutableMap.of( + "testjob-1-1000", ImmutableMap.of( + "1-0-2", + "{\"workerIndex\":0,\"workerNumber\":2,\"jobId\":\"testjob-1\",\"stageNum\":1," + + "\"numberOfPorts\":5,\"metricsPort\":1051,\"consolePort\":1053,\"debugPort\":1052," + + "\"customPort\":1054,\"ports\":[1055]}"), + "testjob-1-2000", ImmutableMap.of( + "1-0-1", + "{\"workerIndex\":0,\"workerNumber\":1,\"jobId\":\"testjob-1\",\"stageNum\":1," + + "\"numberOfPorts\":5,\"metricsPort\":1051,\"consolePort\":1053,\"debugPort\":1052," + + "\"customPort\":1054,\"ports\":[1055]}") + )); + + when(kvStore.getAllRows("MantisJobStageData")) + .thenReturn(ImmutableMap.of( + "testjob-1", + ImmutableMap.of( + "stageMetadata-1", + "{\"jobId\":\"testjob-1\",\"stageNum\":1,\"numStages\":1,\"numWorkers\":1," + + "\"machineDefinition\":{\"cpuCores\":2.0,\"memoryMB\":5000.0,\"networkMbps\":1000.0," + + "\"diskMB\":8192.0,\"numPorts\":1}}", + "jobMetadata", + "{\"jobId\":\"testjob-1\",\"name\":\"testjob\",\"user\":\"mantisoss\"," + + "\"submittedAt\":1691103290000,\"startedAt\":1691103300002,\"jarUrl\":\"http://testjob1.zip\"," + + "\"numStages\":1,\"state\":\"Launched\",\"parameters\":[],\"nextWorkerNumberToUse\":3," + + "\"sla\":{\"runtimeLimitSecs\":0,\"minRuntimeSecs\":0,\"slaType\":\"Lossy\"," + + "\"durationType\":\"Perpetual\",\"userProvidedType\":\"\"}" + + "}" + ))); + + LifecycleEventPublisher eventPublisher = Mockito.mock(LifecycleEventPublisher.class); + + KeyValueBasedPersistenceProvider kvProvider = + new KeyValueBasedPersistenceProvider(kvStore, eventPublisher); + + // loading two workers on same index should retain the one with newer worker number. + List iMantisJobMetadata = kvProvider.loadAllJobs(); + assertNotNull(iMantisJobMetadata); + assertEquals(1, iMantisJobMetadata.size()); + assertEquals("testjob-1", iMantisJobMetadata.get(0).getJobId().getId()); + assertTrue(iMantisJobMetadata.get(0).getStageMetadata(1).isPresent()); + assertEquals(2, + iMantisJobMetadata.get(0).getStageMetadata(1).get().getWorkerByIndex(0).getMetadata().getWorkerNumber()); + } +}