diff --git a/execution/adapter/eks_adapter.go b/execution/adapter/eks_adapter.go index 865562ed..74636a8e 100644 --- a/execution/adapter/eks_adapter.go +++ b/execution/adapter/eks_adapter.go @@ -9,13 +9,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "regexp" + "strconv" "strings" "time" ) type EKSAdapter interface { AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod *corev1.Pod) (state.Run, error) - AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error) + AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error) } type eksAdapter struct{} @@ -55,10 +56,13 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod updated.Status = state.StatusStopped if pod != nil { if pod.Status.ContainerStatuses != nil && len(pod.Status.ContainerStatuses) > 0 { - containerStatus := pod.Status.ContainerStatuses[len(pod.Status.ContainerStatuses)-1] - if containerStatus.State.Terminated != nil { - updated.ExitReason = &containerStatus.State.Terminated.Reason - exitCode = int64(containerStatus.State.Terminated.ExitCode) + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name != "sidecar" { + if containerStatus.State.Terminated != nil { + updated.ExitReason = &containerStatus.State.Terminated.Reason + exitCode = int64(containerStatus.State.Terminated.ExitCode) + } + } } } } @@ -66,11 +70,14 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod } if pod != nil && len(pod.Spec.Containers) > 0 { - container := pod.Spec.Containers[0] - //First three lines are injected by Flotilla, strip those out. - if len(container.Command) > 3 { - cmd := strings.Join(container.Command[3:], "\n") - updated.Command = &cmd + for _, container := range pod.Spec.Containers { + if container.Name != "sidecar" { + //First three lines are injected by Flotilla, strip those out. + if len(container.Command) > 3 { + cmd := strings.Join(container.Command[3:], "\n") + updated.Command = &cmd + } + } } } @@ -98,7 +105,7 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod // 5. Node lifecycle. // 6. Node affinity and anti-affinity // -func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error) { +func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error) { cmd := "" if run.Command != nil && len(*run.Command) > 0 { @@ -134,6 +141,19 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa "owner": a.sanitizeLabel(run.User), } + sidecarContainer := corev1.Container{ + Name: "sidecar", + Image: run.Image, + Command: a.constructCmdSlice(sidecarCommand), + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("25M"), + corev1.ResourceCPU: resource.MustParse("15m"), + }, + }, + Env: a.envOverrides(executable, run), + } + //if run.Description != nil { // info := strings.Split(*run.Description, "/") // @@ -154,7 +174,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa }, Spec: corev1.PodSpec{ SchedulerName: schedulerName, - Containers: []corev1.Container{container}, + Containers: []corev1.Container{container, sidecarContainer}, RestartPolicy: corev1.RestartPolicyNever, ServiceAccountName: sa, Affinity: affinity, @@ -177,7 +197,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa } func (a *eksAdapter) constructEviction(run state.Run, manager state.Manager) string { if run.Gpu != nil && *run.Gpu > 0 { - return "false" + return "true" } if run.NodeLifecycle != nil && *run.NodeLifecycle == state.OndemandLifecycle { @@ -439,6 +459,40 @@ func (a *eksAdapter) envOverrides(executable state.Executable, run state.Run) [] }) } } + + res = append(res, corev1.EnvVar{ + Name: "CPU", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: run.RunID, + Resource: "requests.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + corev1.EnvVar{ + Name: "MEMORY", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: run.RunID, + Resource: "requests.memory", + Divisor: resource.MustParse("1M"), + }, + }, + }, + ) + if run.Gpu != nil && *run.Gpu > 0 { + res = append(res, corev1.EnvVar{ + Name: "GPU", + Value: strconv.FormatInt(*run.Gpu, 10), + }) + } else { + res = append(res, corev1.EnvVar{ + Name: "GPU", + Value: "0", + }) + + } return res } diff --git a/execution/engine/eks_engine.go b/execution/engine/eks_engine.go index ca77676f..5e684d17 100644 --- a/execution/engine/eks_engine.go +++ b/execution/engine/eks_engine.go @@ -47,6 +47,7 @@ type EKSExecutionEngine struct { s3Bucket string s3BucketRootDir string statusQueue string + sidecarCommand string } // @@ -87,7 +88,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error { ee.jobTtl = conf.GetInt("eks_job_ttl") ee.jobSA = conf.GetString("eks_service_account") ee.jobARAEnabled = true - + ee.sidecarCommand = conf.GetString("eks_sidecar_command") adapt, err := adapter.NewEKSAdapter() if err != nil { @@ -115,7 +116,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error { } func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run, manager state.Manager) (state.Run, bool, error) { - job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled) + job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled, ee.sidecarCommand) kClient, err := ee.getKClient(run) if err != nil { @@ -175,7 +176,6 @@ func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) { podList, err := ee.getPodList(run) - if err != nil { return run, err } @@ -185,16 +185,19 @@ func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) { run.PodName = &pod.Name run.Namespace = &pod.Namespace if pod.Spec.Containers != nil && len(pod.Spec.Containers) > 0 { - container := pod.Spec.Containers[len(pod.Spec.Containers)-1] - cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli) - run.Cpu = &cpu - run.CpuLimit = &cpuLimit - run = ee.getInstanceDetails(pod, run) - mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega) - run.Memory = &mem - memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega) - run.MemoryLimit = &memLimit + for _, container := range pod.Spec.Containers { + if container.Name != "sidecar" { + cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli) + run.Cpu = &cpu + run.CpuLimit = &cpuLimit + run = ee.getInstanceDetails(pod, run) + mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega) + run.Memory = &mem + memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega) + run.MemoryLimit = &memLimit + } + } } } return run, nil diff --git a/execution/engine/emr_engine.go b/execution/engine/emr_engine.go index f16cb277..78fb8bfd 100644 --- a/execution/engine/emr_engine.go +++ b/execution/engine/emr_engine.go @@ -47,6 +47,8 @@ type EMRExecutionEngine struct { s3ManifestBucket string s3ManifestBasePath string serializer *k8sJson.Serializer + sidecarCommand string + sidecarImage string } // @@ -66,6 +68,8 @@ func (emr *EMRExecutionEngine) Initialize(conf config.Config) error { emr.s3ManifestBasePath = conf.GetString("emr_manifest_base_path") emr.emrJobSA = conf.GetString("eks_service_account") emr.schedulerName = conf.GetString("eks_scheduler_name") + emr.sidecarImage = conf.GetString("emr_sidecar_image") + emr.sidecarCommand = conf.GetString("emr_sidecar_command") awsConfig := &aws.Config{Region: aws.String(emr.awsRegion)} sess := session.Must(session.NewSessionWithOptions(session.Options{Config: *awsConfig})) @@ -247,6 +251,18 @@ func (emr *EMRExecutionEngine) driverPodTemplate(executable state.Executable, ru }, WorkingDir: workingDir, }, + { + Name: "sidecar", + Image: emr.sidecarImage, + Command: emr.constructCmdSlice(&emr.sidecarCommand), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("25M"), + v1.ResourceCPU: resource.MustParse("15m"), + }, + }, + Env: append(emr.envOverrides(executable, run)), + }, }, InitContainers: []v1.Container{{ Name: fmt.Sprintf("init-driver-%s", run.RunID), @@ -318,6 +334,18 @@ func (emr *EMRExecutionEngine) executorPodTemplate(executable state.Executable, }, WorkingDir: workingDir, }, + { + Name: "sidecar", + Image: emr.sidecarImage, + Command: emr.constructCmdSlice(&emr.sidecarCommand), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("25M"), + v1.ResourceCPU: resource.MustParse("15m"), + }, + }, + Env: append(emr.envOverrides(executable, run)), + }, }, InitContainers: []v1.Container{{ Name: fmt.Sprintf("init-executor-%s", run.RunID), @@ -700,6 +728,29 @@ func (emr *EMRExecutionEngine) envOverrides(executable state.Executable, run sta }, }, }) + + res = append(res, v1.EnvVar{ + Name: "CPU", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "spark-kubernetes-executor", + Resource: "requests.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + v1.EnvVar{ + Name: "MEMORY", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "spark-kubernetes-executor", + Resource: "requests.memory", + Divisor: resource.MustParse("1M"), + }, + }, + }, + ) + return res } diff --git a/state/models.go b/state/models.go index e4e62bb7..971a7d09 100644 --- a/state/models.go +++ b/state/models.go @@ -28,7 +28,7 @@ var DefaultTaskType = "task" var MinCPU = int64(256) -var MaxCPU = int64(32000) +var MaxCPU = int64(64000) var MinMem = int64(512) diff --git a/worker/status_worker.go b/worker/status_worker.go index 8cce9c59..7f3241cb 100644 --- a/worker/status_worker.go +++ b/worker/status_worker.go @@ -272,16 +272,6 @@ func (sw *statusWorker) extractExceptions(runID string) { } } -func (sw *statusWorker) processEKSRunMetrics(run state.Run) { - updatedRun, err := sw.ee.FetchPodMetrics(run) - if err == nil { - if updatedRun.MaxMemoryUsed != run.MaxMemoryUsed || - updatedRun.MaxCpuUsed != run.MaxCpuUsed { - _, err = sw.sm.UpdateRun(updatedRun.RunID, updatedRun) - } - } -} - func (sw *statusWorker) logStatusUpdate(update state.Run) { var err error var startedAt, finishedAt time.Time