From 5b1258ff4e4d85c58545a85eb9cc0cc09300e888 Mon Sep 17 00:00:00 2001 From: Karel Suta Date: Tue, 4 Feb 2025 12:23:35 +0100 Subject: [PATCH] Specify Worker replicas in PyTorchJob only for existing workers --- tests/kfto/kfto_training_test.go | 239 ++++++++++++++++--------------- 1 file changed, 121 insertions(+), 118 deletions(-) diff --git a/tests/kfto/kfto_training_test.go b/tests/kfto/kfto_training_test.go index 5ea0e565..15226083 100644 --- a/tests/kfto/kfto_training_test.go +++ b/tests/kfto/kfto_training_test.go @@ -275,148 +275,151 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, }, }, - kftov1.PyTorchJobReplicaTypeWorker: { - Replicas: Ptr(int32(numberOfWorkerNodes)), - RestartPolicy: "OnFailure", - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "kfto-llm", - }, - }, - Spec: corev1.PodSpec{ - Affinity: &corev1.Affinity{ - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "kfto-llm", - }, - }, - TopologyKey: "kubernetes.io/hostname", + }, + }, + } + // Declaring worker replicas separately, if worker replica is declared with number of pods 0 then operator keeps creating and deleting worker pods + if numberOfWorkerNodes > 0 { + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker] = &kftov1.ReplicaSpec{ + Replicas: Ptr(int32(numberOfWorkerNodes)), + RestartPolicy: "OnFailure", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-llm", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-llm", }, }, + TopologyKey: "kubernetes.io/hostname", }, }, - Tolerations: []corev1.Toleration{ + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: gpu.ResourceLabel, + Operator: corev1.TolerationOpExists, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "copy-model", + Image: GetBloomModelImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ { - Key: gpu.ResourceLabel, - Operator: corev1.TolerationOpExists, + Name: "tmp-volume", + MountPath: "/tmp", }, }, - InitContainers: []corev1.Container{ + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"}, + }, + { + Name: "copy-dataset", + Image: GetAlpacaDatasetImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ { - Name: "copy-model", - Image: GetBloomModelImage(), - ImagePullPolicy: corev1.PullIfNotPresent, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "tmp-volume", - MountPath: "/tmp", - }, - }, - Command: []string{"/bin/sh", "-c"}, - Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"}, + Name: "tmp-volume", + MountPath: "/tmp", }, + }, + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"}, + }, + }, + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: baseImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/bash", "-c", + `torchrun /etc/config/hf_llm_training.py \ + --model_uri /tmp/model/bloom-560m \ + --model_dir /tmp/model/bloom-560m \ + --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ + --transformer_type AutoModelForCausalLM \ + --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ + --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, + }, + Env: []corev1.EnvVar{ { - Name: "copy-dataset", - Image: GetAlpacaDatasetImage(), - ImagePullPolicy: corev1.PullIfNotPresent, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "tmp-volume", - MountPath: "/tmp", - }, - }, - Command: []string{"/bin/sh", "-c"}, - Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"}, + Name: "HF_HOME", + Value: "/tmp/.cache", }, - }, - Containers: []corev1.Container{ { - Name: "pytorch", - Image: baseImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Command: []string{ - "/bin/bash", "-c", - `torchrun /etc/config/hf_llm_training.py \ - --model_uri /tmp/model/bloom-560m \ - --model_dir /tmp/model/bloom-560m \ - --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ - --transformer_type AutoModelForCausalLM \ - --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ - --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, - }, - Env: []corev1.EnvVar{ - { - Name: "HF_HOME", - Value: "/tmp/.cache", - }, - { - Name: "TRITON_CACHE_DIR", - Value: "/tmp/.triton", - }, - { - Name: "TOKENIZERS_PARALLELISM", - Value: "false", - }, - { - Name: "NCCL_DEBUG", - Value: "INFO", - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "config-volume", - MountPath: "/etc/config", - }, - { - Name: "tmp-volume", - MountPath: "/tmp", - }, - }, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), - }, - }, - SecurityContext: &corev1.SecurityContext{ - RunAsNonRoot: Ptr(true), - ReadOnlyRootFilesystem: Ptr(true), - }, + Name: "TRITON_CACHE_DIR", + Value: "/tmp/.triton", + }, + { + Name: "TOKENIZERS_PARALLELISM", + Value: "false", + }, + { + Name: "NCCL_DEBUG", + Value: "INFO", }, }, - Volumes: []corev1.Volume{ + VolumeMounts: []corev1.VolumeMount{ { - Name: "config-volume", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: config.Name, - }, - }, - }, + Name: "config-volume", + MountPath: "/etc/config", }, { - Name: "tmp-volume", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + }, + SecurityContext: &corev1.SecurityContext{ + RunAsNonRoot: Ptr(true), + ReadOnlyRootFilesystem: Ptr(true), + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, }, }, }, }, + { + Name: "tmp-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, }, }, }, - }, + } } tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})