Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Extracted WorkflowExecutor.executeSystemTask into its own class Async…
Browse files Browse the repository at this point in the history
…SystemTaskExecutor
  • Loading branch information
aravindanr authored and apanicker-nflx committed Jul 16, 2021
1 parent 44dd149 commit a1444ca
Show file tree
Hide file tree
Showing 23 changed files with 731 additions and 321 deletions.
11 changes: 11 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
apply plugin: 'groovy'

dependencies {
implementation project(':conductor-common')
Expand Down Expand Up @@ -46,4 +47,14 @@ dependencies {

testImplementation 'org.springframework.boot:spring-boot-starter-validation'
testImplementation project(':conductor-common').sourceSets.test.output

testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}"
testImplementation "org.spockframework:spock-core:${revSpock}"
testImplementation "org.spockframework:spock-spring:${revSpock}"
}

test {
testLogging {
exceptionFormat = 'full'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.core.execution;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class AsyncSystemTaskExecutor {

private final ExecutionDAOFacade executionDAOFacade;
private final QueueDAO queueDAO;
private final MetadataDAO metadataDAO;
private final long queueTaskMessagePostponeSecs;
private final long systemTaskCallbackTime;
private final WorkflowExecutor workflowExecutor;
private final DeciderService deciderService;

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSystemTaskExecutor.class);

public AsyncSystemTaskExecutor(ExecutionDAOFacade executionDAOFacade, QueueDAO queueDAO, MetadataDAO metadataDAO, ConductorProperties conductorProperties, WorkflowExecutor workflowExecutor, DeciderService deciderService) {
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
this.metadataDAO = metadataDAO;
this.workflowExecutor = workflowExecutor;
this.deciderService = deciderService;
this.systemTaskCallbackTime = conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds();
this.queueTaskMessagePostponeSecs = conductorProperties.getTaskExecutionPostponeDuration().getSeconds();
}

/**
* Executes and persists the results of an async {@link WorkflowSystemTask}.
*
* @param systemTask The {@link WorkflowSystemTask} to be executed.
* @param taskId The id of the {@link Task} object.
*/
public void execute(WorkflowSystemTask systemTask, String taskId) {
Task task = loadTaskQuietly(taskId);
if (task == null) {
LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask);
return;
}

LOGGER.debug("Task: {} fetched from execution DAO for taskId: {}", task, taskId);
String queueName = QueueUtils.getQueueName(task);
if (task.getStatus().isTerminal()) {
//Tune the SystemTaskWorkerCoordinator's queues - if the queue size is very big this can happen!
LOGGER.info("Task {}/{} was already completed.", task.getTaskType(), task.getTaskId());
queueDAO.remove(queueName, task.getTaskId());
return;
}

if (task.getStatus().equals(SCHEDULED)) {
if (executionDAOFacade.exceedsInProgressLimit(task)) {
//TODO: add a metric to record this
LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName());
postponeQuietly(queueName, task);
return;
}
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) {
LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(),
task.getRateLimitPerFrequency());
postponeQuietly(queueName, task);
return;
}
}

boolean hasTaskExecutionCompleted = false;
String workflowId = task.getWorkflowInstanceId();
// if we are here the Task object is updated and needs to be persisted regardless of an exception
try {
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);

if (workflow.getStatus().isTerminal()) {
LOGGER.info("Workflow {} has been completed for {}/{}", workflow.toShortString(),
systemTask,
task.getTaskId());
if (!task.getStatus().isTerminal()) {
task.setStatus(CANCELED);
task.setReasonForIncompletion(String.format("Workflow is in %s state", workflow.getStatus().toString()));
}
queueDAO.remove(queueName, task.getTaskId());
return;
}

LOGGER.debug("Executing {}/{} in {} state", task.getTaskType(), task.getTaskId(), task.getStatus());

// load task data (input/output) from external storage, if necessary
deciderService.populateTaskData(task);

boolean isTaskAsyncComplete = systemTask.isAsyncComplete(task);
if (task.getStatus() == SCHEDULED || !isTaskAsyncComplete) {
task.incrementPollCount();
}

if (task.getStatus() == SCHEDULED) {
task.setStartTime(System.currentTimeMillis());
Monitors.recordQueueWaitTime(task.getTaskDefName(), task.getQueueWaitTime());
systemTask.start(workflow, task, workflowExecutor);
} else if (task.getStatus() == IN_PROGRESS) {
systemTask.execute(workflow, task, workflowExecutor);
}

if (task.getOutputData() != null && !task.getOutputData().isEmpty()) {
deciderService.externalizeTaskData(task);
}

// Update message in Task queue based on Task status
// Remove asyncComplete system tasks from the queue that are not in SCHEDULED state
if (isTaskAsyncComplete && task.getStatus() != SCHEDULED) {
queueDAO.remove(queueName, task.getTaskId());
hasTaskExecutionCompleted = true;
} else if (task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
queueDAO.remove(queueName, task.getTaskId());
hasTaskExecutionCompleted = true;
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), systemTaskCallbackTime);
LOGGER.debug("{} postponed in queue: {}", task, queueName);
}

LOGGER.debug("Finished execution of {}/{}-{}", systemTask, task.getTaskId(), task.getStatus());
} catch (Exception e) {
Monitors.error(AsyncSystemTaskExecutor.class.getSimpleName(), "executeSystemTask");
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
} finally {
executionDAOFacade.updateTask(task);
// if the current task execution has completed, then the workflow needs to be evaluated
if(hasTaskExecutionCompleted) {
workflowExecutor.decide(workflowId);
}
}
}

private void postponeQuietly(String queueName, Task task) {
try {
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), queueTaskMessagePostponeSecs);
} catch (Exception e) {
LOGGER.error("Error postponing task: {} in queue: {}", task.getTaskId(), queueName);
}
}

private Task loadTaskQuietly(String taskId) {
try {
return executionDAOFacade.getTaskById(taskId);
} catch (Exception e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public class WorkflowExecutor {
private final SystemTaskRegistry systemTaskRegistry;

private long activeWorkerLastPollMs;
private final long queueTaskMessagePostponeSecs;
public static final String DECIDER_QUEUE = "_deciderQueue";
private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
private final ExecutionLockService executionLockService;
Expand All @@ -119,7 +118,6 @@ public class WorkflowExecutor {

private static final Predicate<Task> NON_TERMINAL_TASK = task -> !task.getStatus().isTerminal();

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, QueueDAO queueDAO,
MetadataMapperService metadataMapperService, WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade, ConductorProperties properties,
Expand All @@ -133,7 +131,6 @@ public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO,
this.metadataMapperService = metadataMapperService;
this.executionDAOFacade = executionDAOFacade;
this.activeWorkerLastPollMs = properties.getActiveWorkerLastPollTimeout().toMillis();
this.queueTaskMessagePostponeSecs = properties.getTaskExecutionPostponeDuration().getSeconds();
this.workflowStatusListener = workflowStatusListener;
this.executionLockService = executionLockService;
this.parametersUtils = parametersUtils;
Expand Down Expand Up @@ -1378,114 +1375,6 @@ public void addTaskToQueue(Task task) {
task.getWorkflowPriority(), taskQueueName, task.getCallbackAfterSeconds());
}

//Executes the async system task
public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long callbackTime) {
try {
Task task = executionDAOFacade.getTaskById(taskId);
if (task == null) {
LOGGER.error("TaskId: {} could not be found while executing SystemTask", taskId);
return;
}
LOGGER.debug("Task: {} fetched from execution DAO for taskId: {}", task, taskId);
String queueName = QueueUtils.getQueueName(task);
if (task.getStatus().isTerminal()) {
//Tune the SystemTaskWorkerCoordinator's queues - if the queue size is very big this can happen!
LOGGER.info("Task {}/{} was already completed.", task.getTaskType(), task.getTaskId());
queueDAO.remove(queueName, task.getTaskId());
return;
}

String workflowId = task.getWorkflowInstanceId();
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);

if (task.getStartTime() == 0) {
task.setStartTime(System.currentTimeMillis());
Monitors.recordQueueWaitTime(task.getTaskDefName(), task.getQueueWaitTime());
}

if (workflow.getStatus().isTerminal()) {
LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(),
systemTask.getTaskType(),
task.getTaskId());
if (!task.getStatus().isTerminal()) {
task.setStatus(CANCELED);
}
executionDAOFacade.updateTask(task);
queueDAO.remove(queueName, task.getTaskId());
return;
}

if (task.getStatus().equals(SCHEDULED)) {
if (executionDAOFacade.exceedsInProgressLimit(task)) {
//to do add a metric to record this
LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName());
// Postpone a message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSecs);
return;
}
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade
.exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) {
LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(),
task.getRateLimitPerFrequency());
// Postpone a message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSecs);
return;
}
}

LOGGER.debug("Executing {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus());
if (task.getStatus() == SCHEDULED || !systemTask.isAsyncComplete(task)) {
task.incrementPollCount();
executionDAOFacade.updateTask(task);
}

// load task data (input/output) from external storage, if necessary
deciderService.populateTaskData(task);

if (task.getStatus() == SCHEDULED) {
systemTask.start(workflow, task, this);
} else if (task.getStatus() == IN_PROGRESS) {
systemTask.execute(workflow, task, this);
}

if (task.getOutputData() != null && !task.getOutputData().isEmpty()) {
deciderService.externalizeTaskData(task);
}

// Update message in Task queue based on Task status
// Stop polling for asyncComplete system tasks that are not in SCHEDULED state
if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}
else if(task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
queueDAO.remove(queueName, task.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, queueName,
task.getStatus());
} else {
task.setCallbackAfterSeconds(callbackTime);
new RetryUtil<>().retryOnException(() -> {
// postpone based on callbackTime
queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), callbackTime);
LOGGER.debug(
"Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}",
task, queueName, task.getStatus(), callbackTime);
return null;
}, null, null, 2, "Postponing Task message in queue for taskId: " + task.getTaskId(), "postponeTaskMessage");
}

new RetryUtil<>().retryOnException(() -> {
executionDAOFacade.updateTask(task);
return null;
}, null, null, 2, "Updating Task with taskId: " + task.getTaskId(), "updateTask");

LOGGER.debug("Finished execution of {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus());
} catch (Exception e) {
Monitors.error(CLASS_NAME, "executeSystemTask");
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
}
}

@VisibleForTesting
void setTaskDomains(List<Task> tasks, Workflow workflow) {
Map<String, String> taskToDomain = workflow.getTaskToDomain();
Expand Down
Loading

0 comments on commit a1444ca

Please sign in to comment.