Skip to content

Commit

Permalink
Fix losing job on control plane restart (#594)
Browse files Browse the repository at this point in the history
* patch lost job on restart

* ut

* rename

* comment
  • Loading branch information
Andyz26 authored Nov 30, 2023
1 parent fe6604d commit 0b7e629
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.mantisrx.server.master.store.MantisJobMetadataWritable;
import io.mantisrx.server.master.store.MantisStageMetadata;
import io.mantisrx.server.master.store.MantisStageMetadataWritable;
import io.mantisrx.server.master.store.MantisWorkerMetadata;
import io.mantisrx.server.master.store.MantisWorkerMetadataWritable;
import io.mantisrx.server.master.store.NamedJob;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -49,7 +50,6 @@
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
Expand Down Expand Up @@ -129,20 +129,26 @@ public class KeyValueBasedPersistenceProvider implements IMantisPersistenceProvi
private final KeyValueStore kvStore;
private final LifecycleEventPublisher eventPublisher;
private final Counter noWorkersFoundCounter;
private final Counter staleWorkersFoundCounter;
private final Counter workersFoundCounter;
private final Counter failedToLoadJobCounter;

public KeyValueBasedPersistenceProvider(KeyValueStore kvStore, LifecycleEventPublisher eventPublisher) {
this.kvStore = kvStore;
this.eventPublisher = eventPublisher;
Metrics m = new Metrics.Builder()
.id("storage")
.addCounter("noWorkersFound")
.addCounter("staleWorkerFound")
.addCounter("workersFound")
.addCounter("failedToLoadJobCount")
.build();

m = MetricsRegistry.getInstance().registerAndGet(m);
this.noWorkersFoundCounter = m.getCounter("noWorkersFound");
this.staleWorkersFoundCounter = m.getCounter("staleWorkerFound");
this.workersFoundCounter = m.getCounter("workersFound");
this.failedToLoadJobCounter = m.getCounter("failedToLoadJobCount");
}

protected String getJobMetadataFieldName() {
Expand Down Expand Up @@ -394,18 +400,20 @@ public List<IMantisJobMetadata> loadAllJobs() throws IOException {

workersFoundCounter.increment();
for (MantisWorkerMetadataWritable workerMeta : workersByJobId.get(jobId)) {
Preconditions.checkState(
jobMeta.addWorkerMedata(workerMeta.getStageNum(), workerMeta, null),
"JobID=%s stage=%d workerIdx=%d has existing worker, existing=%s, new=%s",
workerMeta.getJobId(),
workerMeta.getStageNum(),
workerMeta.getWorkerIndex(),
jobMeta.getWorkerByIndex(workerMeta.getStageNum(), workerMeta.getWorkerIndex()).getWorkerId(),
workerMeta.getWorkerId());
// If there are duplicate workers on the same stage index, only attach the one with latest
// worker number. The stale workers will not present in the stage metadata thus gets terminated
// when it sends heartbeats to its JobActor.
MantisWorkerMetadata existedWorker =
jobMeta.tryAddOrReplaceWorker(workerMeta.getStageNum(), workerMeta);
if (existedWorker != null) {
logger.error("Encountered duplicate worker {} when adding {}", existedWorker, workerMeta);
staleWorkersFoundCounter.increment();
}
}
jobMetas.add(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(jobMeta, eventPublisher));
} catch (Exception e) {
logger.warn("Exception loading job {}", jobId, e);
logger.error("Exception loading job {}", jobId, e);
this.failedToLoadJobCounter.increment();
}
}
// need to load all workers for the jobMeta and then ensure they are added to jobMetas!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,34 @@ public boolean addJobStageIfAbsent(MantisStageMetadataWritable msmd) {
return stageMetadataMap.putIfAbsent(msmd.getStageNum(), msmd) == null;
}

public boolean addWorkerMedata(int stageNum, MantisWorkerMetadata workerMetadata, MantisWorkerMetadata replacedWorker)
throws InvalidJobException {
/**
* Add the given MantisWorkerMetadata instance to the corresponding stage.
* If the stage worker index already exists, replace it only when the given worker has higher worker number.
* @param stageNum target stage number.
* @param workerMetadata new worker metadata instance.
* @return null if the given worker metadata is added to this job. Otherwise, return the existing worker with
* newer number.
*/
public MantisWorkerMetadata tryAddOrReplaceWorker(int stageNum, MantisWorkerMetadata workerMetadata) {
final boolean result =
stageMetadataMap.get(stageNum)
.replaceWorkerIndex(workerMetadata, replacedWorker);
.replaceWorkerIndex(workerMetadata);

if (result) {
Integer integer = workerNumberToStageMap.put(workerMetadata.getWorkerNumber(), stageNum);
if (integer != null && integer != stageNum) {
logger.error(String.format("Unexpected to put worker number mapping from %d to stage %d for job %s, prev mapping to stage %d",
workerMetadata.getWorkerNumber(), stageNum, workerMetadata.getJobId(), integer));
}
return null;
} else {
try {
return stageMetadataMap.get(stageNum).getWorkerByIndex(workerMetadata.getWorkerIndex());
} catch (InvalidJobException e) {
logger.error("Failed to fetch existing worker when new worker got rejected: {}", workerMetadata, e);
throw new RuntimeException("Failed to fetch existing worker when new worker got rejected", e);
}
}
return result;
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,35 +208,37 @@ Collection<MantisWorkerMetadataWritable> removeArchiveableWorkers() {
return removedWorkers;
}

public boolean replaceWorkerIndex(MantisWorkerMetadata newWorker, MantisWorkerMetadata oldWorker)
throws InvalidJobException {
/**
* Use the given new worker to add or replace the target index.
* If the stage worker index already exists, replace it only when the given worker has higher worker number.
* @param newWorker new worker metadata instance.
* @return true if the new worker is used in this stage.
*/
public boolean replaceWorkerIndex(MantisWorkerMetadata newWorker) {
int index = newWorker.getWorkerIndex();
boolean result = true;
if (!MantisJobState.isErrorState(newWorker.getState())) {
if (oldWorker == null) {
if (workerByIndexMetadataSet.putIfAbsent(index, newWorker) != null) {
result = false;
}
} else {
if (oldWorker.getWorkerIndex() != index) {
throw new InvalidJobException(newWorker.getJobId(), stageNum, oldWorker.getWorkerIndex());
}
MantisWorkerMetadata mwmd = workerByIndexMetadataSet.put(index, newWorker);
if (mwmd.getWorkerNumber() != oldWorker.getWorkerNumber()) {
workerByIndexMetadataSet.put(index, mwmd);
result = false;
logger.info("Did not replace worker " + oldWorker.getWorkerNumber() + " with " +
newWorker.getWorkerNumber() + " for index " + newWorker.getWorkerIndex() + " of job " +
jobId + ", different worker " + mwmd.getWorkerNumber() + " exists already");
}
// else
// logger.info("Replaced worker " + oldWorker.getWorkerNumber() + " with " + newWorker.getWorkerNumber() +
// " for index " + newWorker.getWorkerIndex() + " of job " + jobId);

if (workerByIndexMetadataSet.containsKey(index)) {
MantisWorkerMetadata existingWorker = workerByIndexMetadataSet.get(index);
int existingWorkerNum = existingWorker.getWorkerNumber();
if (existingWorkerNum >= newWorker.getWorkerNumber()) {
logger.warn("Encounter stale worker: {} when newer worker exist: {}, ignore.", newWorker,
existingWorker);
result = false;
}
else {
logger.warn("Replace stale worker {} with {}.", existingWorker, newWorker);
addNewWorker(newWorker);
}
} else if (oldWorker != null)
result = false;
if (result)
workerByNumberMetadataSet.put(newWorker.getWorkerNumber(), newWorker);
} else {
addNewWorker(newWorker);
}

return result;
}

private void addNewWorker(MantisWorkerMetadata newWorker) {
workerByIndexMetadataSet.put(newWorker.getWorkerIndex(), newWorker);
workerByNumberMetadataSet.put(newWorker.getWorkerNumber(), newWorker);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.server.master.persistence;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.server.master.persistence.exceptions.InvalidJobException;
import io.mantisrx.server.master.store.KeyValueStore;
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import org.junit.Test;
import org.mockito.Mockito;

public class KeyValueBasedPersistenceProviderTest {

@Test
public void testLoadJobWithDupeWorker() throws IOException, InvalidJobException {
KeyValueStore kvStore = Mockito.mock(KeyValueStore.class);
when(kvStore.getAllRows("MantisWorkers"))
.thenReturn(ImmutableMap.of(
"testjob-1-1000", ImmutableMap.of(
"1-0-2",
"{\"workerIndex\":0,\"workerNumber\":2,\"jobId\":\"testjob-1\",\"stageNum\":1,"
+ "\"numberOfPorts\":5,\"metricsPort\":1051,\"consolePort\":1053,\"debugPort\":1052,"
+ "\"customPort\":1054,\"ports\":[1055]}"),
"testjob-1-2000", ImmutableMap.of(
"1-0-1",
"{\"workerIndex\":0,\"workerNumber\":1,\"jobId\":\"testjob-1\",\"stageNum\":1,"
+ "\"numberOfPorts\":5,\"metricsPort\":1051,\"consolePort\":1053,\"debugPort\":1052,"
+ "\"customPort\":1054,\"ports\":[1055]}")
));

when(kvStore.getAllRows("MantisJobStageData"))
.thenReturn(ImmutableMap.of(
"testjob-1",
ImmutableMap.of(
"stageMetadata-1",
"{\"jobId\":\"testjob-1\",\"stageNum\":1,\"numStages\":1,\"numWorkers\":1,"
+ "\"machineDefinition\":{\"cpuCores\":2.0,\"memoryMB\":5000.0,\"networkMbps\":1000.0,"
+ "\"diskMB\":8192.0,\"numPorts\":1}}",
"jobMetadata",
"{\"jobId\":\"testjob-1\",\"name\":\"testjob\",\"user\":\"mantisoss\","
+ "\"submittedAt\":1691103290000,\"startedAt\":1691103300002,\"jarUrl\":\"http://testjob1.zip\","
+ "\"numStages\":1,\"state\":\"Launched\",\"parameters\":[],\"nextWorkerNumberToUse\":3,"
+ "\"sla\":{\"runtimeLimitSecs\":0,\"minRuntimeSecs\":0,\"slaType\":\"Lossy\","
+ "\"durationType\":\"Perpetual\",\"userProvidedType\":\"\"}"
+ "}"
)));

LifecycleEventPublisher eventPublisher = Mockito.mock(LifecycleEventPublisher.class);

KeyValueBasedPersistenceProvider kvProvider =
new KeyValueBasedPersistenceProvider(kvStore, eventPublisher);

// loading two workers on same index should retain the one with newer worker number.
List<IMantisJobMetadata> iMantisJobMetadata = kvProvider.loadAllJobs();
assertNotNull(iMantisJobMetadata);
assertEquals(1, iMantisJobMetadata.size());
assertEquals("testjob-1", iMantisJobMetadata.get(0).getJobId().getId());
assertTrue(iMantisJobMetadata.get(0).getStageMetadata(1).isPresent());
assertEquals(2,
iMantisJobMetadata.get(0).getStageMetadata(1).get().getWorkerByIndex(0).getMetadata().getWorkerNumber());
}
}

0 comments on commit 0b7e629

Please sign in to comment.