Skip to content

Commit

Permalink
add jobVersion metadata to ExecuteStageRequest (#689)
Browse files Browse the repository at this point in the history
* add jobVersion metadata to ExecuteStageRequest
  • Loading branch information
sarahwada-stripe authored Jul 17, 2024
1 parent 3431246 commit c0d0370
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ExecuteStageRequest implements Serializable {
@Nullable
private final String nameOfJobProviderClass;
private final String user;
private final String jobVersion;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -102,7 +103,8 @@ public ExecuteStageRequest(
@JsonProperty("minRuntimeSecs") long minRuntimeSecs,
@JsonProperty("workerPorts") WorkerPorts workerPorts,
@JsonProperty("nameOfJobProviderClass") Optional<String> nameOfJobProviderClass,
@JsonProperty("user") String user) {
@JsonProperty("user") String user,
@JsonProperty("jobVersion") String jobVersion) {
this.jobName = jobName;
this.jobId = jobId;
this.workerIndex = workerIndex;
Expand All @@ -128,6 +130,7 @@ public ExecuteStageRequest(
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
this.workerPorts = workerPorts;
this.jobVersion = jobVersion;
}

public boolean getHasJobMaster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JobMetadata {

private final String jobId;
private final URL jobJarUrl;
private final String jobVersion;
private final int totalStages;
private final String user;
private final SchedulingInfo schedulingInfo;
Expand All @@ -37,6 +38,7 @@ public class JobMetadata {

public JobMetadata(final String jobId,
final URL jobJarUrl,
final String jobVersion,
final int totalStages,
final String user,
final SchedulingInfo schedulingInfo,
Expand All @@ -46,6 +48,7 @@ public JobMetadata(final String jobId,
final long minRuntimeSecs) {
this.jobId = jobId;
this.jobJarUrl = jobJarUrl;
this.jobVersion = jobVersion;
this.totalStages = totalStages;
this.user = user;
this.schedulingInfo = schedulingInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void setup() throws Exception {
1L,
new WorkerPorts(2, 3, 4, 5, 6),
java.util.Optional.of("className"),
"user1");
"user1",
"111");
example2 = new ExecuteStageRequest("jobName", "jobId-0", 0, 1,
new URL("http://datamesh/whatever"),
1, 1,
Expand All @@ -70,7 +71,8 @@ public void setup() throws Exception {
1L,
new WorkerPorts(2, 3, 4, 5, 6),
java.util.Optional.empty(),
"user1");
"user1",
"111");
}

@Test
Expand Down Expand Up @@ -148,6 +150,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJackson(
" },\n" +
" \"nameOfJobProviderClass\": \"className\",\n" +
" \"user\": \"user1\",\n" +
" \"jobVersion\": \"111\",\n" +
" \"hasJobMaster\": false,\n" +
" \"jobId\": \"jobId-0\",\n" +
" \"workerId\":\n" +
Expand Down Expand Up @@ -231,6 +234,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJacksonW
" },\n" +
" \"nameOfJobProviderClass\": null,\n" +
" \"user\": \"user1\",\n" +
" \"jobVersion\": \"111\",\n" +
" \"hasJobMaster\": false,\n" +
" \"jobId\": \"jobId-0\",\n" +
" \"workerId\":\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testGetJobArtifact() throws Exception {
Lists.newArrayList(),
Lists.newArrayList()).build();
JobMetadata jobMetadata = new JobMetadata(
"testId", new URL("http://artifact.zip"),1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0);
"testId", new URL("http://artifact.zip"), "111", 1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0);
assertEquals(jobMetadata.getJobArtifact(), ArtifactID.of("artifact.zip"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ private ScheduleRequest createSchedulingRequest(
JobMetadata jobMetadata = new JobMetadata(
mantisJobMetaData.getJobId().getId(),
mantisJobMetaData.getJobJarUrl(),
mantisJobMetaData.getJobDefinition().getVersion(),
mantisJobMetaData.getTotalStages(),
mantisJobMetaData.getUser(),
mantisJobMetaData.getSchedulingInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ExecuteStageRequest of(
scheduleRequest.getJobMetadata().getMinRuntimeSecs() - (System.currentTimeMillis() - scheduleRequest.getJobMetadata().getMinRuntimeSecs()),
matchedTaskExecutorInfo.getWorkerPorts(),
Optional.empty(),
scheduleRequest.getJobMetadata().getUser());
scheduleRequest.getJobMetadata().getUser(),
scheduleRequest.getJobMetadata().getJobVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static ScheduleRequest createFakeScheduleRequest(final WorkerId workerId,
stageNum,
new JobMetadata(mantisJobMetadata.getJobId().getId(),
mantisJobMetadata.getJobJarUrl(),
mantisJobMetadata.getJobDefinition().getVersion(),
mantisJobMetadata.getTotalStages(),
mantisJobMetadata.getUser(),
mantisJobMetadata.getSchedulingInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void testJobSubmitPerpetual() {
verify(schedulerMock,times(1)).scheduleWorkers(any());

JobMetadata jobMetadata = new JobMetadata(jobId, new URL("http://myart" +
""),1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0);
""),"111", 1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0);
ScheduleRequest scheduleRequest = new ScheduleRequest(
workerId, 1, jobMetadata,MantisJobDurationType.Perpetual, SchedulingConstraints.of(machineDefinition),0);
BatchScheduleRequest expectedRequest = new BatchScheduleRequest(Collections.singletonList(scheduleRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ void initialize(
UserCodeClassLoader userCodeClassLoader);

String getWorkerId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void testTaskExecutorEndToEndWithASingleStageJobByLoadingFromClassLoader(
1L,
new WorkerPorts(2, 3, 4, 5, 6),
Optional.of(SineFunctionJobProvider.class.getName()),
"user")), Time.seconds(1));
"user",
"111")), Time.seconds(1));
wait.get();
Assert.assertTrue(startedSignal.await(5, TimeUnit.SECONDS));
Subscription subscription = HttpSources.source(HttpClientFactories.sseClientFactory(),
Expand Down

0 comments on commit c0d0370

Please sign in to comment.