diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/1-pretrain-trn1-raycluster.yaml b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/1-pretrain-trn1-raycluster.yaml new file mode 100644 index 000000000..b84e1fe6e --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/1-pretrain-trn1-raycluster.yaml @@ -0,0 +1,149 @@ +# This RayCluster configuration deploys a distributed training environment for Llama3 +# using AWS Neuron SDK and RayTrain on Amazon EKS. + +# ---------------------------------------------------------------------- +# NOTE: For detailed deployment instructions, refer to the DoEKS website (https://awslabs.github.io/data-on-eks/docs/category/training-on-eks). +# ---------------------------------------------------------------------- + +# ---------------------------------------------------------------------- +# NOTE: We are using the default namespace for this deployment because the fsx-claim PVC is created under the default namespace by the Terraform blueprint. +# If you want to deploy the cluster in a dedicated namespace, ensure that the FSX for Lustre file system is also created in the same namespace since PVCs are namespace-bound. +# ---------------------------------------------------------------------- + +# Docs for Volcano with KubeRay: https://docs.ray.io/en/master/cluster/kubernetes/k8s-ecosystem/volcano.html +--- +apiVersion: scheduling.volcano.sh/v1beta1 +kind: Queue +metadata: + name: llama3-training-queue + namespace: default +spec: + weight: 1 + capability: + cpu: '4000' + memory: 12000Gi + +--- +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: kuberay-trn1 + namespace: default + labels: + ray.io/scheduler-name: volcano + volcano.sh/queue-name: llama3-training-queue +spec: + rayVersion: 2.22.0 + headGroupSpec: + # Head Node Configuration + # This section defines the specification for the Ray head pod. + # The head node manages the cluster and provides services like the dashboard and GCS. + template: + spec: + containers: + - name: ray-head + image: .dkr.ecr.us-east-2.amazonaws.com/rayptl_pretrain_llama3.1:latest # Replace AWS Account ID + imagePullPolicy: Always # Pull the latest image each time + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "ray stop"] # Graceful shutdown of Ray processes + ports: + - containerPort: 8265 + name: dashboard # Expose Ray dashboard + - containerPort: 6379 + name: redis # Expose Redis port + - containerPort: 10001 + name: object-manager # Expose object manager port + resources: + requests: + cpu: 6 + memory: 30Gi + volumeMounts: + - mountPath: /tmp/ray + name: log-volume # Mount for Ray logs + - name: persistent-storage # Mount shared filesystem (FSx for Lustre) + mountPath: /shared + # Node Selector for Karpenter + # Karpenter will provision this head pod on a node with the specified labels. + nodeSelector: + instanceType: mixed-x86 + provisionerType: Karpenter + volumes: + - name: log-volume + emptyDir: {} + - name: persistent-storage + persistentVolumeClaim: + claimName: fsx-claim # Reference the PVC for shared storage + rayStartParams: + dashboard-host: 0.0.0.0 # Make dashboard accessible + + workerGroupSpecs: + # Worker Node Configuration + # This section defines the specification for the Ray worker pods. + # Worker nodes execute tasks and participate in distributed training. + - groupName: workergroup + replicas: 16 # Number of worker replicas + minReplicas: 16 # Minimum number of worker replicas + maxReplicas: 16 # Maximum number of worker replicas (no scaling in this case) + rayStartParams: {} + template: + spec: + containers: + - name: ray-worker + image: .dkr.ecr.us-east-2.amazonaws.com/rayptl_pretrain_llama3.1:latest # Replace AWS Account ID + imagePullPolicy: Always # Pull the latest image each time + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "ray stop"] + ports: + - containerPort: 8265 + name: dashboard + - containerPort: 6379 + name: redis + - containerPort: 10001 + name: object-manager + resources: + limits: + aws.amazon.com/neuron: '16' # Request AWS Neuron cores + vpc.amazonaws.com/efa: '8' # Request AWS EFA devices + memory: 440Gi + requests: + aws.amazon.com/neuron: '16' + vpc.amazonaws.com/efa: '8' + cpu: '120' + memory: 440Gi + volumeMounts: + - name: persistent-storage + mountPath: /shared # Mount shared filesystem (FSx for Lustre) + - name: dshm + mountPath: /dev/shm # Mount for shared memory + - mountPath: /tmp/ray + name: log-volume # Mount for Ray logs + # Node Selector for Managed Node Group (with Cluster Autoscaler) + # These workers will run on Trn1 instances provisioned by the cluster autoscaler. + # This is necessary as Karpenter doesn't currently support EFA (required for Neuron distributed training). + nodeSelector: + instance-type: trn1-32xl + provisioner: cluster-autoscaler + + # Tolerations for Trn1 and Dedicated Nodes + tolerations: + - key: "aws.amazon.com/neuron" + operator: "Exists" + effect: "NoSchedule" + - key: "hub.jupyter.org/dedicated" + operator: "Equal" + value: "user" + effect: "NoSchedule" + volumes: + # Persistent Volume Claim (PVC) to access the FSx for Lustre filesystem + - name: persistent-storage + persistentVolumeClaim: + claimName: fsx-claim + - name: dshm + emptyDir: + medium: Memory + - name: log-volume + emptyDir: {} diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/2-download_fineweb_dataset.yaml b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/2-download_fineweb_dataset.yaml new file mode 100644 index 000000000..67d1377e9 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/2-download_fineweb_dataset.yaml @@ -0,0 +1,34 @@ +# ---------------------------------------------------------------------------- +# RayJob: llama3-generate-pretraining-test-data +# +# Description: +# This RayJob is responsible for generating pre-training test data required for +# the Llama3 model training. It sources data from the specified dataset, processes +# it, and prepares it for use in subsequent training stages. The job runs a Python +# script (`get_dataset.py`) that performs these data preparation steps. + +# Usage: +# Apply this configuration to your Kubernetes cluster using `kubectl apply -f download_fineweb_dataset.yaml`. +# Ensure that the Ray cluster (`kuberay-trn1`) is running and accessible in the specified namespace. +# ---------------------------------------------------------------------------- + +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: grammarly-generate-pretraining-test-data + namespace: default +spec: + submissionMode: K8sJobMode + entrypoint: "mkdir -p /shared/llama3.1_config/ /shared/fineweb_llama3.1_tokenized /shared/checkpoint /shared/tblogs && cp config.json /shared/llama3.1_config/ && python3 get_dataset.py" + runtimeEnvYAML: | + working_dir: /grammarly_pretraining + env_vars: + PYTHONUNBUFFERED: '0' + resources: + requests: + cpu: "6" + memory: "30Gi" + clusterSelector: + ray.io/cluster: kuberay-trn1 + rayClusterNamespace: default # Replace with the namespace where your RayCluster is deployed + ttlSecondsAfterFinished: 60 # Time to live for the pod after completion (in seconds) diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/3-parallel-compile-trn1-rayjob.yaml b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/3-parallel-compile-trn1-rayjob.yaml new file mode 100644 index 000000000..4a3dadae8 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/3-parallel-compile-trn1-rayjob.yaml @@ -0,0 +1,28 @@ +# ---------------------------------------------------------------------------- +# RayJob: llama3-pretraining-job +# +# Description: +# This RayJob is responsible for neuron parallel compile step of the Llama3 model. +# This step is a pre-requisite for training the language model with the prepared dataset. + +# Usage: +# Apply this configuration to your Kubernetes cluster using `kubectl apply -f 3-parallel-compile-trn1-rayjob.yaml`. +# Ensure that the Ray cluster (`kuberay-trn1`) is running and accessible in the specified namespace. +# Uncomment the fields if you want the job to shut down after finishing or if you want to set a maximum runtime. +# ---------------------------------------------------------------------------- + +--- +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: llama3.1-parallel-compile-job +spec: + submissionMode: K8sJobMode + entrypoint: "NEURON_NUM_DEVICES=32 bash run_llama3.1_8b.sh -r 2 -n 16 -l 4e-4 -s 8192 -p 1" + runtimeEnvYAML: | + working_dir: /grammarly_pretraining + clusterSelector: + ray.io/cluster: kuberay-trn1 + rayClusterNamespace: default # Replace with the namespace where your RayCluster is deployed + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 60 # Time to live for the pod after completion (in seconds) diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/4-train-trn1-rayjob.yaml b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/4-train-trn1-rayjob.yaml new file mode 100644 index 000000000..dc684b896 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/4-train-trn1-rayjob.yaml @@ -0,0 +1,29 @@ +# ---------------------------------------------------------------------------- +# RayJob: llama3-pretraining-job +# +# Description: +# This RayJob is responsible for the main pretraining step of the Llama3 model. It runs a +# Python script (`ray_train_llama3.py`) to perform the pretraining using AWS Neuron devices. +# This step is critical for training the language model with the prepared dataset. + +# Usage: +# Apply this configuration to your Kubernetes cluster using `kubectl apply -f 4-grammarly-train-trn1-rayjob.yaml`. +# Ensure that the Ray cluster (`kuberay-trn1`) is running and accessible in the specified namespace. +# Uncomment the fields if you want the job to shut down after finishing or if you want to set a maximum runtime. +# ---------------------------------------------------------------------------- + +--- +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: llama3.1-training-job +spec: + submissionMode: K8sJobMode + entrypoint: "NEURON_NUM_DEVICES=32 bash run_llama3.1_8b.sh -w 500 -n 16 -l 4e-4 -s 8192" + runtimeEnvYAML: | + working_dir: /grammarly_pretraining + clusterSelector: + ray.io/cluster: kuberay-trn1 + rayClusterNamespace: default # Replace with the namespace where your RayCluster is deployed + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 60 # Time to live for the pod after completion (in seconds) diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/Dockerfile b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/Dockerfile new file mode 100644 index 000000000..2d03189aa --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/Dockerfile @@ -0,0 +1,34 @@ +ARG REGION + +# Base image: PyTorch training image for NeuronX +FROM public.ecr.aws/neuron/pytorch-training-neuronx:2.1.2-neuronx-py310-sdk2.20.0-ubuntu20.04 + +# Install Ray for distributed computing +RUN pip3 install aiohttp \ + && rm -rf /root/.cache + +# Install additional Python dependencies +RUN pip3 install wget awscli regex boto3 pyarrow \ + && rm -rf /root/.cache/ + +# Copy the Llama2 training code into the container +# (Separate layer to rebuild only if the code changes) +COPY ./llama3.1_pretraining /llama3.1_pretraining + +# Make shell scripts executable +RUN chmod +x /llama3.1_pretraining/run_llama3.1_8b.sh + +# Set the working directory +WORKDIR /llama3.1_pretraining + +# Installing the requirements +RUN pip install -r requirements.txt --extra-index-url https://pip.repos.neuron.amazonaws.com + +# Installing the requirements +# RUN pip install transformers==4.32.1 --no-warn-conflicts + +# # Installing neuronx-cc 2.0+ +RUN pip install neuronx-cc==2.* --extra-index-url https://pip.repos.neuron.amazonaws.com -U + +# Replacing libneuronxla with torch_neuronx for neuron parallel compile in Ray config file +COPY ./config.py /usr/local/lib/python3.10/site-packages/ray/train/torch/xla/config.py diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/build_docker.sh b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/build_docker.sh new file mode 100755 index 000000000..8975028f9 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/build_docker.sh @@ -0,0 +1,83 @@ +#!/bin/bash + +# This script automates the process of building a multi-architecture Docker image +# and pushing it to an Amazon ECR (Elastic Container Registry) repository. +# The script performs the following steps: +# 1. Ensures the script is run on an x86_64-based instance. +# 2. Checks if Docker is installed and running. +# 3. Verifies that AWS CLI is installed and configured. +# 4. Prompts the user for the desired AWS region. +# 5. Checks if the specified ECR repository exists, and creates it if it does not. +# 6. Logs into Amazon ECR. +# 7. Creates a Docker Buildx builder instance to support multi-architecture builds. +# 8. Builds and pushes a multi-architecture Docker image to the specified ECR repository. + +# Note: It is preferable to use AWS Cloud9 IDE with at least 100GB of storage for creating this image +# to avoid storage issues during the build process. You can use your local Mac or Windows machine, +# but ensure that you have enough memory and storage allocated for Docker to build this image. +# For more help, reach out to Perplexity or Google. + +# Replace with your desired repository name +ECR_REPO_NAME="rayptl_pretrain_llama3.1" + +# # Check that we are running on an x86_64 instance to avoid issues with docker build +# arch=$(uname -m) +# if [[ ! "$arch" = "x86_64" ]]; then +# echo "Error: please run this script on an x86_64-based instance" +# exit 1 +# fi + +# Check if docker is installed +junk=$(which docker 2>&1 > /dev/null) +if [[ "$?" -ne 0 ]]; then + echo "Error: please install docker and try again. ex: for AL2023 you can run:" + echo " sudo yum install docker -y" + echo " sudo systemctl start docker" + echo " sudo usermod -aG docker ec2-user" + echo " newgrp docker" + exit 1 +fi + +# Check that AWS CLI is installed and configured +junk=$(aws sts get-caller-identity) +if [[ "$?" -ne 0 ]]; then + echo "Error: please make sure that the AWS CLI is installed and configured using 'aws configure'." + exit 1 +fi + +# Prompt user for desired region +read -p "Enter the ECR region (ex: us-east-2): " region +echo $region > .eks_region + +# Check if the ECR repository exists +if aws ecr describe-repositories --repository-names "$ECR_REPO_NAME" --region "$region" >/dev/null 2>&1; then + echo "ECR repository '$ECR_REPO_NAME' already exists." + + # Get the ECR_REPO_URI for the existing repository + ECR_REPO_URI=$(aws ecr describe-repositories --repository-name "$ECR_REPO_NAME" --query 'repositories[0].repositoryUri' --region "$region" --output text) + echo "Repository URL: $ECR_REPO_URI" +else + # Create the ECR repository + aws ecr create-repository --repository-name "$ECR_REPO_NAME" --region "$region" + + # Get the ECR_REPO_URI for the newly created repository + ECR_REPO_URI=$(aws ecr describe-repositories --repository-name "$ECR_REPO_NAME" --query 'repositories[0].repositoryUri' --region "$region" --output text) + echo "ECR repository '$ECR_REPO_NAME' created successfully." + echo "Repository URL: $ECR_REPO_URI" +fi + +# Store ECR REPO URI for later use +echo $ECR_REPO_URI > .ecr_repo_uri + +# Login to ECR +echo -e "\nLogging in to ECR" +aws ecr get-login-password --region "$region" | docker login --username AWS --password-stdin $ECR_REPO_URI +aws ecr get-login-password --region "$region" | docker login --username AWS --password-stdin 763104351884.dkr.ecr.${region}.amazonaws.com/pytorch-training-neuronx + +# Create and use a new builder instance for multi-arch builds +docker buildx create --use --name mybuilder --driver docker-container +docker buildx inspect mybuilder --bootstrap + +echo -e "\nBuilding kuberay_trn1 docker image" \ + && docker buildx build --platform linux/amd64 -t $ECR_REPO_URI:latest --build-arg REGION=$region . --push \ + && echo -e "\nImage successfully pushed to ECR" diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/config.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/config.py new file mode 100644 index 000000000..80e94ea68 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/config.py @@ -0,0 +1,171 @@ +import logging +import os +import re +import shutil +import uuid +from dataclasses import dataclass + +import ray +from ray.train._internal.utils import get_address_and_port +from ray.train._internal.worker_group import WorkerGroup +from ray.train.backend import Backend +from ray.train.torch import TorchConfig +from ray.util import PublicAPI + +logger = logging.getLogger(__name__) + + +@PublicAPI(stability="alpha") +@dataclass +class TorchXLAConfig(TorchConfig): + """ + Configuration for torch XLA setup. + See https://pytorch.org/xla/release/1.13/index.html for more info. + Currently, only "neuron_cores" accelerator (AwsNeuronXLABackend) + is supported with xrt runtime. + """ + + neuron_parallel_compile: bool = False + + @property + def backend_cls(self): + return _TorchAwsNeuronXLABackend + + +def _kill_xrt_server(): + import subprocess + + subprocess.call(["pkill", "-f", "xrt_run_server"]) + + +def _set_xla_env_vars(): + # https://pytorch.org/docs/1.13/elastic/run.html#environment-variables + context = ray.train.get_context() + + os.environ["LOCAL_RANK"] = str(context.get_local_rank()) + os.environ["RANK"] = str(context.get_world_rank()) + os.environ["LOCAL_WORLD_SIZE"] = str(context.get_local_world_size()) + os.environ["WORLD_SIZE"] = str(context.get_world_size()) + os.environ["GROUP_RANK"] = str(context.get_node_rank()) + os.environ["GROUP_WORLD_SIZE"] = str( + context.get_world_size() / context.get_local_world_size() + ) + os.environ["ROLE_RANK"] = str(context.get_world_rank()) + os.environ["ROLE_WORLD_RANK"] = str(context.get_world_rank()) + os.environ["ROLE_WORLD_SIZE"] = str(context.get_world_size()) + + # EFA and XLA setup + # https://github.com/aws/libfabric/blob/master/prov/efa/src/rxr/rxr_init.c + # https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/training/dp_bert_hf_pretrain/run_dp_bert_large_hf_pretrain_bf16_s128.sh # noqa + os.environ["FI_PROVIDER"] = "efa" + os.environ["FI_EFA_USE_DEVICE_RDMA"] = "1" + os.environ["FI_EFA_FORK_SAFE"] = "1" + os.environ["XLA_TRANSFER_SEED_ASYNC"] = "1" + os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" + + +def _setup_xla_torch_process_group(): + try: + import torch.distributed as dist + import torch_xla.core.xla_model as xm # noqa F401 + import torch_xla.distributed.xla_backend # noqa F401 + + dist.init_process_group("xla") + except ImportError: + raise ImportError("torch_xla must be installed to use torch_xla backend.") + + +# The following env vars enable Neuron graph extraction for parallel compilation +# Note: model outputs are invalid and should be ignored while these env vars are set +def _set_neuron_parallel_compile_env_vars(): + os.environ["NEURON_PARALLEL_COMPILE"] = "1" + os.environ["NEURON_EXTRACT_GRAPHS_ONLY"] = "1" + os.environ["NEURON_FALL_BACK_TO_NULL_NEFF"] = "1" + + +# Compile previously extracted Neuron graphs +def _neuron_compile_extracted_graphs(): + try: + #from libneuronxla.neuron_cc_cache import CacheUrl + #from libneuronxla.neuron_parallel_compile import parallel_compile + from torch_neuronx.parallel_compile.neuron_parallel_compile import CacheUrl + from torch_neuronx.parallel_compile.neuron_parallel_compile import parallel_compile + except ImportError: + raise ImportError( + "libneuronxla must be installed to use Neuron parallel compilation." + ) + + # Only 1 worker per node should run parallel_compile() + if os.environ.get("LOCAL_RANK") == "0": + logger.info("Compiling extracted graphs on local rank0 worker") + + parallel_compile_workdir = ( + f"/tmp/{os.environ.get('USER','no-user')}/parallel_compile_workdir/" + ) + if os.path.exists(parallel_compile_workdir): + shutil.rmtree(parallel_compile_workdir) + os.makedirs(parallel_compile_workdir, exist_ok=True) + + # Users can set the cache directory using --cache_dir in NEURON_CC_FLAGS or by + # using NEURON_COMPILE_CACHE_URL. --cache_dir takes precedence. + explicit_cache_dir = None + if neuron_cc_flags := os.environ.get("NEURON_CC_FLAGS"): + if s := re.search(r"--cache_dir[= ](\S+)", neuron_cc_flags): + explicit_cache_dir = s.group(1) + + parallel_compile( + parallel_compile_workdir, + CacheUrl.get_cache_url(explicit_cache_dir), + ) + + +class _TorchAwsNeuronXLABackend(Backend): + unique_run_id: str = str(uuid.uuid4()) + + def on_start(self, worker_group: WorkerGroup, backend_config: TorchXLAConfig): + """Logic ran right before training is started.""" + + # On previous worker failure, we don't run graceful shutdown on workers. + # This would leak any running xrt server. + worker_group.execute(_kill_xrt_server) + + # Get master address and port from the first worker. + master_addr, master_port = worker_group.execute_single(0, get_address_and_port) + + def set_env_vars(addr, port): + os.environ["MASTER_ADDR"] = addr + os.environ["MASTER_PORT"] = str(port) + # To trigger the xrt server + os.environ["TORCHELASTIC_RUN_ID"] = self.unique_run_id + + # Set the env vars on all workers. + worker_group.execute(set_env_vars, addr=master_addr, port=master_port) + + # Set up env vars for neuron parallel compilation graph extraction + if backend_config.neuron_parallel_compile: + logger.info("Extracting graphs for Neuron parallel compilation") + worker_group.execute(_set_neuron_parallel_compile_env_vars) + + def on_training_start( + self, worker_group: WorkerGroup, backend_config: TorchXLAConfig + ): + """ + Configure the environment variables for the worker group. + And initialize the xla distributed process group. + TODO: Current setup only supports homogenous cluster with + neuron_cores accelerator and xrt runtime. + """ + worker_group.execute(_set_xla_env_vars) + worker_group.execute(_setup_xla_torch_process_group) + + def on_shutdown(self, worker_group: WorkerGroup, backend_config: TorchXLAConfig): + """ + Logic ran right after training is finished. + This is a sanity cleanup to kill xrt server, and to optionally + run neuron parallel graph compilation + """ + worker_group.execute(_kill_xrt_server) + + # Compile the extracted graphs. This must run at end of training. + if backend_config.neuron_parallel_compile: + worker_group.execute(_neuron_compile_extracted_graphs) diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/compile_llama3.1_8b.sh b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/compile_llama3.1_8b.sh new file mode 100755 index 000000000..31ef28e2d --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/compile_llama3.1_8b.sh @@ -0,0 +1,162 @@ +#!/bin/bash + +############################################# +# User defined parameters and env vars + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +export NEURON_CC_FLAGS="--model-type transformer --distribution-strategy=llm-training --cache_dir=/shared/neuron_compile_cache/" +export NEURON_FUSE_SOFTMAX=1 + +# Async Runtime +export NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS=3 + +# HOST OOM +export MALLOC_ARENA_MAX=64 + +##############################DEFAULT VALUES########################################### +# TP degree +TP_DEGREE=32 + +KV_REPLI=`expr $TP_DEGREE / 8` +# 0: bf16; 1: mixed precision +USE_MIX_PRECISION=1 +# 0: use pure DP; 1: use ZeRO-1 +USE_ZERO_1=1 +# global batch size +: ${GBS:=1024} +# micro batch size +MBS=1 +# number of steps to run +MAX_STEPS=10000 +# warmup steps +WARMUP_STEPS=100 +# learning rate +#LR=3.0e-4 +LR=1.5e-4 +# model path +#MODEL_PATH=$SCRIPT_DIR +MODEL_PATH="/shared/llama3.1_config/" +# data path +DATA_PATH="/shared/fineweb_llama3.1_tokenized" +#Checkpoint dir +CKPT_DIR="/shared/checkpoint" +# sequence length +SEQ_LEN=8192 +# Number of epochs +EPOCHS=1 +#Number of Nodes +NUM_NODES=2 +#Tensorboard logs dir +TB_DIR="/shared/tblogs" +#Steps this run +STEPS_THIS_RUN=-1 +#output log +ts=`date +%y_%m_%d_%H:%M:%S` +OUTPUT_LOG="/shared/llama3.1-8b-pretrain-$ts.log" +##############################DEFAULT VALUES########################################### + +#Args +Help() +{ + # Display Help + echo "Add description of the script functions here." + echo + echo "Syntax: $0 [-t|w|l|m|d|c|s|n]" + echo "options:" + echo "t total number of training steps" + echo "w warmup steps for training" + echo "l learning rate" + echo "m abs path to llama config.json" + echo "d abs path to tokenized dataset" + echo "c abs path to checkpoint directory" + echo "s Sequence length" + echo "n Number of instances to run training" + echo "b tensor board logs location" + echo "r defining steps this run" + echo +} +while getopts t:w:l:m:d:c:s:n:b:r:h flag +do + case "${flag}" in + t) MAX_STEPS=${OPTARG};; + w) WARMUP_STEPS=${OPTARG};; + l) LR=${OPTARG};; + m) MODEL_PATH=${OPTARG};; + d) DATA_PATH=${OPTARG};; + c) CKPT_DIR=${OPTARG};; + s) SEQ_LEN=${OPTARG};; + n) NUM_NODES=${OPTARG};; + b) TB_DIR=${OPTARG};; + r) STEPS_THIS_RUN=${OPTARG};; + h) Help + exit;; + #\?) # Invalid option + # echo "Error: Invalid option" + # exit;; + esac +done +############################################# + +export NUM_NEURONCORES=32 +#DISTRIBUTED_ARGS="--nproc_per_node $NUM_NEURONCORES" + + +#sudo sysctl -w net.ipv4.ip_local_reserved_ports=44000,48620 + +export NEURON_RT_NUM_CORES=32 +export NUM_NEURONCORES=$NEURON_RT_NUM_CORES +export TPU_NUM_DEVICES=$NEURON_RT_NUM_CORES +export TPU_CHIPS_PER_HOST_BOUNDS=$NEURON_RT_NUM_CORES + +############################################# + + +#DP=$(($NEURON_RT_NUM_CORES * $WORLD_SIZE / $TP_DEGREE)) +DP=$(($NEURON_RT_NUM_CORES * $NUM_NODES / $TP_DEGREE)) +ACC_STEPS=$(($GBS / $MBS / $DP)) +#ACC_STEPS=$(($GBS / $MBS / $DP)) + + +echo TP_DEGREE=$TP_DEGREE +echo GBS=$GBS +echo MBS=$MBS +echo MAX_STEPS=$MAX_STEPS +echo WARMUP_STEPS=$WARMUP_STEPS +echo LR=$LR +echo MODEL_PATH=$MODEL_PATH +echo DATA_PATH=$DATA_PATH +echo SEQ_LEN=$SEQ_LEN +echo CKPT_DIR=$CKPT_DIR +echo DP=$DP +echo ACC_STEPS=$ACC_STEPS +echo STEPS_THIS_RUN=$STEPS_THIS_RUN +echo NUM_NODES=$NUM_NODES +echo TB_DIR=$TB_DIR +echo OUTPUT_LOG=$OUTPUT_LOG + +neuron_parallel_compile python $DISTRIBUTED_ARGS \ + ray_train_llama3.py \ + --model_path $MODEL_PATH \ + --num_nodes $NUM_NODES \ + --tb_dir $TB_DIR \ + --data_dir $DATA_PATH \ + --tensor_parallel_size $TP_DEGREE \ + --train_batch_size $MBS \ + --steps_this_run $STEPS_THIS_RUN \ + --max_steps $MAX_STEPS \ + --warmup_steps $WARMUP_STEPS \ + --lr $LR \ + --grad_accum_usteps $ACC_STEPS \ + --seq_len $SEQ_LEN \ + --use_sequence_parallel 1 \ + --use_selective_checkpoint 1 \ + --use_fp32_optimizer $USE_MIX_PRECISION \ + --use_zero1_optimizer $USE_ZERO_1 \ + --scheduler_type 'linear' \ + --qkv_linear 1 \ + --kv_replicator $KV_REPLI \ + --num_train_epochs $EPOCHS \ + --save_checkpoint \ + --use_flash_attention 1 |& tee $OUTPUT_LOG +exit ${PIPESTATUS[0]} diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/config.json b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/config.json new file mode 100644 index 000000000..535f38352 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/config.json @@ -0,0 +1,32 @@ +{ + "architectures": [ + "LlamaForCausalLM" + ], + "attention_bias": false, + "attention_dropout": 0.0, + "bos_token_id": 128000, + "eos_token_id": 128001, + "hidden_act": "silu", + "hidden_size": 4096, + "initializer_range": 0.02, + "intermediate_size": 14336, + "max_position_embeddings": 131072, + "mlp_bias": false, + "model_type": "llama", + "num_attention_heads": 32, + "num_hidden_layers": 32, + "num_key_value_heads": 8, + "pad_token_id": 0, + "pretraining_tp": 1, + "rms_norm_eps": 1e-05, + "rope_scaling": null, + "rope_theta": 500000.0, + "tie_word_embeddings": false, + "torch_dtype": "bfloat16", + "transformers_version": "4.31.0", + "use_cache": true, + "vocab_size": 128256, + "sequence_parallel_enabled": false, + "selective_checkpoint_enabled": false, + "move_model_to_device":false +} diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/data_module.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/data_module.py new file mode 100644 index 000000000..f70b9c4b8 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/data_module.py @@ -0,0 +1,44 @@ +from typing import Callable, Dict, Tuple + +from pytorch_lightning import LightningDataModule + + +class NeuronLightningDataModule(LightningDataModule): + def __init__( + self, + dataloader_fn: Callable, + data_dir: str, + batch_size: int, + data_args: Tuple = (), + data_kwargs: Dict = {}, + ): + super().__init__() + self.dataloader_fn = dataloader_fn + self.data_dir = data_dir + self.batch_size = batch_size + self.data_args = (data_args,) + self.data_kwargs = data_kwargs + + def setup(self, stage: str): + pass + + def train_dataloader(self): + return self.dataloader_fn( + self.data_dir, + self.batch_size, + self.trainer.strategy.data_parallel_size, + self.trainer.strategy.data_parallel_rank, + *self.data_args, + **self.data_kwargs + )[0] + + def test_dataloader(self): + return self.dataloader_fn( + self.data_dir, + self.batch_size, + self.trainer.strategy.data_parallel_size, + self.trainer.strategy.data_parallel_rank, + *self.data_args, + **self.data_kwargs + )[1] + diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/get_dataset.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/get_dataset.py new file mode 100644 index 000000000..d6cc3973f --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/get_dataset.py @@ -0,0 +1,71 @@ +from datasets import load_dataset +import os +import argparse +from itertools import chain +from transformers import AutoTokenizer + +parser = argparse.ArgumentParser() +parser.add_argument('--llama-version', type=int, default=3.1, help='LLaMA version (default: 3.1)') + +args = parser.parse_args() +llama_version = args.llama_version + +dataset_name = "HuggingFaceFW/fineweb" +#local_download_dir = "~/fineweb/" +sample = "sample-10BT" + +block_size = 8192 +save_path = "/shared/fineweb_llama3.1_tokenized" +target_dataset_path = save_path + "/" + dataset_name + +model_id = "NousResearch/Meta-Llama-3.1-8B" + +save_path = os.path.expanduser(save_path) +if not os.path.exists(save_path): + os.makedirs(save_path) + +raw_datasets = load_dataset(dataset_name, cache_dir=target_dataset_path, name=sample, split="train") +tokenizer = AutoTokenizer.from_pretrained(model_id) + +column_names = raw_datasets.column_names +text_column_name = "text" if "text" in column_names else column_names[0] + +def tokenize_function(examples): + return tokenizer(examples[text_column_name]) + +tokenized_datasets = raw_datasets.map( + tokenize_function, + batched=True, + remove_columns=column_names, +) + +if block_size > tokenizer.model_max_length: + print("block_size > tokenizer.model_max_length") +block_size = min(block_size, tokenizer.model_max_length) + + +# Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size. +def group_texts(examples): + # Concatenate all texts. + concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} + total_length = len(concatenated_examples[list(examples.keys())[0]]) + # We drop the small remainder, and if the total_length < block_size we exclude this batch and return an empty dict. + # We could add padding if the model supported it instead of this drop, you can customize this part to your needs. + total_length = (total_length // block_size) * block_size + # Split by chunks of max_len. + result = { + k: [t[i : i + block_size] for i in range(0, total_length, block_size)] for k, t in concatenated_examples.items() + } + result["labels"] = result["input_ids"].copy() + return result + + +train_dataset = tokenized_datasets.map( + group_texts, + batched=True, +) + +print(len(train_dataset)) + +train_dataset.save_to_disk(save_path) + diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/modeling_llama_nxd.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/modeling_llama_nxd.py new file mode 100644 index 000000000..f1ec2f694 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/modeling_llama_nxd.py @@ -0,0 +1,774 @@ +# coding=utf-8 +# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved. +# +# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX +# and OPT implementations in this library. It has been modified from its +# original forms to accommodate minor architectural differences compared +# to GPT-NeoX and OPT used by the Meta AI team that trained the model. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" PyTorch LLaMA model.""" +import math +from functools import partial +from typing import List, Optional, Tuple, Union + +import torch +import torch.nn.functional as F +import torch.utils.checkpoint +import torch_xla.core.xla_model as xm +from packaging import version +from torch import nn +from transformers.activations import ACT2FN +from transformers.modeling_outputs import ( + BaseModelOutputWithPast, + CausalLMOutputWithPast, +) +from transformers.models.llama.configuration_llama import LlamaConfig +from transformers.models.llama.modeling_llama import ( + LLAMA_INPUTS_DOCSTRING, + LLAMA_START_DOCSTRING, +) +from transformers.models.llama.modeling_llama import LlamaAttention as LlamaAttentionHF +from transformers.models.llama.modeling_llama import ( + LlamaDecoderLayer as LlamaDecoderLayerHF, +) +from transformers.models.llama.modeling_llama import ( + LlamaForCausalLM as LlamaForCausalLMHF, +) +from transformers.models.llama.modeling_llama import ( + LlamaForSequenceClassification, + LlamaLinearScalingRotaryEmbedding, +) +from transformers.models.llama.modeling_llama import LlamaMLP as LlamaMLPHF +from transformers.models.llama.modeling_llama import LlamaModel as LlamaModelHF +from transformers.models.llama.modeling_llama import LlamaPreTrainedModel +from transformers.models.llama.modeling_llama import LlamaRMSNorm as LlamaRMSNormHF +from transformers.models.llama.modeling_llama import ( + LlamaRotaryEmbedding, + apply_rotary_pos_emb, + repeat_kv, + rotate_half, +) +from transformers.utils import ( + add_start_docstrings, + add_start_docstrings_to_model_forward, + logging, + replace_return_docstrings, +) + +import neuronx_distributed.parallel_layers.utils as neuronx_dist_utils +from neuronx_distributed.kernels.flash_attn import nki_flash_attn_func +from neuronx_distributed.modules.qkv_linear import GQAQKVColumnParallelLinear +from neuronx_distributed.parallel_layers import mappings +from neuronx_distributed.parallel_layers.layers import ( + ColumnParallelLinear, + ParallelEmbedding, + RowParallelLinear, +) +from neuronx_distributed.parallel_layers.loss_functions import parallel_cross_entropy +from neuronx_distributed.parallel_layers.parallel_state import ( + get_tensor_model_parallel_size, +) +from neuronx_distributed.utils.model_utils import move_model_to_device + + +def _init_normal(std, w): + return nn.init.normal_(w, mean=0.0, std=std) + + +if version.parse(torch.__version__) >= version.parse("2.1"): + from torch_xla.utils.checkpoint import checkpoint + + checkpoint_method = checkpoint +else: + checkpoint_method = torch.utils.checkpoint.checkpoint + +logger = logging.get_logger(__name__) + +_CONFIG_FOR_DOC = "LlamaConfig" + + +# Copied from transformers.models.bart.modeling_bart._make_causal_mask +def _make_causal_mask( + input_ids_shape: torch.Size, dtype: torch.dtype, device: torch.device, past_key_values_length: int = 0 +): + """ + Make causal mask used for bi-directional self-attention. + """ + bsz, tgt_len = input_ids_shape + mask = torch.full((tgt_len, tgt_len), torch.finfo(dtype).min, device=device) + mask_cond = torch.arange(mask.size(-1), device=device) + mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0) + mask = mask.to(dtype) + + if past_key_values_length > 0: + mask = torch.cat([torch.zeros(tgt_len, past_key_values_length, dtype=dtype, device=device), mask], dim=-1) + return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len + past_key_values_length) + + +# Copied from transformers.models.bart.modeling_bart._expand_mask +def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None): + """ + Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. + """ + bsz, src_len = mask.size() + tgt_len = tgt_len if tgt_len is not None else src_len + + expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype) + + inverted_mask = 1.0 - expanded_mask + + return inverted_mask.masked_fill(inverted_mask.to(torch.bool), torch.finfo(dtype).min) + + +class LlamaRMSNorm(LlamaRMSNormHF): + def __init__(self, hidden_size, eps=1e-6, sequence_parallel_enabled=False): + """ + LlamaRMSNorm is equivalent to T5LayerNorm + """ + super().__init__(hidden_size, eps=eps) + setattr(self.weight, "sequence_parallel_enabled", sequence_parallel_enabled) + + def forward(self, hidden_states): + input_dtype = hidden_states.dtype + + hidden_states = hidden_states.to(torch.double) + + variance = hidden_states.pow(2).mean(-1, keepdim=True) + hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) + return self.weight * hidden_states.to(input_dtype) + + +class LlamaMLP(LlamaMLPHF): + def __init__(self, config): + nn.Module.__init__(self) + self.config = config + self.pretraining_tp = config.pretraining_tp + self.hidden_size = config.hidden_size + self.intermediate_size = config.intermediate_size + self.act_fn = ACT2FN[config.hidden_act] + + init_method = partial(_init_normal, config.initializer_range) + self.gate_up_proj = ColumnParallelLinear( + self.hidden_size, + 2 * self.intermediate_size, + stride=2, + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.down_proj = RowParallelLinear( + self.intermediate_size, + self.hidden_size, + bias=False, + input_is_parallel=True, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.split_size = self.intermediate_size // get_tensor_model_parallel_size() + if config.move_model_to_device: + move_model_to_device(self, xm.xla_device()) + + def forward(self, x): + if self.pretraining_tp > 1: + slice = self.intermediate_size // self.pretraining_tp + gate_proj_slices = self.gate_proj.weight.split(slice, dim=0) + up_proj_slices = self.up_proj.weight.split(slice, dim=0) + down_proj_slices = self.down_proj.weight.split(slice, dim=1) + + gate_proj = torch.cat([F.linear(x, gate_proj_slices[i]) for i in range(self.pretraining_tp)], dim=-1) + up_proj = torch.cat([F.linear(x, up_proj_slices[i]) for i in range(self.pretraining_tp)], dim=-1) + + intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2) + down_proj = [F.linear(intermediate_states[i], down_proj_slices[i]) for i in range(self.pretraining_tp)] + down_proj = sum(down_proj) + else: + gate_proj, up_proj = self.gate_up_proj(x).split(self.split_size, dim=2) + + def activation_mlp(gate_proj, up_proj): + activation_output = self.act_fn(gate_proj) + return activation_output * up_proj + + # We checkpoint the MLP compute too, since we see extra data movement which is more + # expensive than the recompute in this case. + if self.config.selective_checkpoint_enabled: + intermediate_states = checkpoint_method(activation_mlp, gate_proj, up_proj) + else: + intermediate_states = self.act_fn(gate_proj) * up_proj + down_proj = self.down_proj(intermediate_states) + + return down_proj + + +class CoreAttention(nn.Module): + def __init__(self): + super().__init__() + + def forward(self, query_states, key_states, value_states): + bsz, num_heads, q_len, head_dim = query_states.shape + kv_seq_len = key_states.shape[-2] + attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(head_dim) + + if attn_weights.size() != (bsz, num_heads, q_len, kv_seq_len): + raise ValueError( + f"Attention weights should be of size {(bsz, num_heads, q_len, kv_seq_len)}, but is" + f" {attn_weights.size()}" + ) + + causal_mask = torch.triu(torch.ones((1, 1, q_len, kv_seq_len), device="xla"), diagonal=1).bool() + attn_weights = attn_weights.masked_fill_(causal_mask, -10000.0) + + attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.double).to(query_states.dtype) + + attn_output = torch.matmul(attn_weights, value_states) + return attn_output + + +class LlamaAttention(LlamaAttentionHF): + """Multi-headed attention from 'Attention Is All You Need' paper""" + + def __init__(self, config: LlamaConfig): + nn.Module.__init__(self) + self.config = config + self.hidden_size = config.hidden_size + self.num_heads = config.num_attention_heads + self.head_dim = self.hidden_size // self.num_heads + self.num_key_value_heads = config.num_key_value_heads + self.num_key_value_groups = self.num_heads // self.num_key_value_heads + self.pretraining_tp = config.pretraining_tp + self.max_position_embeddings = config.max_position_embeddings + self.rope_theta = config.rope_theta + + if not hasattr(config, "kv_shared_group_size"): + config.kv_shared_group_size = 1 + + if not hasattr(config, "qkv_linear"): + config.qkv_linear = False + + if not hasattr(config, "separate_qkv"): + config.separate_qkv = False + + if not hasattr(config, "use_flash_attention"): + self.use_flash_attention = False + else: + self.use_flash_attention = config.use_flash_attention + + if (self.head_dim * self.num_heads) != self.hidden_size: + raise ValueError( + f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}" + f" and `num_heads`: {self.num_heads})." + ) + self._init_rope() + + init_method = partial(_init_normal, config.initializer_range) + if not self.config.separate_qkv and self.num_heads == self.num_key_value_heads: + self.qkv_proj = ColumnParallelLinear( + self.hidden_size, + 3 * self.num_heads * self.head_dim, + stride=3, + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.split_size = self.num_heads * self.head_dim // get_tensor_model_parallel_size() + elif self.config.qkv_linear: + self.qkv_proj = GQAQKVColumnParallelLinear( + self.hidden_size, + [self.num_heads * self.head_dim, self.num_key_value_heads * self.head_dim], + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + kv_size_multiplier=self.config.kv_shared_group_size, + ) + else: + self.q_proj = ColumnParallelLinear( + self.hidden_size, + self.num_heads * self.head_dim, + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.k_proj = ColumnParallelLinear( + self.hidden_size, + self.num_key_value_heads * self.head_dim, + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.v_proj = ColumnParallelLinear( + self.hidden_size, + self.num_key_value_heads * self.head_dim, + bias=False, + gather_output=False, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.o_proj = RowParallelLinear( + self.num_heads * self.head_dim, + self.hidden_size, + bias=False, + input_is_parallel=True, + init_method=init_method, + sequence_parallel_enabled=self.config.sequence_parallel_enabled, + ) + self.num_heads = neuronx_dist_utils.divide(config.num_attention_heads, get_tensor_model_parallel_size()) + self.num_key_value_heads = neuronx_dist_utils.divide( + config.num_key_value_heads * self.config.kv_shared_group_size, get_tensor_model_parallel_size() + ) + self.num_key_value_groups = self.num_heads // self.num_key_value_heads + + self.core_attn = CoreAttention() + + if config.move_model_to_device: + move_model_to_device(self, xm.xla_device()) + + def forward( + self, + hidden_states: torch.Tensor, + attention_mask: Optional[torch.Tensor] = None, + position_ids: Optional[torch.LongTensor] = None, + past_key_value: Optional[Tuple[torch.Tensor]] = None, + output_attentions: bool = False, + use_cache: bool = False, + ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: + assert use_cache is False, "KV-Cache flow is not fully supported" + bsz, q_len, _ = hidden_states.size() + + if self.config.sequence_parallel_enabled: + q_len, bsz, _ = hidden_states.size() + q_len = q_len * get_tensor_model_parallel_size() + + if self.pretraining_tp > 1: + key_value_slicing = (self.num_key_value_heads * self.head_dim) // self.pretraining_tp + query_slices = self.q_proj.weight.split((self.num_heads * self.head_dim) // self.pretraining_tp, dim=0) + key_slices = self.k_proj.weight.split(key_value_slicing, dim=0) + value_slices = self.v_proj.weight.split(key_value_slicing, dim=0) + + query_states = [F.linear(hidden_states, query_slices[i]) for i in range(self.pretraining_tp)] + query_states = torch.cat(query_states, dim=-1) + + key_states = [F.linear(hidden_states, key_slices[i]) for i in range(self.pretraining_tp)] + key_states = torch.cat(key_states, dim=-1) + + value_states = [F.linear(hidden_states, value_slices[i]) for i in range(self.pretraining_tp)] + value_states = torch.cat(value_states, dim=-1) + + else: + if ( + not self.config.separate_qkv + and self.num_heads == self.num_key_value_heads + and self.config.kv_shared_group_size == 1 + ): + qkv_states = self.qkv_proj(hidden_states) + query_states, key_states, value_states = qkv_states.split(self.split_size, dim=2) + elif self.config.qkv_linear: + query_states, key_states, value_states = self.qkv_proj(hidden_states) + else: + query_states = self.q_proj(hidden_states) + key_states = self.k_proj(hidden_states) + value_states = self.v_proj(hidden_states) + + if self.config.sequence_parallel_enabled: + query_states = query_states.view(q_len, bsz, self.num_heads, self.head_dim).permute(1, 2, 0, 3) + key_states = key_states.view(q_len, bsz, self.num_key_value_heads, self.head_dim).permute(1, 2, 0, 3) + value_states = value_states.view(q_len, bsz, self.num_key_value_heads, self.head_dim).permute(1, 2, 0, 3) + else: + query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) + key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) + value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) + + kv_seq_len = key_states.shape[-2] + if past_key_value is not None: + kv_seq_len += past_key_value[0].shape[-2] + cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len) + query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) + + if past_key_value is not None: + # reuse k, v, self_attention + key_states = torch.cat([past_key_value[0], key_states], dim=2) + value_states = torch.cat([past_key_value[1], value_states], dim=2) + + past_key_value = (key_states, value_states) if use_cache else None + + # repeat k/v heads if n_kv_heads < n_heads + key_states = repeat_kv(key_states, self.num_key_value_groups) + value_states = repeat_kv(value_states, self.num_key_value_groups) + + attn_output = ( + nki_flash_attn_func(query_states, key_states, value_states) + if self.use_flash_attention + else self.core_attn(query_states, key_states, value_states) + ) + + if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim): + raise ValueError( + f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is" + f" {attn_output.size()}" + ) + + if self.config.sequence_parallel_enabled: + attn_output = attn_output.permute(2, 0, 1, 3) + attn_output = attn_output.reshape(q_len, bsz, self.hidden_size // get_tensor_model_parallel_size()) + else: + attn_output = attn_output.transpose(1, 2).contiguous() + attn_output = attn_output.reshape(bsz, q_len, self.hidden_size // get_tensor_model_parallel_size()) + + if self.pretraining_tp > 1: + attn_output = attn_output.split(self.hidden_size // self.pretraining_tp, dim=2) + o_proj_slices = self.o_proj.weight.split(self.hidden_size // self.pretraining_tp, dim=1) + attn_output = sum([F.linear(attn_output[i], o_proj_slices[i]) for i in range(self.pretraining_tp)]) + else: + attn_output = self.o_proj(attn_output) + + if not output_attentions: + attn_weights = None + + return attn_output, attn_weights, past_key_value + + +class LlamaDecoderLayer(LlamaDecoderLayerHF): + def __init__(self, config: LlamaConfig): + nn.Module.__init__(self) + self.hidden_size = config.hidden_size + self.self_attn = LlamaAttention(config=config) + self.mlp = LlamaMLP(config) + self.input_layernorm = LlamaRMSNorm( + config.hidden_size, eps=config.rms_norm_eps, sequence_parallel_enabled=config.sequence_parallel_enabled + ) + self.post_attention_layernorm = LlamaRMSNorm( + config.hidden_size, eps=config.rms_norm_eps, sequence_parallel_enabled=config.sequence_parallel_enabled + ) + + +@add_start_docstrings( + "The bare LLaMA Model outputting raw hidden-states without any specific head on top.", + LLAMA_START_DOCSTRING, +) +class LlamaModel(LlamaModelHF): + """ + Transformer decoder consisting of *config.num_hidden_layers* layers. Each layer is a [`LlamaDecoderLayer`] + + Args: + config: LlamaConfig + """ + + def __init__(self, config: LlamaConfig): + LlamaPreTrainedModel.__init__(self, config) + self.padding_idx = config.pad_token_id + self.vocab_size = config.vocab_size + + init_method = partial(_init_normal, config.initializer_range) + self.embed_tokens = ParallelEmbedding( + config.vocab_size, config.hidden_size, self.padding_idx, init_method=init_method + ) + self.layers = nn.ModuleList([LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)]) + self.norm = LlamaRMSNorm( + config.hidden_size, eps=config.rms_norm_eps, sequence_parallel_enabled=config.sequence_parallel_enabled + ) + + self.gradient_checkpointing = False + # Initialize weights and apply final processing + self.post_init() + + # Copied from transformers.models.bart.modeling_bart.BartDecoder._prepare_decoder_attention_mask + def _prepare_decoder_attention_mask(self, attention_mask, input_shape, inputs_embeds, past_key_values_length): + # create causal mask + # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len] + combined_attention_mask = None + if input_shape[-1] > 1: + combined_attention_mask = _make_causal_mask( + input_shape, + inputs_embeds.dtype, + device=inputs_embeds.device, + past_key_values_length=past_key_values_length, + ) + + if attention_mask is not None: + pass + + return combined_attention_mask + + @add_start_docstrings_to_model_forward(LLAMA_INPUTS_DOCSTRING) + def forward( + self, + input_ids: torch.LongTensor = None, + attention_mask: Optional[torch.Tensor] = None, + position_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[List[torch.FloatTensor]] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple, BaseModelOutputWithPast]: + output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions + output_hidden_states = ( + output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states + ) + use_cache = use_cache if use_cache is not None else self.config.use_cache + + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + # retrieve input_ids and inputs_embeds + if input_ids is not None and inputs_embeds is not None: + raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") + elif input_ids is not None: + batch_size, seq_length = input_ids.shape + elif inputs_embeds is not None: + batch_size, seq_length, _ = inputs_embeds.shape + else: + raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds") + + seq_length_with_past = seq_length + past_key_values_length = 0 + + if past_key_values is not None: + past_key_values_length = past_key_values[0][0].shape[2] + seq_length_with_past = seq_length_with_past + past_key_values_length + + if position_ids is None: + device = input_ids.device if input_ids is not None else inputs_embeds.device + position_ids = torch.arange( + past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device + ) + position_ids = position_ids.unsqueeze(0).view(-1, seq_length) + else: + position_ids = position_ids.view(-1, seq_length).long() + + if inputs_embeds is None: + inputs_embeds = self.embed_tokens(input_ids) + # embed positions + # if attention_mask is None: + # attention_mask = torch.ones( + # (batch_size, seq_length_with_past), dtype=torch.bool, device=inputs_embeds.device + # ) + # attention_mask = self._prepare_decoder_attention_mask( + # attention_mask, (batch_size, seq_length), inputs_embeds, past_key_values_length + # ) + + hidden_states = inputs_embeds + + if self.gradient_checkpointing and self.training: + if use_cache: + logger.warning_once( + "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." + ) + use_cache = False + + # decoder layers + all_hidden_states = () if output_hidden_states else None + all_self_attns = () if output_attentions else None + next_decoder_cache = () if use_cache else None + + if self.config.sequence_parallel_enabled: + hidden_states = hidden_states.transpose(0, 1).contiguous() + hidden_states = mappings.scatter_to_sequence_parallel_region(hidden_states) + + for idx, decoder_layer in enumerate(self.layers): + if output_hidden_states: + all_hidden_states += (hidden_states,) + + past_key_value = past_key_values[idx] if past_key_values is not None else None + + if self.gradient_checkpointing and self.training: + + def create_custom_forward(module): + def custom_forward(*inputs): + # None for past_key_value + return module(*inputs, output_attentions, None) + + return custom_forward + + layer_outputs = checkpoint_method( + create_custom_forward(decoder_layer), + hidden_states, + attention_mask, + position_ids, + None, + ) + else: + layer_outputs = decoder_layer( + hidden_states, + attention_mask=attention_mask, + position_ids=position_ids, + past_key_value=past_key_value, + output_attentions=output_attentions, + use_cache=use_cache, + ) + + hidden_states = layer_outputs[0] + + if use_cache: + next_decoder_cache += (layer_outputs[2 if output_attentions else 1],) + + if output_attentions: + all_self_attns += (layer_outputs[1],) + + hidden_states = self.norm(hidden_states) + + if self.config.sequence_parallel_enabled: + hidden_states = mappings.gather_from_sequence_parallel_region(hidden_states, to_model_parallel=False) + hidden_states = hidden_states.transpose(0, 1).contiguous() + + # add hidden states from the last decoder layer + if output_hidden_states: + all_hidden_states += (hidden_states,) + + next_cache = next_decoder_cache if use_cache else None + if not return_dict: + return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None) + return BaseModelOutputWithPast( + last_hidden_state=hidden_states, + past_key_values=next_cache, + hidden_states=all_hidden_states, + attentions=all_self_attns, + ) + + +class LlamaForCausalLM(LlamaForCausalLMHF): + _tied_weights_keys = ["lm_head.weight"] + + def __init__(self, config): + LlamaPreTrainedModel.__init__(self, config) + self.model = LlamaModel(config) + self.pretraining_tp = config.pretraining_tp + self.vocab_size = config.vocab_size + + init_method = partial(_init_normal, config.initializer_range) + self.lm_head = ColumnParallelLinear( + config.hidden_size, + config.vocab_size, + bias=False, + gather_output=False, + init_method=init_method, + ) + # Initialize weights and apply final processing + self.post_init() + + @add_start_docstrings_to_model_forward(LLAMA_INPUTS_DOCSTRING) + @replace_return_docstrings(output_type=CausalLMOutputWithPast, config_class=_CONFIG_FOR_DOC) + def forward( + self, + input_ids: torch.LongTensor = None, + attention_mask: Optional[torch.Tensor] = None, + position_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[List[torch.FloatTensor]] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + labels: Optional[torch.LongTensor] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple, CausalLMOutputWithPast]: + r""" + Args: + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., + config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored + (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. + + Returns: + + Example: + + ```python + >>> from transformers import AutoTokenizer, LlamaForCausalLM + + >>> model = LlamaForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS) + >>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER) + + >>> prompt = "Hey, are you conscious? Can you talk to me?" + >>> inputs = tokenizer(prompt, return_tensors="pt") + + >>> # Generate + >>> generate_ids = model.generate(inputs.input_ids, max_length=30) + >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] + "Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." + ```""" + + output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions + output_hidden_states = ( + output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states + ) + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) + outputs = self.model( + input_ids=input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + past_key_values=past_key_values, + inputs_embeds=inputs_embeds, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + + hidden_states = outputs[0] + if self.pretraining_tp > 1: + lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.pretraining_tp, dim=0) + logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.pretraining_tp)] + logits = torch.cat(logits, dim=-1) + else: + logits = self.lm_head(hidden_states) + + logits = logits.double() + + loss = None + if labels is not None: + # Shift so that tokens < n predict n + shift_logits = logits[..., :-1, :].contiguous() + shift_labels = labels[..., 1:].contiguous() + # Flatten the tokens + loss_fct = parallel_cross_entropy + shift_logits = shift_logits.view(-1, shift_logits.size(-1)) + + shift_labels = shift_labels.view(-1) + # Enable model parallelism + shift_labels = shift_labels.to(shift_logits.device) + loss = loss_fct(shift_logits, shift_labels) + + loss = torch.mean(loss) + + if not return_dict: + output = (logits,) + outputs[1:] + return (loss,) + output if loss is not None else output + + return CausalLMOutputWithPast( + loss=loss, + logits=logits, + past_key_values=outputs.past_key_values, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + +def init_weights(module): + """ + Re-init weights after partition + Referred from HF transformers https://github.com/huggingface/transformers/blob/main/src/transformers/models/llama/modeling_llama.py#L690 + """ + if isinstance(module, LlamaRMSNorm): + module.weight.data.fill_(1.0) + elif isinstance(module, (ParallelEmbedding, RowParallelLinear, ColumnParallelLinear)): + module.init_weight_cpu() + if hasattr(module, "bias") and module.bias is not None: + module.bias.data.zero_() + elif isinstance(module, GQAQKVColumnParallelLinear): + module.initialize_weight_biases() diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/module_llama.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/module_llama.py new file mode 100644 index 000000000..6725a5676 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/module_llama.py @@ -0,0 +1,211 @@ +from typing import Any, Optional + +import numpy as np +import torch +import torch.distributed as dist +import torch_xla.core.xla_model as xm +from modeling_llama_nxd import LlamaForCausalLM +from training_utils import Throughput, get_sin_cos_matrix +from transformers import GenerationConfig + +import neuronx_distributed as nxd +from neuronx_distributed.lightning import NeuronLTModule +from neuronx_distributed.parallel_layers import parallel_state +from neuronx_distributed.trainer import ( + initialize_parallel_model, + initialize_parallel_optimizer, +) + + +class NeuronLlamaLTModule(NeuronLTModule): + def __init__(self, tokenizer=None, use_deferred_init=False, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tokenizer = tokenizer # tokenizer is added here + self.validation_step_outputs = [] + self.use_deferred_init = use_deferred_init + + def setup(self, stage=None): + def get_model(model_config): + # For delayed parameter inititalization + # Check https://pytorch.org/torchdistx/latest/deferred_init.html + try: + from torchdistx import deferred_init + except ImportError: + deferred_init = None + if self.use_deferred_init > 0 and deferred_init is not None: + model = deferred_init.deferred_init(LlamaForCausalLM, model_config) + else: + model = LlamaForCausalLM(model_config) + # Here we make sure we use the same sine and cosine matrices for all layers. + # Making use of same tensors would make the CSE algorithm eliminate the lookup call + # from layers, keeping only lookup from first layer. + with torch.no_grad(): + cos, sin = get_sin_cos_matrix(model_config) + for layer in model.model.layers: + layer.self_attn.rotary_emb.cos_cached = cos + layer.self_attn.rotary_emb.sin_cached = sin + num_params = sum([np.prod(p.size()) for p in model.parameters()]) + if dist.get_rank() == 0: + print(f"# total parameters: {num_params}") + print(f"model config {model_config}") + return model + + self.model = initialize_parallel_model( + self.nxd_config, + get_model, + *self.model_args, + **self.model_kwargs, + ) + self.averaged_loss = torch.zeros(1, dtype=torch.double).to(xm.xla_device()) + self.print_pp_rank = 0 if self.log_rank0 else self.trainer.strategy.pipeline_parallel_size - 1 + # Make the model Neuron-compatible for generation + try: + from optimum.neuron.utils.training_utils import ( + patch_generation_mixin_to_general_neuron_generation_mixin, + ) + + patch_generation_mixin_to_general_neuron_generation_mixin(self.model.module) + except ImportError: + print("Failed to import optimum-neuron dependency, generation will not work on Neuron.") + # Load Pretrained checkpoint + if hasattr(self.model_args[0], "pretrained_ckpt") and self.model_args[0].pretrained_ckpt: + user_content = nxd.load_checkpoint( + self.model_args[0].pretrained_ckpt, + tag="pretrained_weight", + model=self.model, + optimizer=None, + scheduler=None, + strict=False, + ) + + def training_step(self, batch, batch_idx): + xm.mark_step() + + for logger in self.trainer.loggers: + logger.print_step = -1 + self.should_print = False + if self.trainer.strategy.pipeline_parallel_size > 1: + loss = self.model.run_train( + input_ids=batch["input_ids"], + attention_mask=batch["attention_mask"], + labels=batch["labels"], + ) + + loss_detached = ( + loss.detach() if self.trainer.strategy.pipeline_parallel_rank == self.print_pp_rank else None + ) + else: + # print(f"self model is {self.model}") + outputs = self.model( + input_ids=batch["input_ids"], + attention_mask=batch["attention_mask"], + labels=batch["labels"], + ) + # print(f"outputs is {outputs}") + loss = outputs.loss / self.grad_accum_steps + loss.backward() + self.averaged_loss += loss.detach() + xm.mark_step() + if not self.automatic_optimization and (batch_idx + 1) % self.grad_accum_steps == 0: + self.should_print = True + if ( + self.trainer.strategy.pipeline_parallel_size == 1 + ): # Todo: At this moment we only average loss among dp ranks in tp cases + loss_div = self.averaged_loss / self.trainer.strategy.data_parallel_size + loss_reduced = xm.all_reduce( + xm.REDUCE_SUM, + loss_div, + groups=parallel_state.get_data_parallel_group(as_list=True), + ) + loss_detached = loss_reduced.detach() + self.averaged_loss.zero_() + + optimizer = self.optimizers() + scheduler = self.lr_schedulers() + optimizer.step() + self.global_norm = optimizer.grad_norm + optimizer.zero_grad() + scheduler.step() + xm.mark_step() + + # Setup items for logging + self.loss = loss_detached + self.lr = self.lr_schedulers().get_lr()[0] + self.input_ids = batch["input_ids"] + self.tps = self.throughput.get_throughput() + + return loss + + def generate( + self, input_ids: Optional[torch.Tensor] = None, generation_config: Optional[GenerationConfig] = None, **kwargs + ): + return self.model.module.generate( + input_ids=input_ids, + generation_config=generation_config, + **kwargs, + ) + + def configure_optimizers(self): + param_groups = self.get_param_groups_by_weight_decay() + optimizer = initialize_parallel_optimizer(self.nxd_config, self.opt_cls, param_groups, **self.opt_kwargs) + optimizer.zero_grad() + + scheduler = self.scheduler_cls(optimizer, *self.scheduler_args, **self.scheduler_kwargs) + self.throughput = Throughput( + self.train_batch_size, + parallel_state.get_data_parallel_size(), + self.grad_accum_steps, + 10, + self.logging_interval, + ) + return ( + [optimizer], + [ + { + "scheduler": scheduler, + } + ], + ) + + def on_train_batch_end(self, *args, **kwargs): + if ( + self.trainer.strategy.data_parallel_rank == 0 + and self.trainer.strategy.tensor_parallel_rank == 0 + and self.trainer.strategy.pipeline_parallel_rank == self.print_pp_rank + ): + if self.should_print: + print( + f"step {self.global_step} loss is {self.loss.detach().cpu().item()}, lr is {self.lr}, throughput {self.tps} seq/s, input_ids {torch.sum(self.input_ids.detach().cpu()).item()}, norm {self.global_norm}, global rank {xm.get_ordinal()}" + ) + + # # Logging, need to revisit when automatic_optimization enabled + if not self.automatic_optimization: + if self.should_print: + self.log( + "loss", + self.loss.detach().cpu().item() + if self.loss is not None + else torch.zeros(1, device="cpu", requires_grad=False), + prog_bar=True, + ) + self.log( + "lr", + self.lr, + prog_bar=True, + ) + self.log( + "input_ids", + torch.sum(self.input_ids.detach().cpu()).item(), + prog_bar=True, + ) + self.log("throughput", self.tps, prog_bar=True) + self.log( + "global_step", + self.global_step, + prog_bar=True, + on_step=True, + on_epoch=True, + ) + for logger in self.trainer.loggers: + logger.print_step = self.global_step + diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_lt_neuron_xla_strategy.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_lt_neuron_xla_strategy.py new file mode 100644 index 000000000..3f69e451b --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_lt_neuron_xla_strategy.py @@ -0,0 +1,64 @@ +import os +from typing import Dict +from neuronx_distributed.lightning.strategy import NeuronXLAStrategy +from ray.train.lightning import RayLightningEnvironment +import torch +from lightning_fabric.plugins.environments import ( + TorchElasticEnvironment, + XLAEnvironment, +) +from lightning_fabric.utilities.types import _PATH, ReduceOp +from pytorch_lightning.strategies import XLAStrategy +from torch import Tensor + +from neuronx_distributed.parallel_layers.parallel_state import ( + get_data_parallel_rank, + get_data_parallel_size, + get_pipeline_model_parallel_rank, + get_tensor_model_parallel_rank, + initialize_model_parallel, + model_parallel_is_initialized, +) + +from neuronx_distributed.lightning.accelerator import NeuronXLAAccelerator +from neuronx_distributed.lightning.checkpoint_io import NeuronCheckpointIO +from neuronx_distributed.lightning.launcher import _NeuronXLALauncher + +class RayLightningNeuronXlaStrategy(NeuronXLAStrategy): + def __init__( + self, + nxd_config: Dict = None, + tensor_parallel_size: int = 1, + pipeline_parallel_size: int = 1, + debug: bool = False, + sync_module_states: bool = False, + checkpoint_io: bool = None, + save_load_xser: bool = True, + ): + super().__init__( + nxd_config=nxd_config, + tensor_parallel_size=tensor_parallel_size, + pipeline_parallel_size=pipeline_parallel_size, + debug=debug, + sync_module_states=sync_module_states, + checkpoint_io=checkpoint_io, + save_load_xser=save_load_xser + ) + + def setup_distributed(self) -> None: + print (f"RayNeuronXLAStrategy TRACE: Got call for setup_distributed, value of {self.parallel_devices=}!") + + super(NeuronXLAStrategy, self).setup_distributed() + # init model parallel if needed + if not model_parallel_is_initialized(): + initialize_model_parallel( + tensor_model_parallel_size=self.tensor_parallel_size, + pipeline_model_parallel_size=self.pipeline_parallel_size, + ) + + self.data_parallel_rank = get_data_parallel_rank() + self.data_parallel_size = get_data_parallel_size() + self.tensor_parallel_rank = get_tensor_model_parallel_rank() + self.pipeline_parallel_rank = get_pipeline_model_parallel_rank() + + diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_neuron_xla_config_20.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_neuron_xla_config_20.py new file mode 100644 index 000000000..f0c3e03d5 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_neuron_xla_config_20.py @@ -0,0 +1,105 @@ +from ray.train.torch.xla import TorchXLAConfig +from ray.train.torch.xla.config import _TorchAwsNeuronXLABackend # Accessing an internal class +from ray.train._internal.worker_group import WorkerGroup +import os +import ray +import torch + +def _set_xla_env_vars(): + # https://pytorch.org/docs/1.13/elastic/run.html#environment-variables + context = ray.train.get_context() + + local_world_size = context.get_local_world_size() + + env_variables = { + "LOCAL_RANK": str(context.get_local_rank()), + "RANK": str(context.get_world_rank()), + "LOCAL_WORLD_SIZE": str(local_world_size), + "WORLD_SIZE": str(context.get_world_size()), + "GROUP_RANK": str(context.get_world_size()), + "GROUP_WORLD_SIZE": str(context.get_world_size() / local_world_size), + "ROLE_RANK": str(context.get_world_rank()), + "ROLE_WORLD_RANK": str(context.get_world_rank()), + "ROLE_WORLD_SIZE": str(context.get_world_size()), + } + + for name, value in env_variables.items(): + # print (f"ray_neuron_xla_config_20: _set_xla_env_vars: Setting the variable {name} from {os.environ.get(name)} to {value}") + os.environ[name] = value + + # EFA and XLA setup + # https://github.com/aws/libfabric/blob/master/prov/efa/src/rxr/rxr_init.c + # https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/training/dp_bert_hf_pretrain/run_dp_bert_large_hf_pretrain_bf16_s128.sh # noqa + os.environ["FI_PROVIDER"] = "efa" + os.environ["FI_EFA_USE_DEVICE_RDMA"] = "1" + os.environ["FI_EFA_FORK_SAFE"] = "1" + os.environ["XLA_TRANSFER_SEED_ASYNC"] = "1" + os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" + +def _set_pjrt_env_variables(): + ''' + # PK_Open_Q: Copied from src/KaenaXlaPyTorch/src/torch_neuronx/xla.py + In torchrun scenearios, are the "LOCAL_*" env variables set even before torch_xla module is initialized (which calls the above torch_neuronx method)? + Is that the core problem with enabling Ray which uses "python" directly to launch the training? + Even in that case, why does this workaround of setting the PJRT environment variables right before we call dist.init_process_group work still causing pjrt client initialization issues? + ''' + assert torch.__version__.startswith("2.") + assert "WORLD_SIZE" in os.environ, "WORLD_SIZE environment variable should have been set by now!" + assert "LOCAL_WORLD_SIZE" in os.environ, "LOCAL_WORLD_SIZE environment variable should have been set by now!" + assert "LOCAL_RANK" in os.environ, "LOCAL_RANK environment variable should have been set by now!" + + env_variables = { + "NEURON_PJRT_PROCESS_INDEX": os.environ["RANK"], + "PJRT_LOCAL_PROCESS_RANK": os.environ["LOCAL_RANK"], + "NEURON_RT_VISIBLE_CORES": os.environ["LOCAL_RANK"], + } + for name, value in env_variables.items(): + print (f"ray_neuron_xla_config_20: _set_pjrt_env_variables: Setting the variable {name} from {os.environ.get(name)} to {value}") + os.environ[name] = value + + +def _setup_xla_torch_process_group(): + try: + import torch.distributed as dist + import torch_xla.core.xla_model as xm # noqa F401 + + if torch.__version__.startswith("2."): + _set_pjrt_env_variables() + + # print (f"ray_neuron_xla_config_20: _setup_xla_torch_process_group: {os.environ.get('RANK')=}, {os.environ.get('PJRT_DEVICE')=}, {os.environ.get('LOCAL_WORLD_SIZE')=}, {os.environ.get('LOCAL_RANK')=}, {os.environ.get('NEURON_PJRT_PROCESSES_NUM_DEVICES')=}, {os.environ.get('PJRT_LOCAL_PROCESS_COUNT')=}, {os.environ.get('NEURON_RT_NUM_CORES')=}, {torch.__version__=}") + # print (f"ray_neuron_xla_config_20: _setup_xla_torch_process_group: {os.environ.get('RANK')=}, {os.environ.get('LOCAL_RANK')=}, {os.environ.get('LOCAL_WORLD_SIZE')=}, {os.environ.get('NEURON_PJRT_PROCESSES_NUM_DEVICES')=}, {os.environ.get('PJRT_LOCAL_PROCESS_COUNT')=}, {os.environ.get('NEURON_RT_NUM_CORES')=}, {os.environ.get('NEURON_RT_VISIBLE_CORES')=}, {os.environ.get('NEURON_PJRT_WORLD_SIZE')=}, {os.environ.get('NEURON_PJRT_PROCESS_INDEX')=}") + if torch.__version__.startswith("2.0"): + import torch_xla.experimental.pjrt_backend # noqa + global_rank = int(os.environ.get("RANK")) # Is this needed at all? + dist.init_process_group("xla", init_method="pjrt://") # Should we pass rank=global_rank ?? Why? + else: + import torch_xla.distributed.xla_backend # noqa F401 + dist.init_process_group("xla") + except ImportError: + raise ImportError("torch_xla must be installed to use torch_xla backend.") + + +class NewTorchXLAConfig(TorchXLAConfig): + @property + def backend_cls(self): + return _NewTorchAwsNeuronXLABackend + +class _NewTorchAwsNeuronXLABackend(_TorchAwsNeuronXLABackend): + def on_start(self, worker_group: WorkerGroup, backend_config: TorchXLAConfig): + print (f"ray_neuron_xla_config_20: _NewTorchAwsNeuronXLABackend: on_start: Trace: Started.") + return super().on_start(worker_group, backend_config) + + def on_training_start( + self, worker_group: WorkerGroup, backend_config: TorchXLAConfig + ): + """ + Configure the environment variables for the worker group. + And initialize the xla distributed process group. + TODO: Current setup only supports homogenous cluster with + neuron_cores accelerator and xrt runtime. + """ + print (f"ray_neuron_xla_config_20: _NewTorchAwsNeuronXLABackend: on_training_start: Trace: Started.") + worker_group.execute(_set_xla_env_vars) + worker_group.execute(_setup_xla_torch_process_group) + + diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_train_llama3.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_train_llama3.py new file mode 100644 index 000000000..98c4c9f1d --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/ray_train_llama3.py @@ -0,0 +1,87 @@ +from argparse import ArgumentParser + +import ray +from ray.train import ScalingConfig +from ray.train.torch import TorchTrainer +# from ray.train.torch.xla import TorchXLAConfig +from ray_neuron_xla_config_20 import NewTorchXLAConfig as TorchXLAConfig + +# Use training function from NxD llama2 example +# see: https://awsdocs-neuron.readthedocs-hosted.com/en/latest/libraries/neuronx-distributed/tutorials/training_llama2_7b.html#llama2-7b-tp-zero1-tutorial +from run_llama_nxd_ptl import train_llama, build_args as build_model_args +import os +import os + +# Collect command-line args +def add_args(parser: ArgumentParser) -> None: + # Additional args added for this Ray Train example + parser.add_argument( + "--neuron_parallel_compile", + action="store_true", + default=False, + help="Enable Neuron parallel compilation to pre-populate the Neuron cache", + ) + parser.add_argument( + "--num_nodes", + type=int, + default=2, + help="Number of trn1 nodes to use for training", + ) + +def get_args(): + parser = build_model_args() + add_args(parser) + return parser.parse_args() + +if __name__ == "__main__": + args = get_args() + args.use_ray = True + + num_cores_per_node = 32 + num_workers = args.num_nodes * num_cores_per_node + num_cores_per_node_env_value = str(num_cores_per_node) + num_workers_env_value = str(num_workers) + + # Set up Neuron-specific env. variables to customize this training job + env = { + "NEURON_CC_FLAGS": "--model-type transformer --distribution-strategy=llm-training", + "NEURON_FUSE_SOFTMAX": "1", + "NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS": "16", + "NEURON_RT_STOCHASTIC_ROUNDING_EN": "1", + "MALLOC_ARENA_MAX": "64", + "CCOM_SOCKET_IFNAME": "eth0", + "NEURON_COMPILE_CACHE_URL": "/shared/neuron_compile_cache/", + # These were usually set inside a slurm script, I don't yet know the significance or relevance/need of these variables + "NUM_NEURONCORES":num_cores_per_node_env_value, + "NEURON_RT_NUM_CORES":num_cores_per_node_env_value, + "TPU_NUM_DEVICES":num_cores_per_node_env_value, + "TPU_CHIPS_PER_HOST_BOUNDS":num_cores_per_node_env_value, + # These were needed for Ray Train to work on pytorch 2+. + "NEURON_PJRT_WORLD_SIZE":num_workers_env_value, + "PJRT_LOCAL_PROCESS_COUNT": num_cores_per_node_env_value + } + + if args.use_fp32_optimizer: + env["XLA_DOWNCAST_BF16"] = "1" + else: + env["XLA_USE_BF16"] = "1" + + # Configure runtime env to use Neuron env vars + ray.init(runtime_env={"env_vars": env}) + + # Limit number of steps during neuron parallel compile runs + if args.neuron_parallel_compile: + args.steps_this_run = 2 + + trainer = TorchTrainer( + train_loop_per_worker=train_llama, + train_loop_config=args, + torch_config=TorchXLAConfig( + neuron_parallel_compile=args.neuron_parallel_compile + ), + scaling_config=ScalingConfig( + num_workers=num_workers, resources_per_worker={"neuron_cores": 1} + ), + ) + + trainer.fit() diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/requirements.txt b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/requirements.txt new file mode 100644 index 000000000..fadb52c77 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/requirements.txt @@ -0,0 +1,9 @@ +pytorch-lightning +transformers==4.31.0 +regex +tensorboard +datasets +sentencepiece +nltk +neuronx_distributed +ray[data,train,tune,serve] diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama3.1_8b.sh b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama3.1_8b.sh new file mode 100755 index 000000000..9560d91a1 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama3.1_8b.sh @@ -0,0 +1,168 @@ +#!/bin/bash + +############################################# +# User defined parameters and env vars + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +export NEURON_CC_FLAGS="--model-type transformer --distribution-strategy=llm-training --cache_dir=/shared/neuron_compile_cache/" +export NEURON_FUSE_SOFTMAX=1 + +# Async Runtime +export NEURON_RT_ASYNC_EXEC_MAX_INFLIGHT_REQUESTS=3 + +# HOST OOM +export MALLOC_ARENA_MAX=64 + +##############################DEFAULT VALUES########################################### +# TP degree +TP_DEGREE=32 + +KV_REPLI=`expr $TP_DEGREE / 8` +# 0: bf16; 1: mixed precision +USE_MIX_PRECISION=0 +# 0: use pure DP; 1: use ZeRO-1 +USE_ZERO_1=1 +# global batch size +: ${GBS:=1024} +# micro batch size +MBS=1 +# number of steps to run +MAX_STEPS=10000 +# warmup steps +WARMUP_STEPS=100 +# learning rate +#LR=3.0e-4 +LR=1.5e-4 +# model path +#MODEL_PATH=$SCRIPT_DIR +MODEL_PATH="/shared/llama3.1_config/" +# data path +DATA_PATH="/shared/fineweb_llama3.1_tokenized" +#Checkpoint dir +CKPT_DIR="/shared/checkpoint" +# sequence length +SEQ_LEN=2048 +# Number of epochs +EPOCHS=1 +#Number of Nodes +NUM_NODES=2 +#Tensorboard logs dir +TB_DIR="/shared/tblogs" +#Steps this run +STEPS_THIS_RUN=-1 +#output log +ts=`date +%y_%m_%d_%H:%M:%S` +OUTPUT_LOG="/shared/llama3.1-8b-pretrain-$ts.log" +#Parallel Compile +PAR_COMP=0 +##############################DEFAULT VALUES########################################### + +#Args +Help() +{ + # Display Help + echo "Add description of the script functions here." + echo + echo "Syntax: $0 [-t|w|l|m|d|c|s|n]" + echo "options:" + echo "t total number of training steps" + echo "w warmup steps for training" + echo "l learning rate" + echo "m abs path to llama config.json" + echo "d abs path to tokenized dataset" + echo "c abs path to checkpoint directory" + echo "s Sequence length" + echo "n Number of instances to run training" + echo "b tensor board logs location" + echo "r defining steps this run" + echo "g global batch size" + echo "z mini batch size" + echo "p neuron parallel compile 0 or 1" + echo +} +while getopts t:w:l:m:d:c:s:n:b:r:g:z:p:h flag +do + case "${flag}" in + t) MAX_STEPS=${OPTARG};; + w) WARMUP_STEPS=${OPTARG};; + l) LR=${OPTARG};; + m) MODEL_PATH=${OPTARG};; + d) DATA_PATH=${OPTARG};; + c) CKPT_DIR=${OPTARG};; + s) SEQ_LEN=${OPTARG};; + n) NUM_NODES=${OPTARG};; + b) TB_DIR=${OPTARG};; + r) STEPS_THIS_RUN=${OPTARG};; + g) GBS=${OPTARG};; + z) MBS=${OPTARG};; + p) PAR_COMP=${OPTARG};; + h) Help + exit;; + #\?) # Invalid option + # echo "Error: Invalid option" + # exit;; + esac +done +############################################# + +export NUM_NEURONCORES=32 +export NEURON_RT_NUM_CORES=32 +export NUM_NEURONCORES=$NEURON_RT_NUM_CORES +export TPU_NUM_DEVICES=$NEURON_RT_NUM_CORES +export TPU_CHIPS_PER_HOST_BOUNDS=$NEURON_RT_NUM_CORES + +############################################# + +EXTRA_ARGS=" " +if [ $PAR_COMP -gt 0 ]; then + EXTRA_ARGS+=" --neuron_parallel_compile" +fi + +DP=$(($NEURON_RT_NUM_CORES * $NUM_NODES / $TP_DEGREE)) +ACC_STEPS=$(($GBS / $MBS / $DP)) +echo NEURON_PARALLEL_COMPILE=$PAR_COMP +echo TP_DEGREE=$TP_DEGREE +echo GBS=$GBS +echo MBS=$MBS +echo MAX_STEPS=$MAX_STEPS +echo WARMUP_STEPS=$WARMUP_STEPS +echo LR=$LR +echo MAX_TOKENS=$MAX_TOKENS +echo MODEL_PATH=$MODEL_PATH +echo DATA_PATH=$DATA_PATH +echo SEQ_LEN=$SEQ_LEN +echo CKPT_DIR=$CKPT_DIR +echo DP=$DP +echo ACC_STEPS=$ACC_STEPS +echo STEPS_THIS_RUN=$STEPS_THIS_RUN +echo NUM_NODES=$NUM_NODES +echo TB_DIR=$TB_DIR +echo OUTPUT_LOG=$OUTPUT_LOG + +python \ + ray_train_llama3.py \ + --model_path $MODEL_PATH \ + --num_nodes $NUM_NODES \ + --tb_dir $TB_DIR \ + --data_dir $DATA_PATH \ + --tensor_parallel_size $TP_DEGREE \ + --train_batch_size $MBS \ + --steps_this_run $STEPS_THIS_RUN \ + --max_steps $MAX_STEPS \ + --warmup_steps $WARMUP_STEPS \ + --lr $LR \ + --grad_accum_usteps $ACC_STEPS \ + --seq_len $SEQ_LEN \ + --use_sequence_parallel 1 \ + --use_selective_checkpoint 1 \ + --use_fp32_optimizer $USE_MIX_PRECISION \ + --use_zero1_optimizer $USE_ZERO_1 \ + --scheduler_type 'linear' \ + --qkv_linear 1 \ + --kv_replicator $KV_REPLI \ + --num_train_epochs $EPOCHS \ + --save_checkpoint \ + --use_flash_attention 1 \ + $EXTRA_ARGS |& tee $OUTPUT_LOG +exit ${PIPESTATUS[0]} diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama_nxd_ptl.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama_nxd_ptl.py new file mode 100644 index 000000000..1711f1abe --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/run_llama_nxd_ptl.py @@ -0,0 +1,458 @@ +# coding=utf-8 +# Copyright (c) 2019 NVIDIA CORPORATION. All rights reserved. +# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team. +# Modifications Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import sys + +current = os.path.dirname(os.path.realpath(__file__)) +parent = os.path.dirname(current) +sys.path.append(parent) + +import torch +import torch.distributed as dist +import torch_xla.core.xla_model as xm +from data_module import NeuronLightningDataModule +from modeling_llama_nxd import ( + CoreAttention, + LlamaDecoderLayer, + LlamaForCausalLM, + LlamaRMSNorm, + init_weights, +) +from module_llama import NeuronLlamaLTModule +from pytorch_lightning import Trainer +from pytorch_lightning.callbacks import ModelCheckpoint +from training_utils import ( + create_llama_pretraining_dataset, + get_learning_rate_scheduler, + get_mixed_precision_config, + get_param_groups_by_weight_decay, + get_sin_cos_matrix, +) +from transformers import LlamaConfig, set_seed +from transformers.optimization import get_linear_schedule_with_warmup +from ray_lt_neuron_xla_strategy import RayLightningNeuronXlaStrategy + +import neuronx_distributed as nxd +from neuronx_distributed.lightning import ( + NeuronTensorBoardLogger, + NeuronTQDMProgressBar, + NeuronXLAPrecisionPlugin, + NeuronXLAStrategy, +) +from neuronx_distributed.parallel_layers import mappings +from neuronx_distributed.utils.adamw_fp32_optim_params import AdamW_FP32OptimParams + +# For PT autocast. +torch.cuda.is_bf16_supported = lambda: True + +# Workaround for NaNs seen with transformers version >= 4.21.0 +# https://github.com/aws-neuron/aws-neuron-sdk/issues/593 +import transformers.modeling_utils as modeling_utils + +if os.environ.get("XLA_USE_BF16") or os.environ.get("XLA_DOWNCAST_BF16"): + modeling_utils.get_parameter_dtype = lambda x: torch.bfloat16 + +# adding ray modules +import ray +from ray.train import ScalingConfig +from ray.train.torch import TorchTrainer +from ray.train.torch.xla import TorchXLAConfig + +def train_llama(args): + print(f"Namespace: {args}") + set_seed(args.seed) + + if args.steps_this_run < 0: + args.steps_this_run = args.max_steps + + pipeline_config = None + + model_config = LlamaConfig.from_pretrained(args.model_path) + model_config.use_cache = False + if args.pipeline_parallel_size > 1: + model_config.return_dict = False + model_config.sequence_parallel_enabled = args.use_sequence_parallel > 0 + model_config.kv_shared_group_size = args.kv_replicator + model_config.qkv_linear = args.qkv_linear + model_config.selective_checkpoint_enabled = args.use_selective_checkpoint > 0 + model_config.max_position_embeddings = max(model_config.max_position_embeddings, args.seq_len) + model_config.use_flash_attention = args.use_flash_attention > 0 + if args.pretrained_weight is not None: + model_config.pretrained_ckpt = args.pretrained_weight + if args.num_layers > 0: + model_config.num_hidden_layers = args.num_layers + if args.hidden_size != -1: + model_config.hidden_size = args.hidden_size + xm.master_print(model_config) + + pipeline_config = None + if args.pipeline_parallel_size > 1: + pipeline_config = { + "transformer_layer_cls": LlamaDecoderLayer, + "num_microbatches": args.num_microbatches, + "output_loss_value_spec": (True, False), + "input_names": ["input_ids", "attention_mask", "labels"], + "auto_partition": True, + "deallocate_pipeline_outputs": args.deallocate_pipeline_outputs > 0, + "trace_file_path": args.trace_file_path, + "param_init_fn": None, + "leaf_module_cls": [LlamaRMSNorm.__name__], + "autowrap_modules": [mappings], + "use_zero1_optimizer": args.use_zero1_optimizer > 0, + "use_optimizer_wrapper": True, + "broadcast_and_average_loss": args.log_rank0 > 0, + "fuse_microbatches": args.fuse_microbatches > 0, + } + + # Create model with different options + # Either deferred_init or meta device initialization will be required to avoid host OOM for 70B model + if args.use_meta_device_init > 0: + model_init_config = { + "meta_device_init": True, + "param_init_fn": init_weights, + } + else: + model_init_config = None + + mixed_precision_config = get_mixed_precision_config(args.use_gpu_compatible_precision > 0) + + nxd_config = nxd.neuronx_distributed_config( + tensor_parallel_size=args.tensor_parallel_size, + pipeline_parallel_size=args.pipeline_parallel_size, + pipeline_config=pipeline_config, + optimizer_config={ + "zero_one_enabled": args.use_zero1_optimizer > 0, + "grad_clipping": True, + "max_grad_norm": 1.0, + }, + sequence_parallel=args.use_sequence_parallel > 0, + activation_checkpoint_config=CoreAttention if args.use_selective_checkpoint > 0 else "full", + model_init_config=model_init_config, + mixed_precision_config=mixed_precision_config, + ) + + opt_cls = AdamW_FP32OptimParams if args.use_fp32_optimizer > 0 else torch.optim.AdamW + + def configure_scheduler(optimizer, warmup_steps, max_steps): # PTLTODO: check loading scheduler state dict here + return get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=warmup_steps, + num_training_steps=max_steps, + last_epoch=-1, + ) + + scheduler_cls = None + scheduler_args = () + + if args.scheduler_type == "linear": + scheduler_cls = configure_scheduler + scheduler_args = (args.warmup_steps, args.max_steps) + elif args.scheduler_type == "cosine": + scheduler_cls = get_learning_rate_scheduler + scheduler_args = (args,) + else: + raise ValueError(f"Currently We only support scheduler type 'linear' and 'cosine', got {args.scheduler_type}") + + model = NeuronLlamaLTModule( + nxd_config=nxd_config, + model_args=(model_config,), + opt_cls=opt_cls, + scheduler_cls=scheduler_cls, + opt_kwargs={ + "lr": args.lr, + "betas": (args.beta1, args.beta2), + "weight_decay": args.weight_decay, + # "capturable": True, + }, + scheduler_args=scheduler_args, + train_batch_size=args.train_batch_size, + grad_accum_steps=args.grad_accum_usteps, + logging_interval=args.logging_interval, + log_rank0=args.log_rank0 > 0, + manual_opt=True, + use_deferred_init=args.use_deferred_init, + ) + + dm = NeuronLightningDataModule( + create_llama_pretraining_dataset, + args.data_dir, + args.train_batch_size, + data_args=(args.seed,), + ) + + strategy_cls = RayLightningNeuronXlaStrategy + strategy = strategy_cls( + nxd_config=nxd_config, + save_load_xser=args.save_load_xser, + ) + + plugins = [] + + plugins.append(NeuronXLAPrecisionPlugin()) + + callbacks = [] + callbacks.append(NeuronTQDMProgressBar()) + if args.save_checkpoint: + callbacks.append( + ModelCheckpoint( + save_top_k=args.num_kept_checkpoint, + monitor="global_step", + mode="max", + every_n_epochs=args.checkpoint_freq, + dirpath=args.checkpoint_dir, + ) + ) + if args.steps_this_run == 2: + trainer = Trainer( + strategy=strategy, + max_steps=args.steps_this_run, + plugins=plugins, + enable_checkpointing=args.save_checkpoint, + logger=NeuronTensorBoardLogger(save_dir=args.tb_dir, log_rank0=args.log_rank0 > 0), + log_every_n_steps=1, + callbacks=callbacks, + ) + else: + trainer = Trainer( + strategy=strategy, + max_epochs=args.num_train_epochs, + plugins=plugins, + enable_checkpointing=args.save_checkpoint, + logger=NeuronTensorBoardLogger(save_dir=args.tb_dir, log_rank0=args.log_rank0 > 0), + log_every_n_steps=1, + callbacks=callbacks, + ) + + if args.resume_ckpt: + ckpt_path = os.path.join( + args.checkpoint_dir, + f"epoch={args.load_epoch}-step={args.load_step}.ckpt", + ) + print(f"resume path is {ckpt_path}") + trainer.fit(model=model, datamodule=dm, ckpt_path=ckpt_path) + else: + trainer.fit(model=model, datamodule=dm) + + print("Training finished!") + + +def _mp_fn(index, args): + # PK:Ray Changes start + print (f"_mp_fn.Trace: {dist.is_torchelastic_launched()=}") + print (f"_mp_fn.Trace: {os.environ.get('WORLD_SIZE')=}") + + # setup_env_vars() + + if not dist.is_torchelastic_launched(): + scaling_config = ScalingConfig(num_workers=32, resources_per_worker={"neuron_cores": 1}) + args.use_ray = True + trainer = TorchTrainer( + train_loop_per_worker=lambda: train_llama(args), + torch_config=TorchXLAConfig(), + scaling_config=scaling_config + ) + result = trainer.fit() + print (f"Training finished with {result=}") + else: + args.use_ray = False + train_llama(args) + #train_llama(args) + +def build_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument( + "--model_path", + type=str, + help="Model weight and config path.", + ) + parser.add_argument( + "--data_dir", + type=str, + help="Pre-tokenized dataset directory.", + ) + parser.add_argument("--train_batch_size", type=int, default=8, help="Worker batch size.") + parser.add_argument( + "--max_steps", + type=int, + default=-1, + help="Maximum total accumulation-steps to run.", + ) + parser.add_argument( + "--steps_this_run", + type=int, + default=-1, + help="Exit early at steps and not go to max_steps. -1 to mean no early exit.", + ) + parser.add_argument( + "--seed", + type=int, + default=12349, + help="Random seed. Worker seed is this value + worker rank.", + ) + parser.add_argument("--lr", type=float, default=4e-4, help="Learning rate.") + parser.add_argument( + "--warmup_steps", + type=int, + default=2000, + help="Number of warmup accumulation-steps for learning rate .", + ) + parser.add_argument( + "--constant_steps", + type=int, + default=None, + help="Number of constant_steps for learning rate .", + ) + parser.add_argument( + "--min_lr", + type=float, + default=None, + help="Minumum value for learning rate. The scheduler" "clip values below this threshold.", + ) + parser.add_argument( + "--scheduler_type", + type=str, + default=None, + help="Type of lr scheduler", + ) + parser.add_argument( + "--grad_accum_usteps", + type=int, + default=1, + help="Gradient accumulation micro-steps (an accumulation-step has micro-steps.", + ) + + parser.add_argument("--weight_decay", default=0.01, type=float, help="weight decay") + parser.add_argument("--beta1", default=0.9, type=float, help="beta1 parameter for Adam optimizer") + parser.add_argument("--beta2", default=0.999, type=float, help="beta2 parameter for Adam optimizer") + + parser.add_argument("--load_step", type=int, default=0, help="step to load checkpoint from") + parser.add_argument("--load_epoch", type=int, default=0, help="epoch to load checkpoint from") + parser.add_argument("--tb_dir", type=str, default="/shared/tblogs", help="Directory for log files") + #parser.add_argument("--log_dir", type=str, default=os.getcwd() + "/llama8B-logs", help="Directory for log files") + parser.add_argument("--save_checkpoint", action="store_true", help="Save checkpoints") + parser.add_argument( + "--num_kept_checkpoint", + type=int, + default=10000, + help="number of checkpoints kept, old checkpoint will get deleted", + ) + parser.add_argument("--pretrained_weight", type=str, default=None, help="Load dir of pretrained weight") + parser.add_argument("--checkpoint_freq", type=int, default=10000, help="save checkpoint freq") + parser.add_argument("--checkpoint_dir", type=str, default=None) + parser.add_argument("--resume_ckpt", action="store_true", help="Resume from checkpoint at resume_step.") + parser.add_argument("--save_load_xser", action="store_true", help="save/load with xla serialization") + + parser.add_argument("--tensor_parallel_size", default=2, type=int, help="Tensor parallel size") + parser.add_argument("--pipeline_parallel_size", default=1, type=int, help="Pipeline parallel size") + parser.add_argument("--num_microbatches", type=int, default=8, help="num_microbatches") + parser.add_argument("--seq_len", default=4096, type=int, help="Sequence length") + parser.add_argument("--trace_file_path", type=str, default=None) + parser.add_argument("--use_fp32_optimizer", type=int, default=0, help="Use fp32 optimizer.") + parser.add_argument("--use_zero1_optimizer", type=int, default=0, help="Use ZeRO-1.") + parser.add_argument("--use_deferred_init", default=0, type=int, help="use torchdistx deferred initialization") + parser.add_argument("--use_meta_device_init", default=0, type=int, help="use meta device initialization") + parser.add_argument( + "--deallocate_pipeline_outputs", + type=int, + default=1, + help="deallocate pipeline output tensors whenever possible", + ) + + parser.add_argument("--logging_interval", type=int, default=1, help="number of warmup_steps") + parser.add_argument("--log_rank0", type=int, default=0, help="logging in rank 0, note that the issue ") + + parser.add_argument( + "--num_layers", + type=int, + default=-1, + help="Override number of layers for this LLaMA model", + ) + parser.add_argument( + "--hidden_size", + type=int, + default=-1, + help="override model model hidden size", + ) + parser.add_argument( + "--use_sequence_parallel", + default=1, + type=int, + help="Enable sequence parallel", + ) + parser.add_argument( + "--use_selective_checkpoint", + default=0, + type=int, + help="Enable selective checkpoint", + ) + parser.add_argument( + "--qkv_linear", + default=0, + type=int, + help="Use QKV Linear module", + ) + parser.add_argument( + "--kv_replicator", + default=1, + type=int, + help="KV replication number", + ) + parser.add_argument( + "--use_flash_attention", + default=0, + type=int, + help="Use neuron kernel", + ) + parser.add_argument( + "--use_gpu_compatible_precision", + default=1, + type=int, + help="Use gpu compatible precision", + ) + parser.add_argument( + "--fuse_microbatches", + type=int, + default=0, + help="Fuse microbatches into a single graph" + ) + parser.add_argument( + "--num_train_epochs", + type=int, + default=1, + help="Maximum numer of epochs to run.", + ) + + return parser + +if __name__ == "__main__": + parser = build_args() + args = parser.parse_args(sys.argv[1:]) + + if args.steps_this_run < 0: + args.steps_this_run = args.max_steps + + os.environ["NEURON_RT_STOCHASTIC_ROUNDING_EN"] = "0" if args.use_gpu_compatible_precision > 0 else "1" + if args.use_fp32_optimizer: + os.environ["XLA_DOWNCAST_BF16"] = "1" + else: + os.environ["XLA_USE_BF16"] = "1" + + # WORLD_SIZE is set by torchrun + + _mp_fn(0, args) diff --git a/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/training_utils.py b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/training_utils.py new file mode 100644 index 000000000..46a527ee9 --- /dev/null +++ b/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1/llama3.1_pretraining/training_utils.py @@ -0,0 +1,370 @@ +import json +import math +import os +import queue +import time +from datetime import datetime, timezone +from functools import partial +from itertools import chain +from typing import Any, Dict, List + +import datasets +import torch +from torch.utils.data import DistributedSampler +from torch.utils.data.dataloader import DataLoader +from transformers import default_data_collator, set_seed +from neuronx_distributed.parallel_layers.parallel_state import ( + get_tensor_model_parallel_size, +) + +try: + from lr import CosineAnnealing +except ImportError: + CosineAnnealing = None + +from collections import namedtuple + +Metric = namedtuple("Metric", ["name", "value", "units", "additional_data"]) +remainder = {"input_ids": [], "attention_mask": [], "token_type_ids": []} + + +# empty list to save remainder from batches to use in next batch +def pack_dataset(dataset, chunk_length=2048): + print(f"Chunking dataset into chunks of {chunk_length} tokens.") + + def chunk(sample, chunk_length=chunk_length): + # define global remainder variable to save remainder from batches to use in next batch + global remainder + # Concatenate all texts and add remainder from previous batch + concatenated_examples = {k: list(chain(*sample[k])) for k in sample.keys()} + concatenated_examples = {k: remainder[k] + concatenated_examples[k] for k in concatenated_examples.keys()} + # get total number of tokens for batch + batch_total_length = len(concatenated_examples[list(sample.keys())[0]]) + + # get max number of chunks for batch + if batch_total_length >= chunk_length: + batch_chunk_length = (batch_total_length // chunk_length) * chunk_length + + # Split by chunks of max_len. + result = { + k: [t[i : i + chunk_length] for i in range(0, batch_chunk_length, chunk_length)] + for k, t in concatenated_examples.items() + } + # add remainder to global variable for next batch + remainder = {k: concatenated_examples[k][batch_chunk_length:] for k in concatenated_examples.keys()} + + # prepare labels + result["labels"] = result["input_ids"].copy() + + return result + + # tokenize and chunk dataset + lm_dataset = dataset.map( + partial(chunk, chunk_length=chunk_length), + batched=True, + ) + print(f"Total number of samples: {len(lm_dataset)}") + return lm_dataset + +def get_learning_rate_scheduler(optimizer, args, last_epoch=-1): + lr_scheduler = CosineAnnealing( + optimizer, + max_steps=args.max_steps, + min_lr=args.min_lr, + warmup_steps=args.warmup_steps, + constant_steps=args.constant_steps, + last_epoch=last_epoch, + ) + return lr_scheduler + + +def get_param_groups_by_weight_decay(model): + """Get param groups.""" + if hasattr(model, "local_named_parameters"): + # Zero1 use the first param in opt to decide the device + param_optimizer = list(model.local_named_parameters()) + else: + param_optimizer = list(model.named_parameters()) + no_decay = ["bias", "LayerNorm"] # gamma/beta are in LayerNorm.weight + + optimizer_grouped_parameters = [ + { + "params": [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], + "weight_decay": 0.01, + }, + { + "params": [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, + ] + return optimizer_grouped_parameters + + +def create_llama_pretraining_dataset(data_dir, mini_batch_size, dp_size, dp_rank, seed): + # Workaround because python functions are not picklable + class WorkerInitObj(object): + def __init__(self, seed): + self.seed = seed + + def __call__(self, id): + set_seed(self.seed) + + worker_init = WorkerInitObj(seed) + train_data = datasets.load_from_disk(data_dir) + train_sampler = DistributedSampler( + train_data, + num_replicas=dp_size, + rank=dp_rank, + shuffle=False, + drop_last=True, + ) + train_dataloader = DataLoader( + train_data, + collate_fn=default_data_collator, + sampler=train_sampler, + batch_size=mini_batch_size, + num_workers=0, + worker_init_fn=worker_init, + drop_last=True, + pin_memory=True, + ) + return train_dataloader, None + + +def create_instruction_based_dataset(data_dir, mini_batch_size, dp_size, dp_rank, seed, tokenizer=None, task=None): + raw_datasets = datasets.load_dataset(data_dir, split="train") + if task: + raw_datasets = raw_datasets.filter(lambda example: example["category"] == task) + train_and_test_dataset = raw_datasets.train_test_split(test_size=2) + train_dataset = train_and_test_dataset["train"] + test_dataset = train_and_test_dataset["test"] + + def preprocess_train_dataset(sample): + instruction = f"### Instruction\n{sample['instruction']}" + context = f"### Context\n{sample['context']}" if len(sample["context"]) > 0 else None + response = f"### Answer\n{sample['response']}" + # join all the parts together + prompt = "\n".join([i for i in [instruction, context, response] if i is not None]) + model_input = tokenizer(f"{prompt}{tokenizer.eos_token}") + return model_input + + train_data = train_dataset.shuffle().map(preprocess_train_dataset, remove_columns=train_dataset.column_names) + train_data = pack_dataset(train_data, chunk_length=2048) + + class WorkerInitObj(object): + def __init__(self, seed): + self.seed = seed + + def __call__(self, id): + set_seed(self.seed) + + worker_init = WorkerInitObj(seed) + + train_sampler = DistributedSampler( + train_data, + num_replicas=dp_size, + rank=dp_rank, + shuffle=True, + drop_last=True, + ) + train_dataloader = DataLoader( + train_data, + collate_fn=default_data_collator, + sampler=train_sampler, + batch_size=mini_batch_size, + num_workers=0, + worker_init_fn=worker_init, + drop_last=True, + pin_memory=True, + ) + + def preprocess_test_dataset(sample): + instruction = f"### Instruction\n{sample['instruction']}" + context = f"### Context\n{sample['context']}" if len(sample["context"]) > 0 else None + response = f"### Answer\n" + # join all the parts together + prompt = "\n".join([i for i in [instruction, context, response] if i is not None]) + model_input = tokenizer(prompt, add_special_tokens=False) + #Find the nearest input length divisible by TP_SIZE + max_length = len(model_input['input_ids']) - len(model_input['input_ids']) % get_tensor_model_parallel_size() + get_tensor_model_parallel_size() + #max_length = 128 + tokenizer.pad_token = tokenizer.eos_token + #add padding to match the length of the nearest input length divisible by TP_SIZE + model_input = tokenizer(prompt, max_length=max_length, padding='max_length', truncation=True) + labels = tokenizer(sample["response"]) + model_input["labels"] = labels["input_ids"] + return model_input + + test_data = test_dataset.map(preprocess_test_dataset, remove_columns=test_dataset.column_names) + + test_sampler = DistributedSampler( + test_data, + num_replicas=dp_size, + rank=dp_rank, + shuffle=False, + drop_last=False, + ) + test_dataloader = DataLoader( + test_data, + collate_fn=default_data_collator, + sampler=test_sampler, + batch_size=mini_batch_size, + num_workers=0, + drop_last=False, + pin_memory=True, + ) + + return train_dataloader, test_dataloader + + +def create_partition(num_hidden_layers, pipeline_parallel_size): + """ + Evenly split the transformer layers between the PP ranks + """ + assert num_hidden_layers % pipeline_parallel_size == 0 + num_layer_per_partition = num_hidden_layers // pipeline_parallel_size + pipeline_cuts = [] + current_cut = num_layer_per_partition - 1 + for i in range(pipeline_parallel_size - 1): + pipeline_cuts.append(f"model.layers.{current_cut}") + current_cut += num_layer_per_partition + return pipeline_cuts + + +def get_sin_cos_matrix(config): + head_dim = config.hidden_size // config.num_attention_heads + base = config.rope_theta + inv_freq = 1.0 / (base ** (torch.arange(0, head_dim, 2).float() / head_dim)) + t = torch.arange(config.max_position_embeddings, dtype=inv_freq.dtype) + freqs = torch.einsum("i,j->ij", t, inv_freq) + # Different from paper, but it uses a different permutation in order to obtain the same calculation + emb = torch.cat((freqs, freqs), dim=-1) + return emb.cos()[None, None, :, :].to(torch.float32), emb.sin()[None, None, :, :].to(torch.float32) + + +def get_dtype(model) -> str: + """ + Reference: https://pytorch.org/xla/release/1.12/index.html#xla-tensors-and-bfloat16 + """ + if "XLA_USE_BF16" in os.environ: + return "torch.bfloat16" + if "XLA_DOWNCAST_BF16" in os.environ: + if "torch.float" in str(model.dtype): + return "torch.bfloat16" + if "torch.double" in str(model.dtype): + return "torch.float32" + return str(model.dtype) + + +def print_logs(loss, global_norm, args, throughput, logger, total_steps, current_lr, input_ids, start): + total_norm_cpu = global_norm.cpu().item() + logger.log(total_steps, loss, total_norm_cpu, current_lr, input_ids, throughput, start) + + +class TrainingMetrics: + """ + This class is used for logging metrics to a json file. One can provide a + dictionary of metrics that needs to be stored, and it wpuld get + written to the file. + Arguments: + json_file: File used for logging. If no file exists, new file would be created. + """ + + def __init__(self, json_file): + self.json_file = json_file + + def read_modify_write_file(self, data, key: str = "metrics") -> None: + """ + data (dict of training parameters or list of metrics): Data to update in the file. + key (str): the dictionary key under which data is to be recorded + """ + result_dict = {} + print(f"Writing data to the provided results file: {self.json_file}") + if os.path.exists(self.json_file): + with open(self.json_file, "r") as json_file: + content = json_file.read() + if not content.strip(): # Check if content is empty or contains only whitespace + print("File is empty or contains only whitespace.") + else: + result_dict = json.loads(content) or result_dict + print(f"Updating with {key} data: {data}") + if result_dict: + try: + # handle internal named entity if present + results = result_dict[next(iter(result_dict))] + except Exception: + results = result_dict + current = results.get(key) + if not current: + results[key] = data + else: + if isinstance(current, list): + current.extend(data) + elif isinstance(current, dict): + current.update(data) + else: + result_dict["results"] = {key: data} + with open(self.json_file, "w") as json_file: + json.dump(result_dict, json_file) + + def store_metrics(self, metrics: List[Metric]) -> None: + """ + Writes collected metrics to the file. + """ + data = [ + { + "MetricName": metric.name, + "MeasuredValue": metric.value, + "Units": metric.units, + "Timestamp": datetime.now(timezone.utc).isoformat(), + "AdditionalData": metric.additional_data, + } + for metric in metrics + ] + self.update(data=data, key="metrics") + + def store_parameters(self, parameters: Dict[str, Any]) -> None: + """ + Writes specified model and configuration parameters to the file. + """ + self.update(data=parameters, key="parameters") + + def update(self, **kwargs: Any) -> None: + """ + Write specified data to the output file. + """ + self.read_modify_write_file(**kwargs) + + +class Throughput: + def __init__(self, batch_size, world_size, grad_accum_usteps, moving_avg_window_size=10, logging_interval=1): + """ + Used to calculate the throughput over a moving window. It records the step time + between two calls and uses that time to calculate the throughput. + """ + self.seqs_per_iteration = batch_size * world_size * grad_accum_usteps * logging_interval + self.moving_avg_window_size = math.ceil(moving_avg_window_size / logging_interval) + self.moving_avg_window = queue.Queue() + self.window_time = 0 + self.start_time = time.time() + + def get_throughput(self): + step_time = time.time() - self.start_time + self.start_time += step_time + self.window_time += step_time + self.moving_avg_window.put(step_time) + window_size = self.moving_avg_window.qsize() + if window_size > self.moving_avg_window_size: + self.window_time -= self.moving_avg_window.get() + window_size -= 1 + throughput = window_size * self.seqs_per_iteration / self.window_time + return throughput + + +def get_mixed_precision_config(use_gpu_compatible_precision): + return { + "use_master_weights": bool(use_gpu_compatible_precision), + "use_fp32_grad_acc": bool(use_gpu_compatible_precision), + "use_master_weights_in_ckpt": False, + } + diff --git a/website/docs/gen-ai/training/Neuron/BERT-Large.md b/website/docs/gen-ai/training/Neuron/BERT-Large.md index 315e4b9d3..35b973c02 100644 --- a/website/docs/gen-ai/training/Neuron/BERT-Large.md +++ b/website/docs/gen-ai/training/Neuron/BERT-Large.md @@ -1,6 +1,6 @@ --- title: BERT-Large on Trainium -sidebar_position: 3 +sidebar_position: 4 --- :::info diff --git a/website/docs/gen-ai/training/Neuron/Llama2.md b/website/docs/gen-ai/training/Neuron/Llama2.md index 11793bd0e..6510fd18f 100644 --- a/website/docs/gen-ai/training/Neuron/Llama2.md +++ b/website/docs/gen-ai/training/Neuron/Llama2.md @@ -1,6 +1,6 @@ --- title: Llama-2 with Nemo-Megatron on Trn1 -sidebar_position: 2 +sidebar_position: 3 description: Training a Llama-2 Model using Trainium, Neuronx-Nemo-Megatron and MPI operator --- import CollapsibleContent from '../../../../src/components/CollapsibleContent'; diff --git a/website/docs/gen-ai/training/Neuron/RayPTLNeuron-Llama3.1.md b/website/docs/gen-ai/training/Neuron/RayPTLNeuron-Llama3.1.md new file mode 100644 index 000000000..903fa8051 --- /dev/null +++ b/website/docs/gen-ai/training/Neuron/RayPTLNeuron-Llama3.1.md @@ -0,0 +1,362 @@ +--- +sidebar_position: 1 +sidebar_label: Llama-3 with Ray PTL on Trn1 +--- +import CollapsibleContent from '../../../../src/components/CollapsibleContent'; + +:::warning +Deployment of ML models on EKS requires access to GPUs or Neuron instances. If your deployment isn't working, it’s often due to missing access to these resources. Also, some deployment patterns rely on Karpenter autoscaling and static node groups; if nodes aren't initializing, check the logs for Karpenter or Node groups to resolve the issue. +::: + +:::danger + +Note: Use of this Llama-3.1 model is governed by the Meta license. +In order to download the model weights and tokenizer, please visit the [website](https://ai.meta.com/) and accept the license before requesting access. + +::: + +:::info + +We are actively enhancing this blueprint to incorporate improvements in observability, logging, and scalability aspects. + +::: + +## Pre-Training LLama3.1 on AWS Trainium using Ray and PyTorch Lightning + +## Overview + +This tutorial shows how to launch a distributed PyTorch Lightning neuronx-distributed training job on a Ray cluster with multiple Trn1 nodes within an Amazon Elastic Kubernetes Service (EKS) cluster. In this example, the [Llama3.1 8B](https://huggingface.co/NousResearch/Meta-Llama-3.1-8B) model will undergo distributed pre-training with Tensor parallelism, Data parallelism and Sequence parallelism using the ```sample-10BT``` FineWeb opensource dataset available on Hugging Face: [HuggingFaceFW/fineweb](https://huggingface.co/datasets/HuggingFaceFW/fineweb). + +In this tutorial, Ray will be used to launch the pre-training job on 16 trn1.32xlarge (or trn1n.32xlarge) instances, with 32 workers per instance. + +### What are Ray, PTL and Neuron? + +[PyTorch Lightning](https://lightning.ai/docs/pytorch/stable/starter/introduction.html) (PTL) developed by Lightning AI organization, is a library that provides a high-level interface for PyTorch, and helps you organize your code and reduce boilerplate. By abstracting away engineering code, it makes deep learning experiments easier to reproduce and improves developer productivity. + +[Ray](https://docs.ray.io/en/latest/ray-core/examples/overview.html) enhances ML workflows by seamlessly scaling fine-tuning and inference across distributed clusters, transforming single-node code into high-performance, multi-node operations with minimal effort. + +[AWS Neuron](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/) is an SDK with a compiler, runtime, and profiling tools that unlocks high-performance and cost-effective deep learning (DL) acceleration. It supports high-performance training on AWS Trainium instances. For model deployment, it supports high-performance and low-latency inference on AWS Inferentia. + +### Combining Ray + PTL + Neuron: +The integration of Ray, PyTorch Lightning (PTL), and AWS Neuron combines PTL's intuitive model development API, Ray Train's robust distributed computing capabilities for seamless scaling across multiple nodes, and AWS Neuron's hardware optimization for Trainium, significantly simplifying the setup and management of distributed training environments for large-scale AI projects, particularly those involving computationally intensive tasks like large language models. + +# Architecture for Pre-training Llama-3.1 on Trn1 using Ray, PTL and EKS + +![Architecture Diagram](../img/rayptlneuron-architecture.png) + +The tutorial covers all steps required to prepare the EKS environment and launch the training job: + + 1. [Sandbox setup](#prepjumphost) + 2. [Cluster and Tools](#clusterandtools) + 3. [Setup Ray cluster](#createcluster) + 4. [Training Job preparation & Launch](#trainingjobprep) + 5. [Monitoring Training ](#monitortraining) + 6. [Deleting the environment](#cleanup) + 7. [Contributors](#contributors) + + +## 1. Sandbox Setup + +This tutorial assumes that you will use an x86-based Linux jump host to launch and manage the EKS cluster, Ray and PyTorch Lightning training jobs. + +### 1.1 Launch a Linux jump host + +Begin by choosing an AWS region that supports both EKS and Trainium (ex: us-east-2). + +In your chosen region, use the AWS Console or AWS CLI to launch an instance with the following configuration: + +* **Instance Type:** t3.large +* **AMI:** Amazon Linux 2 AMI (HVM) +* **Key pair name:** (choose a key pair that you have access to) +* **Auto-assign public IP:** Enabled +* **Storage:** 100 GiB root volume + +### 1.2 Configure AWS credentials on the jump host + +#### Log into your jump host instance using one of the following techniques: + +* Connect to your instance via the AWS Console using [EC2 Instance Connect](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Connect-using-EC2-Instance-Connect.html) +* SSH to your instance's public IP using the key pair you specified above. + * Ex: `ssh -i KEYPAIR.pem ec2-user@INSTANCE_PUBLIC_IP_ADDRESS` + +#### Configure the AWS CLI with your IAM user's credentials: + +Run `aws configure`, entering the ACCESS_KEY_ID and SECRET_ACCESS_KEY you recorded above. For _Default region name_ be sure to specify the same region used to launch your jump host, ex: `us-east-2`. + +```bash +bash> aws configure +AWS Access Key ID [None]: ACCESS_KEY_ID +AWS Secret Access Key [None]: SECRET_ACCESS_KEY +Default region name [None]: us-east-2 +Default output format [None]: json +``` + +## 2. Cluster and Tools + +Before we begin, ensure you have all the prerequisites in place to make the deployment process smooth and hassle-free. Ensure that you have installed the following tools on your jump host. + +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) +* [kubectl](https://kubernetes.io/docs/tasks/tools/) +* [terraform](https://learn.hashicorp.com/tutorials/terraform/install-cli) + +To install all the pre-reqs above on jump host, you can run this [script](https://github.com/awslabs/data-on-eks/blob/main/ai-ml/trainium-inferentia/examples/llama2/install-pre-requsites-for-ec2.sh) which is compatible with Amazon Linux 2023. + +### 2.1 Clone the Data on EKS repository +``` +cd ~ +git clone https://github.com/awslabs/data-on-eks.git +``` + +### 2.2 Navigate to the trainium-inferentia directory. +``` +cd data-on-eks/ai-ml/trainium-inferentia +``` + +Let's run the below export commands to set environment variables. + +``` +# Enable FSx for Lustre, which will mount pre-training data to all pods across multiple nodes +export TF_VAR_enable_fsx_for_lustre=true + +# Set the region according to your requirements. Check Trn1 instance availability in the specified region. +export TF_VAR_region=us-east-2 + +# Enable Volcano custom scheduler with KubeRay Operator +export TF_VAR_enable_volcano=true + +# Note: This configuration will create two new Trn1 32xl instances. Ensure you validate the associated costs before proceeding. You can change the number of instances here. +export TF_VAR_trn1_32xl_min_size=16 +export TF_VAR_trn1_32xl_desired_size=16 +``` + +Run the installation script to provision an EKS cluster with all the add-ons needed for the solution. + +``` +./install.sh +``` + +### 2.3 Verify the resources +Verify the Amazon EKS Cluster +``` +aws eks --region us-east-2 describe-cluster --name trainium-inferentia +``` + +``` +# Creates k8s config file to authenticate with EKS +aws eks --region us-east-2 update-kubeconfig --name trainium-inferentia + +kubectl get nodes # Output shows the EKS Managed Node group nodes +``` + +## 3. Setup Ray Cluster + +### 3.1 Navigate to the directory +``` +cd data-on-eks/gen-ai/training/ray-ptl-llama3.1-pretrain-trn1 +``` +### 3.2 Build and push the docker image to ECR +``` +sh build_docker.sh +``` +Enter AWS REGION In this example, us-east-2 as below + +`Enter the ECR region (ex: us-east-2): us-east-2` + +Ensure that the docker image is successfully built and pushed to ECR. + +### 3.3 Create the Ray cluster + +Once the docker image is pushed to ECR, edit the file ```1-pretrain-trn1-raycluster.yaml``` and replace the ```` with the value of AWS Account ID you are using. +Ensure that the docker image url is accurate. + +Create the Ray cluster using the commands below: + +``` +kubectl apply -f 1-pretrain-trn1-raycluster.yaml +kubectl get pods # Ensure all head and worker pods are in Running state +``` + +## 4. Training Job preparation + +### 4.1 Download pretraining dataset to the Lustre-hosted persistent volume + +Run the Ray job to download the fineweb dataset containing 10B tokens. Downloading this dataset can take around 6 hours. + +``` +kubectl apply -f 2-download_fineweb_dataset.yaml +``` +Monitor the status of the download dataset job using the following command and check if it's in "Completed" status before proceeding further. + +``` +kubectl get pods +``` +You can also monitor this job using Ray Dashboard. Refer to [Monitoring Training ](#monitortraining) section for details on setting up Ray Dashboard. + +![Download Fineweb Dataset](../img/rayptlneuron-downloaddataset.png) + +### 4.2 Precompile the Llama-3.1 8B graphs using neuron_parallel_compile + +PyTorch Neuronx comes with a tool called [neuron_parallel_compile](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/frameworks/torch/torch-neuronx/api-reference-guide/training/pytorch-neuron-parallel-compile.html) which reduces graph compilation time by extracting model graphs and then compiling the graphs in parallel. The compiled graphs are stored on the shared storage volume where they can be accessed by the worker nodes during model training. + +To precompile the Llama-3.1 8B graphs, run the following command: + +``` +kubectl apply -f 3-parallel-compile-trn1-rayjob.yaml +``` + +*Note:* Please check `entrypoint: "NEURON_NUM_DEVICES=32 bash run_llama3.1_8b.sh -r 2 -n 16 -l 4e-4 -s 8192 -p 1"` in `3-parallel-compile-trn1-rayjob.yaml` which got multiple args for the script `run_llama3.1_8b.sh` as below. There are default values assigned for all the args within the script. You may update the args and values **only** if needed. + +``` +Syntax: ./run_llama3.1_8b.sh [-t|w|l|m|d|c|s|n] +options: +t total number of training steps +w warmup steps for training +l learning rate +m abs path to llama config.json +d abs path to tokenized dataset +c abs path to checkpoint directory +s Sequence length +n Number of instances to run training +b tensor board logs location +r defining steps this run +o max tokens +g global batch size +z mini batch size +p neuron parallel compile 0 or 1 +``` + +Run `kubectl get pods` and check to ensure that you see `llama3.1-parallel-compile-job` pod that is "Running". If the status shows as "ContainerCreating", please wait a few seconds till the status changes to "Running". + +Next, run the following command to monitor the output of the precompilation job: + +``` +kubectl logs -f +``` + +The precompilation job will run for ~15 minutes. Once complete, you will see the following in the output: + +``` +[36m(RayTrainWorker pid=xxxx, ip=aa.bb.cc.dd)[0m YYYY-MM-DD HH:MM:SS.000XXX: YYYYY INFO ||NEURON_PARALLEL_COMPILE||: Total graphs: X +[36m(RayTrainWorker pid=xxxx, ip=aa.bb.cc.dd)[0m YYYY-MM-DD HH:MM:SS.000XXX: YYYYY INFO ||NEURON_PARALLEL_COMPILE||: Total successful compilations: X +[36m(RayTrainWorker pid=xxxx, ip=aa.bb.cc.dd)[0m YYYY-MM-DD HH:MM:SS.000XXX: YYYYY INFO ||NEURON_PARALLEL_COMPILE||: Total failed compilations: 0 +``` + +### 4.3 Launch Llama-3.1 8B pre-training job using 16 trn1.32xlarge instances + +To run the pre-training job for Llama-3.1 8B, run the following command: + +``` +kubectl apply -f 4-train-trn1-rayjob.yaml +``` + +*Note:* Please check `entrypoint: "NEURON_NUM_DEVICES=32 bash bash run_llama3.1_8b.sh -w 500 -n 16 -l 4e-4 -s 8192"` in `4-train-trn1-rayjob.yaml ` which got multiple args for the script `run_llama3.1_8b.sh` as below. There are default values assigned for all the args within the script. You may update the args and values **only** if needed. + +``` +Syntax: ./run_llama3.1_8b.sh [-t|w|l|m|d|c|s|n] +options: +t total number of training steps +w warmup steps for training +l learning rate +m abs path to llama config.json +d abs path to tokenized dataset +c abs path to checkpoint directory +s Sequence length +n Number of instances to run training +b tensor board logs location +r defining steps this run +o max tokens +g global batch size +z mini batch size +p neuron parallel compile 0 or 1 +``` + +Run `kubectl get pods` and check to ensure that you see `llama3.1-training-job` pod that is "Running". If the status shows as "ContainerCreating", wait a few seconds and re-run this command until status changes to "Running". + +To continously view the training script output (similar to the `tail -f` command in Linux), you can use the following command. The command can be terminated using CTRL-C. + +``` +kubectl logs -f YOUR_POD_NAME +``` + +## 5. Monitor Training + +### Monitoring using Ray Dashboard -- Optional +Run the following on your localhost to tunnel the ray dashboard from the EKS cluster to your localhost. As a pre-requisite, ensure kubectl is also setup with same AWS credentials in your localhost as the jumphost. Ensure `kubectl get pods` works well from your localhost. + +#### Accessing the Ray Dashboard +The Ray dashboard provides valuable insights into your cluster's status and job progress. To access it: + +#### Port Forwarding: + +This forwards the Ray dashboard port (8265) from your local machine to the head pod within the cluster. +``` +kubectl port-forward service/kuberay-trn1-head-svc 8265:8265 +``` +Open Browser and navigate to http://localhost:8265 in your web browser to view the dashboard. + +Once the dashboard has been opened successfully, click ```Jobs``` to view details of the pre-training job such such as Duration etc. You can check the logs of the job for detailed messages. +![Ray Job Dashboard](../img/rayptlneuron-dashboard.png) + +The job above shows that it took approximately 13 hours to finish the training job using 16 trn1.32xlarge nodes. + +### Monitor Neuron device utilization using neuron-top + +The Neuron SDK provides [Neuron tools](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/tools/index.html#neuron-tools) for monitoring Neuron devices on Inf2 and Trn1 instances. During a training job it is often useful to monitor Neuron device utilization using `neuron-top`, which provides a text-based view of device and memory utilization. + +To view `neuron-top` statistics for one of your nodes, begin by choosing one of your running Llama training pods: + +``` +kubectl get pods | grep Running | grep llama +``` + +Substitute the name of one of your running pods into the following command to launch a bash prompt within the running pod: + +``` +kubectl exec -it -- /bin/bash +``` + +At the bash prompt, run `neuron-top`: + +``` +neuron-top +``` + +It should look something like the below: + +![Neuron-top](../img/rayptlneuron-neurontop.png) + +When you are finished exploring `neuron-top`, press `q` to quit. At the pod's bash prompt, press `CTRL-D` to return to your jump host. + +## 6. Analysis of Results + +You can find the results below for the pre-training job, which was run using fineweb dataset with 10B tokens and 1 Epoch in us-east-2 region. +You can leverage Tensorboard for viewing the visualizations. + +#### Results from 16 node run: +Training duration: 13 hours 36 mins and 10secs +Throughput: ~24.75 sequences / second (1 sequence = 1024 tokens) + +![16-node-throughput](../img/rayptlneuron-16nodethroughput.png) + +#### Results from 8 node run: +Training duration: 26 hours 46 mins and 48 secs +Throughput: ~12.56 sequences / second (1 sequence = 1024 tokens) + +![8-node-throughput](../img/rayptlneuron-8nodethroughput.png) + +## 7. Clean-up + +When you are finished with the tutorial, run the following commands on the jump host to remove the EKS cluster and associated resources: + +``` +#Delete all the pods and cluster +kubectl delete -f 4-train-trn1-rayjob.yaml +kubectl delete -f 3-parallel-compile-trn1-rayjob.yaml +kubectl delete -f 1-pretrain-trn1-raycluster.yaml + +# Cleanup the Infra +cd ~/data-on-eks/ai-ml/trainium-inferentia +./cleanup.sh + +Lastly, terminate your jump host instance via the AWS Console. +``` \ No newline at end of file diff --git a/website/docs/gen-ai/training/Neuron/RayTrain-Llama2.md b/website/docs/gen-ai/training/Neuron/RayTrain-Llama2.md index 16d318258..3e2a278a3 100644 --- a/website/docs/gen-ai/training/Neuron/RayTrain-Llama2.md +++ b/website/docs/gen-ai/training/Neuron/RayTrain-Llama2.md @@ -1,5 +1,5 @@ --- -sidebar_position: 1 +sidebar_position: 2 sidebar_label: Llama-2 with RayTrain on Trn1 --- import CollapsibleContent from '../../../../src/components/CollapsibleContent'; diff --git a/website/docs/gen-ai/training/img/rayptlneuron-16nodethroughput.png b/website/docs/gen-ai/training/img/rayptlneuron-16nodethroughput.png new file mode 100644 index 000000000..566c3664d Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-16nodethroughput.png differ diff --git a/website/docs/gen-ai/training/img/rayptlneuron-8nodethroughput.png b/website/docs/gen-ai/training/img/rayptlneuron-8nodethroughput.png new file mode 100644 index 000000000..36aa8a2f7 Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-8nodethroughput.png differ diff --git a/website/docs/gen-ai/training/img/rayptlneuron-architecture.png b/website/docs/gen-ai/training/img/rayptlneuron-architecture.png new file mode 100644 index 000000000..23692fc9c Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-architecture.png differ diff --git a/website/docs/gen-ai/training/img/rayptlneuron-dashboard.png b/website/docs/gen-ai/training/img/rayptlneuron-dashboard.png new file mode 100644 index 000000000..172086e0f Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-dashboard.png differ diff --git a/website/docs/gen-ai/training/img/rayptlneuron-downloaddataset.png b/website/docs/gen-ai/training/img/rayptlneuron-downloaddataset.png new file mode 100644 index 000000000..b6c0af503 Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-downloaddataset.png differ diff --git a/website/docs/gen-ai/training/img/rayptlneuron-neurontop.png b/website/docs/gen-ai/training/img/rayptlneuron-neurontop.png new file mode 100644 index 000000000..8c6de7493 Binary files /dev/null and b/website/docs/gen-ai/training/img/rayptlneuron-neurontop.png differ