diff --git a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go index a96d99d3d6..e2a32debdd 100644 --- a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go +++ b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go @@ -110,7 +110,7 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Invalid custom quality of service set in launch plan [%v], failed to parse duration [%v] with: %v", input.LaunchPlan.Id, - input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + input.LaunchPlan.Spec.QualityOfService.GetSpec().QueueingBudget, err) } return QualityOfServiceSpec{ QueuingBudget: duration, @@ -129,7 +129,7 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Invalid custom quality of service set in workflow [%v], failed to parse duration [%v] with: %v", workflowIdentifier, - input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetSpec().QueueingBudget, err) } return QualityOfServiceSpec{ QueuingBudget: duration, @@ -154,7 +154,7 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Invalid custom quality of service set in overridable matching attributes for [%v],"+ "failed to parse duration [%v] with: %v", workflowIdentifier, - input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + qualityOfService.GetSpec().QueueingBudget, err) } return QualityOfServiceSpec{ QueuingBudget: duration, @@ -163,7 +163,7 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu logger.Debugf(ctx, "Determining quality of service tier from database override for [%s/%s/%s]", input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, input.ExecutionCreateRequest.Name) - qualityOfServiceTier = input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetTier() + qualityOfServiceTier = qualityOfService.GetTier() } } diff --git a/flyteadmin/pkg/manager/impl/executions/quality_of_service_test.go b/flyteadmin/pkg/manager/impl/executions/quality_of_service_test.go index 41a04ec2bc..335b5d390c 100644 --- a/flyteadmin/pkg/manager/impl/executions/quality_of_service_test.go +++ b/flyteadmin/pkg/manager/impl/executions/quality_of_service_test.go @@ -2,6 +2,7 @@ package executions import ( "context" + "errors" "testing" "time" @@ -35,6 +36,16 @@ func getQualityOfServiceWithDuration(duration time.Duration) *core.QualityOfServ } } +func getQualityOfServiceWithNilDuration() *core.QualityOfService { + return &core.QualityOfService{ + Designation: &core.QualityOfService_Spec{ + Spec: &core.QualityOfServiceSpec{ + QueueingBudget: nil, + }, + }, + } +} + func getMockConfig() runtimeInterfaces.Configuration { mockConfig := mocks.NewMockConfigurationProvider(nil, nil, nil, nil, nil, nil) provider := &runtimeIFaceMocks.QualityOfServiceConfiguration{} @@ -78,6 +89,35 @@ func addGetResourceFunc(t *testing.T, resourceManager interfaces.ResourceInterfa } } +func addGetErrorResourceFunc(t *testing.T, resourceManager interfaces.ResourceInterface, containSpec bool) { + resourceManager.(*managerMocks.MockResourceManager).GetResourceFunc = func(ctx context.Context, + request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { + assert.EqualValues(t, request, interfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Workflow: workflowIdentifier.Name, + ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, + }) + noSourceError := errors.New("no resource") + + noServiceResponse := &interfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_QualityOfService{ + QualityOfService: getQualityOfServiceWithNilDuration(), + }, + }, + } + + result, error := &interfaces.ResourceResponse{}, noSourceError + + if containSpec { + result, error = noServiceResponse, nil + } + + return result, error + } +} + func getWorkflowWithQosSpec(qualityOfService *core.QualityOfService) *admin.Workflow { return &admin.Workflow{ Id: workflowIdentifier, @@ -116,6 +156,23 @@ func TestGetQualityOfService_ExecutionCreateRequest(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, spec.QueuingBudget, 3*time.Minute) + + _, failError := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithDuration(4 * time.Minute)), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + QualityOfService: getQualityOfServiceWithDuration(2 * time.Minute), + }, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{ + QualityOfService: getQualityOfServiceWithNilDuration(), + }, + }, + }) + assert.Error(t, failError) + } func TestGetQualityOfService_LaunchPlan(t *testing.T) { @@ -137,6 +194,20 @@ func TestGetQualityOfService_LaunchPlan(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, spec.QueuingBudget, 2*time.Minute) + + _, failError := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithDuration(4 * time.Minute)), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + QualityOfService: getQualityOfServiceWithNilDuration(), + }, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Error(t, failError) } func TestGetQualityOfService_Workflow(t *testing.T) { @@ -156,6 +227,19 @@ func TestGetQualityOfService_Workflow(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, spec.QueuingBudget, 4*time.Minute) + + _, failError := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithNilDuration()), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Error(t, failError) + } func TestGetQualityOfService_MatchableResource(t *testing.T) { @@ -175,6 +259,33 @@ func TestGetQualityOfService_MatchableResource(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, spec.QueuingBudget, 5*time.Minute) + + addGetErrorResourceFunc(t, &resourceManager, false) + _, failError := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(nil), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Error(t, failError) + + addGetErrorResourceFunc(t, &resourceManager, true) + _, noSpecError := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(nil), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Error(t, noSpecError) + } func TestGetQualityOfService_ConfigValues(t *testing.T) {