Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downward api does not support getting arbritrary resources; changing … #431

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 67 additions & 13 deletions execution/adapter/eks_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"strconv"
"strings"
"time"
)

type EKSAdapter interface {
AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod *corev1.Pod) (state.Run, error)
AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error)
AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error)
}
type eksAdapter struct{}

Expand Down Expand Up @@ -55,22 +56,28 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod
updated.Status = state.StatusStopped
if pod != nil {
if pod.Status.ContainerStatuses != nil && len(pod.Status.ContainerStatuses) > 0 {
containerStatus := pod.Status.ContainerStatuses[len(pod.Status.ContainerStatuses)-1]
if containerStatus.State.Terminated != nil {
updated.ExitReason = &containerStatus.State.Terminated.Reason
exitCode = int64(containerStatus.State.Terminated.ExitCode)
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name != "sidecar" {
if containerStatus.State.Terminated != nil {
updated.ExitReason = &containerStatus.State.Terminated.Reason
exitCode = int64(containerStatus.State.Terminated.ExitCode)
}
}
}
}
}
updated.ExitCode = &exitCode
}

if pod != nil && len(pod.Spec.Containers) > 0 {
container := pod.Spec.Containers[0]
//First three lines are injected by Flotilla, strip those out.
if len(container.Command) > 3 {
cmd := strings.Join(container.Command[3:], "\n")
updated.Command = &cmd
for _, container := range pod.Spec.Containers {
if container.Name != "sidecar" {
//First three lines are injected by Flotilla, strip those out.
if len(container.Command) > 3 {
cmd := strings.Join(container.Command[3:], "\n")
updated.Command = &cmd
}
}
}
}

Expand Down Expand Up @@ -98,7 +105,7 @@ func (a *eksAdapter) AdaptJobToFlotillaRun(job *batchv1.Job, run state.Run, pod
// 5. Node lifecycle.
// 6. Node affinity and anti-affinity
//
func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool) (batchv1.Job, error) {
func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executable, run state.Run, sa string, schedulerName string, manager state.Manager, araEnabled bool, sidecarCommand string) (batchv1.Job, error) {
cmd := ""

if run.Command != nil && len(*run.Command) > 0 {
Expand Down Expand Up @@ -134,6 +141,19 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa
"owner": a.sanitizeLabel(run.User),
}

sidecarContainer := corev1.Container{
Name: "sidecar",
Image: run.Image,
Command: a.constructCmdSlice(sidecarCommand),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("25M"),
corev1.ResourceCPU: resource.MustParse("15m"),
},
},
Env: a.envOverrides(executable, run),
}

//if run.Description != nil {
// info := strings.Split(*run.Description, "/")
//
Expand All @@ -154,7 +174,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa
},
Spec: corev1.PodSpec{
SchedulerName: schedulerName,
Containers: []corev1.Container{container},
Containers: []corev1.Container{container, sidecarContainer},
RestartPolicy: corev1.RestartPolicyNever,
ServiceAccountName: sa,
Affinity: affinity,
Expand All @@ -177,7 +197,7 @@ func (a *eksAdapter) AdaptFlotillaDefinitionAndRunToJob(executable state.Executa
}
func (a *eksAdapter) constructEviction(run state.Run, manager state.Manager) string {
if run.Gpu != nil && *run.Gpu > 0 {
return "false"
return "true"
}

if run.NodeLifecycle != nil && *run.NodeLifecycle == state.OndemandLifecycle {
Expand Down Expand Up @@ -439,6 +459,40 @@ func (a *eksAdapter) envOverrides(executable state.Executable, run state.Run) []
})
}
}

res = append(res, corev1.EnvVar{
Name: "CPU",
ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{
ContainerName: run.RunID,
Resource: "requests.cpu",
Divisor: resource.MustParse("1m"),
},
},
},
corev1.EnvVar{
Name: "MEMORY",
ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{
ContainerName: run.RunID,
Resource: "requests.memory",
Divisor: resource.MustParse("1M"),
},
},
},
)
if run.Gpu != nil && *run.Gpu > 0 {
res = append(res, corev1.EnvVar{
Name: "GPU",
Value: strconv.FormatInt(*run.Gpu, 10),
})
} else {
res = append(res, corev1.EnvVar{
Name: "GPU",
Value: "0",
})

}
return res
}

Expand Down
29 changes: 16 additions & 13 deletions execution/engine/eks_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type EKSExecutionEngine struct {
s3Bucket string
s3BucketRootDir string
statusQueue string
sidecarCommand string
}

//
Expand Down Expand Up @@ -87,7 +88,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error {
ee.jobTtl = conf.GetInt("eks_job_ttl")
ee.jobSA = conf.GetString("eks_service_account")
ee.jobARAEnabled = true

ee.sidecarCommand = conf.GetString("eks_sidecar_command")
adapt, err := adapter.NewEKSAdapter()

if err != nil {
Expand Down Expand Up @@ -115,7 +116,7 @@ func (ee *EKSExecutionEngine) Initialize(conf config.Config) error {
}

func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run, manager state.Manager) (state.Run, bool, error) {
job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled)
job, err := ee.adapter.AdaptFlotillaDefinitionAndRunToJob(executable, run, ee.jobSA, ee.schedulerName, manager, ee.jobARAEnabled, ee.sidecarCommand)

kClient, err := ee.getKClient(run)
if err != nil {
Expand Down Expand Up @@ -175,7 +176,6 @@ func (ee *EKSExecutionEngine) Execute(executable state.Executable, run state.Run

func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) {
podList, err := ee.getPodList(run)

if err != nil {
return run, err
}
Expand All @@ -185,16 +185,19 @@ func (ee *EKSExecutionEngine) getPodName(run state.Run) (state.Run, error) {
run.PodName = &pod.Name
run.Namespace = &pod.Namespace
if pod.Spec.Containers != nil && len(pod.Spec.Containers) > 0 {
container := pod.Spec.Containers[len(pod.Spec.Containers)-1]
cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli)
cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli)
run.Cpu = &cpu
run.CpuLimit = &cpuLimit
run = ee.getInstanceDetails(pod, run)
mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega)
run.Memory = &mem
memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega)
run.MemoryLimit = &memLimit
for _, container := range pod.Spec.Containers {
if container.Name != "sidecar" {
cpu := container.Resources.Requests.Cpu().ScaledValue(resource.Milli)
cpuLimit := container.Resources.Limits.Cpu().ScaledValue(resource.Milli)
run.Cpu = &cpu
run.CpuLimit = &cpuLimit
run = ee.getInstanceDetails(pod, run)
mem := container.Resources.Requests.Memory().ScaledValue(resource.Mega)
run.Memory = &mem
memLimit := container.Resources.Limits.Memory().ScaledValue(resource.Mega)
run.MemoryLimit = &memLimit
}
}
}
}
return run, nil
Expand Down
51 changes: 51 additions & 0 deletions execution/engine/emr_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type EMRExecutionEngine struct {
s3ManifestBucket string
s3ManifestBasePath string
serializer *k8sJson.Serializer
sidecarCommand string
sidecarImage string
}

//
Expand All @@ -66,6 +68,8 @@ func (emr *EMRExecutionEngine) Initialize(conf config.Config) error {
emr.s3ManifestBasePath = conf.GetString("emr_manifest_base_path")
emr.emrJobSA = conf.GetString("eks_service_account")
emr.schedulerName = conf.GetString("eks_scheduler_name")
emr.sidecarImage = conf.GetString("emr_sidecar_image")
emr.sidecarCommand = conf.GetString("emr_sidecar_command")

awsConfig := &aws.Config{Region: aws.String(emr.awsRegion)}
sess := session.Must(session.NewSessionWithOptions(session.Options{Config: *awsConfig}))
Expand Down Expand Up @@ -247,6 +251,18 @@ func (emr *EMRExecutionEngine) driverPodTemplate(executable state.Executable, ru
},
WorkingDir: workingDir,
},
{
Name: "sidecar",
Image: emr.sidecarImage,
Command: emr.constructCmdSlice(&emr.sidecarCommand),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("25M"),
v1.ResourceCPU: resource.MustParse("15m"),
},
},
Env: append(emr.envOverrides(executable, run)),
},
},
InitContainers: []v1.Container{{
Name: fmt.Sprintf("init-driver-%s", run.RunID),
Expand Down Expand Up @@ -318,6 +334,18 @@ func (emr *EMRExecutionEngine) executorPodTemplate(executable state.Executable,
},
WorkingDir: workingDir,
},
{
Name: "sidecar",
Image: emr.sidecarImage,
Command: emr.constructCmdSlice(&emr.sidecarCommand),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("25M"),
v1.ResourceCPU: resource.MustParse("15m"),
},
},
Env: append(emr.envOverrides(executable, run)),
},
},
InitContainers: []v1.Container{{
Name: fmt.Sprintf("init-executor-%s", run.RunID),
Expand Down Expand Up @@ -700,6 +728,29 @@ func (emr *EMRExecutionEngine) envOverrides(executable state.Executable, run sta
},
},
})

res = append(res, v1.EnvVar{
Name: "CPU",
ValueFrom: &v1.EnvVarSource{
ResourceFieldRef: &v1.ResourceFieldSelector{
ContainerName: "spark-kubernetes-executor",
Resource: "requests.cpu",
Divisor: resource.MustParse("1m"),
},
},
},
v1.EnvVar{
Name: "MEMORY",
ValueFrom: &v1.EnvVarSource{
ResourceFieldRef: &v1.ResourceFieldSelector{
ContainerName: "spark-kubernetes-executor",
Resource: "requests.memory",
Divisor: resource.MustParse("1M"),
},
},
},
)

return res
}

Expand Down
2 changes: 1 addition & 1 deletion state/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var DefaultTaskType = "task"

var MinCPU = int64(256)

var MaxCPU = int64(32000)
var MaxCPU = int64(64000)

var MinMem = int64(512)

Expand Down
10 changes: 0 additions & 10 deletions worker/status_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,6 @@ func (sw *statusWorker) extractExceptions(runID string) {
}
}

func (sw *statusWorker) processEKSRunMetrics(run state.Run) {
updatedRun, err := sw.ee.FetchPodMetrics(run)
if err == nil {
if updatedRun.MaxMemoryUsed != run.MaxMemoryUsed ||
updatedRun.MaxCpuUsed != run.MaxCpuUsed {
_, err = sw.sm.UpdateRun(updatedRun.RunID, updatedRun)
}
}
}

func (sw *statusWorker) logStatusUpdate(update state.Run) {
var err error
var startedAt, finishedAt time.Time
Expand Down