From d3fd5d5e8802e10f53a5da55f8669e28aa81da80 Mon Sep 17 00:00:00 2001 From: Ujjwal Sarin <51675746+ujjwalsfix@users.noreply.github.com> Date: Mon, 31 Oct 2022 17:49:10 -0700 Subject: [PATCH 1/6] Adding sidecar containers and additional resource env vars (#428) --- execution/adapter/eks_adapter.go | 81 +++++++++++++++++++++++++++----- execution/engine/eks_engine.go | 29 +++++++----- execution/engine/emr_engine.go | 50 ++++++++++++++++++++ worker/status_worker.go | 10 ---- 4 files changed, 135 insertions(+), 35 deletions(-) diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index 865562ed..092eb441 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -15,7 +15,7 @@ import ( type EKSAdapter interface { AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod *corev1.Pod) (state.Run, error) - AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error) + AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand []string) (batchv1.Job, error) } type eksAdapter struct{} @@ -55,10 +55,13 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod updated.Status = state.StatusStopped if pod != nil { if pod.Status.ContainerStatuses != nil && len(pod.Status.ContainerStatuses) > 0 { - containerStatus := pod.Status.ContainerStatuses[len(pod.Status.ContainerStatuses)-1] - if containerStatus.State.Terminated != nil { - updated.ExitReason = &containerStatus.State.Terminated.Reason - exitCode = int64(containerStatus.State.Terminated.ExitCode) + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name != "sidecar" { + if containerStatus.State.Terminated != nil { + updated.ExitReason = &containerStatus.State.Terminated.Reason + exitCode = int64(containerStatus.State.Terminated.ExitCode) + } + } } } } @@ -66,11 +69,14 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod } if pod != nil && len(pod.Spec.Containers) > 0 { - container := pod.Spec.Containers[0] - //First three lines are injected by Flotilla, strip those out. - if len(container.Command) > 3 { - cmd := strings.Join(container.Command[3:], "\n") - updated.Command = &cmd + for _, container := range pod.Spec.Containers { + if container.Name != "sidecar" { + //First three lines are injected by Flotilla, strip those out. + if len(container.Command) > 3 { + cmd := strings.Join(container.Command[3:], "\n") + updated.Command = &cmd + } + } } } @@ -98,7 +104,7 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod // 5. Node lifecycle. // 6. Node affinity and anti-affinity // -func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error) { +func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand []string) (batchv1.Job, error) { cmd := "" if run.Command != nil && len(*run.Command) > 0 { @@ -134,6 +140,19 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa "owner": a.sanitizeLabel(run.User), } + sidecarContainer := corev1.Container{ + Name: "sidecar", + Image: run.Image, + Command: sidecarCommand, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("25m"), + corev1.ResourceCPU: resource.MustParse("15M"), + }, + }, + Env: a.envOverrides(executable, run), + } + //if run.Description != nil { // info := strings.Split(*run.Description, "/") // @@ -154,7 +173,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa }, Spec: corev1.PodSpec{ SchedulerName: schedulerName, - Containers: []corev1.Container{container}, + Containers: []corev1.Container{container, sidecarContainer}, RestartPolicy: corev1.RestartPolicyNever, ServiceAccountName: sa, Affinity: affinity, @@ -439,6 +458,44 @@ func (a *eksAdapter) envOverrides(executable state.Executable, run state.Run) [] }) } } + + res = append(res, corev1.EnvVar{ + Name: "CPU", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: run.RunID, + Resource: "requests.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + corev1.EnvVar{ + Name: "MEMORY", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: run.RunID, + Resource: "requests.memory", + }, + }, + }, + ) + if run.Gpu != nil && *run.Gpu > 0 { + res = append(res, corev1.EnvVar{ + Name: "GPU", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: run.RunID, + Resource: "requests.nvidia.com/gpu", + }, + }, + }) + } else { + res = append(res, corev1.EnvVar{ + Name: "GPU", + Value: "0", + }) + + } return res } diff --git a/execution/engine/eks_engine.go b/execution/engine/eks_engine.go index ca77676f..14abc746 100644 --- a/execution/engine/eks_engine.go +++ b/execution/engine/eks_engine.go @@ -47,6 +47,7 @@ type EKSExecutionEngine struct { s3Bucket string s3BucketRootDir string statusQueue string + sidecarCommand []string } // @@ -87,7 +88,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error { ee.jobTtl = conf.GetInt("eks_job_ttl") ee.jobSA = conf.GetString("eks_service_account") ee.jobARAEnabled = true - + ee.sidecarCommand = conf.GetStringSlice("eks_sidecar_command") adapt, err := adapter.NewEKSAdapter() if err != nil { @@ -115,7 +116,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error { } func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run, manager state.Manager) (state.Run, bool, error) { - job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled) + job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled, ee.sidecarCommand) kClient, err := ee.getKClient(run) if err != nil { @@ -175,7 +176,6 @@ func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) { podList, err := ee.getPodList(run) - if err != nil { return run, err } @@ -185,16 +185,19 @@ func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) { run.PodName = &pod.Name run.Namespace = &pod.Namespace if pod.Spec.Containers != nil && len(pod.Spec.Containers) > 0 { - container := pod.Spec.Containers[len(pod.Spec.Containers)-1] - cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli) - run.Cpu = &cpu - run.CpuLimit = &cpuLimit - run = ee.getInstanceDetails(pod, run) - mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega) - run.Memory = &mem - memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega) - run.MemoryLimit = &memLimit + for _, container := range pod.Spec.Containers { + if container.Name != "sidecar" { + cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli) + run.Cpu = &cpu + run.CpuLimit = &cpuLimit + run = ee.getInstanceDetails(pod, run) + mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega) + run.Memory = &mem + memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega) + run.MemoryLimit = &memLimit + } + } } } return run, nil diff --git a/execution/engine/emr_engine.go b/execution/engine/emr_engine.go index f16cb277..28d1837a 100644 --- a/execution/engine/emr_engine.go +++ b/execution/engine/emr_engine.go @@ -47,6 +47,8 @@ type EMRExecutionEngine struct { s3ManifestBucket string s3ManifestBasePath string serializer *k8sJson.Serializer + sidecarCommand []string + sidecarImage string } // @@ -66,6 +68,8 @@ func (emr *EMRExecutionEngine) Initialize(conf config.Config) error { emr.s3ManifestBasePath = conf.GetString("emr_manifest_base_path") emr.emrJobSA = conf.GetString("eks_service_account") emr.schedulerName = conf.GetString("eks_scheduler_name") + emr.sidecarImage = conf.GetString("emr_sidecar_image") + emr.sidecarCommand = conf.GetStringSlice("emr_sidecar_command") awsConfig := &aws.Config{Region: aws.String(emr.awsRegion)} sess := session.Must(session.NewSessionWithOptions(session.Options{Config: *awsConfig})) @@ -247,6 +251,18 @@ func (emr *EMRExecutionEngine) driverPodTemplate(executable state.Executable, ru }, WorkingDir: workingDir, }, + { + Name: "sidecar", + Image: emr.sidecarImage, + Command: emr.sidecarCommand, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("25m"), + v1.ResourceCPU: resource.MustParse("15M"), + }, + }, + Env: append(emr.envOverrides(executable, run)), + }, }, InitContainers: []v1.Container{{ Name: fmt.Sprintf("init-driver-%s", run.RunID), @@ -318,6 +334,18 @@ func (emr *EMRExecutionEngine) executorPodTemplate(executable state.Executable, }, WorkingDir: workingDir, }, + { + Name: "sidecar", + Image: emr.sidecarImage, + Command: emr.sidecarCommand, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("25m"), + v1.ResourceCPU: resource.MustParse("15M"), + }, + }, + Env: append(emr.envOverrides(executable, run)), + }, }, InitContainers: []v1.Container{{ Name: fmt.Sprintf("init-executor-%s", run.RunID), @@ -700,6 +728,28 @@ func (emr *EMRExecutionEngine) envOverrides(executable state.Executable, run sta }, }, }) + + res = append(res, v1.EnvVar{ + Name: "CPU", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "spark-kubernetes-executor", + Resource: "requests.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + v1.EnvVar{ + Name: "MEMORY", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "spark-kubernetes-executor", + Resource: "requests.memory", + }, + }, + }, + ) + return res } diff --git a/worker/status_worker.go b/worker/status_worker.go index 8cce9c59..7f3241cb 100644 --- a/worker/status_worker.go +++ b/worker/status_worker.go @@ -272,16 +272,6 @@ func (sw *statusWorker) extractExceptions(runID string) { } } -func (sw *statusWorker) processEKSRunMetrics(run state.Run) { - updatedRun, err := sw.ee.FetchPodMetrics(run) - if err == nil { - if updatedRun.MaxMemoryUsed != run.MaxMemoryUsed || - updatedRun.MaxCpuUsed != run.MaxCpuUsed { - _, err = sw.sm.UpdateRun(updatedRun.RunID, updatedRun) - } - } -} - func (sw *statusWorker) logStatusUpdate(update state.Run) { var err error var startedAt, finishedAt time.Time From 8395aba03d323928c089e163b5b81fde4d32db90 Mon Sep 17 00:00:00 2001 From: Ujjwal Sarin Date: Tue, 1 Nov 2022 01:12:44 +0000 Subject: [PATCH 2/6] fix command slice handling --- execution/adapter/eks_adapter.go | 6 +++--- execution/engine/eks_engine.go | 4 ++-- execution/engine/emr_engine.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index 092eb441..52d6902e 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -15,7 +15,7 @@ import ( type EKSAdapter interface { AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod *corev1.Pod) (state.Run, error) - AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand []string) (batchv1.Job, error) + AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error) } type eksAdapter struct{} @@ -104,7 +104,7 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod // 5. Node lifecycle. // 6. Node affinity and anti-affinity // -func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand []string) (batchv1.Job, error) { +func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error) { cmd := "" if run.Command != nil && len(*run.Command) > 0 { @@ -143,7 +143,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa sidecarContainer := corev1.Container{ Name: "sidecar", Image: run.Image, - Command: sidecarCommand, + Command: a.constructCmdSlice(sidecarCommand), Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceMemory: resource.MustParse("25m"), diff --git a/execution/engine/eks_engine.go b/execution/engine/eks_engine.go index 14abc746..5e684d17 100644 --- a/execution/engine/eks_engine.go +++ b/execution/engine/eks_engine.go @@ -47,7 +47,7 @@ type EKSExecutionEngine struct { s3Bucket string s3BucketRootDir string statusQueue string - sidecarCommand []string + sidecarCommand string } // @@ -88,7 +88,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error { ee.jobTtl = conf.GetInt("eks_job_ttl") ee.jobSA = conf.GetString("eks_service_account") ee.jobARAEnabled = true - ee.sidecarCommand = conf.GetStringSlice("eks_sidecar_command") + ee.sidecarCommand = conf.GetString("eks_sidecar_command") adapt, err := adapter.NewEKSAdapter() if err != nil { diff --git a/execution/engine/emr_engine.go b/execution/engine/emr_engine.go index 28d1837a..6b1eab5d 100644 --- a/execution/engine/emr_engine.go +++ b/execution/engine/emr_engine.go @@ -47,7 +47,7 @@ type EMRExecutionEngine struct { s3ManifestBucket string s3ManifestBasePath string serializer *k8sJson.Serializer - sidecarCommand []string + sidecarCommand string sidecarImage string } @@ -69,7 +69,7 @@ func (emr *EMRExecutionEngine) Initialize(conf config.Config) error { emr.emrJobSA = conf.GetString("eks_service_account") emr.schedulerName = conf.GetString("eks_scheduler_name") emr.sidecarImage = conf.GetString("emr_sidecar_image") - emr.sidecarCommand = conf.GetStringSlice("emr_sidecar_command") + emr.sidecarCommand = conf.GetString("emr_sidecar_command") awsConfig := &aws.Config{Region: aws.String(emr.awsRegion)} sess := session.Must(session.NewSessionWithOptions(session.Options{Config: *awsConfig})) @@ -254,7 +254,7 @@ func (emr *EMRExecutionEngine) driverPodTemplate(executable state.Executable, ru { Name: "sidecar", Image: emr.sidecarImage, - Command: emr.sidecarCommand, + Command: emr.constructCmdSlice(&emr.sidecarCommand), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceMemory: resource.MustParse("25m"), @@ -337,7 +337,7 @@ func (emr *EMRExecutionEngine) executorPodTemplate(executable state.Executable, { Name: "sidecar", Image: emr.sidecarImage, - Command: emr.sidecarCommand, + Command: emr.constructCmdSlice(&emr.sidecarCommand), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceMemory: resource.MustParse("25m"), From 849b46e6d367f26a45c274e91d7950b098a89b16 Mon Sep 17 00:00:00 2001 From: Ujjwal Sarin Date: Tue, 1 Nov 2022 01:26:42 +0000 Subject: [PATCH 3/6] fix typop --- execution/adapter/eks_adapter.go | 5 +++-- execution/engine/emr_engine.go | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index 52d6902e..9b599c1b 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -146,8 +146,8 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa Command: a.constructCmdSlice(sidecarCommand), Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("25m"), - corev1.ResourceCPU: resource.MustParse("15M"), + corev1.ResourceMemory: resource.MustParse("25M"), + corev1.ResourceCPU: resource.MustParse("15m"), }, }, Env: a.envOverrides(executable, run), @@ -475,6 +475,7 @@ func (a *eksAdapter) envOverrides(executable state.Executable, run state.Run) [] ResourceFieldRef: &corev1.ResourceFieldSelector{ ContainerName: run.RunID, Resource: "requests.memory", + Divisor: resource.MustParse("1M"), }, }, }, diff --git a/execution/engine/emr_engine.go b/execution/engine/emr_engine.go index 6b1eab5d..78fb8bfd 100644 --- a/execution/engine/emr_engine.go +++ b/execution/engine/emr_engine.go @@ -257,8 +257,8 @@ func (emr *EMRExecutionEngine) driverPodTemplate(executable state.Executable, ru Command: emr.constructCmdSlice(&emr.sidecarCommand), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("25m"), - v1.ResourceCPU: resource.MustParse("15M"), + v1.ResourceMemory: resource.MustParse("25M"), + v1.ResourceCPU: resource.MustParse("15m"), }, }, Env: append(emr.envOverrides(executable, run)), @@ -340,8 +340,8 @@ func (emr *EMRExecutionEngine) executorPodTemplate(executable state.Executable, Command: emr.constructCmdSlice(&emr.sidecarCommand), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("25m"), - v1.ResourceCPU: resource.MustParse("15M"), + v1.ResourceMemory: resource.MustParse("25M"), + v1.ResourceCPU: resource.MustParse("15m"), }, }, Env: append(emr.envOverrides(executable, run)), @@ -745,6 +745,7 @@ func (emr *EMRExecutionEngine) envOverrides(executable state.Executable, run sta ResourceFieldRef: &v1.ResourceFieldSelector{ ContainerName: "spark-kubernetes-executor", Resource: "requests.memory", + Divisor: resource.MustParse("1M"), }, }, }, From 0068468397f17beefd23437882932730920e723a Mon Sep 17 00:00:00 2001 From: Ujjwal Sarin Date: Tue, 1 Nov 2022 04:00:12 +0000 Subject: [PATCH 4/6] Revert "remove eviction for gpu jobs" This reverts commit 6d731c43891c1c9728b507e53774f8873a7b0226. --- execution/adapter/eks_adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index 9b599c1b..df27faad 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -196,7 +196,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa } func (a *eksAdapter) constructEviction(run state.Run, manager state.Manager) string { if run.Gpu != nil && *run.Gpu > 0 { - return "false" + return "true" } if run.NodeLifecycle != nil && *run.NodeLifecycle == state.OndemandLifecycle { From 7a4406710a9f28839d5eff40811b446d028d395d Mon Sep 17 00:00:00 2001 From: Oz Raza Date: Tue, 8 Nov 2022 13:54:29 -0500 Subject: [PATCH 5/6] Update models.go Increasing CPU max to support 64 cores. --- state/models.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/models.go b/state/models.go index e4e62bb7..971a7d09 100644 --- a/state/models.go +++ b/state/models.go @@ -28,7 +28,7 @@ var DefaultTaskType = "task" var MinCPU = int64(256) -var MaxCPU = int64(32000) +var MaxCPU = int64(64000) var MinMem = int64(512) From 920bc725e620b2a1fd0336ec0a2b6b646f3274a2 Mon Sep 17 00:00:00 2001 From: Oz Raza Date: Thu, 10 Nov 2022 14:49:39 -0800 Subject: [PATCH 6/6] downward api does not support getting arbritrary resources; changing to value in run definition --- execution/adapter/eks_adapter.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index df27faad..74636a8e 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "regexp" + "strconv" "strings" "time" ) @@ -482,13 +483,8 @@ func (a *eksAdapter) envOverrides(executable state.Executable, run state.Run) [] ) if run.Gpu != nil && *run.Gpu > 0 { res = append(res, corev1.EnvVar{ - Name: "GPU", - ValueFrom: &corev1.EnvVarSource{ - ResourceFieldRef: &corev1.ResourceFieldSelector{ - ContainerName: run.RunID, - Resource: "requests.nvidia.com/gpu", - }, - }, + Name: "GPU", + Value: strconv.FormatInt(*run.Gpu, 10), }) } else { res = append(res, corev1.EnvVar{