Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix inconsistent workflow status caused by upload payload exception d…
Browse files Browse the repository at this point in the history
…uring workflow termination (#2491)

* Fix inconsistent workflow status caused by upload payload exception during workflow termination

* Code refactor

* Default input and output data of task and workflow to empty map to avoid NullPointerException

* Refactor to reset to new map
  • Loading branch information
jxu-nflx authored Oct 7, 2021
1 parent ef6b65d commit ab746d1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ public Map<String, Object> getInputData() {
}

public void setInputData(Map<String, Object> inputData) {
if (inputData == null) {
inputData = new HashMap<>();
}
this.inputData = inputData;
}

Expand Down Expand Up @@ -583,6 +586,9 @@ public Map<String, Object> getOutputData() {
* @param outputData the outputData to set
*/
public void setOutputData(Map<String, Object> outputData) {
if (outputData == null) {
outputData = new HashMap<>();
}
this.outputData = outputData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ public Map<String, Object> getInput() {
* @param input the input to set
*/
public void setInput(Map<String, Object> input) {
if (input == null) {
input = new HashMap<>();
}
this.input = input;
}

Expand Down Expand Up @@ -241,6 +244,9 @@ public Map<String, Object> getOutput() {
* @param output the output to set
*/
public void setOutput(Map<String, Object> output) {
if (output == null) {
output = new HashMap<>();
}
this.output = output;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,14 @@ public Workflow terminateWorkflow(Workflow workflow, String reason, String failu
if (workflow.getWorkflowDefinition() == null) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);
}
deciderService.updateWorkflowOutput(workflow, null);

try {
deciderService.updateWorkflowOutput(workflow, null);
} catch (Exception e) {
// catch any failure in this step and continue the execution of terminating workflow
LOGGER.error("Failed to update output data for workflow: {}", workflow.getWorkflowId(), e);
Monitors.error(CLASS_NAME, "terminateWorkflow");
}

// update the failed reference task names
workflow.getFailedReferenceTaskNames().addAll(workflow.getTasks().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,34 +132,34 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
String errorMsg = String.format(
"The output payload size: %dB of workflow: %s is greater than the permissible limit: %dKB",
payloadSize, ((Workflow) entity).getWorkflowId(), maxThreshold);
failWorkflow(errorMsg);
failWorkflow(((Workflow) entity), payloadType, errorMsg);
}
} else if (payloadSize > threshold * 1024) {
switch (payloadType) {
case TASK_INPUT:
((Task) entity).setInputData(null);
((Task) entity).setInputData(new HashMap<>());
((Task) entity).setExternalInputPayloadStoragePath(
uploadHelper(payloadBytes, payloadSize, PayloadType.TASK_INPUT));
Monitors
.recordExternalPayloadStorageUsage(((Task) entity).getTaskDefName(),
ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.TASK_INPUT.toString());
break;
case TASK_OUTPUT:
((Task) entity).setOutputData(null);
((Task) entity).setOutputData(new HashMap<>());
((Task) entity).setExternalOutputPayloadStoragePath(
uploadHelper(payloadBytes, payloadSize, PayloadType.TASK_OUTPUT));
Monitors.recordExternalPayloadStorageUsage(((Task) entity).getTaskDefName(),
ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.TASK_OUTPUT.toString());
break;
case WORKFLOW_INPUT:
((Workflow) entity).setInput(null);
((Workflow) entity).setInput(new HashMap<>());
((Workflow) entity).setExternalInputPayloadStoragePath(
uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_INPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowName(),
ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_INPUT.toString());
break;
case WORKFLOW_OUTPUT:
((Workflow) entity).setOutput(null);
((Workflow) entity).setOutput(new HashMap<>());
((Workflow) entity).setExternalOutputPayloadStoragePath(
uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_OUTPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowName(),
Expand Down Expand Up @@ -187,15 +187,21 @@ void failTask(Task task, PayloadType payloadType, String errorMsg) {
task.setReasonForIncompletion(errorMsg);
task.setStatus(Task.Status.FAILED_WITH_TERMINAL_ERROR);
if (payloadType == PayloadType.TASK_INPUT) {
task.setInputData(null);
task.setInputData(new HashMap<>());
} else {
task.setOutputData(null);
task.setOutputData(new HashMap<>());
}
throw new TerminateWorkflowException(errorMsg, Workflow.WorkflowStatus.FAILED, task);
}

private void failWorkflow(String errorMsg) {
@VisibleForTesting
void failWorkflow(Workflow workflow, PayloadType payloadType, String errorMsg) {
LOGGER.error(errorMsg);
if (payloadType == PayloadType.WORKFLOW_INPUT) {
workflow.setInput(new HashMap<>());
} else {
workflow.setOutput(new HashMap<>());
}
throw new TerminateWorkflowException(errorMsg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
Expand Down Expand Up @@ -146,6 +147,7 @@ public class TestWorkflowExecutor {
private QueueDAO queueDAO;
private WorkflowStatusListener workflowStatusListener;
private ExecutionLockService executionLockService;
private ExternalPayloadStorageUtils externalPayloadStorageUtils;

@Configuration
@ComponentScan(basePackageClasses = {Evaluator.class}) // load all Evaluator beans.
Expand Down Expand Up @@ -220,7 +222,7 @@ public void init() {
metadataDAO = mock(MetadataDAO.class);
queueDAO = mock(QueueDAO.class);
workflowStatusListener = mock(WorkflowStatusListener.class);
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
executionLockService = mock(ExecutionLockService.class);
ParametersUtils parametersUtils = new ParametersUtils(objectMapper);
Map<TaskType, TaskMapper> taskMappers = new HashMap<>();
Expand Down Expand Up @@ -499,6 +501,51 @@ public void testTerminateWorkflow() {
verify(workflowStatusListener, times(1)).onWorkflowFinalizedIfEnabled(any(Workflow.class));
}

@Test
public void testUploadOutputFailuresDuringTerminateWorkflow() {
WorkflowDef def = new WorkflowDef();
def.setName("test");
def.setWorkflowStatusListenerEnabled(true);

Workflow workflow = new Workflow();
workflow.setWorkflowDefinition(def);
workflow.setWorkflowId("1");
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
workflow.setOwnerApp("junit_test");
workflow.setStartTime(10L);
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);

List<Task> tasks = new LinkedList<>();

Task task = new Task();
task.setScheduledTime(1L);
task.setSeq(1);
task.setTaskId(UUID.randomUUID().toString());
task.setReferenceTaskName("t1");
task.setWorkflowInstanceId(workflow.getWorkflowId());
task.setTaskDefName("task1");
task.setStatus(Status.IN_PROGRESS);

tasks.add(task);
workflow.setTasks(tasks);

when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow);

AtomicInteger updateWorkflowCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
updateWorkflowCalledCounter.incrementAndGet();
return null;
}).when(executionDAOFacade).updateWorkflow(any());

doThrow(new RuntimeException("any exception")).when(externalPayloadStorageUtils).verifyAndUpload(workflow, ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT);

workflowExecutor.terminateWorkflow(workflow.getWorkflowId(), "reason");
assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus());
assertEquals(1, updateWorkflowCalledCounter.get());
verify(workflowStatusListener, times(1)).onWorkflowTerminatedIfEnabled(any(Workflow.class));
}

@Test
@SuppressWarnings("unchecked")
public void testQueueExceptionsIgnoredDuringTerminateWorkflow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testUploadTaskPayload() throws IOException {
Task task = new Task();
task.setInputData(payload);
externalPayloadStorageUtils.verifyAndUpload(task, ExternalPayloadStorage.PayloadType.TASK_INPUT);
assertNull(task.getInputData());
assertTrue(task.getInputData().isEmpty());
assertEquals(1, uploadCount.get());
assertNotNull(task.getExternalInputPayloadStoragePath());
}
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testUploadWorkflowPayload() throws IOException {
workflow.setOutput(payload);
workflow.setWorkflowDefinition(def);
externalPayloadStorageUtils.verifyAndUpload(workflow, ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT);
assertNull(workflow.getOutput());
assertTrue(workflow.getOutput().isEmpty());
assertEquals(1, uploadCount.get());
assertNotNull(workflow.getExternalOutputPayloadStoragePath());
}
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testFailTaskWithInputPayload() {
expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failTask(task, ExternalPayloadStorage.PayloadType.TASK_INPUT, "error");
assertNotNull(task);
assertNull(task.getInputData());
assertTrue(task.getInputData().isEmpty());
}

@Test
Expand All @@ -190,6 +190,30 @@ public void testFailTaskWithOutputPayload() {
expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failTask(task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, "error");
assertNotNull(task);
assertNull(task.getOutputData());
assertTrue(task.getOutputData().isEmpty());
}

@Test
public void testFailWorkflowWithInputPayload() {
Workflow workflow = new Workflow();
workflow.setInput(new HashMap<>());

expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failWorkflow(workflow, ExternalPayloadStorage.PayloadType.TASK_INPUT, "error");
assertNotNull(workflow);
assertTrue(workflow.getInput().isEmpty());
assertEquals(Workflow.WorkflowStatus.FAILED, workflow.getStatus());
}

@Test
public void testFailWorkflowWithOutputPayload() {
Workflow workflow = new Workflow();
workflow.setOutput(new HashMap<>());

expectedException.expect(TerminateWorkflowException.class);
externalPayloadStorageUtils.failWorkflow(workflow, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, "error");
assertNotNull(workflow);
assertTrue(workflow.getOutput().isEmpty());
assertEquals(Workflow.WorkflowStatus.FAILED, workflow.getStatus());
}
}

0 comments on commit ab746d1

Please sign in to comment.