From 66625f61b37c9799340977e08a5136eae60aff7c Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 31 Oct 2023 14:19:53 -0500 Subject: [PATCH 1/3] completing retries even if minSuccesses are achieved Signed-off-by: Daniel Rammer --- flyteplugins/go/tasks/plugins/array/core/state.go | 2 +- .../go/tasks/plugins/array/core/state_test.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/array/core/state.go b/flyteplugins/go/tasks/plugins/array/core/state.go index 665eec309f..d714c3404d 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state.go +++ b/flyteplugins/go/tasks/plugins/array/core/state.go @@ -273,7 +273,7 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) return PhaseWaitingForResources } - if totalSuccesses >= minSuccesses && totalRunning == 0 { + if totalSuccesses >= minSuccesses && totalRunning == 0 && totalRetryableFailures == 0 { logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses) return PhaseWriteToDiscovery } diff --git a/flyteplugins/go/tasks/plugins/array/core/state_test.go b/flyteplugins/go/tasks/plugins/array/core/state_test.go index 26a80531b5..c81252bbf8 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state_test.go +++ b/flyteplugins/go/tasks/plugins/array/core/state_test.go @@ -333,7 +333,7 @@ func TestSummaryToPhase(t *testing.T) { core.PhaseSuccess: 10, }, }, - { + { "FailedToRetry", PhaseWriteToDiscoveryThenFail, map[core.Phase]int64{ @@ -349,6 +349,15 @@ func TestSummaryToPhase(t *testing.T) { core.PhaseRetryableFailure: 5, }, }, + { + // complete retry even though minSuccesses is achieved + "RetryMinSuccessRatio", + PhaseCheckingSubTaskExecutions, + map[core.Phase]int64{ + core.PhaseSuccess: 10, + core.PhaseRetryableFailure: 1, + }, + }, } for _, tt := range tests { From d628c196e1b4fbeb21e15ecdc7fac4d7108cb503 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 6 Nov 2023 08:55:54 -0600 Subject: [PATCH 2/3] ensuring all tasks in a terminal state Signed-off-by: Daniel Rammer --- flyteplugins/go/tasks/plugins/array/core/state.go | 3 ++- .../go/tasks/plugins/array/core/state_test.go | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/array/core/state.go b/flyteplugins/go/tasks/plugins/array/core/state.go index d714c3404d..2c8252cbdf 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state.go +++ b/flyteplugins/go/tasks/plugins/array/core/state.go @@ -273,7 +273,8 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) return PhaseWaitingForResources } - if totalSuccesses >= minSuccesses && totalRunning == 0 && totalRetryableFailures == 0 { + //if totalSuccesses >= minSuccesses && totalRunning == 0 && totalRetryableFailures == 0 { + if totalSuccesses >= minSuccesses && totalSuccesses+totalPermanentFailures == totalCount { logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses) return PhaseWriteToDiscovery } diff --git a/flyteplugins/go/tasks/plugins/array/core/state_test.go b/flyteplugins/go/tasks/plugins/array/core/state_test.go index c81252bbf8..01b5b41528 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state_test.go +++ b/flyteplugins/go/tasks/plugins/array/core/state_test.go @@ -333,7 +333,7 @@ func TestSummaryToPhase(t *testing.T) { core.PhaseSuccess: 10, }, }, - { + { "FailedToRetry", PhaseWriteToDiscoveryThenFail, map[core.Phase]int64{ @@ -358,6 +358,15 @@ func TestSummaryToPhase(t *testing.T) { core.PhaseRetryableFailure: 1, }, }, + { + // ensure all tasks are executed even if minSuccesses is achieved + "ExecuteAllMinSuccessRatio", + PhaseCheckingSubTaskExecutions, + map[core.Phase]int64{ + core.PhaseSuccess: 10, + core.PhaseUndefined: 1, + }, + }, } for _, tt := range tests { From 4817d72448d09ea52c1418c47fa3a5a282ffb174 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 6 Nov 2023 09:33:38 -0600 Subject: [PATCH 3/3] updated dead code to comment Signed-off-by: Daniel Rammer --- flyteplugins/go/tasks/plugins/array/core/state.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/array/core/state.go b/flyteplugins/go/tasks/plugins/array/core/state.go index 2c8252cbdf..f601ec375b 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state.go +++ b/flyteplugins/go/tasks/plugins/array/core/state.go @@ -273,7 +273,9 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) return PhaseWaitingForResources } - //if totalSuccesses >= minSuccesses && totalRunning == 0 && totalRetryableFailures == 0 { + + // if we have enough successes, ensure all tasks are in a terminal phase (success or failure) + // before transitioning to the next phase. if totalSuccesses >= minSuccesses && totalSuccesses+totalPermanentFailures == totalCount { logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses) return PhaseWriteToDiscovery