diff --git a/e2e2/internal/framework_extensions/client.go b/e2e2/internal/framework_extensions/client.go index 47064fe6c..0cd6e50d7 100644 --- a/e2e2/internal/framework_extensions/client.go +++ b/e2e2/internal/framework_extensions/client.go @@ -3,6 +3,7 @@ package frameworkext import ( "bytes" "context" + "html/template" "io" "os" @@ -102,6 +103,17 @@ func deleteManifests(restConfig *rest.Config, manifests ...io.Reader) error { return nil } +// RenderManifests renders manifests with the supplied data +func RenderManifests(file []byte, templateData interface{}) ([]byte, error) { + tpl, err := template.New("Manifest").Parse(string(file)) + if err != nil { + return nil, err + } + buf := bytes.Buffer{} + err = tpl.Execute(&buf, templateData) + return buf.Bytes(), err +} + func bytesSlicesToReaders(byteSlices ...[]byte) []io.Reader { var readers []io.Reader for _, b := range byteSlices { diff --git a/e2e2/test/cases/nvidia/main_test.go b/e2e2/test/cases/nvidia/main_test.go index f107fb703..2baa03149 100644 --- a/e2e2/test/cases/nvidia/main_test.go +++ b/e2e2/test/cases/nvidia/main_test.go @@ -3,6 +3,8 @@ package nvidia import ( "context" _ "embed" + "flag" + "fmt" "log" "os" "slices" @@ -13,6 +15,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/klient/wait/conditions" "sigs.k8s.io/e2e-framework/pkg/env" @@ -20,7 +23,13 @@ import ( ) var ( - testenv env.Environment + testenv env.Environment + nodeType *string + efaEnabled *bool + ncclTestImage *string + nodeCount int + gpuPerNode int + efaPerNode int ) var ( @@ -28,9 +37,14 @@ var ( nvidiaDevicePluginManifest []byte //go:embed manifests/mpi-operator.yaml mpiOperatorManifest []byte + //go:embed manifests/efa-device-plugin.yaml + efaDevicePluginManifest []byte ) func TestMain(m *testing.M) { + nodeType = flag.String("nodeType", "", "node type for the tests") + ncclTestImage = flag.String("ncclTestImage", "", "nccl test image for nccl tests") + efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests") cfg, err := envconf.NewFromFlags() if err != nil { log.Fatalf("failed to initialize test environment: %v", err) @@ -73,12 +87,69 @@ func TestMain(m *testing.M) { } return ctx, nil }, + func(ctx context.Context, config *envconf.Config) (context.Context, error) { + clientset, err := kubernetes.NewForConfig(cfg.Client().RESTConfig()) + if err != nil { + return ctx, err + } + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return ctx, err + } + if *efaEnabled { + err := fwext.ApplyManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest) + if err != nil { + return ctx, err + } + ds := appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "efa-device-plugin-daemonset", Namespace: "kube-system"}, + } + err = wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds), + wait.WithTimeout(time.Minute*5)) + if err != nil { + return ctx, err + } + } + + singleNodeType := true + for i := 1; i < len(nodes.Items)-1; i++ { + if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] { + singleNodeType = false + } + } + if !singleNodeType { + return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster") + } + if *nodeType != "" { + for _, v := range nodes.Items { + if v.Labels["node.kubernetes.io/instance-type"] == *nodeType { + nodeCount++ + gpu := v.Status.Capacity["nvidia.com/gpu"] + gpuPerNode = int(gpu.Value()) + efa := v.Status.Capacity["vpc.amazonaws.com/efa"] + efaPerNode = int(efa.Value()) + } + } + } else { + log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"]) + nodeCount = len(nodes.Items) + gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"] + gpuPerNode = int(gpu.Value()) + efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"] + efaPerNode = int(efa.Value()) + } + return ctx, nil + }, ) testenv.Finish( func(ctx context.Context, config *envconf.Config) (context.Context, error) { + err := fwext.DeleteManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest) + if err != nil { + return ctx, err + } slices.Reverse(manifests) - err := fwext.DeleteManifests(config.Client().RESTConfig(), manifests...) + err = fwext.DeleteManifests(config.Client().RESTConfig(), manifests...) if err != nil { return ctx, err } diff --git a/e2e2/test/cases/nvidia/manifests/mpi-job-pytorch-training-multi-node.yaml b/e2e2/test/cases/nvidia/manifests/mpi-job-nccl-test-multi-node.yaml similarity index 74% rename from e2e2/test/cases/nvidia/manifests/mpi-job-pytorch-training-multi-node.yaml rename to e2e2/test/cases/nvidia/manifests/mpi-job-nccl-test-multi-node.yaml index 722cb3da1..1af096145 100644 --- a/e2e2/test/cases/nvidia/manifests/mpi-job-pytorch-training-multi-node.yaml +++ b/e2e2/test/cases/nvidia/manifests/mpi-job-nccl-test-multi-node.yaml @@ -1,9 +1,9 @@ apiVersion: kubeflow.org/v2beta1 kind: MPIJob metadata: - name: pytorch-training-multi-node + name: multi-node-nccl-test spec: - slotsPerWorker: 8 + slotsPerWorker: {{.GpuPerNode}} runPolicy: # it may take a bit for the workers to get ready (the container image is heavy) # and we don't want the launcher to reach it's CrashLoopBackoff limit in the meantime @@ -13,23 +13,23 @@ spec: Launcher: replicas: 1 template: - spec: + spec: restartPolicy: OnFailure containers: - - image: TODO + - image: {{.NcclTestImage}} imagePullPolicy: Always name: nccl-test-launcher env: - - name: XLA_FLAGS - value: "--xla_gpu_cuda_data_dir=/usr/local/cuda" - - name: TF_XLA_FLAGS - value: "--tf_xla_cpu_global_jit" + - name: XLA_FLAGS + value: "--xla_gpu_cuda_data_dir=/usr/local/cuda" + - name: TF_XLA_FLAGS + value: "--tf_xla_cpu_global_jit" command: - /opt/amazon/openmpi/bin/mpirun - --allow-run-as-root - --tag-output - -np - - "16" + - "{{.WorkerNodeGpuCount}}" - -bind-to - none - -map-by @@ -53,7 +53,7 @@ spec: - -x - FI_LOG_LEVEL=warn - -x - - FI_EFA_USE_DEVICE_RDMA=1 + - FI_EFA_USE_DEVICE_RDMA={{.EfaUseDeviceRdma}} - -x - OFI_NCCL_DISABLE_GDR_REQUIRED_CHECK=0 - -x @@ -69,16 +69,12 @@ spec: - 2G - -f - "2" - - -t - - "1" - - -g - - "1" - -c - "1" - -n - "100" Worker: - replicas: 2 + replicas: {{.WorkerNodeCount}} template: spec: volumes: @@ -86,20 +82,20 @@ spec: emptyDir: medium: Memory containers: - - image: TODO + - image: {{.NcclTestImage}} imagePullPolicy: Always - name: nccl-worker + name: nccl-test-worker volumeMounts: - mountPath: /dev/shm name: dshm resources: - limits: - nvidia.com/gpu: 8 + requests: + nvidia.com/gpu: {{.GpuPerNode}} hugepages-2Mi: 5120Mi - vpc.amazonaws.com/efa: 4 + vpc.amazonaws.com/efa: {{.EfaInterfacePerNode}} memory: 8000Mi - requests: - nvidia.com/gpu: 8 + limits: + nvidia.com/gpu: {{.GpuPerNode}} hugepages-2Mi: 5120Mi - vpc.amazonaws.com/efa: 4 + vpc.amazonaws.com/efa: {{.EfaInterfacePerNode}} memory: 8000Mi diff --git a/e2e2/test/cases/nvidia/mpi_test.go b/e2e2/test/cases/nvidia/mpi_test.go index 423519b52..46ed24c67 100644 --- a/e2e2/test/cases/nvidia/mpi_test.go +++ b/e2e2/test/cases/nvidia/mpi_test.go @@ -3,7 +3,7 @@ package nvidia import ( "context" _ "embed" - "slices" + "fmt" "testing" "time" @@ -15,7 +15,6 @@ import ( "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -23,12 +22,20 @@ import ( var ( //go:embed manifests/mpi-job-pytorch-training-single-node.yaml mpiJobPytorchTrainingSingleNodeManifest []byte - //go:embed manifests/mpi-job-pytorch-training-multi-node.yaml - mpiJobPytorchTrainingMultiNodeManifest []byte - //go:embed manifests/efa-device-plugin.yaml - efaDevicePluginManifest []byte + //go:embed manifests/mpi-job-nccl-test-multi-node.yaml + mpiJobNcclTestMultiNodeManifest []byte + renderedMpiJobNcclTestMultiNodeManifest []byte ) +type ncclTestManifestTplVars struct { + WorkerNodeCount int + WorkerNodeGpuCount int + GpuPerNode int + NcclTestImage string + EfaInterfacePerNode int + EfaUseDeviceRdma int +} + func TestMPIJobPytorchTraining(t *testing.T) { singleNode := features.New("single-node"). WithLabel("suite", "nvidia"). @@ -65,28 +72,32 @@ func TestMPIJobPytorchTraining(t *testing.T) { }). Feature() - manifestsMultiNode := [][]byte{ - efaDevicePluginManifest, - mpiJobPytorchTrainingMultiNodeManifest, - } - multiNode := features.New("multi-node"). WithLabel("suite", "nvidia"). WithLabel("hardware", "gpu"). WithLabel("hardware", "efa"). Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { - err := fwext.ApplyManifests(cfg.Client().RESTConfig(), manifestsMultiNode...) + if *ncclTestImage == "" { + t.Fatal(fmt.Errorf("efaImage must be set to run nccl test, use https://github.com/aws/aws-k8s-tester/blob/main/e2e2/test/images/Dockerfile.aws-efa-nccl-tests to build the image and -efaImage to set the image url")) + } + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start-nccl-base.html#nccl-start-base-test + var EfaUseDeviceRdma int + if *nodeType == "p4d.24xlarge" { + EfaUseDeviceRdma = 1 + } + renderedMpiJobNcclTestMultiNodeManifest, err := fwext.RenderManifests(mpiJobNcclTestMultiNodeManifest, ncclTestManifestTplVars{ + // one of the nodes will be used for the master pod + WorkerNodeCount: nodeCount - 1, + WorkerNodeGpuCount: (nodeCount - 1) * gpuPerNode, + GpuPerNode: gpuPerNode, + NcclTestImage: *ncclTestImage, + EfaInterfacePerNode: efaPerNode, + EfaUseDeviceRdma: EfaUseDeviceRdma, + }) if err != nil { t.Fatal(err) } - return ctx - }). - Assess("EFA device plugin daemonset ready", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { - ds := appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "efa-device-plugin-daemonset", Namespace: "kube-system"}, - } - err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds), - wait.WithTimeout(time.Minute*5)) + err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedMpiJobNcclTestMultiNodeManifest) if err != nil { t.Fatal(err) } @@ -98,7 +109,7 @@ func TestMPIJobPytorchTraining(t *testing.T) { t.Fatal(err) } j := kubeflowv2beta1.MPIJob{ - ObjectMeta: metav1.ObjectMeta{Name: "pytorch-training-multi-node", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "multi-node-nccl-test", Namespace: "default"}, } timeout := time.Minute * 10 err := wait.For(conditions.New(rsrc).ResourceMatch(&j, mpiJobSucceeded), @@ -109,8 +120,7 @@ func TestMPIJobPytorchTraining(t *testing.T) { return ctx }). Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { - slices.Reverse(manifestsMultiNode) - err := fwext.DeleteManifests(cfg.Client().RESTConfig(), manifestsMultiNode...) + err := fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedMpiJobNcclTestMultiNodeManifest) if err != nil { t.Fatal(err) }