diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 8c235d3d71..41ce00d103 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -6,24 +6,9 @@ concurrency: on: pull_request: - paths: - - 'datacatalog/**' - - 'flyteadmin/**' - - 'flytecopilot/**' - - 'flyteplugins/**' - - 'flytepropeller/**' - - 'flytestdlib/**' push: branches: - master - paths: - - 'datacatalog/**' - - 'flyteadmin/**' - - 'flytecopilot/**' - - 'flyteidl/**' - - 'flyteplugins/**' - - 'flytepropeller/**' - - 'flytestdlib/**' env: GO_VERSION: "1.19" PRIORITIES: "P0" diff --git a/.github/workflows/flyteidl-checks.yml b/.github/workflows/flyteidl-checks.yml index cbf84b97f1..5cdf9733b2 100644 --- a/.github/workflows/flyteidl-checks.yml +++ b/.github/workflows/flyteidl-checks.yml @@ -6,13 +6,9 @@ concurrency: on: pull_request: - paths: - - 'flyteidl/**' push: branches: - master - paths: - - 'flyteidl/**' env: GO_VERSION: "1.19" jobs: diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 42ef63841b..e269715a91 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -140,3 +140,8 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi } return statusErr } + +func IsDoesNotExistError(err error) bool { + adminError, ok := err.(FlyteAdminError) + return ok && adminError.Code() == codes.NotFound +} diff --git a/flyteadmin/pkg/errors/errors_test.go b/flyteadmin/pkg/errors/errors_test.go index 4b6b250166..cb6a2a0ae0 100644 --- a/flyteadmin/pkg/errors/errors_test.go +++ b/flyteadmin/pkg/errors/errors_test.go @@ -2,6 +2,7 @@ package errors import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -90,3 +91,15 @@ func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) { _, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsIdenticalStructure) assert.True(t, ok) } + +func TestIsDoesNotExistError(t *testing.T) { + assert.True(t, IsDoesNotExistError(NewFlyteAdminError(codes.NotFound, "foo"))) +} + +func TestIsNotDoesNotExistError(t *testing.T) { + assert.False(t, IsDoesNotExistError(NewFlyteAdminError(codes.Canceled, "foo"))) +} + +func TestIsNotDoesNotExistErrorBecauseOfNoneAdminError(t *testing.T) { + assert.False(t, IsDoesNotExistError(errors.New("foo"))) +} diff --git a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go index 712470a1ee..74dad42ce0 100644 --- a/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go +++ b/flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go @@ -6,8 +6,6 @@ import ( "hash/fnv" "math/rand" - "google.golang.org/grpc/codes" - "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster" "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" @@ -102,10 +100,8 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu LaunchPlan: spec.LaunchPlan, ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + return nil, err } var weightedRandomList random.WeightedRandomList diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index e31f501686..06516657d2 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -164,11 +164,8 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID * LaunchPlan: launchPlanName, ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE, }) - if err != nil { - ec, ok := err.(errors.FlyteAdminError) - if !ok || ec.Code() != codes.NotFound { - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + return nil, err } if override != nil && override.Attributes != nil && override.Attributes.GetPluginOverrides() != nil { return override.Attributes.GetPluginOverrides().Overrides, nil @@ -427,11 +424,9 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad Domain: request.Domain, ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err) - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err) + return nil, err } if resource != nil && resource.Attributes.GetClusterAssignment() != nil { return resource.Attributes.GetClusterAssignment(), nil diff --git a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go index 345b0e926d..a96d99d3d6 100644 --- a/flyteadmin/pkg/manager/impl/executions/quality_of_service.go +++ b/flyteadmin/pkg/manager/impl/executions/quality_of_service.go @@ -43,7 +43,7 @@ func (q qualityOfServiceAllocator) getQualityOfServiceFromDb(ctx context.Context ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, }) if err != nil { - if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound { + if !errors.IsDoesNotExistError(err) { logger.Warningf(ctx, "Failed to fetch override values when assigning quality of service values for [%+v] with err: %v", workflowIdentifier, err) diff --git a/flyteadmin/pkg/manager/impl/executions/queues.go b/flyteadmin/pkg/manager/impl/executions/queues.go index 9ec4ff865a..5e4706700c 100644 --- a/flyteadmin/pkg/manager/impl/executions/queues.go +++ b/flyteadmin/pkg/manager/impl/executions/queues.go @@ -4,6 +4,7 @@ import ( "context" "math/rand" + "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces" repoInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces" @@ -64,7 +65,7 @@ func (q *queueAllocatorImpl) GetQueue(ctx context.Context, identifier core.Ident ResourceType: admin.MatchableResource_EXECUTION_QUEUE, }) - if err != nil { + if err != nil && !errors.IsDoesNotExistError(err) { logger.Warningf(ctx, "Failed to fetch override values when assigning execution queue for [%+v] with err: %v", identifier, err) } diff --git a/flyteadmin/pkg/manager/impl/util/resources.go b/flyteadmin/pkg/manager/impl/util/resources.go index 0180484005..8001b907dd 100644 --- a/flyteadmin/pkg/manager/impl/util/resources.go +++ b/flyteadmin/pkg/manager/impl/util/resources.go @@ -6,6 +6,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" + "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" @@ -100,7 +101,7 @@ func GetTaskResources(ctx context.Context, id *core.Identifier, resourceManager } resource, err := resourceManager.GetResource(ctx, request) - if err != nil { + if err != nil && !errors.IsDoesNotExistError(err) { logger.Infof(ctx, "Failed to fetch override values when assigning task resource default values for [%+v]: %v", id, err) } diff --git a/flyteadmin/pkg/manager/impl/util/shared.go b/flyteadmin/pkg/manager/impl/util/shared.go index cf7febd619..24c97f416a 100644 --- a/flyteadmin/pkg/manager/impl/util/shared.go +++ b/flyteadmin/pkg/manager/impl/util/shared.go @@ -275,12 +275,10 @@ func GetMatchableResource(ctx context.Context, resourceManager interfaces.Resour Workflow: workflowName, ResourceType: resourceType, }) - if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType, - project, domain, workflowName, err) - return nil, err - } + if err != nil && !errors.IsDoesNotExistError(err) { + logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType, + project, domain, workflowName, err) + return nil, err } return matchableResource, nil } diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index 69ef17ed89..ca5a6012a8 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -1,45 +1,34 @@ package logs import ( - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" ) //go:generate pflags LogConfig --default-var=DefaultConfig -// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. -type TemplateURI = string - // LogConfig encapsulates plugins' log configs type LogConfig struct { IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"` // Deprecated: Please use CloudwatchTemplateURI CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."` // Deprecated: Please use CloudwatchTemplateURI - CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` - CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` + CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` + CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"` // Deprecated: Please use KubernetesTemplateURI - KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` - KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` + KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` + KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"` // Deprecated: Please use StackDriverTemplateURI GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"` // Deprecated: Please use StackDriverTemplateURI - StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` - StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` - - Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"` -} + StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` + StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` -type TemplateLogPluginConfig struct { - DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` - TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` - MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` - Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` + Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } var ( diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 6af1889e9f..4978109458 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -13,11 +13,6 @@ import ( "github.com/flyteorg/flyte/flytestdlib/logger" ) -type logPlugin struct { - Name string - Plugin tasklog.Plugin -} - // Internal func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) { if logPlugin == nil { @@ -66,67 +61,53 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas return logs.TaskLogs, nil } -type taskLogPluginWrapper struct { - logPlugins []logPlugin +type templateLogPluginCollection struct { + plugins []tasklog.TemplateLogPlugin } -func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) { - logs := make([]*core.TaskLog, 0, len(t.logPlugins)) - suffix := input.LogName +func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) { + var taskLogs []*core.TaskLog - for _, plugin := range t.logPlugins { - input.LogName = plugin.Name + suffix - o, err := plugin.Plugin.GetTaskLogs(input) + for _, plugin := range t.plugins { + o, err := plugin.GetTaskLogs(input) if err != nil { return tasklog.Output{}, err } - - logs = append(logs, o.TaskLogs...) + taskLogs = append(taskLogs, o.TaskLogs...) } - return tasklog.Output{ - TaskLogs: logs, - }, nil + return tasklog.Output{TaskLogs: taskLogs}, nil } // InitializeLogPlugins initializes log plugin based on config. func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { // Use a list to maintain order. - logPlugins := make([]logPlugin, 0, 2) + var plugins []tasklog.TemplateLogPlugin if cfg.IsKubernetesEnabled { if len(cfg.KubernetesTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsCloudwatchEnabled { if len(cfg.CloudwatchTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsStackDriverEnabled { if len(cfg.StackDriverTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON}) } } - if len(cfg.Templates) > 0 { - for _, cfg := range cfg.Templates { - logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)}) - } - } - - if len(logPlugins) == 0 { - return nil, nil - } - - return taskLogPluginWrapper{logPlugins: logPlugins}, nil + plugins = append(plugins, cfg.Templates...) + return templateLogPluginCollection{plugins: plugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 066fdd96c8..91048eff16 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -320,7 +320,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c func TestGetLogsForContainerInPod_Templates(t *testing.T) { assertTestSucceeded(t, &LogConfig{ - Templates: []TemplateLogPluginConfig{ + Templates: []tasklog.TemplateLogPlugin{ { DisplayName: "StackDriver", TemplateURIs: []string{ diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index b812221f6d..c43da02e58 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -16,6 +16,9 @@ const ( TemplateSchemeTaskExecution ) +// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. +type TemplateURI = string + type TemplateVar struct { Regex *regexp.Regexp Value string @@ -57,3 +60,10 @@ type Plugin interface { // Generates a TaskLog object given necessary computation information GetTaskLogs(i Input) (logs Output, err error) } + +type TemplateLogPlugin struct { + DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` + TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` + MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` + Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` +} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 77c49d2695..750b1972df 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -34,6 +34,7 @@ type templateRegexes struct { ExecutionName *regexp.Regexp ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp + GeneratedName *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -58,6 +59,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionName"), MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), + MustCreateRegex("generatedName"), } } @@ -121,6 +123,10 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { defaultRegexes.NodeID, input.TaskExecutionID.GetUniqueNodeID(), }, + TemplateVar{ + defaultRegexes.GeneratedName, + input.TaskExecutionID.GetGeneratedName(), + }, TemplateVar{ defaultRegexes.TaskRetryAttempt, strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10), @@ -172,55 +178,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { return vars } -// A simple log plugin that supports templates in urls to build the final log link. -// See `defaultRegexes` for supported templates. -type TemplateLogPlugin struct { - scheme TemplateScheme - templateUris []string - messageFormat core.TaskLog_MessageFormat -} - -func (s TemplateLogPlugin) GetTaskLog(podName, podUID, namespace, containerName, containerID, logName string, podRFC3339StartTime string, podRFC3339FinishTime string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) { - o, err := s.GetTaskLogs(Input{ - LogName: logName, - Namespace: namespace, - PodName: podName, - PodUID: podUID, - ContainerName: containerName, - ContainerID: containerID, - PodRFC3339StartTime: podRFC3339StartTime, - PodRFC3339FinishTime: podRFC3339FinishTime, - PodUnixStartTime: podUnixStartTime, - PodUnixFinishTime: podUnixFinishTime, - }) - - if err != nil || len(o.TaskLogs) == 0 { - return core.TaskLog{}, err - } - - return *o.TaskLogs[0], nil -} - -func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { - templateVars := input.templateVarsForScheme(s.scheme) - taskLogs := make([]*core.TaskLog, 0, len(s.templateUris)) - for _, templateURI := range s.templateUris { +func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { + templateVars := input.templateVarsForScheme(p.Scheme) + taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + for _, templateURI := range p.TemplateURIs { taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), - Name: input.LogName, - MessageFormat: s.messageFormat, + Name: p.DisplayName + input.LogName, + MessageFormat: p.MessageFormat, }) } return Output{TaskLogs: taskLogs}, nil } - -// NewTemplateLogPlugin creates a template-based log plugin with the provided template Uri and message format. -// See `defaultRegexes` for supported templates. -func NewTemplateLogPlugin(scheme TemplateScheme, templateUris []string, messageFormat core.TaskLog_MessageFormat) TemplateLogPlugin { - return TemplateLogPlugin{ - scheme: scheme, - templateUris: templateUris, - messageFormat: messageFormat, - } -} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index 320ece05a4..f279707a3b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -5,7 +5,6 @@ import ( "regexp" "testing" - "github.com/go-test/deep" "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -13,26 +12,6 @@ import ( coreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" ) -func TestTemplateLog(t *testing.T) { - p := NewTemplateLogPlugin(TemplateSchemePod, []string{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.podUID}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, core.TaskLog_JSON) - tl, err := p.GetTaskLog( - "f-uuid-driver", - "pod-uid", - "flyteexamples-production", - "spark-kubernetes-driver", - "cri-o://abc", - "main_logs", - "2015-03-14T17:08:14+01:00", - "2021-06-15T20:47:57+02:00", - 1426349294, - 1623782877, - ) - assert.NoError(t, err) - assert.Equal(t, tl.GetName(), "main_logs") - assert.Equal(t, tl.GetMessageFormat(), core.TaskLog_JSON) - assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_pod-uid_flyteexamples-production_spark-kubernetes-driver-abc.log", tl.Uri) -} - // Latest Run: Benchmark_mustInitTemplateRegexes-16 45960 26914 ns/op func Benchmark_initDefaultRegexes(b *testing.B) { for i := 0; i < b.N; i++ { @@ -172,6 +151,7 @@ func Test_Input_templateVarsForScheme(t *testing.T) { TemplateVars{ {defaultRegexes.LogName, "main_logs"}, {defaultRegexes.NodeID, "n0-0-n0"}, + {defaultRegexes.GeneratedName, "generated-name"}, {defaultRegexes.TaskRetryAttempt, "1"}, {defaultRegexes.TaskID, "my-task-name"}, {defaultRegexes.TaskVersion, "1"}, @@ -245,147 +225,99 @@ func Test_Input_templateVarsForScheme(t *testing.T) { } } -func Test_templateLogPlugin_Regression(t *testing.T) { - type fields struct { - templateURI string - messageFormat core.TaskLog_MessageFormat - } +func TestTemplateLogPlugin(t *testing.T) { type args struct { - podName string - podUID string - namespace string - containerName string - containerID string - logName string - podRFC3339StartTime string - podRFC3339FinishTime string - podUnixStartTime int64 - podUnixFinishTime int64 + input Input } tests := []struct { - name string - fields fields - args args - want core.TaskLog - wantErr bool + name string + plugin TemplateLogPlugin + args args + want Output }{ { "cloudwatch", - fields{ - templateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "f-uuid-driver", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "f-uuid-driver", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_flyteexamples-production_spark-kubernetes-driver-abc.log", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "stackdriver", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "podName", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "podName", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3DpodName", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "kubernetes", - fields{ - templateURI: "https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "flyteexamples-development-task-name", - podUID: "pod-uid", - namespace: "flyteexamples-development", - containerName: "ignore", - containerID: "ignore", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "flyteexamples-development-task-name", + PodUID: "pod-uid", + Namespace: "flyteexamples-development", + ContainerName: "ignore", + ContainerID: "ignore", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - - got, err := s.GetTaskLog(tt.args.podName, tt.args.podUID, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName, tt.args.podRFC3339FinishTime, tt.args.podRFC3339FinishTime, tt.args.podUnixStartTime, tt.args.podUnixFinishTime) - if (err != nil) != tt.wantErr { - t.Errorf("GetTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if diff := deep.Equal(got, tt.want); diff != nil { - t.Errorf("GetTaskLog() got = %v, want %v, diff: %v", got, tt.want, diff) - } - }) - } -} - -func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { - type fields struct { - scheme TemplateScheme - templateURI string - messageFormat core.TaskLog_MessageFormat - } - type args struct { - input Input - } - tests := []struct { - name string - fields fields - args args - want Output - wantErr bool - }{ { "splunk", - fields{ - templateURI: "https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -410,13 +342,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "ddog", - fields{ - templateURI: "https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -441,13 +372,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "stackdriver-with-rfc3339-timestamp", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -472,14 +402,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -505,14 +434,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "mapped-task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -545,23 +473,14 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - scheme: tt.fields.scheme, - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - got, err := s.GetTaskLogs(tt.args.input) - if (err != nil) != tt.wantErr { - t.Errorf("NewTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } + got, err := tt.plugin.GetTaskLogs(tt.args.input) + assert.NoError(t, err) if !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewTaskLog() got = %v, want %v", got, tt.want) + t.Errorf("GetTaskLogs() got = %v, want %v", got, tt.want) } }) } diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config.go b/flyteplugins/go/tasks/plugins/k8s/ray/config.go index e123c5b8ab..8601264edf 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config.go @@ -8,6 +8,7 @@ import ( pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flytestdlib/config" ) @@ -78,11 +79,12 @@ type Config struct { DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"` // Remote Ray Cluster Config - RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` - Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` - LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` - Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` - EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` + RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` + Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` + LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` + DashboardURLTemplate *tasklog.TemplateLogPlugin `json:"dashboardURLTemplate" pflag:",Template for URL of Ray dashboard running on a head node."` + Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` + EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` } type DefaultConfig struct { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index cc8d198334..0bc4f1183b 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins" flyteerr "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" @@ -437,26 +438,35 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon return nil, fmt.Errorf("failed to initialize log plugins. Error: %w", err) } - if logPlugin == nil { - return nil, nil - } - - // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs - // RayJob CRD does not include the name of the worker or head pod for now + var taskLogs []*core.TaskLog taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() - logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{ + input := tasklog.Input{ Namespace: rayJob.Namespace, TaskExecutionID: taskExecID, - }) + } + // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs + // RayJob CRD does not include the name of the worker or head pod for now + logOutput, err := logPlugin.GetTaskLogs(input) if err != nil { return nil, fmt.Errorf("failed to generate task logs. Error: %w", err) } + taskLogs = append(taskLogs, logOutput.TaskLogs...) - return &pluginsCore.TaskInfo{ - Logs: logOutput.TaskLogs, - }, nil + // Handling for Ray Dashboard + dashboardURLTemplate := GetConfig().DashboardURLTemplate + if dashboardURLTemplate != nil && + rayJob.Status.DashboardURL != "" && + rayJob.Status.JobStatus == rayv1alpha1.JobStatusRunning { + dashboardURLOutput, err := dashboardURLTemplate.GetTaskLogs(input) + if err != nil { + return nil, fmt.Errorf("failed to generate Ray dashboard link. Error: %w", err) + } + taskLogs = append(taskLogs, dashboardURLOutput.TaskLogs...) + } + + return &pluginsCore.TaskInfo{Logs: taskLogs}, nil } func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) { @@ -470,7 +480,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil } - // Kuberay creates a Ray cluster first, and then submits a Ray job to the cluster + // KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster switch rayJob.Status.JobDeploymentStatus { case rayv1alpha1.JobDeploymentStatusInitializing: return pluginsCore.PhaseInfoInitializing(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil @@ -480,7 +490,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont case rayv1alpha1.JobDeploymentStatusFailedJobDeploy: reason := fmt.Sprintf("Failed to submit Ray job %s with error: %s", rayJob.Name, rayJob.Status.Message) return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil - case rayv1alpha1.JobDeploymentStatusWaitForDashboard: + case rayv1alpha1.JobDeploymentStatusWaitForDashboard, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil case rayv1alpha1.JobDeploymentStatusRunning, rayv1alpha1.JobDeploymentStatusComplete: switch rayJob.Status.JobStatus { @@ -489,12 +499,27 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil case rayv1alpha1.JobStatusSucceeded: return pluginsCore.PhaseInfoSuccess(info), nil - case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: + case rayv1alpha1.JobStatusPending: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + case rayv1alpha1.JobStatusRunning: + phaseInfo := pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info) + if len(info.Logs) > 0 { + phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1) + } + return phaseInfo, nil + case rayv1alpha1.JobStatusStopped: + // There is no current usage of this job status in KubeRay. It's unclear what it represents + fallthrough + default: + // We already handle all known job status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job status: %s", rayJob.Status.JobStatus) } + default: + // We already handle all known deployment status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job deployment status: %s", rayJob.Status.JobDeploymentStatus) } - - return pluginsCore.PhaseInfoUndefined, nil } func init() { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 920fa85d61..ccb518fa03 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -24,6 +24,7 @@ import ( pluginIOMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" mocks2 "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils" ) @@ -615,6 +616,8 @@ func newPluginContext() k8s.PluginContext { }, }, }) + taskExecID.OnGetUniqueNodeID().Return("unique-node") + taskExecID.OnGetGeneratedName().Return("generated-name") tskCtx := &mocks.TaskExecutionMetadata{} tskCtx.OnGetTaskExecutionID().Return(taskExecID) @@ -642,17 +645,19 @@ func TestGetTaskPhase(t *testing.T) { rayJobPhase rayv1alpha1.JobStatus rayClusterPhase rayv1alpha1.JobDeploymentStatus expectedCorePhase pluginsCore.Phase + expectedError bool }{ - {"", rayv1alpha1.JobDeploymentStatusInitializing, pluginsCore.PhaseInitializing}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusWaitForDashboard, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, pluginsCore.PhaseUndefined}, - {rayv1alpha1.JobStatusRunning, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusFailed, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseSuccess}, - {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseSuccess}, + {"", rayv1alpha1.JobDeploymentStatusInitializing, pluginsCore.PhaseInitializing, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusWaitForDashboard, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusRunning, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusFailed, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseSuccess, false}, + {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseSuccess, false}, + {rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseUndefined, true}, } for _, tc := range testCases { @@ -663,12 +668,69 @@ func TestGetTaskPhase(t *testing.T) { startTime := metav1.NewTime(time.Now()) rayObject.Status.StartTime = &startTime phaseInfo, err := rayJobResourceHandler.GetTaskPhase(ctx, pluginCtx, rayObject) - assert.Nil(t, err) + if tc.expectedError { + assert.Error(t, err) + } else { + assert.Nil(t, err) + } assert.Equal(t, tc.expectedCorePhase.String(), phaseInfo.Phase().String()) }) } } +func TestGetEventInfo_DashboardURL(t *testing.T) { + pluginCtx := newPluginContext() + testCases := []struct { + name string + rayJob rayv1alpha1.RayJob + dashboardURLTemplate tasklog.TemplateLogPlugin + expectedTaskLogs []*core.TaskLog + }{ + { + name: "dashboard URL displayed", + rayJob: rayv1alpha1.RayJob{ + Status: rayv1alpha1.RayJobStatus{ + DashboardURL: "exists", + JobStatus: rayv1alpha1.JobStatusRunning, + }, + }, + dashboardURLTemplate: tasklog.TemplateLogPlugin{ + DisplayName: "Ray Dashboard", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{.generatedName}}"}, + Scheme: tasklog.TemplateSchemeTaskExecution, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "Ray Dashboard", + Uri: "http://test/generated-name", + }, + }, + }, + { + name: "dashboard URL is not displayed", + rayJob: rayv1alpha1.RayJob{ + Status: rayv1alpha1.RayJobStatus{ + JobStatus: rayv1alpha1.JobStatusPending, + }, + }, + dashboardURLTemplate: tasklog.TemplateLogPlugin{ + DisplayName: "dummy", + TemplateURIs: []tasklog.TemplateURI{"http://dummy"}, + }, + expectedTaskLogs: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.NoError(t, SetConfig(&Config{DashboardURLTemplate: &tc.dashboardURLTemplate})) + ti, err := getEventInfoForRayJob(logs.LogConfig{}, pluginCtx, &tc.rayJob) + assert.NoError(t, err) + assert.Equal(t, tc.expectedTaskLogs, ti.Logs) + }) + } +} + func TestGetPropertiesRay(t *testing.T) { rayJobResourceHandler := rayJobResourceHandler{} expected := k8s.PluginProperties{} diff --git a/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go b/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go index a1f16163fe..826a12e45f 100644 --- a/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go @@ -3,6 +3,7 @@ package athena import ( "context" "fmt" + "strings" "time" awsSdk "github.com/aws/aws-sdk-go-v2/aws" @@ -177,12 +178,19 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co func createTaskInfo(queryID string, cfg awsSdk.Config) *core.TaskInfo { timeNow := time.Now() + var consoleURL string + if strings.Contains(cfg.Region, "gov") { + consoleURL = "console.amazonaws-us-gov.com" + } else { + consoleURL = "console.aws.amazon.com" + } return &core.TaskInfo{ OccurredAt: &timeNow, Logs: []*idlCore.TaskLog{ { - Uri: fmt.Sprintf("https://%v.console.aws.amazon.com/athena/home?force®ion=%v#query/history/%v", + Uri: fmt.Sprintf("https://%v.%v/athena/home?force®ion=%v#query/history/%v", cfg.Region, + consoleURL, cfg.Region, queryID), Name: "Athena Query Console", diff --git a/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go index 6e3238d813..e19829447e 100644 --- a/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go @@ -22,3 +22,18 @@ func TestCreateTaskInfo(t *testing.T) { assert.Len(t, taskInfo.ExternalResources, 1) assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "query_id") } + + +func TestCreateTaskInfoGovAWS(t *testing.T) { + taskInfo := createTaskInfo("query_id", awsSdk.Config{ + Region: "us-gov-east-1", + }) + assert.EqualValues(t, []*idlCore.TaskLog{ + { + Uri: "https://us-gov-east-1.console.amazonaws-us-gov.com/athena/home?force®ion=us-gov-east-1#query/history/query_id", + Name: "Athena Query Console", + }, + }, taskInfo.Logs) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "query_id") +} diff --git a/rsts/deployment/deployment/sandbox.rst b/rsts/deployment/deployment/sandbox.rst index 5c40eea5eb..46809ae4a3 100644 --- a/rsts/deployment/deployment/sandbox.rst +++ b/rsts/deployment/deployment/sandbox.rst @@ -78,5 +78,35 @@ who wish to dig deeper into the storage layer. 🐋 Flyte sandbox ships with a Docker registry. Tag and push custom workflow images to localhost:30000 📂 The Minio API is hosted on localhost:30002. Use http://localhost:30080/minio/login for Minio console +Configuration +______________ + +The ``config-sandbox.yaml`` file contains configuration for **FlyteAdmin**, +which is the Flyte cluster backend component that processes all client requests +such as workflow executions. The default values are enough to let you connect and use Flyte: + + +.. code-block:: yaml + + admin: + # For GRPC endpoints you might want to use dns:///flyte.myexample.com + endpoint: localhost:30080 + authType: Pkce + insecure: true + console: + endpoint: http://localhost:30080 + logger: + show-source: true + level: 0 + +.. note:: + + You can also create your own config file with `flytectl config init`, which + will create a config file at `~/.flyte/config.yaml`. + + Learn more about the configuration settings in the + {ref}`Deployment Guide ` + + Now that you have the sandbox cluster running, you can now go to the :ref:`User Guide ` or :ref:`Tutorials ` to run tasks and workflows written in ``flytekit``, the Python SDK for Flyte. \ No newline at end of file