Skip to content

Commit

Permalink
using Abort phase and cleaning up best-effort (#4600)
Browse files Browse the repository at this point in the history
* using Abort phase and cleaning up best-effort

Signed-off-by: Daniel Rammer <[email protected]>

* removed phase updates on Finalize

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 14, 2023
1 parent ccd31ce commit 08056f3
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 29 deletions.
11 changes: 8 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
PhasePermanentFailure
// Indicates the task is waiting for the cache to be populated so it can reuse results
PhaseWaitingForCache
// Indicate the task has been aborted
PhaseAborted
)

var Phases = []Phase{
Expand All @@ -49,11 +51,12 @@ var Phases = []Phase{
PhaseRetryableFailure,
PhasePermanentFailure,
PhaseWaitingForCache,
PhaseAborted,
}

// Returns true if the given phase is failure, retryable failure or success
func (p Phase) IsTerminal() bool {
return p.IsFailure() || p.IsSuccess()
return p.IsFailure() || p.IsSuccess() || p.IsAborted()
}

func (p Phase) IsFailure() bool {
Expand All @@ -64,6 +67,10 @@ func (p Phase) IsSuccess() bool {
return p == PhaseSuccess
}

func (p Phase) IsAborted() bool {
return p == PhaseAborted
}

func (p Phase) IsWaitingForResources() bool {
return p == PhaseWaitingForResources
}
Expand All @@ -82,8 +89,6 @@ type ExternalResource struct {
RetryAttempt uint32
// Phase (if exists) associated with the external resource
Phase Phase
// Indicates if external resource is a subtask getting aborted
IsAbortedSubtask bool
}

type ReasonInfo struct {
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl

case PhasePermanentFailure:
if state.GetExecutionErr() != nil {
phaseInfo = core.PhaseInfoFailureWithCleanup(core.PhasePermanentFailure.String(), state.GetExecutionErr().Message, nowTaskInfo)
phaseInfo = core.PhaseInfoFailed(core.PhasePermanentFailure, state.GetExecutionErr(), nowTaskInfo)
} else {
phaseInfo = core.PhaseInfoSystemFailureWithCleanup(ErrorK8sArrayGeneric, state.GetReason(), nowTaskInfo)
phaseInfo = core.PhaseInfoSystemFailure(ErrorK8sArrayGeneric, state.GetReason(), nowTaskInfo)
}
default:
return phaseInfo, fmt.Errorf("failed to map custom state phase to core phase. State Phase [%v]", p)
Expand Down
3 changes: 2 additions & 1 deletion flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext)
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state")
}

return TerminateSubTasksOnAbort(ctx, tCtx, e.kubeClient, GetConfig(), finalizeSubtask, pluginState)
_, _, err := TerminateSubTasks(ctx, tCtx, e.kubeClient, GetConfig(), finalizeSubtask, pluginState)
return err
}

func (e Executor) Start(ctx context.Context) error {
Expand Down
42 changes: 22 additions & 20 deletions flyteplugins/go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,13 @@ func TerminateSubTasksOnAbort(ctx context.Context, tCtx core.TaskExecutionContex
taskInfo := &core.TaskInfo{
ExternalResources: externalResources,
}
phaseInfo := core.PhaseInfoFailureWithCleanup(core.PhasePermanentFailure.String(), "Array subtasks were aborted", taskInfo)
err = tCtx.EventsRecorder().RecordRaw(ctx, phaseInfo)
executionErr := &idlCore.ExecutionError{
Code: "ArraySubtasksAborted",
Message: "Array subtasks were aborted",
}
phaseInfo := core.PhaseInfoFailed(core.PhaseAborted, executionErr, taskInfo)

return err
return tCtx.EventsRecorder().RecordRaw(ctx, phaseInfo)
}

// TerminateSubTasks performs operations to gracefully terminate all subtasks. This may include
Expand All @@ -362,30 +365,29 @@ func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kube
// we can use RetryAttempts if it has been initialized, otherwise stay with default 0
retryAttempt = currentState.RetryAttempts.GetItem(childIdx)
}

// return immediately if subtask has completed or not yet started
if existingPhase.IsTerminal() || existingPhase == core.PhaseUndefined {
continue
}

originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache())
stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, childIdx, originalIdx, retryAttempt, 0)
if err != nil {
return currentState, externalResources, err
}

isAbortedSubtask := false
if !existingPhase.IsTerminal() && existingPhase != core.PhaseUndefined {
// only terminate subtask if it has completed or has not yet started
err = terminateFunction(ctx, stCtx, config, kubeClient)
if err != nil {
messageCollector.Collect(childIdx, err.Error())
} else {
isAbortedSubtask = true
}
err = terminateFunction(ctx, stCtx, config, kubeClient)
if err != nil {
messageCollector.Collect(childIdx, err.Error())
} else {
externalResources = append(externalResources, &core.ExternalResource{
ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(),
Index: uint32(originalIdx),
RetryAttempt: uint32(retryAttempt),
Phase: core.PhaseAborted,
})
}

externalResources = append(externalResources, &core.ExternalResource{
ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(),
Index: uint32(originalIdx),
RetryAttempt: uint32(retryAttempt),
Phase: existingPhase,
IsAbortedSubtask: isAbortedSubtask,
})
}

if messageCollector.Length() > 0 {
Expand Down
5 changes: 2 additions & 3 deletions flytepropeller/pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func ToTaskEventPhase(p pluginCore.Phase) core.TaskExecution_Phase {
return core.TaskExecution_FAILED
case pluginCore.PhaseRetryableFailure:
return core.TaskExecution_FAILED
case pluginCore.PhaseAborted:
return core.TaskExecution_ABORTED

Check warning on line 48 in flytepropeller/pkg/controller/nodes/task/transformer.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/transformer.go#L47-L48

Added lines #L47 - L48 were not covered by tests
case pluginCore.PhaseNotReady:
fallthrough
case pluginCore.PhaseUndefined:
Expand Down Expand Up @@ -118,9 +120,6 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio
metadata.ExternalResources = make([]*event.ExternalResourceInfo, len(externalResources))
for idx, e := range input.Info.Info().ExternalResources {
phase := ToTaskEventPhase(e.Phase)

Check warning on line 122 in flytepropeller/pkg/controller/nodes/task/transformer.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/transformer.go#L122

Added line #L122 was not covered by tests
if e.IsAbortedSubtask {
phase = core.TaskExecution_ABORTED
}
metadata.ExternalResources[idx] = &event.ExternalResourceInfo{
ExternalId: e.ExternalID,
CacheStatus: e.CacheStatus,
Expand Down

0 comments on commit 08056f3

Please sign in to comment.