From 44dd1496391e141411c5fd9c9ec3bbebc5e847a5 Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Mon, 12 Jul 2021 23:21:24 -0700 Subject: [PATCH] on transient errors (BACKEND_ERROR), task state is set as SCHEDULED which allows the task to be retried --- .../conductor/common/metadata/tasks/Task.java | 7 +- .../core/execution/WorkflowExecutor.java | 92 +++++++++++-------- .../core/execution/tasks/SubWorkflow.java | 19 +++- .../conductor/service/ExecutionService.java | 2 +- .../core/execution/tasks/TestSubWorkflow.java | 53 +++++++++++ 5 files changed, 124 insertions(+), 49 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index 26abc8f7c1..4f87c48048 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -402,10 +402,6 @@ public long getQueueWaitTime() { return 0L; } - public void setQueueWaitTime(long t) { - - } - /** * @return True if the task has been retried after failure */ @@ -446,6 +442,9 @@ public void setPollCount(int pollCount) { this.pollCount = pollCount; } + public void incrementPollCount() { + ++this.pollCount; + } public boolean isCallbackFromWorker() { return callbackFromWorker; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index f362dbb167..472a9fede8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -12,22 +12,6 @@ */ package com.netflix.conductor.core.execution; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED_WITH_TERMINAL_ERROR; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED; -import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf; -import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC; -import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; -import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW; -import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; -import static com.netflix.conductor.core.exception.ApplicationException.Code.BACKEND_ERROR; -import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT; -import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT; -import static com.netflix.conductor.core.exception.ApplicationException.Code.NOT_FOUND; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.netflix.conductor.annotations.Trace; @@ -64,6 +48,11 @@ import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionLockService; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -76,10 +65,22 @@ import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; + +import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED_WITH_TERMINAL_ERROR; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf; +import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC; +import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; +import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW; +import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; +import static com.netflix.conductor.core.exception.ApplicationException.Code.BACKEND_ERROR; +import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT; +import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT; +import static com.netflix.conductor.core.exception.ApplicationException.Code.NOT_FOUND; /** * Workflow services provider interface @@ -1434,38 +1435,51 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long LOGGER.debug("Executing {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus()); if (task.getStatus() == SCHEDULED || !systemTask.isAsyncComplete(task)) { - task.setPollCount(task.getPollCount() + 1); + task.incrementPollCount(); executionDAOFacade.updateTask(task); } + // load task data (input/output) from external storage, if necessary deciderService.populateTaskData(task); - // Stop polling for asyncComplete system tasks that are not in SCHEDULED state - if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) { - queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); - return; + if (task.getStatus() == SCHEDULED) { + systemTask.start(workflow, task, this); + } else if (task.getStatus() == IN_PROGRESS) { + systemTask.execute(workflow, task, this); } - switch (task.getStatus()) { - case SCHEDULED: - systemTask.start(workflow, task, this); - break; - - case IN_PROGRESS: - systemTask.execute(workflow, task, this); - break; - default: - break; + if (task.getOutputData() != null && !task.getOutputData().isEmpty()) { + deciderService.externalizeTaskData(task); } - if (!task.getStatus().isTerminal()) { + // Update message in Task queue based on Task status + // Stop polling for asyncComplete system tasks that are not in SCHEDULED state + if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) { + queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); + } + else if(task.getStatus().isTerminal()) { + task.setEndTime(System.currentTimeMillis()); + queueDAO.remove(queueName, task.getTaskId()); + LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, queueName, + task.getStatus()); + } else { task.setCallbackAfterSeconds(callbackTime); + new RetryUtil<>().retryOnException(() -> { + // postpone based on callbackTime + queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), callbackTime); + LOGGER.debug( + "Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", + task, queueName, task.getStatus(), callbackTime); + return null; + }, null, null, 2, "Postponing Task message in queue for taskId: " + task.getTaskId(), "postponeTaskMessage"); } - updateTask(new TaskResult(task)); - LOGGER.debug("Done Executing {}/{}-{} output={}", task.getTaskType(), task.getTaskId(), task.getStatus(), - task.getOutputData().toString()); + new RetryUtil<>().retryOnException(() -> { + executionDAOFacade.updateTask(task); + return null; + }, null, null, 2, "Updating Task with taskId: " + task.getTaskId(), "updateTask"); + LOGGER.debug("Finished execution of {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus()); } catch (Exception e) { Monitors.error(CLASS_NAME, "executeSystemTask"); LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index c9161dfee1..5803f0a3b2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -42,10 +42,9 @@ public SubWorkflow(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings("unchecked") @Override public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) { - Map input = task.getInputData(); String name = input.get("subWorkflowName").toString(); int version = (int) input.get("subWorkflowVersion"); @@ -54,13 +53,15 @@ public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecuto if (input.get("subWorkflowDefinition") != null) { // convert the value back to workflow definition object workflowDefinition = objectMapper.convertValue(input.get("subWorkflowDefinition"), WorkflowDef.class); + name = workflowDefinition.getName(); } - Map taskToDomain = workflow.getTaskToDomain(); + Map taskToDomain = workflow.getTaskToDomain(); if (input.get("subWorkflowTaskToDomain") instanceof Map) { - taskToDomain = (Map) input.get("subWorkflowTaskToDomain"); + taskToDomain = (Map) input.get("subWorkflowTaskToDomain"); } - Map wfInput = (Map) input.get("workflowInput"); + + var wfInput = (Map) input.get("workflowInput"); if (wfInput == null || wfInput.isEmpty()) { wfInput = input; } @@ -99,6 +100,14 @@ public void start(Workflow workflow, Task task, WorkflowExecutor workflowExecuto // Set task status based on current sub-workflow status, as the status can change in recursion by the time we update here. Workflow subWorkflow = workflowExecutor.getWorkflow(subWorkflowId, false); updateTaskStatus(subWorkflow, task); + } catch (ApplicationException ae) { + if (ae.isRetryable()) { + LOGGER.info("A transient backend error happened when task {} tried to start sub workflow.", task.getTaskId()); + } else { + task.setStatus(Status.FAILED); + task.setReasonForIncompletion(ae.getMessage()); + LOGGER.error("Error starting sub workflow: {} from workflow: {}", name, workflow.getWorkflowId(), ae); + } } catch (Exception e) { task.setStatus(Status.FAILED); task.setReasonForIncompletion(e.getMessage()); diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 4df4f84e08..7e419dbb9c 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -153,7 +153,7 @@ public List poll(String taskType, String workerId, String domain, int coun } task.setCallbackAfterSeconds(0); // reset callbackAfterSeconds when giving the task to the worker task.setWorkerId(workerId); - task.setPollCount(task.getPollCount() + 1); + task.incrementPollCount(); executionDAOFacade.updateTask(task); tasks.add(task); } catch (Exception e) { diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java index d3f513a377..6a8b282905 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java @@ -17,6 +17,7 @@ import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.core.execution.WorkflowExecutor; import org.junit.Before; import org.junit.Test; @@ -94,6 +95,58 @@ public void testStartSubWorkflow() { assertEquals(Task.Status.COMPLETED, task.getStatus()); } + @Test + public void testStartSubWorkflowQueueFailure() { + WorkflowDef workflowDef = new WorkflowDef(); + Workflow workflowInstance = new Workflow(); + workflowInstance.setWorkflowDefinition(workflowDef); + + Task task = new Task(); + task.setOutputData(new HashMap<>()); + task.setStatus(Task.Status.SCHEDULED); + + Map inputData = new HashMap<>(); + inputData.put("subWorkflowName", "UnitWorkFlow"); + inputData.put("subWorkflowVersion", 3); + task.setInputData(inputData); + + when(workflowExecutor + .startWorkflow(eq("UnitWorkFlow"), eq(3), eq(inputData), eq(null), any(), any(), any(), eq(null), any())) + .thenThrow(new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "QueueDAO failure")); + + subWorkflow.start(workflowInstance, task, workflowExecutor); + assertNull("subWorkflowId should be null", task.getSubWorkflowId()); + assertEquals(Task.Status.SCHEDULED, task.getStatus()); + assertTrue("Output data should be empty", task.getOutputData().isEmpty()); + } + + @Test + public void testStartSubWorkflowStartError() { + WorkflowDef workflowDef = new WorkflowDef(); + Workflow workflowInstance = new Workflow(); + workflowInstance.setWorkflowDefinition(workflowDef); + + Task task = new Task(); + task.setOutputData(new HashMap<>()); + task.setStatus(Task.Status.SCHEDULED); + + Map inputData = new HashMap<>(); + inputData.put("subWorkflowName", "UnitWorkFlow"); + inputData.put("subWorkflowVersion", 3); + task.setInputData(inputData); + + String failureReason = "non transient failure"; + when(workflowExecutor + .startWorkflow(eq("UnitWorkFlow"), eq(3), eq(inputData), eq(null), any(), any(), any(), eq(null), any())) + .thenThrow(new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, failureReason)); + + subWorkflow.start(workflowInstance, task, workflowExecutor); + assertNull("subWorkflowId should be null", task.getSubWorkflowId()); + assertEquals(Task.Status.FAILED, task.getStatus()); + assertEquals(failureReason, task.getReasonForIncompletion()); + assertTrue("Output data should be empty", task.getOutputData().isEmpty()); + } + @Test public void testStartSubWorkflowWithEmptyWorkflowInput() { WorkflowDef workflowDef = new WorkflowDef();