Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix rerunWorkflow places synchronous system tasks in the queue (#2494)
Browse files Browse the repository at this point in the history
* Fix rerunWorkflow places synchronous system tasks in the queue
* Refactor test spec
* Code refactor
- clear task output on rerun instead of set it to null
- refactor outputData checks for JsonJQTransformSpec
  • Loading branch information
jxu-nflx authored Oct 6, 2021
1 parent 49f8882 commit ef6b65d
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
rerunFromTask.setStartTime(0);
rerunFromTask.setUpdateTime(0);
rerunFromTask.setEndTime(0);
rerunFromTask.setOutputData(null);
rerunFromTask.getOutputData().clear();
rerunFromTask.setRetried(false);
rerunFromTask.setExecuted(false);
rerunFromTask.setExternalOutputPayloadStoragePath(null);
Expand All @@ -1648,12 +1648,19 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
rerunFromTask.setStatus(IN_PROGRESS);
rerunFromTask.setStartTime(System.currentTimeMillis());
} else {
// Set the task to rerun as SCHEDULED
rerunFromTask.setStatus(SCHEDULED);
if (taskInput != null) {
rerunFromTask.setInputData(taskInput);
}
addTaskToQueue(rerunFromTask);
if (systemTaskRegistry.isSystemTask(rerunFromTask.getTaskType()) &&
!systemTaskRegistry.get(rerunFromTask.getTaskType()).isAsync()) {
// Start the synchronized system task directly
deciderService.populateTaskData(rerunFromTask);
systemTaskRegistry.get(rerunFromTask.getTaskType()).start(workflow, rerunFromTask, this);
} else {
// Set the task to rerun as SCHEDULED
rerunFromTask.setStatus(SCHEDULED);
addTaskToQueue(rerunFromTask);
}
}
executionDAOFacade.updateTask(rerunFromTask);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static com.netflix.conductor.common.metadata.tasks.TaskType.SIMPLE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SWITCH;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_LAMBDA;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
Expand Down Expand Up @@ -180,6 +181,21 @@ public WorkflowSystemTask http2() {
return new WorkflowSystemTaskStub("HTTP2");
}

@Bean(TASK_TYPE_JSON_JQ_TRANSFORM)
public WorkflowSystemTask jsonBean() {
return new WorkflowSystemTaskStub("JSON_JQ_TRANSFORM") {
@Override
public boolean isAsync() {
return false;
}

@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.setStatus(Task.Status.COMPLETED);
}
};
}

@Bean
public SystemTaskRegistry systemTaskRegistry(Set<WorkflowSystemTask> tasks) {
return new SystemTaskRegistry(tasks);
Expand Down Expand Up @@ -1324,6 +1340,59 @@ public void testRerunWorkflowWithTaskId() {
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
}

@Test
public void testRerunWorkflowWithSyncSystemTaskId() {
//setup
String workflowId = IDGenerator.generate();

Task task1 = new Task();
task1.setTaskType(TaskType.SIMPLE.name());
task1.setTaskDefName("task1");
task1.setReferenceTaskName("task1_ref");
task1.setWorkflowInstanceId(workflowId);
task1.setScheduledTime(System.currentTimeMillis());
task1.setTaskId(IDGenerator.generate());
task1.setStatus(Status.COMPLETED);
task1.setWorkflowTask(new WorkflowTask());
task1.setOutputData(new HashMap<>());

Task task2 = new Task();
task2.setTaskType(TaskType.JSON_JQ_TRANSFORM.name());
task2.setReferenceTaskName("task2_ref");
task2.setWorkflowInstanceId(workflowId);
task2.setScheduledTime(System.currentTimeMillis());
task2.setTaskId("system-task-id");
task2.setStatus(Status.FAILED);

Workflow workflow = new Workflow();
workflow.setWorkflowId(workflowId);
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("workflow");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRerunWorkflowId");
workflow.setStatus(WorkflowStatus.FAILED);
workflow.setReasonForIncompletion("task2 failed");
workflow.setFailedReferenceTaskNames(new HashSet<String>() {{
add("task2_ref");
}});
workflow.getTasks().addAll(Arrays.asList(task1, task2));
//end of setup

//when:
when(executionDAOFacade.getWorkflowById(workflow.getWorkflowId(), true)).thenReturn(workflow);
RerunWorkflowRequest rerunWorkflowRequest = new RerunWorkflowRequest();
rerunWorkflowRequest.setReRunFromWorkflowId(workflow.getWorkflowId());
rerunWorkflowRequest.setReRunFromTaskId(task2.getTaskId());
workflowExecutor.rerun(rerunWorkflowRequest);

//then:
assertEquals(Status.COMPLETED, task2.getStatus());
assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus());
assertNull(workflow.getReasonForIncompletion());
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
}

@Test
public void testRerunSubWorkflowWithTaskId() {
//setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.test.integration

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.test.base.AbstractSpecification
import spock.lang.Shared
Expand Down Expand Up @@ -54,7 +55,7 @@ class JsonJQTransformSpec extends AbstractSpecification {
tasks.size() == 1
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].outputData as String == "[result:[out:[a, b, c, d]], resultList:[[out:[a, b, c, d]]]]"
tasks[0].outputData.containsKey("result") && tasks[0].outputData.containsKey("resultList")
}
}

Expand All @@ -81,7 +82,69 @@ class JsonJQTransformSpec extends AbstractSpecification {
tasks.size() == 1
tasks[0].status == Task.Status.FAILED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].reasonForIncompletion as String == "Cannot index string with string \"array\""
tasks[0].reasonForIncompletion == 'Cannot index string with string \"array\"'
}
}

/**
* Given the following invalid input JSON
*{* "in1": "a",
* "in2": "b"
*}* using the same query from the success test, jq will try to get in1['array']
* and fail since 'in1' is a string.
*
* Re-run failed system task with the following valid input JSON will fix the workflow
*{* "in1": {* "array": [ "a", "b" ]
*},
* "in2": {* "array": [ "c", "d" ]
*}*}* expect the workflow task to transform to following result:
*{* out: [ "a", "b", "c", "d" ]
*}
*/
def "Test rerun workflow with failed json jq transform task"() {
given: "workflow input"
def invalidInput = new HashMap()
invalidInput['in1'] = "a"
invalidInput['in2'] = "b"

def validInput = new HashMap()
def input = new HashMap()
input['in1'] = new HashMap()
input['in1']['array'] = ["a", "b"]
input['in2'] = new HashMap()
input['in2']['array'] = ["c", "d"]
validInput['input'] = input
validInput['queryExpression'] = '.input as $_ | { out: ($_.in1.array + $_.in2.array) }'

when: "workflow which has the json jq transform task started"
def workflowInstanceId = workflowExecutor.startWorkflow(JSON_JQ_TRANSFORM_WF, 1,
'', invalidInput, null, null, null)

then: "verify that the workflow and task failed with expected error"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 1
tasks[0].status == Task.Status.FAILED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].reasonForIncompletion == 'Cannot index string with string \"array\"'
}

when: "workflow which has the json jq transform task reran"
def reRunWorkflowRequest = new RerunWorkflowRequest()
reRunWorkflowRequest.reRunFromWorkflowId = workflowInstanceId
def reRunTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[0].taskId
reRunWorkflowRequest.reRunFromTaskId = reRunTaskId
reRunWorkflowRequest.taskInput = validInput

workflowExecutor.rerun(reRunWorkflowRequest)

then: "verify that the workflow and task are completed with expected output"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 1
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].outputData.containsKey("result") && tasks[0].outputData.containsKey("resultList")
}
}
}

0 comments on commit ef6b65d

Please sign in to comment.