diff --git a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java index c4c61b905520..c013171377ae 100644 --- a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java +++ b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java @@ -39,6 +39,10 @@ public abstract class AbstractDelayEvent implements IEvent, Delayed { @Builder.Default protected long createTimeInNano = System.nanoTime(); + // set create time as default if the inheritor didn't call super() + @Builder.Default + protected long expiredTimeInNano = System.nanoTime(); + public AbstractDelayEvent() { this(DEFAULT_DELAY_TIME); } @@ -50,6 +54,7 @@ public AbstractDelayEvent(final long delayTime) { public AbstractDelayEvent(final long delayTime, final long createTimeInNano) { this.delayTime = delayTime; this.createTimeInNano = createTimeInNano; + this.expiredTimeInNano = this.delayTime * 1_000_000 + this.createTimeInNano; } @Override @@ -60,7 +65,7 @@ public long getDelay(TimeUnit unit) { @Override public int compareTo(Delayed other) { - return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano); + return Long.compare(this.expiredTimeInNano, ((AbstractDelayEvent) other).expiredTimeInNano); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java index 76b04951b712..3f168cce6e30 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java @@ -191,6 +191,11 @@ public interface IWorkflowExecutionGraph { */ boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable); + /** + * Whether the given task's execution is failure and waiting for retry. + */ + boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable); + /** * Whether all predecessors task is skipped. *
Once all predecessors are marked as skipped, then the task will be marked as skipped, and will trigger its successors. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java index 20a65d45a2e9..dd89debe6c64 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java @@ -140,7 +140,7 @@ public ITaskExecutionRunnable getTaskExecutionRunnableByTaskCode(final Long task @Override public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable taskExecutionRunnable) { - return activeTaskExecutionRunnable.add(taskExecutionRunnable.getName()); + return activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName()); } @Override @@ -256,6 +256,16 @@ public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable tas return (taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO); } + @Override + public boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable) { + if (!taskExecutionRunnable.isTaskInstanceInitialized()) { + return false; + } + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + return taskInstance.getState() == TaskExecutionStatus.FAILURE && taskExecutionRunnable.isTaskInstanceCanRetry() + && isTaskExecutionRunnableActive(taskExecutionRunnable); + } + /** * Whether all predecessors are skipped. *
Only when all predecessors are skipped, will return true. If the given task doesn't have any predecessors, will return false. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java index 005a0003caab..b0b9139b8b8c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java @@ -104,6 +104,12 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR final ITaskExecutionRunnable taskExecutionRunnable, final TaskPauseLifecycleEvent taskPauseEvent) { throwExceptionIfStateIsNotMatch(taskExecutionRunnable); + // When the failed task is awaiting retry, we can mark it as 'paused' to ignore the retry event. + if (isTaskRetrying(taskExecutionRunnable)) { + super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable, + TaskPausedLifecycleEvent.of(taskExecutionRunnable)); + return; + } logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent); } @@ -112,14 +118,11 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution final ITaskExecutionRunnable taskExecutionRunnable, final TaskPausedLifecycleEvent taskPausedEvent) { throwExceptionIfStateIsNotMatch(taskExecutionRunnable); - final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph(); // This case happen when the task is failure but the task is in delay retry queue. // We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is // killed. - if (taskExecutionRunnable.isTaskInstanceCanRetry() - && workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) { - workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable); - publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + if (isTaskRetrying(taskExecutionRunnable)) { + super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskPausedEvent); return; } logWarningIfCannotDoAction(taskExecutionRunnable, taskPausedEvent); @@ -130,6 +133,12 @@ public void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRu final ITaskExecutionRunnable taskExecutionRunnable, final TaskKillLifecycleEvent taskKillEvent) { throwExceptionIfStateIsNotMatch(taskExecutionRunnable); + // When the failed task is awaiting retry, we can mark it as 'killed' to ignore the retry event. + if (isTaskRetrying(taskExecutionRunnable)) { + super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable, + TaskKilledLifecycleEvent.of(taskExecutionRunnable)); + return; + } logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent); } @@ -138,14 +147,11 @@ public void killedEventAction(final IWorkflowExecutionRunnable workflowExecution final ITaskExecutionRunnable taskExecutionRunnable, final TaskKilledLifecycleEvent taskKilledEvent) { throwExceptionIfStateIsNotMatch(taskExecutionRunnable); - final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph(); // This case happen when the task is failure but the task is in delay retry queue. // We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is // killed. - if (taskExecutionRunnable.isTaskInstanceCanRetry() - && workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) { - workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable); - publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + if (isTaskRetrying(taskExecutionRunnable)) { + super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskKilledEvent); return; } logWarningIfCannotDoAction(taskExecutionRunnable, taskKilledEvent); @@ -179,4 +185,9 @@ public void failoverEventAction(final IWorkflowExecutionRunnable workflowExecuti public TaskExecutionStatus matchState() { return TaskExecutionStatus.FAILURE; } + + private boolean isTaskRetrying(final ITaskExecutionRunnable taskExecutionRunnable) { + final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph(); + return workflowExecutionGraph.isTaskExecutionRunnableRetrying(taskExecutionRunnable); + } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java index fd9eee60f496..6594e170debe 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java @@ -294,4 +294,54 @@ public void testPauseWorkflow_with_subWorkflowTask_success() { masterContainer.assertAllResourceReleased(); } + @Test + @DisplayName("Test pause a workflow with failed retrying task") + public void testPauseWorkflow_with_failedRetryingTask() { + final String yaml = "/it/pause/workflow_with_fake_task_failed_retrying.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + + assertThat(repository.queryTaskInstance(workflowInstanceId)) + .satisfiesExactly( + taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE); + }); + }); + + assertThat(workflowOperator.pauseWorkflowInstance(workflowInstanceId).isSuccess()); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .satisfies( + workflowInstance -> { + assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.PAUSE); + }); + + assertThat(repository.queryTaskInstance(workflowInstanceId)) + .satisfiesExactly( + taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.PAUSE); + }); + }); + masterContainer.assertAllResourceReleased(); + } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java index 0b80a4b40a08..5174a2dc59fe 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java @@ -246,4 +246,54 @@ public void testStopWorkflow_with_subWorkflowTask_success() { masterContainer.assertAllResourceReleased(); } + + @Test + @DisplayName("Test stop a workflow with failed retrying task") + public void testStopWorkflow_with_failedRetryingTask() { + final String yaml = "/it/stop/workflow_with_fake_task_failed_retrying.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + + assertThat(repository.queryTaskInstance(workflowInstanceId)) + .satisfiesExactly( + taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE); + }); + }); + + assertThat(workflowOperator.stopWorkflowInstance(workflowInstanceId).isSuccess()); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .satisfies( + workflowInstance -> { + assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.STOP); + }); + + assertThat(repository.queryTaskInstance(workflowInstanceId)) + .satisfiesExactly( + taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL); + }); + }); + masterContainer.assertAllResourceReleased(); + } } diff --git a/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml new file mode 100644 index 000000000000..a86397370e53 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_one_fake_task_failed + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: FAILED-RETRY + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + failRetryTimes: 10 + failRetryInterval: 10 + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + diff --git a/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml new file mode 100644 index 000000000000..a86397370e53 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_one_fake_task_failed + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: FAILED-RETRY + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + failRetryTimes: 10 + failRetryInterval: 10 + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 +