From aad945039d899eb455a6f95dc268a40e7db31ccf Mon Sep 17 00:00:00 2001 From: Jeev B Date: Thu, 9 Nov 2023 14:16:05 -0800 Subject: [PATCH] Add support for displaying the Ray dashboard when a RayJob is active Signed-off-by: Jeev B --- .../go/tasks/plugins/k8s/ray/config.go | 12 ++++---- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 30 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config.go b/flyteplugins/go/tasks/plugins/k8s/ray/config.go index e123c5b8ab5..41a2aa4d948 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config.go @@ -8,6 +8,7 @@ import ( pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flytestdlib/config" ) @@ -78,11 +79,12 @@ type Config struct { DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"` // Remote Ray Cluster Config - RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` - Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` - LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` - Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` - EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` + RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` + Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` + LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` + DashboardUrlTemplate *tasklog.TemplateLogPlugin `json:"dashboardUrlTemplate" pflag:",Template for URL of Ray dashboard running on a head node."` + Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` + EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` } type DefaultConfig struct { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index cc8d1983343..55dc9a46843 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -441,22 +441,34 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon return nil, nil } - // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs - // RayJob CRD does not include the name of the worker or head pod for now - - taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() - logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{ + var taskLogs []*core.TaskLog + input := tasklog.Input{ Namespace: rayJob.Namespace, TaskExecutionID: taskExecID, - }) + } + // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs + // RayJob CRD does not include the name of the worker or head pod for now + taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() + logOutput, err := logPlugin.GetTaskLogs(input) if err != nil { return nil, fmt.Errorf("failed to generate task logs. Error: %w", err) } + taskLogs = append(taskLogs, logOutput.TaskLogs...) - return &pluginsCore.TaskInfo{ - Logs: logOutput.TaskLogs, - }, nil + // Handling for Ray Dashboard + dashboardUrlTemplate := GetConfig().DashboardUrlTemplate + if dashboardUrlTemplate != nil && + rayJob.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning && + rayJob.Status.DashboardURL != "" { + dashboardUrlOutput, err := dashboardUrlTemplate.GetTaskLogs(input) + if err != nil { + return nil, fmt.Errorf("failed to generate Ray dashboard link. Error: %w", err) + } + taskLogs = append(taskLogs, dashboardUrlOutput.TaskLogs...) + } + + return &pluginsCore.TaskInfo{Logs: logOutput.TaskLogs}, nil } func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {