diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java index ef696d107..349e6ca3a 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java @@ -80,6 +80,7 @@ public class ExecuteStageRequest implements Serializable { @Nullable private final String nameOfJobProviderClass; private final String user; + private final String jobVersion; @JsonCreator @JsonIgnoreProperties(ignoreUnknown = true) @@ -102,7 +103,8 @@ public ExecuteStageRequest( @JsonProperty("minRuntimeSecs") long minRuntimeSecs, @JsonProperty("workerPorts") WorkerPorts workerPorts, @JsonProperty("nameOfJobProviderClass") Optional nameOfJobProviderClass, - @JsonProperty("user") String user) { + @JsonProperty("user") String user, + @JsonProperty("jobVersion") String jobVersion) { this.jobName = jobName; this.jobId = jobId; this.workerIndex = workerIndex; @@ -128,6 +130,7 @@ public ExecuteStageRequest( this.subscriptionTimeoutSecs = subscriptionTimeoutSecs; this.minRuntimeSecs = minRuntimeSecs; this.workerPorts = workerPorts; + this.jobVersion = jobVersion; } public boolean getHasJobMaster() { diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java index b4aa67eed..dcb86a527 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java @@ -27,6 +27,7 @@ public class JobMetadata { private final String jobId; private final URL jobJarUrl; + private final String jobVersion; private final int totalStages; private final String user; private final SchedulingInfo schedulingInfo; @@ -37,6 +38,7 @@ public class JobMetadata { public JobMetadata(final String jobId, final URL jobJarUrl, + final String jobVersion, final int totalStages, final String user, final SchedulingInfo schedulingInfo, @@ -46,6 +48,7 @@ public JobMetadata(final String jobId, final long minRuntimeSecs) { this.jobId = jobId; this.jobJarUrl = jobJarUrl; + this.jobVersion = jobVersion; this.totalStages = totalStages; this.user = user; this.schedulingInfo = schedulingInfo; diff --git a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java index 2eb1f22e2..81b5bd85d 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java @@ -55,7 +55,8 @@ public void setup() throws Exception { 1L, new WorkerPorts(2, 3, 4, 5, 6), java.util.Optional.of("className"), - "user1"); + "user1", + "111"); example2 = new ExecuteStageRequest("jobName", "jobId-0", 0, 1, new URL("http://datamesh/whatever"), 1, 1, @@ -70,7 +71,8 @@ public void setup() throws Exception { 1L, new WorkerPorts(2, 3, 4, 5, 6), java.util.Optional.empty(), - "user1"); + "user1", + "111"); } @Test @@ -148,6 +150,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJackson( " },\n" + " \"nameOfJobProviderClass\": \"className\",\n" + " \"user\": \"user1\",\n" + + " \"jobVersion\": \"111\",\n" + " \"hasJobMaster\": false,\n" + " \"jobId\": \"jobId-0\",\n" + " \"workerId\":\n" + @@ -231,6 +234,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJacksonW " },\n" + " \"nameOfJobProviderClass\": null,\n" + " \"user\": \"user1\",\n" + + " \"jobVersion\": \"111\",\n" + " \"hasJobMaster\": false,\n" + " \"jobId\": \"jobId-0\",\n" + " \"workerId\":\n" + diff --git a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java index ce245323c..979db516e 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java @@ -34,7 +34,7 @@ public void testGetJobArtifact() throws Exception { Lists.newArrayList(), Lists.newArrayList()).build(); JobMetadata jobMetadata = new JobMetadata( - "testId", new URL("http://artifact.zip"),1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0); + "testId", new URL("http://artifact.zip"), "111", 1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0); assertEquals(jobMetadata.getJobArtifact(), ArtifactID.of("artifact.zip")); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index 8fee1981f..0f77e9cf3 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1630,6 +1630,7 @@ private ScheduleRequest createSchedulingRequest( JobMetadata jobMetadata = new JobMetadata( mantisJobMetaData.getJobId().getId(), mantisJobMetaData.getJobJarUrl(), + mantisJobMetaData.getJobDefinition().getVersion(), mantisJobMetaData.getTotalStages(), mantisJobMetaData.getUser(), mantisJobMetaData.getSchedulingInfo(), diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java index 6b4ded76d..960e41e60 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java @@ -49,6 +49,7 @@ public ExecuteStageRequest of( scheduleRequest.getJobMetadata().getMinRuntimeSecs() - (System.currentTimeMillis() - scheduleRequest.getJobMetadata().getMinRuntimeSecs()), matchedTaskExecutorInfo.getWorkerPorts(), Optional.empty(), - scheduleRequest.getJobMetadata().getUser()); + scheduleRequest.getJobMetadata().getUser(), + scheduleRequest.getJobMetadata().getJobVersion()); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java index 5379d4862..5982d1fad 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java @@ -62,6 +62,7 @@ public static ScheduleRequest createFakeScheduleRequest(final WorkerId workerId, stageNum, new JobMetadata(mantisJobMetadata.getJobId().getId(), mantisJobMetadata.getJobJarUrl(), + mantisJobMetadata.getJobDefinition().getVersion(), mantisJobMetadata.getTotalStages(), mantisJobMetadata.getUser(), mantisJobMetadata.getSchedulingInfo(), diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 1aaba8e12..588299c8f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -305,7 +305,7 @@ public void testJobSubmitPerpetual() { verify(schedulerMock,times(1)).scheduleWorkers(any()); JobMetadata jobMetadata = new JobMetadata(jobId, new URL("http://myart" + - ""),1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0); + ""),"111", 1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0); ScheduleRequest scheduleRequest = new ScheduleRequest( workerId, 1, jobMetadata,MantisJobDurationType.Perpetual, SchedulingConstraints.of(machineDefinition),0); BatchScheduleRequest expectedRequest = new BatchScheduleRequest(Collections.singletonList(scheduleRequest)); diff --git a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java index ee49dead0..d41557d12 100644 --- a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java +++ b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java @@ -33,4 +33,5 @@ void initialize( UserCodeClassLoader userCodeClassLoader); String getWorkerId(); + } diff --git a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java index e02e9f318..9829ed0ad 100644 --- a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java +++ b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java @@ -218,7 +218,8 @@ public void testTaskExecutorEndToEndWithASingleStageJobByLoadingFromClassLoader( 1L, new WorkerPorts(2, 3, 4, 5, 6), Optional.of(SineFunctionJobProvider.class.getName()), - "user")), Time.seconds(1)); + "user", + "111")), Time.seconds(1)); wait.get(); Assert.assertTrue(startedSignal.await(5, TimeUnit.SECONDS)); Subscription subscription = HttpSources.source(HttpClientFactories.sseClientFactory(),