Skip to content

Commit

Permalink
Integrate multi-node nccl testing into the tester package (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
weicongw authored Jun 14, 2024
1 parent cb6ad01 commit 56cbd21
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 48 deletions.
12 changes: 12 additions & 0 deletions e2e2/internal/framework_extensions/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frameworkext
import (
"bytes"
"context"
"html/template"
"io"
"os"

Expand Down Expand Up @@ -102,6 +103,17 @@ func deleteManifests(restConfig *rest.Config, manifests ...io.Reader) error {
return nil
}

// RenderManifests renders manifests with the supplied data
func RenderManifests(file []byte, templateData interface{}) ([]byte, error) {
tpl, err := template.New("Manifest").Parse(string(file))
if err != nil {
return nil, err
}
buf := bytes.Buffer{}
err = tpl.Execute(&buf, templateData)
return buf.Bytes(), err
}

func bytesSlicesToReaders(byteSlices ...[]byte) []io.Reader {
var readers []io.Reader
for _, b := range byteSlices {
Expand Down
75 changes: 73 additions & 2 deletions e2e2/test/cases/nvidia/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package nvidia
import (
"context"
_ "embed"
"flag"
"fmt"
"log"
"os"
"slices"
Expand All @@ -13,24 +15,36 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/klient/wait/conditions"
"sigs.k8s.io/e2e-framework/pkg/env"
"sigs.k8s.io/e2e-framework/pkg/envconf"
)

var (
testenv env.Environment
testenv env.Environment
nodeType *string
efaEnabled *bool
ncclTestImage *string
nodeCount int
gpuPerNode int
efaPerNode int
)

var (
//go:embed manifests/nvidia-device-plugin.yaml
nvidiaDevicePluginManifest []byte
//go:embed manifests/mpi-operator.yaml
mpiOperatorManifest []byte
//go:embed manifests/efa-device-plugin.yaml
efaDevicePluginManifest []byte
)

func TestMain(m *testing.M) {
nodeType = flag.String("nodeType", "", "node type for the tests")
ncclTestImage = flag.String("ncclTestImage", "", "nccl test image for nccl tests")
efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests")
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
Expand Down Expand Up @@ -73,12 +87,69 @@ func TestMain(m *testing.M) {
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
if err != nil {
return ctx, err
}
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctx, err
}
if *efaEnabled {
err := fwext.ApplyManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "efa-device-plugin-daemonset", Namespace: "kube-system"},
}
err = wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds),
wait.WithTimeout(time.Minute*5))
if err != nil {
return ctx, err
}
}

singleNodeType := true
for i := 1; i < len(nodes.Items)-1; i++ {
if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] {
singleNodeType = false
}
}
if !singleNodeType {
return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster")
}
if *nodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := v.Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
return ctx, nil
},
)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.DeleteManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}
slices.Reverse(manifests)
err := fwext.DeleteManifests(config.Client().RESTConfig(), manifests...)
err = fwext.DeleteManifests(config.Client().RESTConfig(), manifests...)
if err != nil {
return ctx, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
apiVersion: kubeflow.org/v2beta1
kind: MPIJob
metadata:
name: pytorch-training-multi-node
name: multi-node-nccl-test
spec:
slotsPerWorker: 8
slotsPerWorker: {{.GpuPerNode}}
runPolicy:
# it may take a bit for the workers to get ready (the container image is heavy)
# and we don't want the launcher to reach it's CrashLoopBackoff limit in the meantime
Expand All @@ -13,23 +13,23 @@ spec:
Launcher:
replicas: 1
template:
spec:
spec:
restartPolicy: OnFailure
containers:
- image: TODO
- image: {{.NcclTestImage}}
imagePullPolicy: Always
name: nccl-test-launcher
env:
- name: XLA_FLAGS
value: "--xla_gpu_cuda_data_dir=/usr/local/cuda"
- name: TF_XLA_FLAGS
value: "--tf_xla_cpu_global_jit"
- name: XLA_FLAGS
value: "--xla_gpu_cuda_data_dir=/usr/local/cuda"
- name: TF_XLA_FLAGS
value: "--tf_xla_cpu_global_jit"
command:
- /opt/amazon/openmpi/bin/mpirun
- --allow-run-as-root
- --tag-output
- -np
- "16"
- "{{.WorkerNodeGpuCount}}"
- -bind-to
- none
- -map-by
Expand All @@ -53,7 +53,7 @@ spec:
- -x
- FI_LOG_LEVEL=warn
- -x
- FI_EFA_USE_DEVICE_RDMA=1
- FI_EFA_USE_DEVICE_RDMA={{.EfaUseDeviceRdma}}
- -x
- OFI_NCCL_DISABLE_GDR_REQUIRED_CHECK=0
- -x
Expand All @@ -69,37 +69,33 @@ spec:
- 2G
- -f
- "2"
- -t
- "1"
- -g
- "1"
- -c
- "1"
- -n
- "100"
Worker:
replicas: 2
replicas: {{.WorkerNodeCount}}
template:
spec:
volumes:
- name: dshm
emptyDir:
medium: Memory
containers:
- image: TODO
- image: {{.NcclTestImage}}
imagePullPolicy: Always
name: nccl-worker
name: nccl-test-worker
volumeMounts:
- mountPath: /dev/shm
name: dshm
resources:
limits:
nvidia.com/gpu: 8
requests:
nvidia.com/gpu: {{.GpuPerNode}}
hugepages-2Mi: 5120Mi
vpc.amazonaws.com/efa: 4
vpc.amazonaws.com/efa: {{.EfaInterfacePerNode}}
memory: 8000Mi
requests:
nvidia.com/gpu: 8
limits:
nvidia.com/gpu: {{.GpuPerNode}}
hugepages-2Mi: 5120Mi
vpc.amazonaws.com/efa: 4
vpc.amazonaws.com/efa: {{.EfaInterfacePerNode}}
memory: 8000Mi
56 changes: 33 additions & 23 deletions e2e2/test/cases/nvidia/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package nvidia
import (
"context"
_ "embed"
"slices"
"fmt"
"testing"
"time"

Expand All @@ -15,20 +15,27 @@ import (
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
//go:embed manifests/mpi-job-pytorch-training-single-node.yaml
mpiJobPytorchTrainingSingleNodeManifest []byte
//go:embed manifests/mpi-job-pytorch-training-multi-node.yaml
mpiJobPytorchTrainingMultiNodeManifest []byte
//go:embed manifests/efa-device-plugin.yaml
efaDevicePluginManifest []byte
//go:embed manifests/mpi-job-nccl-test-multi-node.yaml
mpiJobNcclTestMultiNodeManifest []byte
renderedMpiJobNcclTestMultiNodeManifest []byte
)

type ncclTestManifestTplVars struct {
WorkerNodeCount int
WorkerNodeGpuCount int
GpuPerNode int
NcclTestImage string
EfaInterfacePerNode int
EfaUseDeviceRdma int
}

func TestMPIJobPytorchTraining(t *testing.T) {
singleNode := features.New("single-node").
WithLabel("suite", "nvidia").
Expand Down Expand Up @@ -65,28 +72,32 @@ func TestMPIJobPytorchTraining(t *testing.T) {
}).
Feature()

manifestsMultiNode := [][]byte{
efaDevicePluginManifest,
mpiJobPytorchTrainingMultiNodeManifest,
}

multiNode := features.New("multi-node").
WithLabel("suite", "nvidia").
WithLabel("hardware", "gpu").
WithLabel("hardware", "efa").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
err := fwext.ApplyManifests(cfg.Client().RESTConfig(), manifestsMultiNode...)
if *ncclTestImage == "" {
t.Fatal(fmt.Errorf("efaImage must be set to run nccl test, use https://github.com/aws/aws-k8s-tester/blob/main/e2e2/test/images/Dockerfile.aws-efa-nccl-tests to build the image and -efaImage to set the image url"))
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start-nccl-base.html#nccl-start-base-test
var EfaUseDeviceRdma int
if *nodeType == "p4d.24xlarge" {
EfaUseDeviceRdma = 1
}
renderedMpiJobNcclTestMultiNodeManifest, err := fwext.RenderManifests(mpiJobNcclTestMultiNodeManifest, ncclTestManifestTplVars{
// one of the nodes will be used for the master pod
WorkerNodeCount: nodeCount - 1,
WorkerNodeGpuCount: (nodeCount - 1) * gpuPerNode,
GpuPerNode: gpuPerNode,
NcclTestImage: *ncclTestImage,
EfaInterfacePerNode: efaPerNode,
EfaUseDeviceRdma: EfaUseDeviceRdma,
})
if err != nil {
t.Fatal(err)
}
return ctx
}).
Assess("EFA device plugin daemonset ready", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "efa-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds),
wait.WithTimeout(time.Minute*5))
err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedMpiJobNcclTestMultiNodeManifest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -98,7 +109,7 @@ func TestMPIJobPytorchTraining(t *testing.T) {
t.Fatal(err)
}
j := kubeflowv2beta1.MPIJob{
ObjectMeta: metav1.ObjectMeta{Name: "pytorch-training-multi-node", Namespace: "default"},
ObjectMeta: metav1.ObjectMeta{Name: "multi-node-nccl-test", Namespace: "default"},
}
timeout := time.Minute * 10
err := wait.For(conditions.New(rsrc).ResourceMatch(&j, mpiJobSucceeded),
Expand All @@ -109,8 +120,7 @@ func TestMPIJobPytorchTraining(t *testing.T) {
return ctx
}).
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
slices.Reverse(manifestsMultiNode)
err := fwext.DeleteManifests(cfg.Client().RESTConfig(), manifestsMultiNode...)
err := fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedMpiJobNcclTestMultiNodeManifest)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 56cbd21

Please sign in to comment.