Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store failed execution in flyteadmin #4390

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,26 +845,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
executionParameters.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
}

workflowExecutor := plugins.Get[workflowengineInterfaces.WorkflowExecutor](m.pluginRegistry, plugins.PluginIDWorkflowExecutor)
execInfo, err := workflowExecutor.Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
})

if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())

// Request notification settings takes precedence over the launch plan settings.
// If there is no notification in the request and DisableAll is not true, use the settings from the launch plan.
Expand All @@ -879,7 +861,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
notificationsSettings = make([]*admin.Notification, 0)
}

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
createExecModelInput := transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: requestSpec,
LaunchPlanID: launchPlanModel.ID,
Expand All @@ -891,13 +873,34 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
SourceExecutionID: sourceExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
}

workflowExecutor := plugins.Get[workflowengineInterfaces.WorkflowExecutor](m.pluginRegistry, plugins.PluginIDWorkflowExecutor)
execInfo, execErr := workflowExecutor.Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
})
if execErr != nil {
createExecModelInput.Error = execErr
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, execErr)
} else {
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())
createExecModelInput.Cluster = execInfo.Cluster
}

executionModel, err := transformers.CreateExecutionModel(createExecModelInput)
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
Expand Down
50 changes: 42 additions & 8 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
principal = "principal"
rawOutput = "raw_output"
)

var spec = testutils.GetExecutionRequest().Spec
var specBytes, _ = proto.Marshal(spec)
var phase = core.WorkflowExecution_RUNNING.String()
Expand Down Expand Up @@ -296,8 +301,6 @@ func TestCreateExecution(t *testing.T) {
Labels: &labels}), nil
}

principal := "principal"
rawOutput := "raw_output"
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
Expand Down Expand Up @@ -421,7 +424,6 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
},
)

principal := "feeny"
getExecutionCalled := false
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
Expand Down Expand Up @@ -618,6 +620,7 @@ func TestCreateExecutionInCompatibleInputs(t *testing.T) {
}

func TestCreateExecutionPropellerFailure(t *testing.T) {
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
expectedErr := flyteAdminErrors.NewFlyteAdminErrorf(codes.Internal, "ABC")
Expand All @@ -626,13 +629,45 @@ func TestCreateExecutionPropellerFailure(t *testing.T) {
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

qosProvider := &runtimeIFaceMocks.QualityOfServiceConfiguration{}
qosProvider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{
core.QualityOfService_HIGH: {
QueueingBudget: ptypes.DurationProto(10 * time.Minute),
},
core.QualityOfService_MEDIUM: {
QueueingBudget: ptypes.DurationProto(20 * time.Minute),
},
core.QualityOfService_LOW: {
QueueingBudget: ptypes.DurationProto(30 * time.Minute),
},
})

qosProvider.OnGetDefaultTiers().Return(map[string]core.QualityOfService_Tier{
"domain": core.QualityOfService_HIGH,
})

mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider)

request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Principal: "unused - populated from authenticated context",
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment

response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, response)
identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
ctx := identity.WithContext(context.Background())
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

expectedResponse := &admin.ExecutionCreateResponse{Id: &executionIdentifier}

response, err := execManager.CreateExecution(ctx, request, requestedAt)

assert.NoError(t, err)
assert.Equal(t, expectedResponse, response)
}

func TestCreateExecutionDatabaseFailure(t *testing.T) {
Expand Down Expand Up @@ -3379,7 +3414,6 @@ func TestTerminateExecution(t *testing.T) {
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

abortCause := "abort cause"
principal := "principal"
updateExecutionFunc := func(
context context.Context, execution models.Execution) error {
assert.Equal(t, "project", execution.Project)
Expand Down
55 changes: 36 additions & 19 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -13,7 +14,7 @@
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
flyteErrs "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -45,6 +46,7 @@
SecurityContext *core.SecurityContext
LaunchEntity core.ResourceType
Namespace string
Error error
}

type ExecutionTransformerOptions struct {
Expand All @@ -70,12 +72,9 @@
requestSpec.SecurityContext = input.SecurityContext
spec, err := proto.Marshal(requestSpec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}
createdAt, err := ptypes.TimestampProto(input.CreatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to serialize execution created at time")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)

Check warning on line 75 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L75

Added line #L75 was not covered by tests
}
createdAt := timestamppb.New(input.CreatedAt)
closure := admin.ExecutionClosure{
Phase: input.Phase,
CreatedAt: createdAt,
Expand All @@ -91,11 +90,29 @@
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
execErr := &core.ExecutionError{
Code: "Unknown",
Message: input.Error.Error(),
Kind: core.ExecutionError_SYSTEM,
}

var adminErr flyteErrs.FlyteAdminError
if errors.As(input.Error, &adminErr) {
execErr.Code = adminErr.Code().String()
execErr.Message = adminErr.Error()
if adminErr.Code() == codes.InvalidArgument {
execErr.Kind = core.ExecutionError_USER
}
}
closure.OutputResult = &admin.ExecutionClosure_Error{Error: execErr}
}

closureBytes, err := proto.Marshal(&closure)

if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
return nil, flyteErrs.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")

Check warning on line 115 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L115

Added line #L115 was not covered by tests
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
Expand Down Expand Up @@ -147,7 +164,7 @@
var executionSpec admin.ExecutionSpec
err := proto.Unmarshal(execution.Spec, &executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
}
if executionSpec.Metadata == nil {
executionSpec.Metadata = &admin.ExecutionMetadata{}
Expand All @@ -158,7 +175,7 @@
executionSpec.Metadata.SystemMetadata.ExecutionCluster = cluster
marshaledSpec, err := proto.Marshal(&executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)

Check warning on line 178 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L178

Added line #L178 was not covered by tests
}
execution.Spec = marshaledSpec
return nil
Expand All @@ -172,15 +189,15 @@
var executionClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)

Check warning on line 192 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L192

Added line #L192 was not covered by tests
}
executionClosure.Phase = request.Event.Phase
executionClosure.UpdatedAt = request.Event.OccurredAt
execution.Phase = request.Event.Phase.String()

occurredAtTimestamp, err := ptypes.Timestamp(request.Event.OccurredAt)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)

Check warning on line 200 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L200

Added line #L200 was not covered by tests
}
execution.ExecutionUpdatedAt = &occurredAtTimestamp

Expand Down Expand Up @@ -210,7 +227,7 @@
errorMsg := fmt.Sprintf("Cannot accept events for running/terminated execution [%v] from cluster [%s],"+
"expected events to originate from [%s]",
request.Event.ExecutionId, request.Event.ProducerId, execution.Cluster)
return errors.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
return flyteErrs.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
}
}

Expand Down Expand Up @@ -253,7 +270,7 @@
}
marshaledClosure, err := proto.Marshal(&executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)

Check warning on line 273 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L273

Added line #L273 was not covered by tests
}
execution.Closure = marshaledClosure
return nil
Expand All @@ -267,7 +284,7 @@
var closure admin.ExecutionClosure
err := proto.Unmarshal(executionModel.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
// Update the indexed columns
stateInt := int32(stateUpdatedTo)
Expand All @@ -286,7 +303,7 @@
}
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)

Check warning on line 306 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L306

Added line #L306 was not covered by tests
}
executionModel.Closure = marshaledClosure
return nil
Expand All @@ -298,7 +315,7 @@
var closure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)

Check warning on line 318 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L318

Added line #L318 was not covered by tests
}
closure.OutputResult = &admin.ExecutionClosure_AbortMetadata{
AbortMetadata: &admin.AbortMetadata{
Expand All @@ -309,7 +326,7 @@
closure.Phase = core.WorkflowExecution_ABORTING
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)

Check warning on line 329 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L329

Added line #L329 was not covered by tests
}
execution.Closure = marshaledClosure
execution.AbortCause = cause
Expand All @@ -329,7 +346,7 @@
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")

Check warning on line 349 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L349

Added line #L349 was not covered by tests
}
if len(opts.DefaultNamespace) > 0 {
if spec.Metadata == nil {
Expand All @@ -346,7 +363,7 @@

var closure admin.ExecutionClosure
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")

Check warning on line 366 in flyteadmin/pkg/repositories/transformers/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/execution.go#L366

Added line #L366 was not covered by tests
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
Expand Down
Loading
Loading