Skip to content

Commit

Permalink
handle task validation error
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Nov 10, 2023
1 parent 630724f commit 0c2ade1
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 87 deletions.
11 changes: 11 additions & 0 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

Expand Down Expand Up @@ -140,3 +141,13 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
}
return statusErr
}

// ExecutionRuntimeError is a special error that can be returned by plugins denoting that
// execution failed during runtime and should still be saved to database
type ExecutionRuntimeError struct {
ExecutionError *core.ExecutionError
}

func (r ExecutionRuntimeError) Error() string {
return r.ExecutionError.Message
}
41 changes: 24 additions & 17 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl

import (
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -20,7 +21,7 @@ import (
notificationInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
flyteErrs "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/executions"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/shared"
Expand Down Expand Up @@ -130,7 +131,7 @@ func validateMapSize(maxEntries int, candidate map[string]string, candidateName
return nil
}
if len(candidate) > maxEntries {
return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s has too many entries [%v > %v]",
return flyteErrs.NewFlyteAdminErrorf(codes.InvalidArgument, "%s has too many entries [%v > %v]",
candidateName, len(candidate), maxEntries)
}
return nil
Expand Down Expand Up @@ -165,7 +166,7 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *
ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE,
})
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
ec, ok := err.(flyteErrs.FlyteAdminError)
if !ok || ec.Code() != codes.NotFound {
return nil, err
}
Expand Down Expand Up @@ -428,7 +429,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad
ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
if flyteAdminError, ok := err.(flyteErrs.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err)
return nil, err
}
Expand Down Expand Up @@ -862,10 +863,15 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
})

if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
var runtimeErr flyteErrs.ExecutionRuntimeError
if errors.As(err, &runtimeErr) {
logger.Infof(ctx, "received execution runtime error %v, saving execution [%+v] as failed", runtimeErr, workflowExecutionID)
} else {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
}
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
Expand Down Expand Up @@ -902,6 +908,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
Error: err,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -983,7 +990,7 @@ func (m *ExecutionManager) RelaunchExecution(
var spec admin.ExecutionSpec
err = proto.Unmarshal(existingExecutionModel.Spec, &spec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
inputs = spec.Inputs
}
Expand Down Expand Up @@ -1215,7 +1222,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
if wfExecPhase == request.Event.Phase {
logger.Debugf(ctx, "This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.AlreadyExists,
"This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
} else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil {
Expand All @@ -1228,14 +1235,14 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
// Cannot go backwards in time from a terminal state to anything else
curPhase := wfExecPhase.String()
errorMsg := fmt.Sprintf("Invalid phase change from %s to %s for workflow execution %v", curPhase, request.Event.Phase.String(), request.Event.ExecutionId)
return nil, errors.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase)
return nil, flyteErrs.NewAlreadyInTerminalStateError(ctx, errorMsg, curPhase)
} else if wfExecPhase == core.WorkflowExecution_RUNNING && request.Event.Phase == core.WorkflowExecution_QUEUED {
// Cannot go back in time from RUNNING -> QUEUED
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Cannot go from %s to %s for workflow execution %v",
wfExecPhase.String(), request.Event.Phase.String(), request.Event.ExecutionId)
} else if wfExecPhase == core.WorkflowExecution_ABORTING && !common.IsExecutionTerminal(request.Event.Phase) {
return nil, errors.NewFlyteAdminErrorf(codes.FailedPrecondition,
return nil, flyteErrs.NewFlyteAdminErrorf(codes.FailedPrecondition,
"Invalid phase change from aborting to %s for workflow execution %v", request.Event.Phase.String(), request.Event.ExecutionId)
}

Expand Down Expand Up @@ -1434,7 +1441,7 @@ func (m *ExecutionManager) ListExecutions(

offset, err := validation.ValidateToken(request.Token)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid pagination token %s for ListExecutions",
return nil, flyteErrs.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid pagination token %s for ListExecutions",
request.Token)
}
joinTableEntities := make(map[common.Entity]bool)
Expand Down Expand Up @@ -1494,7 +1501,7 @@ func (m *ExecutionManager) publishNotifications(ctx context.Context, request adm
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to transform execution [%+v] with err: %v", request.Event.ExecutionId, err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to transform execution [%+v] with err: %v", request.Event.ExecutionId, err)
}
var notificationsList = adminExecution.Closure.Notifications
logger.Debugf(ctx, "publishing notifications for execution [%+v] in state [%+v] for notifications [%+v]",
Expand Down Expand Up @@ -1526,7 +1533,7 @@ func (m *ExecutionManager) publishNotifications(ctx context.Context, request adm
logger.Debugf(ctx, "failed to publish notification, encountered unrecognized type: %v", notification.Type)
m.systemMetrics.UnexpectedDataError.Inc()
// Unsupported notification types should have been caught when the launch plan was being created.
return errors.NewFlyteAdminErrorf(codes.Internal, "Unsupported notification type [%v] for execution [%+v]",
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Unsupported notification type [%v] for execution [%+v]",
notification.Type, request.Event.ExecutionId)
}

Expand Down Expand Up @@ -1564,7 +1571,7 @@ func (m *ExecutionManager) TerminateExecution(
}

if common.IsExecutionTerminal(core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase])) {
return nil, errors.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminate workflow execution", executionModel.Phase)
return nil, flyteErrs.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminate workflow execution", executionModel.Phase)
}

err = transformers.SetExecutionAborting(&executionModel, request.Cause, getUser(ctx))
Expand Down
115 changes: 115 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,121 @@ func TestCreateExecution(t *testing.T) {
// TODO: Check for offloaded inputs
}

func TestCreateExecution_ExecutionRuntimeError(t *testing.T) {
repository := getMockRepositoryForExecTest()
labels := admin.Labels{
Values: map[string]string{
"label3": "3",
"label2": "1", // common label, will be dropped
}}
repository.ProjectRepo().(*repositoryMocks.MockProjectRepo).GetFunction = func(
ctx context.Context, projectID string) (models.Project, error) {
return transformers.CreateProjectModel(&admin.Project{
Labels: &labels}), nil
}

principal := "principal"
rawOutput := "raw_output"
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, principal, spec.Metadata.Principal)
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment))
assert.Equal(t, "launch_plan", input.LaunchEntity)
assert.Equal(t, spec.GetMetadata().GetSystemMetadata().Namespace, "project-domain")
return nil
})
setDefaultLpCallbackForExecTest(repository)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
resources := &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "300m",
},
{
Name: core.Resources_MEMORY,
Value: "500Gi",
},
},
}
runtimeErr := flyteAdminErrors.ExecutionRuntimeError{ExecutionError: &core.ExecutionError{
Code: "RuntimeError",
Message: "validation failed",
Kind: core.ExecutionError_USER,
}}
mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.ExecutionData) bool {
tasks := data.WorkflowClosure.GetTasks()
for _, task := range tasks {
assert.EqualValues(t, resources.Requests,
task.Template.GetContainer().Resources.Requests)
assert.EqualValues(t, resources.Requests,
task.Template.GetContainer().Resources.Limits)
}

return true
})).Return(workflowengineInterfaces.ExecutionResponse{Cluster: testCluster}, runtimeErr)
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)

qosProvider := &runtimeIFaceMocks.QualityOfServiceConfiguration{}
qosProvider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{
core.QualityOfService_HIGH: {
QueueingBudget: ptypes.DurationProto(10 * time.Minute),
},
core.QualityOfService_MEDIUM: {
QueueingBudget: ptypes.DurationProto(20 * time.Minute),
},
core.QualityOfService_LOW: {
QueueingBudget: ptypes.DurationProto(30 * time.Minute),
},
})

qosProvider.OnGetDefaultTiers().Return(map[string]core.QualityOfService_Tier{
"domain": core.QualityOfService_HIGH,
})

mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider)

execManager := NewExecutionManager(repository, r, mockConfig, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Principal: "unused - populated from authenticated context",
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment

identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
ctx := identity.WithContext(context.Background())
response, err := execManager.CreateExecution(ctx, request, requestedAt)
assert.Nil(t, err)

expectedResponse := &admin.ExecutionCreateResponse{
Id: &executionIdentifier,
}
assert.Nil(t, err)
assert.Equal(t, expectedResponse, response)

// TODO: Check for offloaded inputs
}

func TestCreateExecutionFromWorkflowNode(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
Expand Down
Loading

0 comments on commit 0c2ade1

Please sign in to comment.