From 1b92105d0750da88414962237e530a16e573f81c Mon Sep 17 00:00:00 2001 From: Jeev B Date: Tue, 31 Oct 2023 21:17:13 -0700 Subject: [PATCH] Add support for capturing Ray job logs via a sidecar (#4266) Signed-off-by: Jeev B --- .../go/tasks/plugins/k8s/ray/config.go | 3 + flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 113 ++++++-- .../go/tasks/plugins/k8s/ray/ray_test.go | 268 ++++++++++++++++-- 3 files changed, 337 insertions(+), 47 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config.go b/flyteplugins/go/tasks/plugins/k8s/ray/config.go index 10a8068344..e123c5b8ab 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config.go @@ -3,6 +3,8 @@ package ray import ( "context" + v1 "k8s.io/api/core/v1" + pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" @@ -78,6 +80,7 @@ type Config struct { // Remote Ray Cluster Config RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` + LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` } diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 22840aa909..c1dcc2b8e2 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -26,6 +26,8 @@ import ( ) const ( + rayStateMountPath = "/tmp/ray" + defaultRayStateVolName = "system-ray-state" rayTaskType = "ray" KindRayJob = "RayJob" IncludeDashboard = "include-dashboard" @@ -61,17 +63,18 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to create pod spec: [%v]", err.Error()) } - var container v1.Container - found := false - for _, c := range podSpec.Containers { + var primaryContainer *v1.Container + var primaryContainerIdx int + for idx, c := range podSpec.Containers { if c.Name == primaryContainerName { - container = c - found = true + c := c + primaryContainer = &c + primaryContainerIdx = idx break } } - if !found { + if primaryContainer == nil { return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to get primary container from the pod: [%v]", err.Error()) } @@ -101,9 +104,15 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC } enableIngress := true + headPodSpec := podSpec.DeepCopy() rayClusterSpec := rayv1alpha1.RayClusterSpec{ HeadGroupSpec: rayv1alpha1.HeadGroupSpec{ - Template: buildHeadPodTemplate(&container, podSpec, objectMeta, taskCtx), + Template: buildHeadPodTemplate( + &headPodSpec.Containers[primaryContainerIdx], + headPodSpec, + objectMeta, + taskCtx, + ), ServiceType: v1.ServiceType(cfg.ServiceType), Replicas: &headReplicas, EnableIngress: &enableIngress, @@ -113,7 +122,13 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC } for _, spec := range rayJob.RayCluster.WorkerGroupSpec { - workerPodTemplate := buildWorkerPodTemplate(&container, podSpec, objectMeta, taskCtx) + workerPodSpec := podSpec.DeepCopy() + workerPodTemplate := buildWorkerPodTemplate( + &workerPodSpec.Containers[primaryContainerIdx], + workerPodSpec, + objectMeta, + taskCtx, + ) minReplicas := spec.Replicas maxReplicas := spec.Replicas @@ -161,7 +176,7 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC jobSpec := rayv1alpha1.RayJobSpec{ RayClusterSpec: rayClusterSpec, - Entrypoint: strings.Join(container.Args, " "), + Entrypoint: strings.Join(primaryContainer.Args, " "), ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes, TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished, RuntimeEnv: rayJob.RuntimeEnv, @@ -179,10 +194,66 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC return &rayJobObject, nil } -func buildHeadPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, taskCtx pluginsCore.TaskExecutionContext) v1.PodTemplateSpec { +func injectLogsSidecar(primaryContainer *v1.Container, podSpec *v1.PodSpec) { + cfg := GetConfig() + if cfg.LogsSidecar == nil { + return + } + sidecar := cfg.LogsSidecar.DeepCopy() + + // Ray logs integration + var rayStateVolMount *v1.VolumeMount + // Look for an existing volume mount on the primary container, mounted at /tmp/ray + for _, vm := range primaryContainer.VolumeMounts { + if vm.MountPath == rayStateMountPath { + vm := vm + rayStateVolMount = &vm + break + } + } + // No existing volume mount exists at /tmp/ray. We create a new volume and volume + // mount and add it to the pod and container specs respectively + if rayStateVolMount == nil { + vol := v1.Volume{ + Name: defaultRayStateVolName, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + } + podSpec.Volumes = append(podSpec.Volumes, vol) + volMount := v1.VolumeMount{ + Name: defaultRayStateVolName, + MountPath: rayStateMountPath, + } + primaryContainer.VolumeMounts = append(primaryContainer.VolumeMounts, volMount) + rayStateVolMount = &volMount + } + // We need to mirror the ray state volume mount into the sidecar as readonly, + // so that we can read the logs written by the head node. + readOnlyRayStateVolMount := *rayStateVolMount.DeepCopy() + readOnlyRayStateVolMount.ReadOnly = true + + // Update volume mounts on sidecar + // If one already exists with the desired mount path, simply replace it. Otherwise, + // add it to sidecar's volume mounts. + foundExistingSidecarVolMount := false + for idx, vm := range sidecar.VolumeMounts { + if vm.MountPath == rayStateMountPath { + foundExistingSidecarVolMount = true + sidecar.VolumeMounts[idx] = readOnlyRayStateVolMount + } + } + if !foundExistingSidecarVolMount { + sidecar.VolumeMounts = append(sidecar.VolumeMounts, readOnlyRayStateVolMount) + } + + // Add sidecar to containers + podSpec.Containers = append(podSpec.Containers, *sidecar) +} + +func buildHeadPodTemplate(primaryContainer *v1.Container, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, taskCtx pluginsCore.TaskExecutionContext) v1.PodTemplateSpec { // Some configs are copy from https://github.com/ray-project/kuberay/blob/b72e6bdcd9b8c77a9dc6b5da8560910f3a0c3ffd/apiserver/pkg/util/cluster.go#L97 // They should always be the same, so we could hard code here. - primaryContainer := container.DeepCopy() primaryContainer.Name = "ray-head" envs := []v1.EnvVar{ @@ -217,12 +288,11 @@ func buildHeadPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMe primaryContainer.Ports = append(primaryContainer.Ports, ports...) - headPodSpec := podSpec.DeepCopy() - - headPodSpec.Containers = []v1.Container{*primaryContainer} + // Inject a sidecar for capturing and exposing Ray job logs + injectLogsSidecar(primaryContainer, podSpec) podTemplateSpec := v1.PodTemplateSpec{ - Spec: *headPodSpec, + Spec: *podSpec, ObjectMeta: *objectMeta, } cfg := config.GetK8sPluginConfig() @@ -231,7 +301,7 @@ func buildHeadPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMe return podTemplateSpec } -func buildWorkerPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMetadata *metav1.ObjectMeta, taskCtx pluginsCore.TaskExecutionContext) v1.PodTemplateSpec { +func buildWorkerPodTemplate(primaryContainer *v1.Container, podSpec *v1.PodSpec, objectMetadata *metav1.ObjectMeta, taskCtx pluginsCore.TaskExecutionContext) v1.PodTemplateSpec { // Some configs are copy from https://github.com/ray-project/kuberay/blob/b72e6bdcd9b8c77a9dc6b5da8560910f3a0c3ffd/apiserver/pkg/util/cluster.go#L185 // They should always be the same, so we could hard code here. initContainers := []v1.Container{ @@ -243,10 +313,11 @@ func buildWorkerPodTemplate(container *v1.Container, podSpec *v1.PodSpec, object "-c", "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done", }, - Resources: container.Resources, + Resources: primaryContainer.Resources, }, } - primaryContainer := container.DeepCopy() + podSpec.InitContainers = append(podSpec.InitContainers, initContainers...) + primaryContainer.Name = "ray-worker" primaryContainer.Args = []string{} @@ -342,12 +413,8 @@ func buildWorkerPodTemplate(container *v1.Container, podSpec *v1.PodSpec, object } primaryContainer.Ports = append(primaryContainer.Ports, ports...) - workerPodSpec := podSpec.DeepCopy() - workerPodSpec.Containers = []v1.Container{*primaryContainer} - workerPodSpec.InitContainers = initContainers - podTemplateSpec := v1.PodTemplateSpec{ - Spec: *workerPodSpec, + Spec: *podSpec, ObjectMeta: *objectMetadata, } podTemplateSpec.SetLabels(utils.UnionMaps(podTemplateSpec.GetLabels(), utils.CopyMap(taskCtx.TaskExecutionMetadata().GetLabels()))) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 6e3253776e..920fa85d61 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/jsonpb" structpb "github.com/golang/protobuf/ptypes/struct" rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" "github.com/stretchr/testify/assert" @@ -56,28 +55,36 @@ var ( workerGroupName = "worker-group" ) -func dummyRayCustomObj() *plugins.RayJob { - return &plugins.RayJob{ - RayCluster: &plugins.RayCluster{ - HeadGroupSpec: &plugins.HeadGroupSpec{RayStartParams: map[string]string{"num-cpus": "1"}}, - WorkerGroupSpec: []*plugins.WorkerGroupSpec{{GroupName: workerGroupName, Replicas: 3}}, - }, +func transformRayJobToCustomObj(rayJob *plugins.RayJob) *structpb.Struct { + structObj, err := utils.MarshalObjToStruct(rayJob) + if err != nil { + panic(err) } + return structObj } -func dummyRayTaskTemplate(id string, rayJobObj *plugins.RayJob) *core.TaskTemplate { - ptObjJSON, err := utils.MarshalToString(rayJobObj) +func transformPodSpecToTaskTemplateTarget(podSpec *corev1.PodSpec) *core.TaskTemplate_K8SPod { + structObj, err := utils.MarshalObjToStruct(&podSpec) if err != nil { panic(err) } + return &core.TaskTemplate_K8SPod{ + K8SPod: &core.K8SPod{ + PodSpec: structObj, + }, + } +} - structObj := structpb.Struct{} - - err = jsonpb.UnmarshalString(ptObjJSON, &structObj) - if err != nil { - panic(err) +func dummyRayCustomObj() *plugins.RayJob { + return &plugins.RayJob{ + RayCluster: &plugins.RayCluster{ + HeadGroupSpec: &plugins.HeadGroupSpec{RayStartParams: map[string]string{"num-cpus": "1"}}, + WorkerGroupSpec: []*plugins.WorkerGroupSpec{{GroupName: workerGroupName, Replicas: 3}}, + }, } +} +func dummyRayTaskTemplate(id string, rayJob *plugins.RayJob) *core.TaskTemplate { return &core.TaskTemplate{ Id: &core.Identifier{Name: id}, Type: "container", @@ -88,7 +95,7 @@ func dummyRayTaskTemplate(id string, rayJobObj *plugins.RayJob) *core.TaskTempla Env: dummyEnvVars, }, }, - Custom: &structObj, + Custom: transformRayJobToCustomObj(rayJob), } } @@ -198,7 +205,7 @@ func TestBuildResourceRayExtendedResources(t *testing.T) { GpuResourceName: flytek8s.ResourceNvidiaGPU, })) - fixtures := []struct { + params := []struct { name string resources *corev1.ResourceRequirements extendedResourcesBase *core.ExtendedResources @@ -292,11 +299,11 @@ func TestBuildResourceRayExtendedResources(t *testing.T) { }, } - for _, f := range fixtures { - t.Run(f.name, func(t *testing.T) { + for _, p := range params { + t.Run(p.name, func(t *testing.T) { taskTemplate := dummyRayTaskTemplate("ray-id", dummyRayCustomObj()) - taskTemplate.ExtendedResources = f.extendedResourcesBase - taskContext := dummyRayTaskContext(taskTemplate, f.resources, f.extendedResourcesOverride) + taskTemplate.ExtendedResources = p.extendedResourcesBase + taskContext := dummyRayTaskContext(taskTemplate, p.resources, p.extendedResourcesOverride) rayJobResourceHandler := rayJobResourceHandler{} r, err := rayJobResourceHandler.BuildResource(context.TODO(), taskContext) assert.Nil(t, err) @@ -308,12 +315,12 @@ func TestBuildResourceRayExtendedResources(t *testing.T) { headNodeSpec := rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec assert.EqualValues( t, - f.expectedNsr, + p.expectedNsr, headNodeSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, ) assert.EqualValues( t, - f.expectedTol, + p.expectedTol, headNodeSpec.Tolerations, ) @@ -321,12 +328,12 @@ func TestBuildResourceRayExtendedResources(t *testing.T) { workerNodeSpec := rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec assert.EqualValues( t, - f.expectedNsr, + p.expectedNsr, workerNodeSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, ) assert.EqualValues( t, - f.expectedTol, + p.expectedTol, workerNodeSpec.Tolerations, ) }) @@ -382,6 +389,219 @@ func TestDefaultStartParameters(t *testing.T) { assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Tolerations, toleration) } +func TestInjectLogsSidecar(t *testing.T) { + rayJobObj := transformRayJobToCustomObj(dummyRayCustomObj()) + params := []struct { + name string + taskTemplate core.TaskTemplate + // primaryContainerName string + logsSidecarCfg *corev1.Container + expectedVolumes []corev1.Volume + expectedPrimaryContainerVolumeMounts []corev1.VolumeMount + expectedLogsSidecarVolumeMounts []corev1.VolumeMount + }{ + { + "container target", + core.TaskTemplate{ + Id: &core.Identifier{Name: "ray-id"}, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: testImage, + Args: testArgs, + }, + }, + Custom: rayJobObj, + }, + &corev1.Container{ + Name: "logs-sidecar", + Image: "test-image", + }, + []corev1.Volume{ + { + Name: "system-ray-state", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + []corev1.VolumeMount{ + { + Name: "system-ray-state", + MountPath: "/tmp/ray", + }, + }, + []corev1.VolumeMount{ + { + Name: "system-ray-state", + MountPath: "/tmp/ray", + ReadOnly: true, + }, + }, + }, + { + "container target with no sidecar", + core.TaskTemplate{ + Id: &core.Identifier{Name: "ray-id"}, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: testImage, + Args: testArgs, + }, + }, + Custom: rayJobObj, + }, + nil, + nil, + nil, + nil, + }, + { + "pod target", + core.TaskTemplate{ + Id: &core.Identifier{Name: "ray-id"}, + Target: transformPodSpecToTaskTemplateTarget(&corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "primary-image", + }, + }, + }), + Custom: rayJobObj, + Config: map[string]string{ + flytek8s.PrimaryContainerKey: "main", + }, + }, + &corev1.Container{ + Name: "logs-sidecar", + Image: "test-image", + }, + []corev1.Volume{ + { + Name: "system-ray-state", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + []corev1.VolumeMount{ + { + Name: "system-ray-state", + MountPath: "/tmp/ray", + }, + }, + []corev1.VolumeMount{ + { + Name: "system-ray-state", + MountPath: "/tmp/ray", + ReadOnly: true, + }, + }, + }, + { + "pod target with existing ray state volume", + core.TaskTemplate{ + Id: &core.Identifier{Name: "ray-id"}, + Target: transformPodSpecToTaskTemplateTarget(&corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "primary-image", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "test-vol", + MountPath: "/tmp/ray", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "test-vol", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }), + Custom: rayJobObj, + Config: map[string]string{ + flytek8s.PrimaryContainerKey: "main", + }, + }, + &corev1.Container{ + Name: "logs-sidecar", + Image: "test-image", + }, + []corev1.Volume{ + { + Name: "test-vol", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + []corev1.VolumeMount{ + { + Name: "test-vol", + MountPath: "/tmp/ray", + }, + }, + []corev1.VolumeMount{ + { + Name: "test-vol", + MountPath: "/tmp/ray", + ReadOnly: true, + }, + }, + }, + } + + for _, p := range params { + t.Run(p.name, func(t *testing.T) { + assert.NoError(t, SetConfig(&Config{ + LogsSidecar: p.logsSidecarCfg, + })) + taskContext := dummyRayTaskContext(&p.taskTemplate, resourceRequirements, nil) + rayJobResourceHandler := rayJobResourceHandler{} + r, err := rayJobResourceHandler.BuildResource(context.TODO(), taskContext) + assert.Nil(t, err) + assert.NotNil(t, r) + rayJob, ok := r.(*rayv1alpha1.RayJob) + assert.True(t, ok) + + headPodSpec := rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec + + // Check volumes + assert.EqualValues(t, p.expectedVolumes, headPodSpec.Volumes) + + // Check containers and respective volume mounts + foundPrimaryContainer := false + foundLogsSidecar := false + for _, cnt := range headPodSpec.Containers { + if cnt.Name == "ray-head" { + foundPrimaryContainer = true + assert.EqualValues( + t, + p.expectedPrimaryContainerVolumeMounts, + cnt.VolumeMounts, + ) + } + if p.logsSidecarCfg != nil && cnt.Name == p.logsSidecarCfg.Name { + foundLogsSidecar = true + assert.EqualValues( + t, + p.expectedLogsSidecarVolumeMounts, + cnt.VolumeMounts, + ) + } + } + assert.Equal(t, true, foundPrimaryContainer) + assert.Equal(t, p.logsSidecarCfg != nil, foundLogsSidecar) + }) + } +} + func newPluginContext() k8s.PluginContext { plg := &mocks2.PluginContext{}