From a1444ca90761794616a3abfb6b121459e63a769c Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Tue, 13 Jul 2021 23:36:41 -0700 Subject: [PATCH] Extracted WorkflowExecutor.executeSystemTask into its own class AsyncSystemTaskExecutor --- core/build.gradle | 11 + .../execution/AsyncSystemTaskExecutor.java | 177 ++++++++ .../core/execution/WorkflowExecutor.java | 111 ----- ...askExecutor.java => SystemTaskWorker.java} | 47 +-- .../tasks/SystemTaskWorkerCoordinator.java | 24 +- .../AsyncSystemTaskExecutorTest.groovy | 396 ++++++++++++++++++ .../core/execution/TestWorkflowExecutor.java | 57 --- ...xecutor.java => TestSystemTaskWorker.java} | 71 ++-- .../TestSystemTaskWorkerCoordinator.java | 12 +- .../test/base/AbstractSpecification.groovy | 26 +- .../integration/DynamicForkJoinSpec.groovy | 22 +- .../ExternalPayloadStorageSpec.groovy | 6 +- .../test/integration/ForkJoinSpec.groovy | 10 +- ...hicalForkJoinSubworkflowRestartSpec.groovy | 10 +- ...rchicalForkJoinSubworkflowRetrySpec.groovy | 10 +- .../LambdaAndTerminateTaskSpec.groovy | 2 +- .../NestedForkJoinSubWorkflowSpec.groovy | 6 +- .../integration/SubWorkflowRestartSpec.groovy | 10 +- .../integration/SubWorkflowRetrySpec.groovy | 10 +- .../test/integration/SubWorkflowSpec.groovy | 6 +- .../test/integration/SystemTaskSpec.groovy | 2 +- .../integration/TaskLimitsWorkflowSpec.groovy | 4 +- .../resiliency/QueueResiliencySpec.groovy | 22 +- 23 files changed, 731 insertions(+), 321 deletions(-) create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java rename core/src/main/java/com/netflix/conductor/core/execution/tasks/{SystemTaskExecutor.java => SystemTaskWorker.java} (77%) create mode 100644 core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy rename core/src/test/java/com/netflix/conductor/core/execution/tasks/{TestSystemTaskExecutor.java => TestSystemTaskWorker.java} (71%) diff --git a/core/build.gradle b/core/build.gradle index ed384f57de..dca5188f85 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +apply plugin: 'groovy' dependencies { implementation project(':conductor-common') @@ -46,4 +47,14 @@ dependencies { testImplementation 'org.springframework.boot:spring-boot-starter-validation' testImplementation project(':conductor-common').sourceSets.test.output + + testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}" + testImplementation "org.spockframework:spock-core:${revSpock}" + testImplementation "org.spockframework:spock-spring:${revSpock}" +} + +test { + testLogging { + exceptionFormat = 'full' + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java new file mode 100644 index 0000000000..2118165e43 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -0,0 +1,177 @@ +/* + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.netflix.conductor.core.execution; + +import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS; +import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED; + +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; +import com.netflix.conductor.core.orchestration.ExecutionDAOFacade; +import com.netflix.conductor.core.utils.QueueUtils; +import com.netflix.conductor.dao.MetadataDAO; +import com.netflix.conductor.dao.QueueDAO; +import com.netflix.conductor.metrics.Monitors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class AsyncSystemTaskExecutor { + + private final ExecutionDAOFacade executionDAOFacade; + private final QueueDAO queueDAO; + private final MetadataDAO metadataDAO; + private final long queueTaskMessagePostponeSecs; + private final long systemTaskCallbackTime; + private final WorkflowExecutor workflowExecutor; + private final DeciderService deciderService; + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSystemTaskExecutor.class); + + public AsyncSystemTaskExecutor(ExecutionDAOFacade executionDAOFacade, QueueDAO queueDAO, MetadataDAO metadataDAO, ConductorProperties conductorProperties, WorkflowExecutor workflowExecutor, DeciderService deciderService) { + this.executionDAOFacade = executionDAOFacade; + this.queueDAO = queueDAO; + this.metadataDAO = metadataDAO; + this.workflowExecutor = workflowExecutor; + this.deciderService = deciderService; + this.systemTaskCallbackTime = conductorProperties.getSystemTaskWorkerCallbackDuration().getSeconds(); + this.queueTaskMessagePostponeSecs = conductorProperties.getTaskExecutionPostponeDuration().getSeconds(); + } + + /** + * Executes and persists the results of an async {@link WorkflowSystemTask}. + * + * @param systemTask The {@link WorkflowSystemTask} to be executed. + * @param taskId The id of the {@link Task} object. + */ + public void execute(WorkflowSystemTask systemTask, String taskId) { + Task task = loadTaskQuietly(taskId); + if (task == null) { + LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask); + return; + } + + LOGGER.debug("Task: {} fetched from execution DAO for taskId: {}", task, taskId); + String queueName = QueueUtils.getQueueName(task); + if (task.getStatus().isTerminal()) { + //Tune the SystemTaskWorkerCoordinator's queues - if the queue size is very big this can happen! + LOGGER.info("Task {}/{} was already completed.", task.getTaskType(), task.getTaskId()); + queueDAO.remove(queueName, task.getTaskId()); + return; + } + + if (task.getStatus().equals(SCHEDULED)) { + if (executionDAOFacade.exceedsInProgressLimit(task)) { + //TODO: add a metric to record this + LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName()); + postponeQuietly(queueName, task); + return; + } + if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) { + LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), + task.getRateLimitPerFrequency()); + postponeQuietly(queueName, task); + return; + } + } + + boolean hasTaskExecutionCompleted = false; + String workflowId = task.getWorkflowInstanceId(); + // if we are here the Task object is updated and needs to be persisted regardless of an exception + try { + Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true); + + if (workflow.getStatus().isTerminal()) { + LOGGER.info("Workflow {} has been completed for {}/{}", workflow.toShortString(), + systemTask, + task.getTaskId()); + if (!task.getStatus().isTerminal()) { + task.setStatus(CANCELED); + task.setReasonForIncompletion(String.format("Workflow is in %s state", workflow.getStatus().toString())); + } + queueDAO.remove(queueName, task.getTaskId()); + return; + } + + LOGGER.debug("Executing {}/{} in {} state", task.getTaskType(), task.getTaskId(), task.getStatus()); + + // load task data (input/output) from external storage, if necessary + deciderService.populateTaskData(task); + + boolean isTaskAsyncComplete = systemTask.isAsyncComplete(task); + if (task.getStatus() == SCHEDULED || !isTaskAsyncComplete) { + task.incrementPollCount(); + } + + if (task.getStatus() == SCHEDULED) { + task.setStartTime(System.currentTimeMillis()); + Monitors.recordQueueWaitTime(task.getTaskDefName(), task.getQueueWaitTime()); + systemTask.start(workflow, task, workflowExecutor); + } else if (task.getStatus() == IN_PROGRESS) { + systemTask.execute(workflow, task, workflowExecutor); + } + + if (task.getOutputData() != null && !task.getOutputData().isEmpty()) { + deciderService.externalizeTaskData(task); + } + + // Update message in Task queue based on Task status + // Remove asyncComplete system tasks from the queue that are not in SCHEDULED state + if (isTaskAsyncComplete && task.getStatus() != SCHEDULED) { + queueDAO.remove(queueName, task.getTaskId()); + hasTaskExecutionCompleted = true; + } else if (task.getStatus().isTerminal()) { + task.setEndTime(System.currentTimeMillis()); + queueDAO.remove(queueName, task.getTaskId()); + hasTaskExecutionCompleted = true; + LOGGER.debug("{} removed from queue: {}", task, queueName); + } else { + task.setCallbackAfterSeconds(systemTaskCallbackTime); + queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), systemTaskCallbackTime); + LOGGER.debug("{} postponed in queue: {}", task, queueName); + } + + LOGGER.debug("Finished execution of {}/{}-{}", systemTask, task.getTaskId(), task.getStatus()); + } catch (Exception e) { + Monitors.error(AsyncSystemTaskExecutor.class.getSimpleName(), "executeSystemTask"); + LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); + } finally { + executionDAOFacade.updateTask(task); + // if the current task execution has completed, then the workflow needs to be evaluated + if(hasTaskExecutionCompleted) { + workflowExecutor.decide(workflowId); + } + } + } + + private void postponeQuietly(String queueName, Task task) { + try { + queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), queueTaskMessagePostponeSecs); + } catch (Exception e) { + LOGGER.error("Error postponing task: {} in queue: {}", task.getTaskId(), queueName); + } + } + + private Task loadTaskQuietly(String taskId) { + try { + return executionDAOFacade.getTaskById(taskId); + } catch (Exception e) { + return null; + } + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 472a9fede8..9dc094f93e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -103,7 +103,6 @@ public class WorkflowExecutor { private final SystemTaskRegistry systemTaskRegistry; private long activeWorkerLastPollMs; - private final long queueTaskMessagePostponeSecs; public static final String DECIDER_QUEUE = "_deciderQueue"; private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName(); private final ExecutionLockService executionLockService; @@ -119,7 +118,6 @@ public class WorkflowExecutor { private static final Predicate NON_TERMINAL_TASK = task -> !task.getStatus().isTerminal(); - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, QueueDAO queueDAO, MetadataMapperService metadataMapperService, WorkflowStatusListener workflowStatusListener, ExecutionDAOFacade executionDAOFacade, ConductorProperties properties, @@ -133,7 +131,6 @@ public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, this.metadataMapperService = metadataMapperService; this.executionDAOFacade = executionDAOFacade; this.activeWorkerLastPollMs = properties.getActiveWorkerLastPollTimeout().toMillis(); - this.queueTaskMessagePostponeSecs = properties.getTaskExecutionPostponeDuration().getSeconds(); this.workflowStatusListener = workflowStatusListener; this.executionLockService = executionLockService; this.parametersUtils = parametersUtils; @@ -1378,114 +1375,6 @@ public void addTaskToQueue(Task task) { task.getWorkflowPriority(), taskQueueName, task.getCallbackAfterSeconds()); } - //Executes the async system task - public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, long callbackTime) { - try { - Task task = executionDAOFacade.getTaskById(taskId); - if (task == null) { - LOGGER.error("TaskId: {} could not be found while executing SystemTask", taskId); - return; - } - LOGGER.debug("Task: {} fetched from execution DAO for taskId: {}", task, taskId); - String queueName = QueueUtils.getQueueName(task); - if (task.getStatus().isTerminal()) { - //Tune the SystemTaskWorkerCoordinator's queues - if the queue size is very big this can happen! - LOGGER.info("Task {}/{} was already completed.", task.getTaskType(), task.getTaskId()); - queueDAO.remove(queueName, task.getTaskId()); - return; - } - - String workflowId = task.getWorkflowInstanceId(); - Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true); - - if (task.getStartTime() == 0) { - task.setStartTime(System.currentTimeMillis()); - Monitors.recordQueueWaitTime(task.getTaskDefName(), task.getQueueWaitTime()); - } - - if (workflow.getStatus().isTerminal()) { - LOGGER.info("Workflow {} has been completed for {}/{}", workflow.getWorkflowId(), - systemTask.getTaskType(), - task.getTaskId()); - if (!task.getStatus().isTerminal()) { - task.setStatus(CANCELED); - } - executionDAOFacade.updateTask(task); - queueDAO.remove(queueName, task.getTaskId()); - return; - } - - if (task.getStatus().equals(SCHEDULED)) { - if (executionDAOFacade.exceedsInProgressLimit(task)) { - //to do add a metric to record this - LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName()); - // Postpone a message, so that it would be available for poll again. - queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSecs); - return; - } - if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade - .exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) { - LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), - task.getRateLimitPerFrequency()); - // Postpone a message, so that it would be available for poll again. - queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSecs); - return; - } - } - - LOGGER.debug("Executing {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus()); - if (task.getStatus() == SCHEDULED || !systemTask.isAsyncComplete(task)) { - task.incrementPollCount(); - executionDAOFacade.updateTask(task); - } - - // load task data (input/output) from external storage, if necessary - deciderService.populateTaskData(task); - - if (task.getStatus() == SCHEDULED) { - systemTask.start(workflow, task, this); - } else if (task.getStatus() == IN_PROGRESS) { - systemTask.execute(workflow, task, this); - } - - if (task.getOutputData() != null && !task.getOutputData().isEmpty()) { - deciderService.externalizeTaskData(task); - } - - // Update message in Task queue based on Task status - // Stop polling for asyncComplete system tasks that are not in SCHEDULED state - if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) { - queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); - } - else if(task.getStatus().isTerminal()) { - task.setEndTime(System.currentTimeMillis()); - queueDAO.remove(queueName, task.getTaskId()); - LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, queueName, - task.getStatus()); - } else { - task.setCallbackAfterSeconds(callbackTime); - new RetryUtil<>().retryOnException(() -> { - // postpone based on callbackTime - queueDAO.postpone(queueName, task.getTaskId(), task.getWorkflowPriority(), callbackTime); - LOGGER.debug( - "Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", - task, queueName, task.getStatus(), callbackTime); - return null; - }, null, null, 2, "Postponing Task message in queue for taskId: " + task.getTaskId(), "postponeTaskMessage"); - } - - new RetryUtil<>().retryOnException(() -> { - executionDAOFacade.updateTask(task); - return null; - }, null, null, 2, "Updating Task with taskId: " + task.getTaskId(), "updateTask"); - - LOGGER.debug("Finished execution of {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus()); - } catch (Exception e) { - Monitors.error(CLASS_NAME, "executeSystemTask"); - LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); - } - } - @VisibleForTesting void setTaskDomains(List tasks, Workflow workflow) { Map taskToDomain = workflow.getTaskToDomain(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java similarity index 77% rename from core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskExecutor.java rename to core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java index d144e1a94c..2088208812 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java @@ -1,20 +1,20 @@ /* - * Copyright 2020 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.core.execution.tasks; import com.google.common.annotations.VisibleForTesting; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.core.utils.SemaphoreUtil; import com.netflix.conductor.dao.QueueDAO; @@ -30,38 +30,33 @@ import java.util.concurrent.ExecutorService; /** - * Manages the threadpool used by system task workers for execution. + * The worker that polls and executes an async system task. */ -class SystemTaskExecutor { +class SystemTaskWorker { - private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskExecutor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorker.class); - private final long callbackTime; private final QueueDAO queueDAO; ExecutionConfig defaultExecutionConfig; - private final WorkflowExecutor workflowExecutor; + private final AsyncSystemTaskExecutor asyncSystemTaskExecutor; private final ConductorProperties properties; private final int maxPollCount; private final ExecutionService executionService; ConcurrentHashMap queueExecutionConfigMap = new ConcurrentHashMap<>(); - SystemTaskExecutor(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, ConductorProperties properties, - ExecutionService executionService) { + SystemTaskWorker(QueueDAO queueDAO, AsyncSystemTaskExecutor asyncSystemTaskExecutor, ConductorProperties properties, + ExecutionService executionService) { this.properties = properties; int threadCount = properties.getSystemTaskWorkerThreadCount(); - this.callbackTime = properties.getSystemTaskWorkerCallbackDuration().getSeconds(); - - String threadNameFormat = "system-task-worker-%d"; - this.defaultExecutionConfig = new ExecutionConfig(threadCount, threadNameFormat); - this.workflowExecutor = workflowExecutor; + this.defaultExecutionConfig = new ExecutionConfig(threadCount, "system-task-worker-%d"); + this.asyncSystemTaskExecutor = asyncSystemTaskExecutor; this.queueDAO = queueDAO; this.maxPollCount = properties.getSystemTaskMaxPollCount(); this.executionService = executionService; - LOGGER.info("Initialized the SystemTaskExecutor with {} threads and callback time: {} seconds", threadCount, - callbackTime); + LOGGER.info("SystemTaskWorker initialized with {} threads", threadCount); } void pollAndExecute(String queueName) { @@ -110,7 +105,7 @@ void pollAndExecute(String queueName) { executionService.ackTaskReceived(taskId); CompletableFuture taskCompletableFuture = CompletableFuture.runAsync(() -> - workflowExecutor.executeSystemTask(systemTask, taskId, callbackTime), executorService); + asyncSystemTaskExecutor.execute(systemTask, taskId), executorService); // release permit after processing is complete taskCompletableFuture.whenComplete((r, e) -> semaphoreUtil.completeProcessing(1)); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index dd5349e57a..92799953f0 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -15,7 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; @@ -29,7 +29,6 @@ import org.springframework.stereotype.Component; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -39,14 +38,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component @ConditionalOnProperty(name = "conductor.system-task-workers.enabled", havingValue = "true", matchIfMissing = true) public class SystemTaskWorkerCoordinator extends LifecycleAwareComponent { private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class); - private SystemTaskExecutor systemTaskExecutor; + private SystemTaskWorker systemTaskWorker; private final ConductorProperties properties; private final long pollInterval; @@ -58,21 +56,21 @@ public class SystemTaskWorkerCoordinator extends LifecycleAwareComponent { private static final String CLASS_NAME = SystemTaskWorkerCoordinator.class.getName(); - private final List workflowSystemTasks; + private final Set workflowSystemTasks; private final QueueDAO queueDAO; - private final WorkflowExecutor workflowExecutor; + private final AsyncSystemTaskExecutor asyncSystemTaskExecutor; private final ExecutionService executionService; - public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, - ConductorProperties properties, - ExecutionService executionService, - List workflowSystemTasks) { + public SystemTaskWorkerCoordinator(QueueDAO queueDAO, AsyncSystemTaskExecutor asyncSystemTaskExecutor, + ConductorProperties properties, + ExecutionService executionService, + Set workflowSystemTasks) { this.properties = properties; this.workflowSystemTasks = workflowSystemTasks; this.executionNameSpace = properties.getSystemTaskWorkerExecutionNamespace(); this.pollInterval = properties.getSystemTaskWorkerPollInterval().toMillis(); this.queueDAO = queueDAO; - this.workflowExecutor = workflowExecutor; + this.asyncSystemTaskExecutor = asyncSystemTaskExecutor; this.executionService = executionService; } @@ -84,7 +82,7 @@ public void initSystemTaskExecutor() { + "task workers, set conductor.system-task-workers.enabled=false."); } this.workflowSystemTasks.forEach(this::add); - this.systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + this.systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); new Thread(this::listen).start(); LOGGER.info("System Task Worker Coordinator initialized with poll interval: {}", pollInterval); } @@ -122,7 +120,7 @@ private void pollAndExecute(String queueName) { LOGGER.debug("Component stopped. Not polling for system task in queue : {}", queueName); return; } - systemTaskExecutor.pollAndExecute(queueName); + systemTaskWorker.pollAndExecute(queueName); } @VisibleForTesting diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy new file mode 100644 index 0000000000..930d90abb5 --- /dev/null +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/AsyncSystemTaskExecutorTest.groovy @@ -0,0 +1,396 @@ +/* + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.netflix.conductor.core.execution + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.conductor.common.metadata.tasks.Task +import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.config.ConductorProperties +import com.netflix.conductor.core.execution.tasks.SubWorkflow +import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask +import com.netflix.conductor.core.orchestration.ExecutionDAOFacade +import com.netflix.conductor.core.utils.IDGenerator +import com.netflix.conductor.core.utils.QueueUtils +import com.netflix.conductor.dao.MetadataDAO +import com.netflix.conductor.dao.QueueDAO +import spock.lang.Specification +import spock.lang.Subject + +import java.time.Duration + +import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW +import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.COMPLETED +import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.RUNNING + +class AsyncSystemTaskExecutorTest extends Specification { + + ExecutionDAOFacade executionDAOFacade + QueueDAO queueDAO + MetadataDAO metadataDAO + WorkflowExecutor workflowExecutor + DeciderService deciderService + + @Subject + AsyncSystemTaskExecutor executor + + WorkflowSystemTask workflowSystemTask + ConductorProperties properties = new ConductorProperties() + + def setup() { + executionDAOFacade = Mock(ExecutionDAOFacade.class) + queueDAO = Mock(QueueDAO.class) + metadataDAO = Mock(MetadataDAO.class) + workflowExecutor = Mock(WorkflowExecutor.class) + deciderService = Mock(DeciderService.class) + + workflowSystemTask = Mock(WorkflowSystemTask.class) + + properties.taskExecutionPostponeDuration = Duration.ofSeconds(1) + properties.systemTaskWorkerCallbackDuration = Duration.ofSeconds(1) + + executor = new AsyncSystemTaskExecutor(executionDAOFacade, queueDAO, metadataDAO, properties, workflowExecutor, deciderService) + } + + // this is not strictly a unit test, but its essential to test AsyncSystemTaskExecutor with SubWorkflow + def "Execute SubWorkflow task"() { + given: + String workflowId = "workflowId" + String subWorkflowId = "subWorkflowId" + SubWorkflow subWorkflowTask = new SubWorkflow(new ObjectMapper()) + + String task1Id = IDGenerator.generate() + Task task1 = new Task() + task1.setTaskType(SUB_WORKFLOW.name()) + task1.setReferenceTaskName("waitTask") + task1.setWorkflowInstanceId(workflowId) + task1.setScheduledTime(System.currentTimeMillis()) + task1.setTaskId(task1Id) + task1.getInputData().put("asyncComplete", true) + task1.getInputData().put("subWorkflowName", "junit1") + task1.getInputData().put("subWorkflowVersion", 1) + task1.setStatus(Task.Status.SCHEDULED) + + String queueName = QueueUtils.getQueueName(task1) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + Workflow subWorkflow = new Workflow(workflowId: subWorkflowId, status: RUNNING) + + when: + executor.execute(subWorkflowTask, task1Id) + + then: + 1 * executionDAOFacade.getTaskById(task1Id) >> task1 + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * workflowExecutor.startWorkflow(*_) >> subWorkflowId + 1 * workflowExecutor.getWorkflow(subWorkflowId, false) >> subWorkflow + + // SUB_WORKFLOW is asyncComplete so its removed from the queue + 1 * queueDAO.remove(queueName, task1Id) + + task1.status == Task.Status.IN_PROGRESS + task1.subWorkflowId == subWorkflowId + task1.startTime != 0 + } + + def "Execute with a non-existing task id"() { + given: + String taskId = "taskId" + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> null + 0 * workflowSystemTask.start(*_) + 0 * executionDAOFacade.updateTask(_) + } + + def "Execute with a task id that fails to load"() { + given: + String taskId = "taskId" + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> { throw new RuntimeException("datastore unavailable") } + 0 * workflowSystemTask.start(*_) + 0 * executionDAOFacade.updateTask(_) + } + + def "Execute with a task id that is in terminal state"() { + given: + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.COMPLETED, taskId: taskId) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * queueDAO.remove(task.taskType, taskId) + 0 * workflowSystemTask.start(*_) + 0 * executionDAOFacade.updateTask(_) + } + + def "Execute with a task id that is part of a workflow in terminal state"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId) + Workflow workflow = new Workflow(workflowId: workflowId, status: COMPLETED) + String queueName = QueueUtils.getQueueName(task) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * queueDAO.remove(queueName, taskId) + + task.status == Task.Status.CANCELED + task.startTime == 0 + } + + def "Execute with a task id that exceeds in-progress limit"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + workflowPriority: 10) + String queueName = QueueUtils.getQueueName(task) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.exceedsInProgressLimit(task) >> true + 1 * queueDAO.postpone(queueName, taskId, task.workflowPriority, properties.taskExecutionPostponeDuration.seconds) + + task.status == Task.Status.SCHEDULED + task.startTime == 0 + } + + def "Execute with a task id that is rate limited"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + rateLimitPerFrequency: 1, taskDefName: "taskDefName", workflowPriority: 10) + String queueName = QueueUtils.getQueueName(task) + TaskDef taskDef = new TaskDef() + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * metadataDAO.getTaskDef(task.taskDefName) >> taskDef + 1 * executionDAOFacade.exceedsRateLimitPerFrequency(task, taskDef) >> taskDef + 1 * queueDAO.postpone(queueName, taskId, task.workflowPriority, properties.taskExecutionPostponeDuration.seconds) + + task.status == Task.Status.SCHEDULED + task.startTime == 0 + } + + def "Execute with a task id that is rate limited but postpone fails"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + rateLimitPerFrequency: 1, taskDefName: "taskDefName", workflowPriority: 10) + String queueName = QueueUtils.getQueueName(task) + TaskDef taskDef = new TaskDef() + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * metadataDAO.getTaskDef(task.taskDefName) >> taskDef + 1 * executionDAOFacade.exceedsRateLimitPerFrequency(task, taskDef) >> taskDef + 1 * queueDAO.postpone(queueName, taskId, task.workflowPriority, properties.taskExecutionPostponeDuration.seconds) >> { throw new RuntimeException("queue unavailable") } + + task.status == Task.Status.SCHEDULED + task.startTime == 0 + } + + def "Execute with a task id that is in SCHEDULED state"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + taskDefName: "taskDefName", workflowPriority: 10) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + String queueName = QueueUtils.getQueueName(task) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) + 1 * queueDAO.postpone(queueName, taskId, task.workflowPriority, properties.systemTaskWorkerCallbackDuration.seconds) + 1 * workflowSystemTask.start(workflow, task, workflowExecutor) >> { task.status = Task.Status.IN_PROGRESS } + + 0 * workflowExecutor.decide(workflowId) // verify that workflow is NOT decided + + task.status == Task.Status.IN_PROGRESS + task.startTime != 0 // verify that startTime is set + task.endTime == 0 // verify that endTime is not set + task.pollCount == 1 // verify that poll count is incremented + task.callbackAfterSeconds == properties.systemTaskWorkerCallbackDuration.seconds + } + + def "Execute with a task id that is in SCHEDULED state and WorkflowSystemTask.start sets the task in a terminal state"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + taskDefName: "taskDefName", workflowPriority: 10) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + String queueName = QueueUtils.getQueueName(task) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) + + 1 * workflowSystemTask.start(workflow, task, workflowExecutor) >> { task.status = Task.Status.COMPLETED } + 1 * queueDAO.remove(queueName, taskId) + 1 * workflowExecutor.decide(workflowId) // verify that workflow is decided + + task.status == Task.Status.COMPLETED + task.startTime != 0 // verify that startTime is set + task.endTime != 0 // verify that endTime is set + task.pollCount == 1 // verify that poll count is incremented + } + + def "Execute with a task id that is in SCHEDULED state but WorkflowSystemTask.start fails"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + taskDefName: "taskDefName", workflowPriority: 10) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) + + // simulating a "start" failure that happens after the Task object is modified + // the modification will be persisted + 1 * workflowSystemTask.start(workflow, task, workflowExecutor) >> { + task.status = Task.Status.IN_PROGRESS + throw new RuntimeException("unknown system task failure") + } + + 0 * workflowExecutor.decide(workflowId) // verify that workflow is NOT decided + + task.status == Task.Status.IN_PROGRESS + task.startTime != 0 // verify that startTime is set + task.endTime == 0 // verify that endTime is not set + task.pollCount == 1 // verify that poll count is incremented + } + + def "Execute with a task id that is in SCHEDULED state and is set to asyncComplete"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.SCHEDULED, taskId: taskId, workflowInstanceId: workflowId, + taskDefName: "taskDefName", workflowPriority: 10) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + String queueName = QueueUtils.getQueueName(task) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) // 1st call for pollCount, 2nd call for status update + + 1 * workflowSystemTask.isAsyncComplete(task) >> true + 1 * workflowSystemTask.start(workflow, task, workflowExecutor) >> { task.status = Task.Status.IN_PROGRESS } + 1 * queueDAO.remove(queueName, taskId) + + 1 * workflowExecutor.decide(workflowId) // verify that workflow is decided + + task.status == Task.Status.IN_PROGRESS + task.startTime != 0 // verify that startTime is set + task.endTime == 0 // verify that endTime is not set + task.pollCount == 1 // verify that poll count is incremented + } + + def "Execute with a task id that is in IN_PROGRESS state"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.IN_PROGRESS, taskId: taskId, workflowInstanceId: workflowId, + rateLimitPerFrequency: 1, taskDefName: "taskDefName", workflowPriority: 10, pollCount: 1) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) // 1st call for pollCount, 2nd call for status update + + 0 * workflowSystemTask.start(workflow, task, workflowExecutor) + 1 * workflowSystemTask.execute(workflow, task, workflowExecutor) + + task.status == Task.Status.IN_PROGRESS + task.endTime == 0 // verify that endTime is not set + task.pollCount == 2 // verify that poll count is incremented + } + + def "Execute with a task id that is in IN_PROGRESS state and is set to asyncComplete"() { + given: + String workflowId = "workflowId" + String taskId = "taskId" + Task task = new Task(taskType: "type1", status: Task.Status.IN_PROGRESS, taskId: taskId, workflowInstanceId: workflowId, + rateLimitPerFrequency: 1, taskDefName: "taskDefName", workflowPriority: 10, pollCount: 1) + Workflow workflow = new Workflow(workflowId: workflowId, status: RUNNING) + + when: + executor.execute(workflowSystemTask, taskId) + + then: + 1 * executionDAOFacade.getTaskById(taskId) >> task + 1 * executionDAOFacade.getWorkflowById(workflowId, true) >> workflow + 1 * executionDAOFacade.updateTask(task) // only one call since pollCount is not incremented + + 1 * workflowSystemTask.isAsyncComplete(task) >> true + 0 * workflowSystemTask.start(workflow, task, workflowExecutor) + 1 * workflowSystemTask.execute(workflow, task, workflowExecutor) + + task.status == Task.Status.IN_PROGRESS + task.endTime == 0 // verify that endTime is not set + task.pollCount == 1 // verify that poll count is NOT incremented + } + +} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 0583abe379..6a5e6dba38 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -44,7 +44,6 @@ import com.netflix.conductor.core.execution.tasks.Lambda; import com.netflix.conductor.core.execution.tasks.SubWorkflow; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; -import com.netflix.conductor.core.execution.tasks.Terminate; import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; import com.netflix.conductor.core.listener.WorkflowStatusListener; @@ -1405,62 +1404,6 @@ public void testTerminateCompletedWorkflow() { workflowExecutor.terminateWorkflow(workflow.getWorkflowId(), "test terminating terminal workflow"); } - @Test - public void testExecuteSystemTask() { - String workflowId = "workflow-id"; - - Wait wait = new Wait(); - - String task1Id = IDGenerator.generate(); - Task task1 = new Task(); - task1.setTaskType(TaskType.WAIT.name()); - task1.setReferenceTaskName("waitTask"); - task1.setWorkflowInstanceId(workflowId); - task1.setScheduledTime(System.currentTimeMillis()); - task1.setTaskId(task1Id); - task1.setStatus(Status.SCHEDULED); - - Workflow workflow = new Workflow(); - workflow.setWorkflowId(workflowId); - workflow.setStatus(Workflow.WorkflowStatus.RUNNING); - - when(executionDAOFacade.getTaskById(anyString())).thenReturn(task1); - when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow); - - workflowExecutor.executeSystemTask(wait, task1Id, 30); - - assertEquals(Status.IN_PROGRESS, task1.getStatus()); - } - - @Test - public void testExecuteSystemTaskWithAsyncComplete() { - String workflowId = "workflow-id"; - - Terminate terminate = new Terminate(); - - String task1Id = IDGenerator.generate(); - Task task1 = new Task(); - task1.setTaskType(TaskType.WAIT.name()); - task1.setReferenceTaskName("waitTask"); - task1.setWorkflowInstanceId(workflowId); - task1.setScheduledTime(System.currentTimeMillis()); - task1.setTaskId(task1Id); - task1.getInputData().put("asyncComplete", true); - task1.setStatus(Status.IN_PROGRESS); - - Workflow workflow = new Workflow(); - workflow.setWorkflowId(workflowId); - workflow.setStatus(Workflow.WorkflowStatus.RUNNING); - - when(executionDAOFacade.getTaskById(anyString())).thenReturn(task1); - when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow); - - workflowExecutor.executeSystemTask(terminate, task1Id, 30); - - // An asyncComplete task shouldn't be executed through this logic, and the Terminate task should remain IN_PROGRESS. - assertEquals(Status.IN_PROGRESS, task1.getStatus()); - } - @Test public void testResetCallbacksForWorkflowTasks() { String workflowId = "test-workflow-id"; diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorker.java similarity index 71% rename from core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java rename to core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorker.java index 28be6c8f82..df894309c7 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorker.java @@ -14,7 +14,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.service.ExecutionService; import org.junit.After; @@ -33,7 +33,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -41,22 +40,22 @@ import static org.mockito.Mockito.when; @SuppressWarnings("UnstableApiUsage") -public class TestSystemTaskExecutor { +public class TestSystemTaskWorker { private static final String TEST_TASK = "system_task"; private static final String ISOLATED_TASK = "system_task-isolated"; - private WorkflowExecutor workflowExecutor; + private AsyncSystemTaskExecutor asyncSystemTaskExecutor; private ExecutionService executionService; private QueueDAO queueDAO; private ScheduledExecutorService scheduledExecutorService; private ConductorProperties properties; - private SystemTaskExecutor systemTaskExecutor; + private SystemTaskWorker systemTaskWorker; @Before public void setUp() { - workflowExecutor = mock(WorkflowExecutor.class); + asyncSystemTaskExecutor = mock(AsyncSystemTaskExecutor.class); executionService = mock(ExecutionService.class); queueDAO = mock(QueueDAO.class); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -71,43 +70,43 @@ public void setUp() { @After public void tearDown() { shutdownExecutorService(scheduledExecutorService); - shutdownExecutorService(systemTaskExecutor.defaultExecutionConfig.getExecutorService()); - systemTaskExecutor.queueExecutionConfigMap.values() + shutdownExecutorService(systemTaskWorker.defaultExecutionConfig.getExecutorService()); + systemTaskWorker.queueExecutionConfigMap.values() .forEach(e -> shutdownExecutorService(e.getExecutorService())); } @Test public void testGetExecutionConfigForSystemTask() { when(properties.getSystemTaskWorkerThreadCount()).thenReturn(5); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); - assertEquals(systemTaskExecutor.getExecutionConfig("").getSemaphoreUtil().availableSlots(), 5); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); + assertEquals(systemTaskWorker.getExecutionConfig("").getSemaphoreUtil().availableSlots(), 5); } @Test public void testGetExecutionConfigForIsolatedSystemTask() { when(properties.getIsolatedSystemTaskWorkerThreadCount()).thenReturn(7); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); - assertEquals(systemTaskExecutor.getExecutionConfig("test-iso").getSemaphoreUtil().availableSlots(), 7); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); + assertEquals(systemTaskWorker.getExecutionConfig("test-iso").getSemaphoreUtil().availableSlots(), 7); } @Test public void testPollAndExecuteSystemTask() { when(properties.getSystemTaskWorkerThreadCount()).thenReturn(1); when(queueDAO.pop(anyString(), anyInt(), anyInt())).thenReturn(Collections.singletonList("taskId")); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocation -> { latch.countDown(); return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService.scheduleAtFixedRate( - () -> systemTaskExecutor.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); + () -> systemTaskWorker.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(latch); - verify(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + verify(asyncSystemTaskExecutor).execute(any(), anyString()); } @Test @@ -116,40 +115,40 @@ public void testBatchPollAndExecuteSystemTask() { when(properties.getSystemTaskMaxPollCount()).thenReturn(2); when(queueDAO.pop(anyString(), anyInt(), anyInt())).thenReturn(Collections.nCopies(2, "taskId")); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch latch = new CountDownLatch(10); doAnswer(invocation -> { latch.countDown(); return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService.scheduleAtFixedRate( - () -> systemTaskExecutor.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); + () -> systemTaskWorker.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(latch); - verify(workflowExecutor, Mockito.times(10)).executeSystemTask(any(), anyString(), anyLong()); + verify(asyncSystemTaskExecutor, Mockito.times(10)).execute(any(), anyString()); } @Test public void testPollAndExecuteIsolatedSystemTask() { when(properties.getSystemTaskWorkerThreadCount()).thenReturn(1); when(queueDAO.pop(anyString(), anyInt(), anyInt())).thenReturn(Collections.singletonList("isolated_taskId")); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocation -> { latch.countDown(); return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService.scheduleAtFixedRate( - () -> systemTaskExecutor.pollAndExecute(ISOLATED_TASK), 0, 1, TimeUnit.SECONDS); + () -> systemTaskWorker.pollAndExecute(ISOLATED_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(latch); - verify(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + verify(asyncSystemTaskExecutor).execute(any(), anyString()); } @Test @@ -158,20 +157,20 @@ public void testPollException() { when(queueDAO.pop(anyString(), anyInt(), anyInt())) .thenThrow(RuntimeException.class) .thenReturn(Collections.singletonList("taskId")); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocation -> { latch.countDown(); return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService.scheduleAtFixedRate( - () -> systemTaskExecutor.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); + () -> systemTaskWorker.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(latch); - verify(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + verify(asyncSystemTaskExecutor).execute(any(), anyString()); } @Test @@ -181,20 +180,20 @@ public void testBatchPollException() { when(queueDAO.pop(anyString(), anyInt(), anyInt())) .thenThrow(RuntimeException.class) .thenReturn(Collections.nCopies(2, "taskId")); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch latch = new CountDownLatch(2); doAnswer(invocation -> { latch.countDown(); return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService.scheduleAtFixedRate( - () -> systemTaskExecutor.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); + () -> systemTaskWorker.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(latch); - verify(workflowExecutor, Mockito.times(2)).executeSystemTask(any(), anyString(), anyLong()); + verify(asyncSystemTaskExecutor, Mockito.times(2)).execute(any(), anyString()); } @Test @@ -205,7 +204,7 @@ public void testMultipleQueuesExecution() { String isolatedTask = "isolatedTaskId"; when(queueDAO.pop(TEST_TASK, 1, 200)).thenReturn(Collections.singletonList(sysTask)); when(queueDAO.pop(ISOLATED_TASK, 1, 200)).thenReturn(Collections.singletonList(isolatedTask)); - systemTaskExecutor = new SystemTaskExecutor(queueDAO, workflowExecutor, properties, executionService); + systemTaskWorker = new SystemTaskWorker(queueDAO, asyncSystemTaskExecutor, properties, executionService); CountDownLatch sysTaskLatch = new CountDownLatch(1); CountDownLatch isolatedTaskLatch = new CountDownLatch(1); @@ -220,14 +219,14 @@ public void testMultipleQueuesExecution() { } return null; } - ).when(workflowExecutor).executeSystemTask(any(), anyString(), anyLong()); + ).when(asyncSystemTaskExecutor).execute(any(), anyString()); scheduledExecutorService - .scheduleAtFixedRate(() -> systemTaskExecutor.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); + .scheduleAtFixedRate(() -> systemTaskWorker.pollAndExecute(TEST_TASK), 0, 1, TimeUnit.SECONDS); ScheduledExecutorService isoTaskService = Executors.newSingleThreadScheduledExecutor(); isoTaskService - .scheduleAtFixedRate(() -> systemTaskExecutor.pollAndExecute(ISOLATED_TASK), 0, 1, TimeUnit.SECONDS); + .scheduleAtFixedRate(() -> systemTaskWorker.pollAndExecute(ISOLATED_TASK), 0, 1, TimeUnit.SECONDS); Uninterruptibles.awaitUninterruptibly(sysTaskLatch); Uninterruptibles.awaitUninterruptibly(isolatedTaskLatch); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java index 4a78f1a66b..26764b28ca 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSystemTaskWorkerCoordinator.java @@ -13,7 +13,7 @@ package com.netflix.conductor.core.execution.tasks; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.service.ExecutionService; import org.junit.Before; @@ -35,14 +35,14 @@ public class TestSystemTaskWorkerCoordinator { private static final String ISOLATION_CONSTANT = "-iso"; private QueueDAO queueDAO; - private WorkflowExecutor workflowExecutor; + private AsyncSystemTaskExecutor asyncSystemTaskExecutor; private ExecutionService executionService; private ConductorProperties properties; @Before public void setUp() { queueDAO = mock(QueueDAO.class); - workflowExecutor = mock(WorkflowExecutor.class); + asyncSystemTaskExecutor = mock(AsyncSystemTaskExecutor.class); executionService = mock(ExecutionService.class); properties = mock(ConductorProperties.class); when(properties.getSystemTaskWorkerPollInterval()).thenReturn(Duration.ofMillis(50)); @@ -53,7 +53,7 @@ public void setUp() { public void isSystemTask() { createTaskMapping(); SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO, - workflowExecutor, properties, executionService, Collections.emptyList()); + asyncSystemTaskExecutor, properties, executionService, Collections.emptySet()); assertTrue(systemTaskWorkerCoordinator.isAsyncSystemTask(TEST_QUEUE + ISOLATION_CONSTANT)); } @@ -61,7 +61,7 @@ public void isSystemTask() { public void isSystemTaskNotPresent() { createTaskMapping(); SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO, - workflowExecutor, properties, executionService, Collections.emptyList()); + asyncSystemTaskExecutor, properties, executionService, Collections.emptySet()); assertFalse(systemTaskWorkerCoordinator.isAsyncSystemTask(null)); } @@ -69,7 +69,7 @@ public void isSystemTaskNotPresent() { public void testIsFromCoordinatorExecutionNameSpace() { doReturn("exeNS").when(properties).getSystemTaskWorkerExecutionNamespace(); SystemTaskWorkerCoordinator systemTaskWorkerCoordinator = new SystemTaskWorkerCoordinator(queueDAO, - workflowExecutor, properties, executionService, Collections.emptyList()); + asyncSystemTaskExecutor, properties, executionService, Collections.emptySet()); assertTrue( systemTaskWorkerCoordinator.isFromCoordinatorExecutionNameSpace(TEST_QUEUE + EXECUTION_NAMESPACE_CONSTANT)); } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/base/AbstractSpecification.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/base/AbstractSpecification.groovy index 281729b0d0..fa7d447f24 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/base/AbstractSpecification.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/base/AbstractSpecification.groovy @@ -1,19 +1,18 @@ /* - * - * * Copyright 2021 Netflix, Inc. - * *

- * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * *

- * * http://www.apache.org/licenses/LICENSE-2.0 - * *

- * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations under the License. - * + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.test.base +import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor import com.netflix.conductor.core.execution.WorkflowExecutor import com.netflix.conductor.core.reconciliation.WorkflowSweeper import com.netflix.conductor.service.ExecutionService @@ -43,6 +42,9 @@ abstract class AbstractSpecification extends Specification { @Autowired WorkflowSweeper workflowSweeper + @Autowired + AsyncSystemTaskExecutor asyncSystemTaskExecutor + def cleanup() { workflowTestUtil.clearWorkflows() } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index a9f1d1f69d..11a40c43f6 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -1,14 +1,14 @@ /* - * Copyright 2021 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.test.integration @@ -419,7 +419,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) String subworkflowTaskId = polledTaskIds.get(0) - workflowExecutor.executeSystemTask(subWorkflowTask, subworkflowTaskId, 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index 3747f97eb4..b73d6cd607 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -178,8 +178,8 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { when: "the system task 'USER_TASK' is started by issuing a system task call" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) - def taskId = workflow.getTaskByRefName('user_task').getTaskId() - workflowExecutor.executeSystemTask(userTask, taskId, 1) + def taskId = workflow.getTaskByRefName('user_task').taskId + asyncSystemTaskExecutor.execute(userTask, taskId) then: "verify that the user task is in a COMPLETED state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -423,7 +423,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = workflow.getTaskByRefName('swt').taskId - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 86500ca41b..1660271fdd 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -602,7 +602,7 @@ class ForkJoinSpec extends AbstractSpecification { and: "Get the sub workflow id associated with the SubWorkflow Task sw1 and start the system task" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = workflow.getTaskByRefName("sw1").getTaskId() - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) def updatedWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowInstanceId = updatedWorkflow.getTaskByRefName('sw1').subWorkflowId @@ -758,9 +758,9 @@ class ForkJoinSpec extends AbstractSpecification { when: "both the sub workflows are started by issuing a system task call" def workflowWithScheduledSubWorkflows = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId1 = workflowWithScheduledSubWorkflows.getTaskByRefName('st1').taskId - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId1, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId1) def subWorkflowTaskId2 = workflowWithScheduledSubWorkflows.getTaskByRefName('st2').taskId - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId2, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId2) then: "verify that the sub workflow tasks are in a IN PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -862,7 +862,7 @@ class ForkJoinSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -927,7 +927,7 @@ class ForkJoinSpec extends AbstractSpecification { when: "the sub workflow is started by issuing a system task call" parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy index 2358ad8a03..043a83064b 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy @@ -96,7 +96,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) @@ -125,7 +125,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state" @@ -225,7 +225,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" @@ -248,7 +248,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { and: "poll and execute the sub workflow task" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" @@ -334,7 +334,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy index 54f1307591..15e858ae23 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy @@ -96,7 +96,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) @@ -125,7 +125,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state" @@ -226,7 +226,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" @@ -249,7 +249,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { and: "poll and execute the sub workflow task" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" @@ -336,7 +336,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy index cde9f3a426..3694e7557a 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/LambdaAndTerminateTaskSpec.groovy @@ -148,7 +148,7 @@ class LambdaAndTerminateTaskSpec extends AbstractSpecification { when: "subworkflow is retrieved" def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = workflow.getTaskByRefName("test_terminate_subworkflow").getTaskId() - workflowExecutor.executeSystemTask(subWorkflowTask, subWorkflowTaskId, 1) + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowId = workflow.getTaskByRefName("test_terminate_subworkflow").subWorkflowId diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy index ce1513f232..8a90516067 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy @@ -98,7 +98,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) @@ -315,7 +315,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) @@ -431,7 +431,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) then: "verify that SUB_WORKFLOW task in in progress" def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy index aec90e79f3..8fbdf2fb19 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRestartSpec.groovy @@ -88,7 +88,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) @@ -115,7 +115,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the mid-level workflow is RUNNING, and first task is in SCHEDULED state" @@ -204,7 +204,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" @@ -221,7 +221,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { and: "poll and execute the sub workflow task" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" @@ -309,7 +309,7 @@ class SubWorkflowRestartSpec extends AbstractSpecification { and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy index 91816483fe..43ee2e10bf 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowRetrySpec.groovy @@ -88,7 +88,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) @@ -115,7 +115,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true) then: "verify that the mid-level workflow is RUNNING, and first task is in SCHEDULED state" @@ -203,7 +203,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new mid level workflow is created and is in RUNNING state" @@ -220,7 +220,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { and: "poll and execute the sub workflow task" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" @@ -421,7 +421,7 @@ class SubWorkflowRetrySpec extends AbstractSpecification { when: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds[0], 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy index 306b5bb454..1762e87a0c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SubWorkflowSpec.groovy @@ -107,7 +107,7 @@ class SubWorkflowSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) String subworkflowTaskId = polledTaskIds.get(0) - workflowExecutor.executeSystemTask(subWorkflowTask, subworkflowTaskId, 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -254,7 +254,7 @@ class SubWorkflowSpec extends AbstractSpecification { when: "Polled for and executed subworkflow task" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowId = workflow.tasks[1].subWorkflowId @@ -354,7 +354,7 @@ class SubWorkflowSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) - workflowExecutor.executeSystemTask(subWorkflowTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SystemTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SystemTaskSpec.groovy index afafc6daa5..0396c20f6a 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SystemTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SystemTaskSpec.groovy @@ -80,7 +80,7 @@ class SystemTaskSpec extends AbstractSpecification { when: "the system task is started by issuing a system task call" List polledTaskIds = queueDAO.pop("USER_TASK", 1, 200) - workflowExecutor.executeSystemTask(userTask, polledTaskIds.get(0), 30) + asyncSystemTaskExecutor.execute(userTask, polledTaskIds[0]) then: "verify that the system task is in IN_PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/TaskLimitsWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/TaskLimitsWorkflowSpec.groovy index 7d6734b2a5..04be3e2ca6 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/TaskLimitsWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/TaskLimitsWorkflowSpec.groovy @@ -56,7 +56,7 @@ class TaskLimitsWorkflowSpec extends AbstractSpecification { when: "Execute the user task" def scheduledTask1 = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[0] - workflowExecutor.executeSystemTask(userTask, scheduledTask1.taskId, 30) + asyncSystemTaskExecutor.execute(userTask, scheduledTask1.taskId) then: "Verify the state of the workflow is completed" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -80,7 +80,7 @@ class TaskLimitsWorkflowSpec extends AbstractSpecification { when: "Execute the user task on the second workflow" def scheduledTask2 = workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true).tasks[0] - workflowExecutor.executeSystemTask(userTask, scheduledTask2.taskId, 30) + asyncSystemTaskExecutor.execute(userTask, scheduledTask2.taskId) then: "Verify the state of the workflow is still in running state" with(workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index dc42a11421..cdd70dba81 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -1,14 +1,14 @@ /* - * Copyright 2020 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.test.resiliency @@ -251,7 +251,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { when: "We get a workflow when QueueDAO is unavailable" workflowResource.delete(workflowInstanceId, false) - then: "Verify queueDAO is not involved" + then: "Verify queueDAO is called to remove from _deciderQueue" 1 * queueDAO._ when: "We try to get deleted workflow"