Skip to content

Commit

Permalink
Handling failure to initialize job clusters better (#598)
Browse files Browse the repository at this point in the history
* Handling failure to initialize job clusters better

* Running testcontainers in a separate build

* Increase the number of retries

* When end instant is not set in mantis metadata use the default value

* Disbaling findbugs

* One more bug

* Fixing the log line

---------

Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
  • Loading branch information
sundargates and sundargates authored Dec 11, 2023
1 parent c5c2a9d commit b3f3621
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 30 deletions.
45 changes: 44 additions & 1 deletion .github/workflows/nebula-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
restore-keys: |
- ${{ runner.os }}-gradlewrapper-
- name: Build with Gradle
run: ./gradlew --info --stacktrace build --warning-mode=all
run: ./gradlew --info --stacktrace build -x mantis-testcontainers:test --warning-mode=all
env:
CI_NAME: github_actions
CI_BUILD_NUMBER: ${{ github.sha }}
Expand All @@ -59,3 +59,46 @@ jobs:
with:
name: Event File
path: ${{ github.event_path }}

integration-test:
runs-on: ubuntu-latest
strategy:
matrix:
# test against JDK 8
java: [ 8 ]
name: Integration Tests with Java ${{ matrix.java }}
steps:
- uses: actions/checkout@v1
- name: Setup jdk
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- uses: actions/cache@v1
id: gradle-cache
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/gradle/dependency-locks/*.lockfile') }}
restore-keys: |
- ${{ runner.os }}-gradle-
- uses: actions/cache@v1
id: gradle-wrapper-cache
with:
path: ~/.gradle/wrapper
key: ${{ runner.os }}-gradlewrapper-${{ hashFiles('gradle/wrapper/*') }}
restore-keys: |
- ${{ runner.os }}-gradlewrapper-
- name: Build with Gradle
run: ./gradlew --info --stacktrace mantis-testcontainers:test --warning-mode=all
env:
CI_NAME: github_actions
CI_BUILD_NUMBER: ${{ github.sha }}
CI_BUILD_URL: 'https://github.com/${{ github.repository }}'
CI_BRANCH: ${{ github.ref }}
COVERALLS_REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Upload Test Results
uses: actions/upload-artifact@v3
if: always()
with:
name: Unit Test Results
path: "**/build/test-results/**/*.xml"

Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ private void initialize(JobClustersManagerInitialize initMsg) {

List<CompletedJob> completedJobsList = Lists.newArrayList();
JobClusterProto.InitializeJobClusterRequest req = new JobClusterProto.InitializeJobClusterRequest((JobClusterDefinitionImpl) jobClusterMeta.getJobClusterDefinition(),
jobClusterMeta.isDisabled(), jobClusterMeta.getLastJobCount(), jobList, completedJobsList, "system", getSelf(), false);
jobClusterMeta.isDisabled(), jobClusterMeta.getLastJobCount(), jobList,
"system", getSelf(), false);
return jobClusterInfoManager.initializeCluster(jobClusterInfo, req, t);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Instant;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -204,8 +205,8 @@ private CompletedJobEntry fetchJobId(JobId jId) {
/**
* If job data exists in cache return it else call getArchiveJob
*
* @param jId
* @return
* @param jId job id
* @return job metadata if found else empty
*/
@Override
public Optional<IMantisJobMetadata> getJobMetadata(JobId jId) throws IOException {
Expand Down Expand Up @@ -253,7 +254,7 @@ private CompletedJob fromMantisJobMetadata(IMantisJobMetadata jobMetadata) {
jobMetadata.getJobDefinition().getVersion(),
jobMetadata.getState(),
jobMetadata.getSubmittedAtInstant().toEpochMilli(),
jobMetadata.getEndedAtInstant().get().toEpochMilli(),
jobMetadata.getEndedAtInstant().orElse(Instant.ofEpochMilli(0l)).toEpochMilli(),
jobMetadata.getUser(),
jobMetadata.getLabels());
}
Expand Down Expand Up @@ -324,9 +325,9 @@ private void addCompletedJobToCache(CompletedJob completedJob,
/**
* Bulk add completed jobs to cache
*
* @param completedJobsList
* @param completedJobsList list of completed jobs
*/
private void addCompletedJobsToCache(List<CompletedJob> completedJobsList) throws IOException {
private void addCompletedJobsToCache(List<CompletedJob> completedJobsList) {
if (!completedJobsList.isEmpty()) {
Map<JobId, CompletedJobEntry> cache = completedJobsList.stream()
.flatMap(compJob -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,15 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
if(jobClusterMetadata.isDisabled()) {
logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata
.getJobClusterDefinition().getName());
jobManager.initialize();
try {
jobManager.initialize();
} catch (Exception e) {
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
String.format("JobCluster %s initialization failed",
initReq.jobClusterDefinition.getName()),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
}

int count = 50;
if(!initReq.jobList.isEmpty()) {
logger.info("Cluster {} is disabled however it has {} active/accepted jobs",
Expand Down Expand Up @@ -783,7 +791,14 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
initRunningJobs(initReq, sender);

logger.info("Job expiry check frequency set to {}", expireFrequency);
jobManager.initialize();
try {
jobManager.initialize();
} catch (Exception e) {
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
String.format("JobCluster %s initialization failed",
initReq.jobClusterDefinition.getName()),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
}
}

}
Expand All @@ -798,14 +813,11 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
*/

private void initRunningJobs(JobClusterProto.InitializeJobClusterRequest initReq, ActorRef sender) {
List<CompletedJob> completedJobsList = initReq.completedJobsList;
List<IMantisJobMetadata> jobList = initReq.jobList;

logger.info("In _initJobs for cluster {}: {} activeJobs and {} completedJobs", name, jobList.size(),
completedJobsList.size());
logger.info("In _initJobs for cluster {}: {} activeJobs", name, jobList.size());
if (logger.isDebugEnabled()) {
logger.debug("In _initJobs for cluster {} activeJobs -> {} and completedJobs -> {}", name, jobList,
completedJobsList);
logger.debug("In _initJobs for cluster {} activeJobs -> {}", name, jobList);
}

Observable.from(jobList)
Expand Down Expand Up @@ -2609,14 +2621,10 @@ final static class JobManager {
this.costsCalculator = costsCalculator;
}

void initialize() {
try {
logger.debug("Loading completed jobs for cluster {}", name);
completedJobStore.initialize();
logger.debug("Initialized completed job store for cluster {}", name);
} catch (IOException e) {
logger.error("Could not initialize completed job store for cluster {}", name, e);
}
void initialize() throws IOException {
logger.debug("Loading completed jobs for cluster {}", name);
completedJobStore.initialize();
logger.debug("Initialized completed job store for cluster {}", name);
}

public void onJobClusterDeletion() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl.CompletedJob;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.shaded.com.google.common.collect.Lists;
Expand All @@ -48,20 +47,19 @@ public static final class InitializeJobClusterRequest extends BaseRequest {
public final long lastJobNumber;
public final boolean createInStore;
public final List<IMantisJobMetadata> jobList;
public final List<CompletedJob> completedJobsList;

/**
* Invoked directly during bootstrap
* @param jobClusterDefinition
* @param isDisabled
* @param lastJobNumber
* @param jobList
* @param completedJobsList
* @param user
* @param requestor
* @param createInStore
*/
public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, boolean isDisabled, long lastJobNumber,
List<IMantisJobMetadata> jobList, List<CompletedJob> completedJobsList, String user, ActorRef requestor, boolean createInStore) {
List<IMantisJobMetadata> jobList, String user, ActorRef requestor, boolean createInStore) {
super();
Preconditions.checkNotNull(jobClusterDefinition, "JobClusterDefn cannot be null");
this.jobClusterDefinition = jobClusterDefinition;
Expand All @@ -71,7 +69,6 @@ public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefi
this.isDisabled = isDisabled;
this.lastJobNumber = lastJobNumber;
this.jobList = jobList;
this.completedJobsList = completedJobsList;

}
/**
Expand All @@ -81,7 +78,7 @@ public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefi
* @param requestor
*/
public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, String user, ActorRef requestor) {
this(jobClusterDefinition, false, 0, Lists.newArrayList(), Lists.newArrayList(), user, requestor, true);
this(jobClusterDefinition, false, 0, Lists.newArrayList(), user, requestor, true);

}

Expand All @@ -95,7 +92,6 @@ public String toString() {
", lastJobNumber=" + lastJobNumber +
", createInStore=" + createInStore +
", jobList=" + jobList +
", completedJobsList=" + completedJobsList +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void testQuickSubmitJob() throws IOException, InterruptedException {
if (!ensureJobWorkerStarted(
controlPlaneHost,
controlPlanePort,
5,
10,
Duration.ofSeconds(2).toMillis())) {
fail("Failed to start job worker.");
}
Expand Down

0 comments on commit b3f3621

Please sign in to comment.