From 040745e6cc2cec4646817ffb5e7a84679f34512a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 16 Nov 2023 17:06:54 -0800 Subject: [PATCH] more tests Signed-off-by: Kevin Su --- .../executors/failure_node_lookup.go | 1 + .../executors/failure_node_lookup_test.go | 57 +++++++++++++++++++ .../nodes/subworkflow/subworkflow_test.go | 30 ++++++++++ .../pkg/controller/workflow/executor.go | 4 +- .../pkg/controller/workflow/executor_test.go | 4 +- .../workflow/testdata/benchmark_wf.yaml | 50 ++++++++++++++++ 6 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 flytepropeller/pkg/controller/executors/failure_node_lookup_test.go diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index 12aa394ea8..0c61540259 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -2,6 +2,7 @@ package executors import ( "context" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go new file mode 100644 index 0000000000..27193603d3 --- /dev/null +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go @@ -0,0 +1,57 @@ +package executors + +import ( + "context" + "testing" + + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + "github.com/stretchr/testify/assert" +) + +type nl struct { + NodeLookup +} + +type en struct { + v1alpha1.ExecutableNode +} + +type ns struct { + v1alpha1.ExecutableNodeStatus +} + +func TestNewFailureNodeLookup(t *testing.T) { + nl := nl{} + en := en{} + ns := ns{} + nodeLoopUp := NewFailureNodeLookup(nl, en, ns) + assert.NotNil(t, nl) + typed := nodeLoopUp.(FailureNodeLookup) + assert.Equal(t, nl, typed.NodeLookup) + assert.Equal(t, en, typed.FailureNode) + assert.Equal(t, ns, typed.FailureNodeStatus) +} + +func TestNewTestFailureNodeLookup(t *testing.T) { + n := &mocks.ExecutableNode{} + ns := &mocks.ExecutableNodeStatus{} + failureNodeID := "fn1" + nl := NewTestNodeLookup( + map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n}, + map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns}, + ) + + assert.NotNil(t, nl) + + failureNodeLookup := NewFailureNodeLookup(nl, n, ns) + r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)) + + r, ok = failureNodeLookup.GetNode(failureNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), failureNodeID)) +} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go index f2391048af..dc00efacdc 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -41,6 +41,36 @@ func TestGetSubWorkflow(t *testing.T) { assert.Equal(t, swf, w) }) + t.Run("subworkflow with failure node", func(t *testing.T) { + + wfNode := &coreMocks.ExecutableWorkflowNode{} + x := "x" + wfNode.OnGetSubWorkflowRef().Return(&x) + + node := &coreMocks.ExecutableNode{} + node.OnGetWorkflowNode().Return(wfNode) + + ectx := &execMocks.ExecutionContext{} + + wfFailureNode := &coreMocks.ExecutableWorkflowNode{} + y := "y" + wfFailureNode.OnGetSubWorkflowRef().Return(&y) + failureNode := &coreMocks.ExecutableNode{} + failureNode.OnGetWorkflowNode().Return(wfFailureNode) + + swf := &coreMocks.ExecutableSubWorkflow{} + swf.OnGetOnFailureNode().Return(failureNode) + ectx.OnFindSubWorkflow("x").Return(swf) + + nCtx := &mocks.NodeExecutionContext{} + nCtx.OnNode().Return(node) + nCtx.OnExecutionContext().Return(ectx) + + w, err := GetSubWorkflow(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, swf, w) + }) + t.Run("missing-subworkflow", func(t *testing.T) { wfNode := &coreMocks.ExecutableWorkflowNode{} diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index c88f655673..d3cbfb71b3 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -224,8 +224,8 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha return StatusFailing(execErr), err } - errorNode := w.GetOnFailureNode() - if errorNode != nil { + failureNode := w.GetOnFailureNode() + if failureNode != nil { return StatusFailureNode(execErr), nil } diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index bded6f4126..0bbda87c31 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -520,7 +520,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if assert.NoError(t, json.Unmarshal(wJSON, w)) { // For benchmark workflow, we will run into the first failure on round 6 - roundsToFail := 7 + roundsToFail := 8 for i := 0; i < roundsToFail; i++ { err := executor.HandleFlyteWorkflow(ctx, w) assert.Nil(t, err, "Round [%v]", i) @@ -534,6 +534,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if i == roundsToFail-1 { assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase) + } else if i == roundsToFail-2 { + assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase) } else { assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String()) } diff --git a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml index d53059b330..1ea8b9abbc 100644 --- a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml +++ b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml @@ -205,6 +205,22 @@ spec: status: phase: 0 task: sum-non-none + onFailure: + id: en0 + inputBindings: + - binding: + promise: + nodeId: start-node + var: triggered_date + var: triggered_date + kind: task + name: delete-cluster + resources: + requests: + cpu: "2" + memory: 2Gi + status: + phase: 0 status: phase: 0 tasks: @@ -290,6 +306,40 @@ tasks: version: 1.19.0b10 timeout: 0s type: "7" + delete-cluster: + container: + args: + - --task-module=flytekit.examples.tasks + - --task-name=print_every_time + - --inputs={{$input}} + - --output-prefix={{$output}} + command: + - flyte-python-entrypoint + image: myflytecontainer:abc123 + resources: + requests: + - name: 1 + value: "2.000" + - name: 3 + value: 2048Mi + - name: 2 + value: "0.000" + id: + name: delete-cluster + interface: + inputs: + variables: + date_triggered: + type: + simple: DATETIME + outputs: + variables: { } + metadata: + runtime: + type: 1 + version: 1.19.0b10 + timeout: 0s + type: "7" sum-and-print: container: args: