Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2360 from Netflix/subworkflow_start_transient_fai…
Browse files Browse the repository at this point in the history
…lures

SubWorkflow task will be auto-retried on BACKEND_ERROR
  • Loading branch information
apanicker-nflx authored Jul 16, 2021
2 parents 518995d + a1444ca commit 586dc3e
Show file tree
Hide file tree
Showing 27 changed files with 823 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ public long getQueueWaitTime() {
return 0L;
}

public void setQueueWaitTime(long t) {

}

/**
* @return True if the task has been retried after failure
*/
Expand Down Expand Up @@ -446,6 +442,9 @@ public void setPollCount(int pollCount) {
this.pollCount = pollCount;
}

public void incrementPollCount() {
++this.pollCount;
}

public boolean isCallbackFromWorker() {
return callbackFromWorker;
Expand Down
11 changes: 11 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
apply plugin: 'groovy'

dependencies {
implementation project(':conductor-common')
Expand Down Expand Up @@ -46,4 +47,14 @@ dependencies {

testImplementation 'org.springframework.boot:spring-boot-starter-validation'
testImplementation project(':conductor-common').sourceSets.test.output

testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}"
testImplementation "org.spockframework:spock-core:${revSpock}"
testImplementation "org.spockframework:spock-spring:${revSpock}"
}

test {
testLogging {
exceptionFormat = 'full'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.core.execution;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class AsyncSystemTaskExecutor {

private final ExecutionDAOFacade executionDAOFacade;
private final QueueDAO queueDAO;
private final MetadataDAO metadataDAO;
private final long queueTaskMessagePostponeSecs;
private final long systemTaskCallbackTime;
private final WorkflowExecutor workflowExecutor;
private final DeciderService deciderService;

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSystemTaskExecutor.class);

public AsyncSystemTaskExecutor(ExecutionDAOFacade executionDAOFacade, QueueDAO queueDAO, MetadataDAO metadataDAO, ConductorProperties conductorProperties, WorkflowExecutor workflowExecutor, DeciderService deciderService) {
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
this.metadataDAO = metadataDAO;
this.workflowExecutor = workflowExecutor;
this.deciderService = deciderService;
this.systemTaskCallbackTime = conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds();
this.queueTaskMessagePostponeSecs = conductorProperties.getTaskExecutionPostponeDuration().getSeconds();
}

/**
* Executes and persists the results of an async {@link WorkflowSystemTask}.
*
* @param systemTask The {@link WorkflowSystemTask} to be executed.
* @param taskId The id of the {@link Task} object.
*/
public void execute(WorkflowSystemTask systemTask, String taskId) {
Task task = loadTaskQuietly(taskId);
if (task == null) {
LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask);
return;
}

LOGGER.debug("Task: {} fetched from execution DAO for taskId: {}", task, taskId);
String queueName = QueueUtils.getQueueName(task);
if (task.getStatus().isTerminal()) {
//Tune the SystemTaskWorkerCoordinator's queues - if the queue size is very big this can happen!
LOGGER.info("Task {}/{} was already completed.", task.getTaskType(), task.getTaskId());
queueDAO.remove(queueName, task.getTaskId());
return;
}

if (task.getStatus().equals(SCHEDULED)) {
if (executionDAOFacade.exceedsInProgressLimit(task)) {
//TODO: add a metric to record this
LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName());
postponeQuietly(queueName, task);
return;
}
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) {
LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(),
task.getRateLimitPerFrequency());
postponeQuietly(queueName, task);
return;
}
}

boolean hasTaskExecutionCompleted = false;
String workflowId = task.getWorkflowInstanceId();
// if we are here the Task object is updated and needs to be persisted regardless of an exception
try {
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);

if (workflow.getStatus().isTerminal()) {
LOGGER.info("Workflow {} has been completed for {}/{}", workflow.toShortString(),
systemTask,
task.getTaskId());
if (!task.getStatus().isTerminal()) {
task.setStatus(CANCELED);
task.setReasonForIncompletion(String.format("Workflow is in %s state", workflow.getStatus().toString()));
}
queueDAO.remove(queueName, task.getTaskId());
return;
}

LOGGER.debug("Executing {}/{} in {} state", task.getTaskType(), task.getTaskId(), task.getStatus());

// load task data (input/output) from external storage, if necessary
deciderService.populateTaskData(task);

boolean isTaskAsyncComplete = systemTask.isAsyncComplete(task);
if (task.getStatus() == SCHEDULED || !isTaskAsyncComplete) {
task.incrementPollCount();
}

if (task.getStatus() == SCHEDULED) {
task.setStartTime(System.currentTimeMillis());
Monitors.recordQueueWaitTime(task.getTaskDefName(), task.getQueueWaitTime());
systemTask.start(workflow, task, workflowExecutor);
} else if (task.getStatus() == IN_PROGRESS) {
systemTask.execute(workflow, task, workflowExecutor);
}

if (task.getOutputData() != null && !task.getOutputData().isEmpty()) {
deciderService.externalizeTaskData(task);
}

// Update message in Task queue based on Task status
// Remove asyncComplete system tasks from the queue that are not in SCHEDULED state
if (isTaskAsyncComplete && task.getStatus() != SCHEDULED) {
queueDAO.remove(queueName, task.getTaskId());
hasTaskExecutionCompleted = true;
} else if (task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
queueDAO.remove(queueName, task.getTaskId());
hasTaskExecutionCompleted = true;
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), systemTaskCallbackTime);
LOGGER.debug("{} postponed in queue: {}", task, queueName);
}

LOGGER.debug("Finished execution of {}/{}-{}", systemTask, task.getTaskId(), task.getStatus());
} catch (Exception e) {
Monitors.error(AsyncSystemTaskExecutor.class.getSimpleName(), "executeSystemTask");
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
} finally {
executionDAOFacade.updateTask(task);
// if the current task execution has completed, then the workflow needs to be evaluated
if(hasTaskExecutionCompleted) {
workflowExecutor.decide(workflowId);
}
}
}

private void postponeQuietly(String queueName, Task task) {
try {
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), queueTaskMessagePostponeSecs);
} catch (Exception e) {
LOGGER.error("Error postponing task: {} in queue: {}", task.getTaskId(), queueName);
}
}

private Task loadTaskQuietly(String taskId) {
try {
return executionDAOFacade.getTaskById(taskId);
} catch (Exception e) {
return null;
}
}
}
Loading

0 comments on commit 586dc3e

Please sign in to comment.