Skip to content

Commit

Permalink
[Fix-16978][Master] Fix AbstractDelayEvent compare method is incorrect (
Browse files Browse the repository at this point in the history
  • Loading branch information
reele authored Jan 29, 2025
1 parent 25108c8 commit 4416548
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public abstract class AbstractDelayEvent implements IEvent, Delayed {
@Builder.Default
protected long createTimeInNano = System.nanoTime();

// set create time as default if the inheritor didn't call super()
@Builder.Default
protected long expiredTimeInNano = System.nanoTime();

public AbstractDelayEvent() {
this(DEFAULT_DELAY_TIME);
}
Expand All @@ -50,6 +54,7 @@ public AbstractDelayEvent(final long delayTime) {
public AbstractDelayEvent(final long delayTime, final long createTimeInNano) {
this.delayTime = delayTime;
this.createTimeInNano = createTimeInNano;
this.expiredTimeInNano = this.delayTime * 1_000_000 + this.createTimeInNano;
}

@Override
Expand All @@ -60,7 +65,7 @@ public long getDelay(TimeUnit unit) {

@Override
public int compareTo(Delayed other) {
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
return Long.compare(this.expiredTimeInNano, ((AbstractDelayEvent) other).expiredTimeInNano);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ public interface IWorkflowExecutionGraph {
*/
boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether the given task's execution is failure and waiting for retry.
*/
boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether all predecessors task is skipped.
* <p> Once all predecessors are marked as skipped, then the task will be marked as skipped, and will trigger its successors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public ITaskExecutionRunnable getTaskExecutionRunnableByTaskCode(final Long task

@Override
public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable taskExecutionRunnable) {
return activeTaskExecutionRunnable.add(taskExecutionRunnable.getName());
return activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
}

@Override
Expand Down Expand Up @@ -256,6 +256,16 @@ public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable tas
return (taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO);
}

@Override
public boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable) {
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
return false;
}
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
return taskInstance.getState() == TaskExecutionStatus.FAILURE && taskExecutionRunnable.isTaskInstanceCanRetry()
&& isTaskExecutionRunnableActive(taskExecutionRunnable);
}

/**
* Whether all predecessors are skipped.
* <p> Only when all predecessors are skipped, will return true. If the given task doesn't have any predecessors, will return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskPauseLifecycleEvent taskPauseEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
// When the failed task is awaiting retry, we can mark it as 'paused' to ignore the retry event.
if (isTaskRetrying(taskExecutionRunnable)) {
super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable,
TaskPausedLifecycleEvent.of(taskExecutionRunnable));
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
}

Expand All @@ -112,14 +118,11 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskPausedLifecycleEvent taskPausedEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
// This case happen when the task is failure but the task is in delay retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
// killed.
if (taskExecutionRunnable.isTaskInstanceCanRetry()
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
if (isTaskRetrying(taskExecutionRunnable)) {
super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskPausedEvent);
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskPausedEvent);
Expand All @@ -130,6 +133,12 @@ public void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRu
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskKillLifecycleEvent taskKillEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
// When the failed task is awaiting retry, we can mark it as 'killed' to ignore the retry event.
if (isTaskRetrying(taskExecutionRunnable)) {
super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable,
TaskKilledLifecycleEvent.of(taskExecutionRunnable));
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
}

Expand All @@ -138,14 +147,11 @@ public void killedEventAction(final IWorkflowExecutionRunnable workflowExecution
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskKilledLifecycleEvent taskKilledEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
// This case happen when the task is failure but the task is in delay retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
// killed.
if (taskExecutionRunnable.isTaskInstanceCanRetry()
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
if (isTaskRetrying(taskExecutionRunnable)) {
super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskKilledEvent);
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskKilledEvent);
Expand Down Expand Up @@ -179,4 +185,9 @@ public void failoverEventAction(final IWorkflowExecutionRunnable workflowExecuti
public TaskExecutionStatus matchState() {
return TaskExecutionStatus.FAILURE;
}

private boolean isTaskRetrying(final ITaskExecutionRunnable taskExecutionRunnable) {
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
return workflowExecutionGraph.isTaskExecutionRunnableRetrying(taskExecutionRunnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,54 @@ public void testPauseWorkflow_with_subWorkflowTask_success() {
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test pause a workflow with failed retrying task")
public void testPauseWorkflow_with_failedRetryingTask() {
final String yaml = "/it/pause/workflow_with_fake_task_failed_retrying.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);

assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(
taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});

assertThat(workflowOperator.pauseWorkflowInstance(workflowInstanceId).isSuccess());

await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.satisfies(
workflowInstance -> {
assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.PAUSE);
});

assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(
taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.PAUSE);
});
});
masterContainer.assertAllResourceReleased();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,54 @@ public void testStopWorkflow_with_subWorkflowTask_success() {

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test stop a workflow with failed retrying task")
public void testStopWorkflow_with_failedRetryingTask() {
final String yaml = "/it/stop/workflow_with_fake_task_failed_retrying.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);

assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(
taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});

assertThat(workflowOperator.stopWorkflowInstance(workflowInstanceId).isSuccess());

await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.satisfies(
workflowInstance -> {
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
});

assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(
taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
});
});
masterContainer.assertAllResourceReleased();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_one_fake_task_failed
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: FAILED-RETRY
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
failRetryTimes: 10
failRetryInterval: 10

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_one_fake_task_failed
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: FAILED-RETRY
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
failRetryTimes: 10
failRetryInterval: 10

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

0 comments on commit 4416548

Please sign in to comment.