Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
on transient errors (BACKEND_ERROR), task state is set as SCHEDULED w…
Browse files Browse the repository at this point in the history
…hich allows the task to be retried
  • Loading branch information
aravindanr authored and apanicker-nflx committed Jul 16, 2021
1 parent 518995d commit 44dd149
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ public long getQueueWaitTime() {
return 0L;
}

public void setQueueWaitTime(long t) {

}

/**
* @return True if the task has been retried after failure
*/
Expand Down Expand Up @@ -446,6 +442,9 @@ public void setPollCount(int pollCount) {
this.pollCount = pollCount;
}

public void incrementPollCount() {
++this.pollCount;
}

public boolean isCallbackFromWorker() {
return callbackFromWorker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@
*/
package com.netflix.conductor.core.execution;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED_WITH_TERMINAL_ERROR;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
import static com.netflix.conductor.core.exception.ApplicationException.Code.BACKEND_ERROR;
import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.exception.ApplicationException.Code.NOT_FOUND;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.netflix.conductor.annotations.Trace;
Expand Down Expand Up @@ -64,6 +48,11 @@
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.ExecutionLockService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -76,10 +65,22 @@
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED_WITH_TERMINAL_ERROR;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
import static com.netflix.conductor.core.exception.ApplicationException.Code.BACKEND_ERROR;
import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.exception.ApplicationException.Code.NOT_FOUND;

/**
* Workflow services provider interface
Expand Down Expand Up @@ -1434,38 +1435,51 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long

LOGGER.debug("Executing {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus());
if (task.getStatus() == SCHEDULED || !systemTask.isAsyncComplete(task)) {
task.setPollCount(task.getPollCount() + 1);
task.incrementPollCount();
executionDAOFacade.updateTask(task);
}

// load task data (input/output) from external storage, if necessary
deciderService.populateTaskData(task);

// Stop polling for asyncComplete system tasks that are not in SCHEDULED state
if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
return;
if (task.getStatus() == SCHEDULED) {
systemTask.start(workflow, task, this);
} else if (task.getStatus() == IN_PROGRESS) {
systemTask.execute(workflow, task, this);
}

switch (task.getStatus()) {
case SCHEDULED:
systemTask.start(workflow, task, this);
break;

case IN_PROGRESS:
systemTask.execute(workflow, task, this);
break;
default:
break;
if (task.getOutputData() != null && !task.getOutputData().isEmpty()) {
deciderService.externalizeTaskData(task);
}

if (!task.getStatus().isTerminal()) {
// Update message in Task queue based on Task status
// Stop polling for asyncComplete system tasks that are not in SCHEDULED state
if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}
else if(task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
queueDAO.remove(queueName, task.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, queueName,
task.getStatus());
} else {
task.setCallbackAfterSeconds(callbackTime);
new RetryUtil<>().retryOnException(() -> {
// postpone based on callbackTime
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), callbackTime);
LOGGER.debug(
"Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}",
task, queueName, task.getStatus(), callbackTime);
return null;
}, null, null, 2, "Postponing Task message in queue for taskId: " + task.getTaskId(), "postponeTaskMessage");
}

updateTask(new TaskResult(task));
LOGGER.debug("Done Executing {}/{}-{} output={}", task.getTaskType(), task.getTaskId(), task.getStatus(),
task.getOutputData().toString());
new RetryUtil<>().retryOnException(() -> {
executionDAOFacade.updateTask(task);
return null;
}, null, null, 2, "Updating Task with taskId: " + task.getTaskId(), "updateTask");

LOGGER.debug("Finished execution of {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus());
} catch (Exception e) {
Monitors.error(CLASS_NAME, "executeSystemTask");
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public SubWorkflow(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings("unchecked")
@Override
public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) {

Map<String, Object> input = task.getInputData();
String name = input.get("subWorkflowName").toString();
int version = (int) input.get("subWorkflowVersion");
Expand All @@ -54,13 +53,15 @@ public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecuto
if (input.get("subWorkflowDefinition") != null) {
// convert the value back to workflow definition object
workflowDefinition = objectMapper.convertValue(input.get("subWorkflowDefinition"), WorkflowDef.class);
name = workflowDefinition.getName();
}

Map taskToDomain = workflow.getTaskToDomain();
Map<String, String> taskToDomain = workflow.getTaskToDomain();
if (input.get("subWorkflowTaskToDomain") instanceof Map) {
taskToDomain = (Map) input.get("subWorkflowTaskToDomain");
taskToDomain = (Map<String, String>) input.get("subWorkflowTaskToDomain");
}
Map<String, Object> wfInput = (Map<String, Object>) input.get("workflowInput");

var wfInput = (Map<String, Object>) input.get("workflowInput");
if (wfInput == null || wfInput.isEmpty()) {
wfInput = input;
}
Expand Down Expand Up @@ -99,6 +100,14 @@ public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecuto
// Set task status based on current sub-workflow status, as the status can change in recursion by the time we update here.
Workflow subWorkflow = workflowExecutor.getWorkflow(subWorkflowId, false);
updateTaskStatus(subWorkflow, task);
} catch (ApplicationException ae) {
if (ae.isRetryable()) {
LOGGER.info("A transient backend error happened when task {} tried to start sub workflow.", task.getTaskId());
} else {
task.setStatus(Status.FAILED);
task.setReasonForIncompletion(ae.getMessage());
LOGGER.error("Error starting sub workflow: {} from workflow: {}", name, workflow.getWorkflowId(), ae);
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
task.setReasonForIncompletion(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<Task> poll(String taskType, String workerId, String domain, int coun
}
task.setCallbackAfterSeconds(0); // reset callbackAfterSeconds when giving the task to the worker
task.setWorkerId(workerId);
task.setPollCount(task.getPollCount() + 1);
task.incrementPollCount();
executionDAOFacade.updateTask(task);
tasks.add(task);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -94,6 +95,58 @@ public void testStartSubWorkflow() {
assertEquals(Task.Status.COMPLETED, task.getStatus());
}

@Test
public void testStartSubWorkflowQueueFailure() {
WorkflowDef workflowDef = new WorkflowDef();
Workflow workflowInstance = new Workflow();
workflowInstance.setWorkflowDefinition(workflowDef);

Task task = new Task();
task.setOutputData(new HashMap<>());
task.setStatus(Task.Status.SCHEDULED);

Map<String, Object> inputData = new HashMap<>();
inputData.put("subWorkflowName", "UnitWorkFlow");
inputData.put("subWorkflowVersion", 3);
task.setInputData(inputData);

when(workflowExecutor
.startWorkflow(eq("UnitWorkFlow"), eq(3), eq(inputData), eq(null), any(), any(), any(), eq(null), any()))
.thenThrow(new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "QueueDAO failure"));

subWorkflow.start(workflowInstance, task, workflowExecutor);
assertNull("subWorkflowId should be null", task.getSubWorkflowId());
assertEquals(Task.Status.SCHEDULED, task.getStatus());
assertTrue("Output data should be empty", task.getOutputData().isEmpty());
}

@Test
public void testStartSubWorkflowStartError() {
WorkflowDef workflowDef = new WorkflowDef();
Workflow workflowInstance = new Workflow();
workflowInstance.setWorkflowDefinition(workflowDef);

Task task = new Task();
task.setOutputData(new HashMap<>());
task.setStatus(Task.Status.SCHEDULED);

Map<String, Object> inputData = new HashMap<>();
inputData.put("subWorkflowName", "UnitWorkFlow");
inputData.put("subWorkflowVersion", 3);
task.setInputData(inputData);

String failureReason = "non transient failure";
when(workflowExecutor
.startWorkflow(eq("UnitWorkFlow"), eq(3), eq(inputData), eq(null), any(), any(), any(), eq(null), any()))
.thenThrow(new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, failureReason));

subWorkflow.start(workflowInstance, task, workflowExecutor);
assertNull("subWorkflowId should be null", task.getSubWorkflowId());
assertEquals(Task.Status.FAILED, task.getStatus());
assertEquals(failureReason, task.getReasonForIncompletion());
assertTrue("Output data should be empty", task.getOutputData().isEmpty());
}

@Test
public void testStartSubWorkflowWithEmptyWorkflowInput() {
WorkflowDef workflowDef = new WorkflowDef();
Expand Down

0 comments on commit 44dd149

Please sign in to comment.