Skip to content

Commit

Permalink
handle task validation error wip
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Nov 10, 2023
1 parent 630724f commit f795902
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 87 deletions.
11 changes: 11 additions & 0 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

Expand Down Expand Up @@ -140,3 +141,13 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
}
return statusErr
}

// ExecutionRuntimeError is a special error that can be returned by plugins denoting that
// execution failed during runtime and should still be saved to database
type ExecutionRuntimeError struct {
ExecutionError *core.ExecutionError
}

func (r ExecutionRuntimeError) Error() string {
return r.ExecutionError.Message
}
40 changes: 23 additions & 17 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl

import (
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -20,7 +21,7 @@ import (
notificationInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
flyteErrs "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/executions"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/shared"
Expand Down Expand Up @@ -130,7 +131,7 @@ func validateMapSize(maxEntries int, candidate map[string]string, candidateName
return nil
}
if len(candidate) > maxEntries {
return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s has too many entries [%v > %v]",
return flyteErrs.NewFlyteAdminErrorf(codes.InvalidArgument, "%s has too many entries [%v > %v]",
candidateName, len(candidate), maxEntries)
}
return nil
Expand Down Expand Up @@ -165,7 +166,7 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *
ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE,
})
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
ec, ok := err.(flyteErrs.FlyteAdminError)
if !ok || ec.Code() != codes.NotFound {
return nil, err
}
Expand Down Expand Up @@ -428,7 +429,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad
ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
if flyteAdminError, ok := err.(flyteErrs.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err)
return nil, err
}
Expand Down Expand Up @@ -862,10 +863,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
})

if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
if errors.Is(err, flyteErrs.ExecutionRuntimeError{}) {
logger.Infof(ctx, "received task validation error %v, saving execution [%+v] as failed", err, workflowExecutionID)
} else {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
}
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
Expand Down Expand Up @@ -902,6 +907,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
Error: err,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -983,7 +989,7 @@ func (m *ExecutionManager) RelaunchExecution(
var spec admin.ExecutionSpec
err = proto.Unmarshal(existingExecutionModel.Spec, &spec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
inputs = spec.Inputs
}
Expand Down Expand Up @@ -1215,7 +1221,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
if wfExecPhase == request.Event.Phase {
logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.AlreadyExists,
"This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
} else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil {
Expand All @@ -1228,14 +1234,14 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
// Cannot go backwards in time from a terminal state to anything else
curPhase := wfExecPhase.String()
errorMsg := fmt.Sprintf("Invalid phase change from %s to %s for workflow execution %v", curPhase, request.Event.Phase.String(), request.Event.ExecutionId)
return nil, errors.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase)
return nil, flyteErrs.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase)
} else if wfExecPhase == core.WorkflowExecution_RUNNING && request.Event.Phase == core.WorkflowExecution_QUEUED {
// Cannot go back in time from RUNNING -> QUEUED
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Cannot go from %s to %s for workflow execution %v",
wfExecPhase.String(), request.Event.Phase.String(), request.Event.ExecutionId)
} else if wfExecPhase == core.WorkflowExecution_ABORTING && !common.IsExecutionTerminal(request.Event.Phase) {
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Invalid phase change from aborting to %s for workflow execution %v", request.Event.Phase.String(), request.Event.ExecutionId)
}

Expand Down Expand Up @@ -1434,7 +1440,7 @@ func (m *ExecutionManager) ListExecutions(

offset, err := validation.ValidateToken(request.Token)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid pagination token %s for ListExecutions",
return nil, flyteErrs.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid pagination token %s for ListExecutions",
request.Token)
}
joinTableEntities := make(map[common.Entity]bool)
Expand Down Expand Up @@ -1494,7 +1500,7 @@ func (m *ExecutionManager) publishNotifications(ctx context.Context, request adm
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to transform execution [%+v] with err: %v", request.Event.ExecutionId, err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to transform execution [%+v] with err: %v", request.Event.ExecutionId, err)
}
var notificationsList = adminExecution.Closure.Notifications
logger.Debugf(ctx, "publishing notifications for execution [%+v] in state [%+v] for notifications [%+v]",
Expand Down Expand Up @@ -1526,7 +1532,7 @@ func (m *ExecutionManager) publishNotifications(ctx context.Context, request adm
logger.Debugf(ctx, "failed to publish notification, encountered unrecognized type: %v", notification.Type)
m.systemMetrics.UnexpectedDataError.Inc()
// Unsupported notification types should have been caught when the launch plan was being created.
return errors.NewFlyteAdminErrorf(codes.Internal, "Unsupported notification type [%v] for execution [%+v]",
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Unsupported notification type [%v] for execution [%+v]",
notification.Type, request.Event.ExecutionId)
}

Expand Down Expand Up @@ -1564,7 +1570,7 @@ func (m *ExecutionManager) TerminateExecution(
}

if common.IsExecutionTerminal(core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase])) {
return nil, errors.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminate workflow execution", executionModel.Phase)
return nil, flyteErrs.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminate workflow execution", executionModel.Phase)
}

err = transformers.SetExecutionAborting(&executionModel, request.Cause, getUser(ctx))
Expand Down
47 changes: 31 additions & 16 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformers

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -13,7 +14,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
flyteErrs "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -45,6 +46,7 @@ type CreateExecutionModelInput struct {
SecurityContext *core.SecurityContext
LaunchEntity core.ResourceType
Namespace string
Error error
}

type ExecutionTransformerOptions struct {
Expand All @@ -70,11 +72,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
requestSpec.SecurityContext = input.SecurityContext
spec, err := proto.Marshal(requestSpec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}
createdAt, err := ptypes.TimestampProto(input.CreatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to serialize execution created at time")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to serialize execution created at time")
}
closure := admin.ExecutionClosure{
Phase: input.Phase,
Expand All @@ -91,11 +93,24 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
execError := &core.ExecutionError{
Code: "Unknown",
Message: input.Error.Error(),
Kind: core.ExecutionError_UNKNOWN,
}
var err flyteErrs.ExecutionRuntimeError
if errors.As(input.Error, &err) {
execError = err.ExecutionError
}
closure.OutputResult = &admin.ExecutionClosure_Error{Error: execError}
}

closureBytes, err := proto.Marshal(&closure)

if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
return nil, flyteErrs.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
Expand Down Expand Up @@ -147,7 +162,7 @@ func reassignCluster(ctx context.Context, cluster string, executionID *core.Work
var executionSpec admin.ExecutionSpec
err := proto.Unmarshal(execution.Spec, &executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
}
if executionSpec.Metadata == nil {
executionSpec.Metadata = &admin.ExecutionMetadata{}
Expand All @@ -158,7 +173,7 @@ func reassignCluster(ctx context.Context, cluster string, executionID *core.Work
executionSpec.Metadata.SystemMetadata.ExecutionCluster = cluster
marshaledSpec, err := proto.Marshal(&executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
}
execution.Spec = marshaledSpec
return nil
Expand All @@ -172,15 +187,15 @@ func UpdateExecutionModelState(
var executionClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
executionClosure.Phase = request.Event.Phase
executionClosure.UpdatedAt = request.Event.OccurredAt
execution.Phase = request.Event.Phase.String()

occurredAtTimestamp, err := ptypes.Timestamp(request.Event.OccurredAt)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
}
execution.ExecutionUpdatedAt = &occurredAtTimestamp

Expand Down Expand Up @@ -210,7 +225,7 @@ func UpdateExecutionModelState(
errorMsg := fmt.Sprintf("Cannot accept events for running/terminated execution [%v] from cluster [%s],"+
"expected events to originate from [%s]",
request.Event.ExecutionId, request.Event.ProducerId, execution.Cluster)
return errors.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
return flyteErrs.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
}
}

Expand Down Expand Up @@ -253,7 +268,7 @@ func UpdateExecutionModelState(
}
marshaledClosure, err := proto.Marshal(&executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
return nil
Expand All @@ -267,7 +282,7 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st
var closure admin.ExecutionClosure
err := proto.Unmarshal(executionModel.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
// Update the indexed columns
stateInt := int32(stateUpdatedTo)
Expand All @@ -286,7 +301,7 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st
}
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
executionModel.Closure = marshaledClosure
return nil
Expand All @@ -298,7 +313,7 @@ func SetExecutionAborting(execution *models.Execution, cause, principal string)
var closure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
closure.OutputResult = &admin.ExecutionClosure_AbortMetadata{
AbortMetadata: &admin.AbortMetadata{
Expand All @@ -309,7 +324,7 @@ func SetExecutionAborting(execution *models.Execution, cause, principal string)
closure.Phase = core.WorkflowExecution_ABORTING
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
execution.AbortCause = cause
Expand All @@ -329,7 +344,7 @@ func FromExecutionModel(ctx context.Context, executionModel models.Execution, op
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
if len(opts.DefaultNamespace) > 0 {
if spec.Metadata == nil {
Expand All @@ -346,7 +361,7 @@ func FromExecutionModel(ctx context.Context, executionModel models.Execution, op

var closure admin.ExecutionClosure
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
Expand Down
Loading

0 comments on commit f795902

Please sign in to comment.