diff --git a/manifests/base/crds/trainer.kubeflow.org_clustertrainingruntimes.yaml b/manifests/base/crds/trainer.kubeflow.org_clustertrainingruntimes.yaml index c2bf82352b..2765763a36 100644 --- a/manifests/base/crds/trainer.kubeflow.org_clustertrainingruntimes.yaml +++ b/manifests/base/crds/trainer.kubeflow.org_clustertrainingruntimes.yaml @@ -50,6 +50,7 @@ spec: description: Configuration for the MPI Runtime. properties: mpiImplementation: + default: OpenMPI description: |- Implementation name for the MPI to create the appropriate hostfile. Defaults to OpenMPI. @@ -61,6 +62,7 @@ spec: format: int32 type: integer runLauncherAsNode: + default: false description: |- Whether to run training process on the launcher Job. Defaults to false. @@ -585,14 +587,24 @@ spec: type: integer type: object numProcPerNode: + default: auto description: |- Number of processes per node. This value is inserted into the `--nproc-per-node` argument of the `torchrun` CLI. Supported values: `auto`, `cpu`, `gpu`, or int value. Defaults to `auto`. type: string + x-kubernetes-validations: + - message: NumProcPerNode must be equal to auto, cpu, gpu, + or int value + rule: self in ['auto', 'cpu', 'gpu'] || type(self) == int type: object type: object + x-kubernetes-validations: + - message: numNodes should not be set if torch.elasticPolicy is configured + rule: '!(has(self.numNodes) && (has(self.torch) && has(self.torch.elasticPolicy)))' + - message: Only one of the policy can be configured + rule: '!(has(self.torch) && has(self.mpi))' podGroupPolicy: description: Configuration for the PodGroup to enable gang-scheduling via supported plugins. @@ -602,6 +614,7 @@ spec: for gang-scheduling. properties: scheduleTimeoutSeconds: + default: 60 description: |- Time threshold to schedule PodGroup for gang-scheduling. If the scheduling timeout is equal to 0, the default value is used. diff --git a/manifests/base/crds/trainer.kubeflow.org_trainingruntimes.yaml b/manifests/base/crds/trainer.kubeflow.org_trainingruntimes.yaml index e7759379b3..0800575e0a 100644 --- a/manifests/base/crds/trainer.kubeflow.org_trainingruntimes.yaml +++ b/manifests/base/crds/trainer.kubeflow.org_trainingruntimes.yaml @@ -50,6 +50,7 @@ spec: description: Configuration for the MPI Runtime. properties: mpiImplementation: + default: OpenMPI description: |- Implementation name for the MPI to create the appropriate hostfile. Defaults to OpenMPI. @@ -61,6 +62,7 @@ spec: format: int32 type: integer runLauncherAsNode: + default: false description: |- Whether to run training process on the launcher Job. Defaults to false. @@ -585,14 +587,24 @@ spec: type: integer type: object numProcPerNode: + default: auto description: |- Number of processes per node. This value is inserted into the `--nproc-per-node` argument of the `torchrun` CLI. Supported values: `auto`, `cpu`, `gpu`, or int value. Defaults to `auto`. type: string + x-kubernetes-validations: + - message: NumProcPerNode must be equal to auto, cpu, gpu, + or int value + rule: self in ['auto', 'cpu', 'gpu'] || type(self) == int type: object type: object + x-kubernetes-validations: + - message: numNodes should not be set if torch.elasticPolicy is configured + rule: '!(has(self.numNodes) && (has(self.torch) && has(self.torch.elasticPolicy)))' + - message: Only one of the policy can be configured + rule: '!(has(self.torch) && has(self.mpi))' podGroupPolicy: description: Configuration for the PodGroup to enable gang-scheduling via supported plugins. @@ -602,6 +614,7 @@ spec: for gang-scheduling. properties: scheduleTimeoutSeconds: + default: 60 description: |- Time threshold to schedule PodGroup for gang-scheduling. If the scheduling timeout is equal to 0, the default value is used. diff --git a/pkg/apis/trainer/v1alpha1/trainingruntime_types.go b/pkg/apis/trainer/v1alpha1/trainingruntime_types.go index 7f9ce5e951..6d55280348 100644 --- a/pkg/apis/trainer/v1alpha1/trainingruntime_types.go +++ b/pkg/apis/trainer/v1alpha1/trainingruntime_types.go @@ -142,10 +142,13 @@ type CoschedulingPodGroupPolicySource struct { // Time threshold to schedule PodGroup for gang-scheduling. // If the scheduling timeout is equal to 0, the default value is used. // Defaults to 60 seconds. + // +kubebuilder:default=60 ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"` } // MLPolicy represents configuration for the model trining with ML-specific parameters. +// +kubebuilder:validation:XValidation:rule="!(has(self.numNodes) && (has(self.torch) && has(self.torch.elasticPolicy)))", message="numNodes should not be set if torch.elasticPolicy is configured" +// +kubebuilder:validation:XValidation:rule="!(has(self.torch) && has(self.mpi))", message="Only one of the policy can be configured" type MLPolicy struct { // Number of training nodes. // Defaults to 1. @@ -173,6 +176,8 @@ type TorchMLPolicySource struct { // Supported values: `auto`, `cpu`, `gpu`, or int value. // TODO (andreyvelich): Add kubebuilder validation. // Defaults to `auto`. + // +kubebuilder:default="auto" + // +kubebuilder:validation:XValidation:rule="self in ['auto', 'cpu', 'gpu'] || type(self) == int", message="NumProcPerNode must be equal to auto, cpu, gpu, or int value" NumProcPerNode *string `json:"numProcPerNode,omitempty"` // Elastic policy for the PyTorch training. @@ -210,6 +215,7 @@ type MPIMLPolicySource struct { // Implementation name for the MPI to create the appropriate hostfile. // Defaults to OpenMPI. + // +kubebuilder:default=OpenMPI MPIImplementation MPIImplementation `json:"mpiImplementation,omitempty"` // Directory where SSH keys are mounted. @@ -218,6 +224,7 @@ type MPIMLPolicySource struct { // Whether to run training process on the launcher Job. // Defaults to false. + // +kubebuilder:default=false RunLauncherAsNode *bool `json:"runLauncherAsNode,omitempty"` } diff --git a/pkg/runtime/core/trainingruntime_test.go b/pkg/runtime/core/trainingruntime_test.go index 7eb75de381..4274136a53 100644 --- a/pkg/runtime/core/trainingruntime_test.go +++ b/pkg/runtime/core/trainingruntime_test.go @@ -19,6 +19,7 @@ package core import ( "context" "fmt" + "k8s.io/utils/ptr" "testing" "github.com/google/go-cmp/cmp" @@ -263,7 +264,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { "succeeded to build JobSet with Torch values from the TrainJob": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). - TorchPolicy(100, "auto"). + TorchPolicy(100, ptr.To("auto")). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Obj(), ).Obj(), @@ -273,7 +274,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { Trainer( testingutil.MakeTrainJobTrainerWrapper(). NumNodes(30). - NumProcPerNode("3"). + NumProcPerNode(ptr.To("3")). Obj(), ). Obj(), @@ -317,7 +318,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { "succeeded to build JobSet with Torch values from the Runtime and envs.": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). - TorchPolicy(100, "auto"). + TorchPolicy(100, ptr.To("auto")). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). ContainerTrainerEnv( []corev1.EnvVar{ diff --git a/pkg/runtime/framework/plugins/torch/torch.go b/pkg/runtime/framework/plugins/torch/torch.go index f8f9e0631b..02e86073c4 100644 --- a/pkg/runtime/framework/plugins/torch/torch.go +++ b/pkg/runtime/framework/plugins/torch/torch.go @@ -76,6 +76,7 @@ func (t *Torch) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob) // TODO (andreyvelich): Add validation to check that TrainJob doesn't have "PET_" envs. // TODO (andreyvelich): We should validate that envs from different plugins don't conflict with each other. // Ref: https://github.com/kubeflow/trainer/pull/2308#discussion_r1823229940 + infoEnvs := []corev1.EnvVar{ { Name: constants.TorchEnvNumNodes, diff --git a/pkg/util/testing/wrapper.go b/pkg/util/testing/wrapper.go index 2002f66a4e..23b38545cd 100644 --- a/pkg/util/testing/wrapper.go +++ b/pkg/util/testing/wrapper.go @@ -392,8 +392,8 @@ func (t *TrainJobTrainerWrapper) NumNodes(numNodes int32) *TrainJobTrainerWrappe return t } -func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode string) *TrainJobTrainerWrapper { - t.Trainer.NumProcPerNode = &numProcPerNode +func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode *string) *TrainJobTrainerWrapper { + t.Trainer.NumProcPerNode = numProcPerNode return t } @@ -689,12 +689,12 @@ func (s *TrainingRuntimeSpecWrapper) NumNodes(numNodes int32) *TrainingRuntimeSp return s } -func (s *TrainingRuntimeSpecWrapper) TorchPolicy(numNodes int32, numProcPerNode string) *TrainingRuntimeSpecWrapper { +func (s *TrainingRuntimeSpecWrapper) TorchPolicy(numNodes int32, numProcPerNode *string) *TrainingRuntimeSpecWrapper { s.MLPolicy = &trainer.MLPolicy{ NumNodes: &numNodes, MLPolicySource: trainer.MLPolicySource{ Torch: &trainer.TorchMLPolicySource{ - NumProcPerNode: &numProcPerNode, + NumProcPerNode: numProcPerNode, }, }, } diff --git a/test/integration/controller/trainjob_controller_test.go b/test/integration/controller/trainjob_controller_test.go index 10c20c2e6c..88b1fb6688 100644 --- a/test/integration/controller/trainjob_controller_test.go +++ b/test/integration/controller/trainjob_controller_test.go @@ -278,7 +278,7 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { trainingRuntime = testingutil.MakeTrainingRuntimeWrapper(ns.Name, "alpha"). RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "alpha").Spec). - TorchPolicy(100, "auto"). + TorchPolicy(100, ptr.To("auto")). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Obj()). Obj() diff --git a/test/integration/webhooks/clustertrainingruntime_test.go b/test/integration/webhooks/clustertrainingruntime_webhook_test.go similarity index 98% rename from test/integration/webhooks/clustertrainingruntime_test.go rename to test/integration/webhooks/clustertrainingruntime_webhook_test.go index 184a7119fa..047f0302c4 100644 --- a/test/integration/webhooks/clustertrainingruntime_test.go +++ b/test/integration/webhooks/clustertrainingruntime_webhook_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024 The Kubeflow Authors. +Copyright 2025 The Kubeflow Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/integration/webhooks/trainingruntime_test.go b/test/integration/webhooks/trainingruntime_test.go deleted file mode 100644 index c9a71bba06..0000000000 --- a/test/integration/webhooks/trainingruntime_test.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2024 The Kubeflow Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package webhooks - -import ( - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - trainer "github.com/kubeflow/trainer/pkg/apis/trainer/v1alpha1" - testingutil "github.com/kubeflow/trainer/pkg/util/testing" - "github.com/kubeflow/trainer/test/integration/framework" -) - -const trainingRuntimeName = "test-trainingruntime" - -var _ = ginkgo.Describe("TrainingRuntime Webhook", ginkgo.Ordered, func() { - var ns *corev1.Namespace - - ginkgo.BeforeAll(func() { - fwk = &framework.Framework{} - cfg = fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg) - }) - ginkgo.AfterAll(func() { - fwk.Teardown() - }) - - ginkgo.BeforeEach(func() { - ns = &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Namespace", - }, - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "trainingruntime-webhook-", - }, - } - gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) - }) - - ginkgo.AfterEach(func() { - gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.TrainingRuntime{}, client.InNamespace(ns.Name))).To(gomega.Succeed()) - }) - - ginkgo.When("Creating TrainingRuntime", func() { - ginkgo.DescribeTable("Validate TrainingRuntime on creation", func(runtime func() *trainer.TrainingRuntime) { - gomega.Expect(k8sClient.Create(ctx, runtime())).Should(gomega.Succeed()) - }, - ginkgo.Entry("Should succeed to create TrainingRuntime", - func() *trainer.TrainingRuntime { - baseRuntime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, trainingRuntimeName) - return baseRuntime. - RuntimeSpec( - testingutil.MakeTrainingRuntimeSpecWrapper(baseRuntime.Spec). - Obj()). - Obj() - }), - ) - }) -}) diff --git a/test/integration/webhooks/trainingruntime_webhook_test.go b/test/integration/webhooks/trainingruntime_webhook_test.go new file mode 100644 index 0000000000..bffb37507d --- /dev/null +++ b/test/integration/webhooks/trainingruntime_webhook_test.go @@ -0,0 +1,202 @@ +/* +Copyright 2025 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhooks + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + trainer "github.com/kubeflow/trainer/pkg/apis/trainer/v1alpha1" + testingutil "github.com/kubeflow/trainer/pkg/util/testing" + "github.com/kubeflow/trainer/test/integration/framework" + "github.com/kubeflow/trainer/test/util" +) + +const trainingRuntimeName = "test-trainingruntime" + +var _ = ginkgo.Describe("TrainingRuntime Webhook", ginkgo.Ordered, func() { + var ns *corev1.Namespace + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{} + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "trainingruntime-webhook-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.TrainingRuntime{}, client.InNamespace(ns.Name))).To(gomega.Succeed()) + }) + + ginkgo.When("Creating TrainingRuntime", func() { + ginkgo.DescribeTable("Validate TrainingRuntime on creation", func(runtime func() *trainer.TrainingRuntime) { + gomega.Expect(k8sClient.Create(ctx, runtime())).Should(gomega.Succeed()) + }, + ginkgo.Entry("Should succeed to create TrainingRuntime", + func() *trainer.TrainingRuntime { + baseRuntime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, trainingRuntimeName) + return baseRuntime. + RuntimeSpec( + testingutil.MakeTrainingRuntimeSpecWrapper(baseRuntime.Spec). + Obj()). + Obj() + }), + ) + }) +}) + +var _ = ginkgo.Describe("TrainingRuntime marker validations and defaulting", ginkgo.Ordered, func() { + var ns *corev1.Namespace + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{} + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "training-runtime-marker-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.TrainingRuntime{}, client.InNamespace(ns.Name))).Should(gomega.Succeed()) + gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.ClusterTrainingRuntime{})).Should(gomega.Succeed()) + }) + + ginkgo.When("Creating TrainingRuntime", func() { + ginkgo.DescribeTable("Validate TrainingRuntime on creation", func(trainingRuntime func() *trainer.TrainingRuntime, errorMatcher gomega.OmegaMatcher) { + gomega.Expect(k8sClient.Create(ctx, trainingRuntime())).Should(errorMatcher) + }, + ginkgo.Entry("Should succeed to create trainingRuntime", + func() *trainer.TrainingRuntime { + return testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime"). + Obj() + }, + gomega.Succeed()), + ginkgo.Entry("Should fail to create trainingRuntime with both MPI and Torch runtimes", + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + MLPolicySource: trainer.MLPolicySource{ + Torch: &trainer.TorchMLPolicySource{}, + MPI: &trainer.MPIMLPolicySource{}, + }, + } + return runtime + }, + testingutil.BeInvalidError()), + ginkgo.Entry("Should fail to create trainingRuntime with minNodes and torch.elasticPolicy", + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + NumNodes: ptr.To(int32(2)), + MLPolicySource: trainer.MLPolicySource{ + Torch: &trainer.TorchMLPolicySource{ + ElasticPolicy: &trainer.TorchElasticPolicy{}, + }, + }, + } + return runtime + }, + testingutil.BeInvalidError()), + ) + ginkgo.DescribeTable("Defaulting TrainingRuntime on creation", func(trainingRuntime func() *trainer.TrainingRuntime, wantTrainingRuntime func() *trainer.TrainingRuntime) { + created := trainingRuntime() + gomega.Expect(k8sClient.Create(ctx, created)).Should(gomega.Succeed()) + gomega.Expect(created).Should(gomega.BeComparableTo(wantTrainingRuntime(), util.IgnoreObjectMetadata)) + }, + ginkgo.Entry("Should succeed to default torch.NumProcPerNode=auto", + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + MLPolicySource: trainer.MLPolicySource{ + Torch: &trainer.TorchMLPolicySource{}, + }, + } + return runtime + }, + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + MLPolicySource: trainer.MLPolicySource{ + Torch: &trainer.TorchMLPolicySource{ + NumProcPerNode: ptr.To("auto"), + }, + }, + } + runtime.Spec.Template.Spec = testingutil.MakeJobSetWrapper(ns.Name, "runtime"). + Replicas(1).Obj().Spec + return runtime + }), + + ginkgo.Entry("Should succeed to default mpi.mpiImplementation=OpenMPI", + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + MLPolicySource: trainer.MLPolicySource{ + MPI: &trainer.MPIMLPolicySource{}, + }, + } + return runtime + }, + func() *trainer.TrainingRuntime { + runtime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "runtime").Obj() + runtime.Spec.MLPolicy = &trainer.MLPolicy{ + MLPolicySource: trainer.MLPolicySource{ + MPI: &trainer.MPIMLPolicySource{ + MPIImplementation: trainer.MPIImplementationOpenMPI, + RunLauncherAsNode: ptr.To(false), + }, + }, + } + runtime.Spec.Template.Spec = testingutil.MakeJobSetWrapper(ns.Name, "runtime"). + Replicas(1).Obj().Spec + return runtime + }), + ) + }) +})