diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..356a4ac8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build/ +pyproject/ +param/ +param.egg-info/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e3c651e3..8562f5ab 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,4 +1,4 @@ -# Contributing to PARAM_Bench +# Contributing to PARAM We want to make contributing to this project as easy and transparent as possible. diff --git a/README.md b/README.md index 6ffefb10..a8ba76c5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +>>> ./README.md # PARAM PARAM Benchmarks is a repository of communication and compute micro-benchmarks as well as full workloads for evaluating training and inference platforms. @@ -33,3 +34,607 @@ PARAM benchmarks is released under the MIT license. Please see the [`LICENSE`](L ## Contributing We actively welcome your pull requests! Please see [`CONTRIBUTING.md`](CONTRIBUTING.md) and [`CODE_OF_CONDUCT.md`](CODE_OF_CONDUCT.md) for more info. + +>>> ./comm/README.md +# PARAM benchmark - Communication benchmarks + +PARAM-Comms is an effort to develop a unified benchmarking framework to +characterize training platform backends. Currently, the benchmark supports +Pytorch Distributed and PyTorch-XLA backends. + +The PARAM-Comms benchmark offers a single point solution to perform both top-down +(DLRM application) and bottoms-up (collectives) operations for any given +communication backend. + +The Collective-Comms benchmark (`comms.py`) is designed similar to nccl-tests +for evaluating collective operations, such as All-reduce and All-to-all, through PyTorch backends. +The DLRM-Comms benchmark (`dlrm.py`) is similar to the open-source DLRM benchmark except it +only implements communication primitives. +The Trace Replay benchmark (`commsTraceReplay.py`) is designed to replay the communication patterns captured +from any distributed PyTorch workloads. + +## Usage: + +### Collective-Comms benchmark (`comms.py`) +```bash +mpirun -np -N --hostfile ./comms.py \ + --master-ip 127.0.0.1 + --b \ + --e \ + --n \ + --f \ + --z \ + --collective +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./comms.py --master-ip $(head -n 1 ./hfile.txt) --b 8 --e 256M --n 100 \ + --f 2 --z 1 --collective all_to_all --backend nccl --device cuda --log INFO +``` + +### DLRM-Comms benchmark (`dlrm.py`) +```bash +mpirun -np -N --hostfile ./dlrm.py \ + --master-ip + --arch-sparse-feature-size \ + --arch-embedding-size \ + --arch-mlp-bot \ + --arch-mlp-top \ + --mini-batch-size \ + --num-batches +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./dlrm.py --master-ip $(head -n 1 ./hfile.txt) --mini-batch-size 32 \ + --num-batches 100 \ + --arch-mlp-bot 1024-256 \ + --arch-sparse-feature-size 64 \ + --arch-embedding-size "10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000" +``` + +### Trace Replay benchmark (`commsTraceReplay.py`) +```bash +mpirun -np -N --hostfile ./commsTraceReplay.py \ + --master-ip 127.0.0.1 --trace-path /path/to/traces --dry-run +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./commsTraceReplay.py --master-ip $(head -n 1 ./hfile.txt) \ + --backend nccl --device cuda \ + --trace-path /path/to/commTraces +``` +Note that there should be one trace file (in JSON format) per rank. + +>>> ./README.md +# PARAM + +PARAM Benchmarks is a repository of communication and compute micro-benchmarks as well as full workloads for evaluating training and inference platforms. + +PARAM complements two broad categories of commonly used benchmarks: +1. C++ based stand-alone compute and communication benchmarks using cuDNN, MKL, NCCL, MPI libraries - e.g., NCCL tests (https://github.com/NVIDIA/nccl-tests), OSU MPI benchmarks (https://mvapich.cse.ohio-state.edu/benchmarks/), and DeepBench (https://github.com/baidu-research/DeepBench). +2. Application benchmarks such as Deep Learning Recommendation Model (DLRM) and the broader MLPerf benchmarks. Its worth noting that while MLPerf is the de-facto industry standard for benchmarking ML applications we hope to compliment this effort with broader workloads that are of more interest to Facebook with more in-depth analysis of each within this branch of Application benchmarks. + +Our initial release of PARAM benchmarks focuses on AI training and comprises of: +1. Communication: PyTorch based collective benchmarks across arbitrary message sizes, effectiveness of compute-communication overlap, and DLRM communication patterns in fwd/bwd pass +2. Compute: PyTorch based GEMM, embedding lookup, and linear layer +3. DLRM: tracks the `ext_dist` branch of DRLM benchmark use Facebook's DLRM benchmark (https://github.com/facebookresearch/dlrm). In short, PARAM fully relies on DLRM benchmark for end-to-end workload evaluation; with additional extensions as required for scale-out AI training platforms. +4. PyTorch Execution Trace (ET) replay based tests: The PyTorch ET capturing capabilities, which have recently been introduced, allow for the recording of runtime information of a model at the operator level. This capability enables the creation of replay-based benchmarks (https://dl.acm.org/doi/abs/10.1145/3579371.3589072) to accurately reproduce the original performance. + + +In essence, PARAM bridges the gap between stand-alone C++ benchmarks and PyTorch/Tensorflow based application benchmarks. This enables us to gain deep insights into the inner workings of the system architecture as well as identify framework-level overheads by stressing all subcomponents of a system. + +## Version + +0.1 : Initial release + +## Requirements + +- pytorch +- future +- numpy +- apex + +## License + +PARAM benchmarks is released under the MIT license. Please see the [`LICENSE`](LICENSE) file for more information. + +## Contributing + +We actively welcome your pull requests! Please see [`CONTRIBUTING.md`](CONTRIBUTING.md) and [`CODE_OF_CONDUCT.md`](CODE_OF_CONDUCT.md) for more info. + +>>> ./comm/README.md +# PARAM benchmark - Communication benchmarks + +PARAM-Comms is an effort to develop a unified benchmarking framework to +characterize training platform backends. Currently, the benchmark supports +Pytorch Distributed and PyTorch-XLA backends. + +The PARAM-Comms benchmark offers a single point solution to perform both top-down +(DLRM application) and bottoms-up (collectives) operations for any given +communication backend. + +The Collective-Comms benchmark (`comms.py`) is designed similar to nccl-tests +for evaluating collective operations, such as All-reduce and All-to-all, through PyTorch backends. +The DLRM-Comms benchmark (`dlrm.py`) is similar to the open-source DLRM benchmark except it +only implements communication primitives. +The Trace Replay benchmark (`commsTraceReplay.py`) is designed to replay the communication patterns captured +from any distributed PyTorch workloads. + +## Usage: + +### Collective-Comms benchmark (`comms.py`) +```bash +mpirun -np -N --hostfile ./comms.py \ + --master-ip 127.0.0.1 + --b \ + --e \ + --n \ + --f \ + --z \ + --collective +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./comms.py --master-ip $(head -n 1 ./hfile.txt) --b 8 --e 256M --n 100 \ + --f 2 --z 1 --collective all_to_all --backend nccl --device cuda --log INFO +``` + +### DLRM-Comms benchmark (`dlrm.py`) +```bash +mpirun -np -N --hostfile ./dlrm.py \ + --master-ip + --arch-sparse-feature-size \ + --arch-embedding-size \ + --arch-mlp-bot \ + --arch-mlp-top \ + --mini-batch-size \ + --num-batches +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./dlrm.py --master-ip $(head -n 1 ./hfile.txt) --mini-batch-size 32 \ + --num-batches 100 \ + --arch-mlp-bot 1024-256 \ + --arch-sparse-feature-size 64 \ + --arch-embedding-size "10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000" +``` + +### Trace Replay benchmark (`commsTraceReplay.py`) +```bash +mpirun -np -N --hostfile ./commsTraceReplay.py \ + --master-ip 127.0.0.1 --trace-path /path/to/traces --dry-run +``` +Example: +```bash +mpirun -np 16 -N 8 --hostfile ./hfile ./commsTraceReplay.py --master-ip $(head -n 1 ./hfile.txt) \ + --backend nccl --device cuda \ + --trace-path /path/to/commTraces +``` +Note that there should be one trace file (in JSON format) per rank. + +>>> ./tools/README.md +# Execution Trace Replay (et_replay) +`et_replay` is a tool designed for replaying Chakra Execution Traces (ET) from machine learning models. + +## Installation +To install `param`, use the following commands: + +```bash +$ git clone --recurse-submodules git@github.com:facebookresearch/param.git +$ conda create -n param python=3.8.0 +$ conda activate param +$ cd param +$ pip3 install -r requirements.txt +$ cd train/comms/pt/ +$ pip3 install . +$ cd - +$ cd train/compute/python/ +$ pip3 install -r requirements.txt +$ pip3 install . +$ cd - +``` + +## Running et_replay +To use et_replay, execution traces are required. +Start by collecting an execution trace using the command below. This command runs a benchmark with specific configurations and enables execution tracing. +```bash +$ python -m param.train.compute.python.pytorch.run_benchmark -c train/compute/python/examples/pytorch/configs/simple_add.json --et +``` + +After collecting the trace, replay it with the following command. Set the warm-up iteration count to at least 1 to exclude tensor transfer time to GPUs. +```bash +$ python -m param.train.compute.python.tools.et_replay --input --warmup-iter 10 --iter 50 --compute --profile-replay +``` + +> Note: When analyzing performance values from et_replay, refer to the collected Kineto traces rather than the execution time reported by et_replay. Kineto traces are only collected when --profile-replay is provided. + +>>> comp/README.md +# PARAM Training Compute Benchmark + +## Overview +The training compute benchmarks have gone under development to support new use cases. The previous [standalone](../pt) benchmarks remain unchanged. + +The general motivation and design philosophy for the new microbenchmarks are: +* Support a wide variety of PyTorch operators and workloads. +* Decoupled workload configuration from Python code. It allows building tools in a pipeline to collect operator configuration from production runs, generate microbenchmark inputs, gather metrics. +* Generates human readable and easy to parse output data format for downstream tools. +* A library interface allowing external tools to run and collect data results from the microbenchmarks. +* Support replay of workload through PyTorch execution trace. + +For design and implementation details or make a contribution to the project, please look at the [development documentation](development.md). + +## Installation +We use `setuptools` to install/uninstall the `parambench-train-compute` package: + +```shell +# Inside dir "param/train/compute/pytnon" + +# Install required dependencies +> pip install -r requirements.txt + +# Install PARAM Compute package +> pip install . + +# Uninstall package +> python -m pip uninstall parambench-train-compute +``` + +The installed packages are under **`param.train.compute.python`**. + +To use the [`FBGEMM_GPU`](https://github.com/pytorch/FBGEMM/tree/main/fbgemm_gpu) library and its operator benchmark workload ([`split_table_batched_embeddings_ops.py`](workloads/pytorch/split_table_batched_embeddings_ops.py)), please follow its set up instruction to download and install. It's not required for the compute benchmarks. During initialization, if an operator fail to import, it'll be ignored and will not affect other benchmarks. + +Please make sure to install the `parambench-train-comms` package (`train/comms/pt`). This is important because some functions in this package reference those in the comms package. + +## Usage +The bundled tool scripts such as [`run_benchmark.py`](pytorch/run_benchmark.py) are written using relative import paths as part of the `parambench-train-compute` package, so they must be ran as a module using the `python -m` option. + +A reliable way to run the benchmarks is install `parambench-train-compute` as a package following the above instructions. Afterward, it can be ran as: +```shell +# Run benchmark tool script module +> python -m param.train.compute.python.pytorch.run_benchmark -c examples/pytorch/configs/simple_add.json +``` + +Without installing the package, you can run a tool script as a module in the source directory: +```shell +# Inside dir "param/train/compute" +> python -m python.pytorch.run_benchmark -c python/examples/pytorch/configs/simple_add.json +``` +However, this method may conflict with other packages (such as `fbgemm_gpu.split_table_batched_embeddings_ops`) that have its own modules under a `python` package. + +Additional example configs can be found in [`examples/pytorch/configs/`](examples/pytorch/configs/). + +### Benchmark Library +As a library, it can be used as any regular Python package: +```python +from param.train.compute.python.lib.config import BenchmarkConfig +``` +A complete example to generate benchmark config, run the benchmark, then get the results can be found in [`run_op.py`](examples/pytorch/run_op.py) + +## PyTorch Benchmark Options +``` +=> python -m param.train.compute.python.pytorch.run_benchmark -h +usage: run_benchmark.py [-h] [-c CONFIG] [-w WARMUP] [-i ITERATION] [-b] [-d DEVICE] [-o OUTPUT_PREFIX] [-r RESUME_ID] [-s STOP_ID] [-a] [--cuda-l2-cache [{on,off}]] [--ncu] [--ncu-bin NCU_BIN] [--ncu-args-file NCU_ARGS_FILE] [--ncu-warmup NCU_WARMUP] + [--ncu-iteration NCU_ITERATION] [--nsys] [--nsys-bin NSYS_BIN] [--nsys-args-file NSYS_ARGS_FILE] [--nsys-warmup NSYS_WARMUP] [--nsys-iteration NSYS_ITERATION] [--run-batch-size RUN_BATCH_SIZE] [--batch-cuda-device BATCH_CUDA_DEVICE] + [--batch-cmd BATCH_CMD] [--exec-mode [{discrete,continuous,continuous_events}]] [-p] [-l LOG_LEVEL] [--version] + +PyTorch Microbenchmarks + +optional arguments: + -h, --help show this help message and exit + -c CONFIG, --config CONFIG + The benchmark config file. + -w WARMUP, --warmup WARMUP + Number of warm up iterations. + -i ITERATION, --iteration ITERATION + Number of benchmark iterations. + -b, --backward Include backward pass. + -d DEVICE, --device DEVICE + Target device for benchmark. + -o OUTPUT_PREFIX, --output-prefix OUTPUT_PREFIX + File name prefix to write benchmark results. + -r RESUME_ID, --resume-id RESUME_ID + Define a resume op_run_id to continue benchmark, skip all previous configs. + -s STOP_ID, --stop_id STOP_ID + Define a stop op_run_id (exclusive) to stop benchmark, skip remaining configs. + -a, --append Append to output file, rather than overwrite. + --cuda-l2-cache [{on,off}] + Set option for CUDA GPU L2 cache between iterations in discrete mode. + --ncu Run NSight Compute to collect metrics. + --ncu-bin NCU_BIN Path to the NSight Compute (ncu) binary. + --ncu-args-file NCU_ARGS_FILE + NSight Compute extra command line options (metrics etc.). + --ncu-warmup NCU_WARMUP + NSight Systems number of warmup runs. + --ncu-iteration NCU_ITERATION + NSight Systems number of measured iteration runs. + --nsys Run NSight Systems to collect metrics. + --nsys-bin NSYS_BIN Path to the NSight Systems (nsys) binary. + --nsys-args-file NSYS_ARGS_FILE + NSight Systems extra command line options (metrics etc.). + --nsys-warmup NSYS_WARMUP + NSight Systems number of warmup runs. + --nsys-iteration NSYS_ITERATION + NSight Systems number of measured iteration runs. + --run-batch-size RUN_BATCH_SIZE + Batch run input size (number of input configs to run in one launch), used by both NCU and NSYS. + --batch-cuda-device BATCH_CUDA_DEVICE + CUDA GPU device ID to run batch job. + --batch-cmd BATCH_CMD + Run batch job command. + --exec-mode [{discrete,continuous,continuous_events}] + Set execution mode of the operators (discrete, continuous, continuous_events). Default=discrete + -p, --profile Enable profiler and tracing. + -l LOG_LEVEL, --log-level LOG_LEVEL + Log output verbosity. + --version Print version. +``` + +## Benchmark Configuration File +Benchmark configurations are defined in a JSON format. It can be stored in a file on disk, or being passed between external callers and the benchmark’s library interface. There are two types of configurations: +* Build configuration (optional) + * Defines arguments used to construct and initialize the operator. + * It’s optional for operators that do not require initialization. +* Input configuration + * Defines arguments used to execute the operator. + +An operator may or may not need to have a build configuration. For example, `torch.matmul` can be called directly with input arguments, so there's no need to specify a build configuration. Other operators require creating the operator first before running it: + +```python +embedding = torch.nn.EmbeddingBag( + num_embeddings=embeddingbags, + embedding_dim=dim, + mode=mode, + include_last_offset=include_last_offset, + sparse=sparse).to(device=device) +embedding(input, offset) +``` +The library expects the benchmark configuration in the following JSON format (the notes in `<>` are comments): +```json +{ + "operator_name": { + "build_iterator": "(optional) build iterator name", + "input_iterator": "(optional) input iterator name", + "build_data_generator": "(optional) data generator name", + "input_data_generator": "(required) data generator name", + "config": [ + + { + "build": [ + <(optional) a list of build spec> + ] + "input": [ + <(required) a list of input spec> + ] + } + ] + } + +} +``` +The **`"operator_name"`** is a key mapped to a concrete workload implementation defined inside [`workloads`](workloads) directory, for example [`workloads/pytorch/native_basic_ops.py`](workloads/pytorch/native_basic_ops.py). + +## Default PyTorch Config Specification +For each **`"build"`** and **`"input"`** configuration, the __`"args"`__ and __`"kwargs"`__ JSON keys should be specified with a list of argument data specifications (see [PyTorch Data Types](#pyTorch-data-types)). This is synonymous to Python's __`*args`__ and __`**kwargs`__. As expected, **`"args"`** is positional and defined as a list. __`"kwargs"`__ is defined as a dictionary of `"kwarg_name": `. + +**Example** +```json +{ + "torch.baddbmm": { + "input_data_generator": "PyTorch:DefaultDataGenerator", + "config": [ + { + "input": [ + { + "args": [ + { + "dtype": "float", + "shape": [2, 1, 512], + "type": "tensor" + }, + { + "dtype": "float", + "shape": [2, 512, 512], + "type": "tensor" + }, + { + "dtype": "float", + "shape": [2, 512, 512], + "type": "tensor" + } + ], + "kwargs": { + "beta": { + "type": "int", + "value": 1 + }, + "alpha": { + "type": "int", + "value": 1 + } + } + } + ] + } + ] + } +} +``` + +### PyTorch Data Types Specification +Current supported data types and examples are listed here: +```json +{ + "type": "int", + "value": 237 +}, +{ + "type": "int", + "value_range": [100, 1000] +}, +{ + "type": "long", + "value": 8328 +}, +{ + "type": "long", + "value_range": [1000, 10000] +}, +{ + "type": "float", + "value": 1.2 +}, +{ + "type": "float", + "value_range": [0.0, 2.0] +}, +{ + "type": "double", + "value": 3.4 +}, +{ + "type": "double", + "value_range": [0.5, 5.5] +}, +{ + "type": "bool", + "value": false +}, +{ + "type": "device", + "value": "cpu" +}, +{ + "type": "str", + "value": "a string value" +}, +{ + "type": "genericlist", + "value": [ + { + "type": "int", + "value": 237, + }, + { + "type": "tensor", + "dtype": "float", + "shape": [16, 32], + }] +}, +{ + "type": "tuple", + "value": [ + { + "type": "tensor", + "dtype": "float", + "shape": [16, 32], + }, + { + "type": "tensor", + "dtype": "float", + "shape": [16, 32], + }] +}, +{ + "type": "tensor", + "dtype": "float", + "shape": [128, 256] +} +``` +**Notes** +* `"value_range"` specifies a random value in the `[min, max]` range to be generated for the argument. + +To help construct benchmark configs, some utility functions are available in [examples/pytorch/run_op.py](examples/pytorch/run_op.py). + +### Configuration Customization +Users are able to implement custom specs for **`"build"`** and **`"input"`** to support a wide variety of operators. Should there's a need, the benchmark specification allows implementing new [`ConfigIterator`](lib/config.py) and [`DataGenerator`](lib/data.py) for your specific use case. + +## Development Contributions +For more details, please take a look at the [development document](development.md). + +>>> docs/using_ET.md +# Using Execution Trace in PARAM Benchmark + +This section includes how to collect Chakra Execution Trace from a PyTorch training workload, as well as how to run PARAM replay on top of the collected ET. + + +## Execution Trace Collection +Execution Trace collection logic has to be added in the main training loop. This includes three steps: + +### Step 1: Set up Execution Trace Observer +The first step is to create a Execution Trace Observer object and register a. temporary file for ET store. + +``` +from torch.profiler import ExecutionTraceObserver + +et_ob = ExecutionTraceObserver() +fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False) +fp.close() +et_ob.register_callback(fp.name) +``` + +### Step 2: Define your function to dump Execution Trace +You have to define a function to store/dump/upload your collected ET trace for further use. Here is an example: + +``` +def dump_execution_trace(tmp_et_path): + et_dir.mkdir(exist_ok=True, parents=True) + et_path = DUMP_DIR / f"rank-{global_rank}.et.json.gz" + with open(tmp_et_path) as fin: + with gzip.open(et_path, "wt") as fout: + fout.writelines(fin) + os.remove(tmp_et_path) + print(f"Finished Rank {global_rank} ET collection at {et_path}") +``` + +### Step 3: Collect Execution Trace in the training loop +This is the key step to collect ET. You have to insert the collection logic into the main training loop of your workload. +TWO parameters have to be set: +- ET_START_ITER: the iteration to start ET collection +- ET_END_ITER: the iteration to stop ET collection + +``` + +while step < TRAINING_STEPS: + ... + ... + # Collect Execution Trace Logic + + # Start ET collection + if et_ob and step == ET_START_ITER: + et_ob.start() + + # First record process group(PG) mapping + pg_config_info = ( + torch.distributed.distributed_c10d._world.pg_config_info + ) + rf_handle = torch.autograd._record_function_with_args_enter( + "## process_group:init ##", json.dumps(pg_config_info) + ) + torch.autograd._record_function_with_args_exit(rf_handle) + + # Stop ET collection + elif et_ob and state.step == ET_END_ITER: + et_ob.stop() + tmp_et_path = et_ob.get_output_file_path() + et_ob.unregister_callback() + dump_execution_trace(tmp_et_path) + + ... + ... + step += 1 + +``` + +Note that process group information collection is not automatically covered by ET observer, because process_group initialization happens before the main training loop. Therefore, you have to manually add pg information collection, as the code shown above. + + + + +## PARAM Comms Replay on Execution Trace +Execution Trace now is fully supported in PARAM benchmark. In order to replay an ET trace, just need to specify `--trace-type=et` and the benchmark will parse your ET and replay the collective communication operators. + +An example command: + +``` +/bin/mpirun -np 8 commsTraceReplay.par --trace-path --trace-type et +``` diff --git a/train/compute/python/development.md b/development.md similarity index 100% rename from train/compute/python/development.md rename to development.md diff --git a/docs/using_ET.md b/docs/using_ET.md deleted file mode 100644 index 7adaa01e..00000000 --- a/docs/using_ET.md +++ /dev/null @@ -1,86 +0,0 @@ -# Using Execution Trace in PARAM Benchmark - -This section includes how to collect Chakra Execution Trace from a PyTorch training workload, as well as how to run PARAM replay on top of the collected ET. - - -## Execution Trace Collection -Execution Trace collection logic has to be added in the main training loop. This includes three steps: - -### Step 1: Set up Execution Trace Observer -The first step is to create a Execution Trace Observer object and register a. temporary file for ET store. - -``` -from torch.profiler import ExecutionTraceObserver - -et_ob = ExecutionTraceObserver() -fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False) -fp.close() -et_ob.register_callback(fp.name) -``` - -### Step 2: Define your function to dump Execution Trace -You have to define a function to store/dump/upload your collected ET trace for further use. Here is an example: - -``` -def dump_execution_trace(tmp_et_path): - et_dir.mkdir(exist_ok=True, parents=True) - et_path = DUMP_DIR / f"rank-{global_rank}.et.json.gz" - with open(tmp_et_path) as fin: - with gzip.open(et_path, "wt") as fout: - fout.writelines(fin) - os.remove(tmp_et_path) - print(f"Finished Rank {global_rank} ET collection at {et_path}") -``` - -### Step 3: Collect Execution Trace in the training loop -This is the key step to collect ET. You have to insert the collection logic into the main training loop of your workload. -TWO parameters have to be set: -- ET_START_ITER: the iteration to start ET collection -- ET_END_ITER: the iteration to stop ET collection - -``` - -while step < TRAINING_STEPS: - ... - ... - # Collect Execution Trace Logic - - # Start ET collection - if et_ob and step == ET_START_ITER: - et_ob.start() - - # First record process group(PG) mapping - pg_config_info = ( - torch.distributed.distributed_c10d._world.pg_config_info - ) - rf_handle = torch.autograd._record_function_with_args_enter( - "## process_group:init ##", json.dumps(pg_config_info) - ) - torch.autograd._record_function_with_args_exit(rf_handle) - - # Stop ET collection - elif et_ob and state.step == ET_END_ITER: - et_ob.stop() - tmp_et_path = et_ob.get_output_file_path() - et_ob.unregister_callback() - dump_execution_trace(tmp_et_path) - - ... - ... - step += 1 - -``` - -Note that process group information collection is not automatically covered by ET observer, because process_group initialization happens before the main training loop. Therefore, you have to manually add pg information collection, as the code shown above. - - - - -## PARAM Comms Replay on Execution Trace -Execution Trace now is fully supported in PARAM benchmark. In order to replay an ET trace, just need to specify `--trace-type=et` and the benchmark will parse your ET and replay the collective communication operators. - -An example command: - -``` -/bin/mpirun -np 8 commsTraceReplay.par --trace-path --trace-type et -``` diff --git a/train/compute/python/examples/cuda/ncu_args.txt b/example_input_files/cuda/ncu_args.txt similarity index 100% rename from train/compute/python/examples/cuda/ncu_args.txt rename to example_input_files/cuda/ncu_args.txt diff --git a/train/compute/python/test/data/1.0.3-chakra.0.0.4/resnet_1gpu_et.json.gz b/example_input_files/data/1.0.3-chakra.0.0.4/resnet_1gpu_et.json.gz similarity index 100% rename from train/compute/python/test/data/1.0.3-chakra.0.0.4/resnet_1gpu_et.json.gz rename to example_input_files/data/1.0.3-chakra.0.0.4/resnet_1gpu_et.json.gz diff --git a/train/compute/python/__init__.py b/example_input_files/data/__init__.py similarity index 100% rename from train/compute/python/__init__.py rename to example_input_files/data/__init__.py diff --git a/train/compute/python/test/data/dlrm_kineto.tar.gz b/example_input_files/data/dlrm_kineto.tar.gz similarity index 100% rename from train/compute/python/test/data/dlrm_kineto.tar.gz rename to example_input_files/data/dlrm_kineto.tar.gz diff --git a/train/compute/python/test/data/dlrm_pytorch_et.tar.gz b/example_input_files/data/dlrm_pytorch_et.tar.gz similarity index 100% rename from train/compute/python/test/data/dlrm_pytorch_et.tar.gz rename to example_input_files/data/dlrm_pytorch_et.tar.gz diff --git a/train/compute/python/test/data/linear_et.json.gz b/example_input_files/data/linear_et.json.gz similarity index 100% rename from train/compute/python/test/data/linear_et.json.gz rename to example_input_files/data/linear_et.json.gz diff --git a/train/compute/python/test/data/linear_kineto.json.gz b/example_input_files/data/linear_kineto.json.gz similarity index 100% rename from train/compute/python/test/data/linear_kineto.json.gz rename to example_input_files/data/linear_kineto.json.gz diff --git a/train/compute/python/test/data/resnet_et.json.gz b/example_input_files/data/resnet_et.json.gz similarity index 100% rename from train/compute/python/test/data/resnet_et.json.gz rename to example_input_files/data/resnet_et.json.gz diff --git a/train/compute/python/test/data/resnet_kineto.json.gz b/example_input_files/data/resnet_kineto.json.gz similarity index 100% rename from train/compute/python/test/data/resnet_kineto.json.gz rename to example_input_files/data/resnet_kineto.json.gz diff --git a/train/compute/python/examples/pytorch/configs/alex_net.json b/example_input_files/pytorch/configs/alex_net.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/alex_net.json rename to example_input_files/pytorch/configs/alex_net.json diff --git a/train/compute/python/examples/pytorch/configs/aten_ops.json b/example_input_files/pytorch/configs/aten_ops.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/aten_ops.json rename to example_input_files/pytorch/configs/aten_ops.json diff --git a/train/compute/python/examples/pytorch/configs/batch_example.json b/example_input_files/pytorch/configs/batch_example.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/batch_example.json rename to example_input_files/pytorch/configs/batch_example.json diff --git a/train/compute/python/examples/pytorch/configs/llama2.json b/example_input_files/pytorch/configs/llama2.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/llama2.json rename to example_input_files/pytorch/configs/llama2.json diff --git a/train/compute/python/examples/pytorch/configs/mm.json b/example_input_files/pytorch/configs/mm.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/mm.json rename to example_input_files/pytorch/configs/mm.json diff --git a/train/compute/python/examples/pytorch/configs/mm_range.json b/example_input_files/pytorch/configs/mm_range.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/mm_range.json rename to example_input_files/pytorch/configs/mm_range.json diff --git a/train/compute/python/examples/pytorch/configs/resnet.json b/example_input_files/pytorch/configs/resnet.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/resnet.json rename to example_input_files/pytorch/configs/resnet.json diff --git a/train/compute/python/examples/pytorch/configs/simple_add.json b/example_input_files/pytorch/configs/simple_add.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/simple_add.json rename to example_input_files/pytorch/configs/simple_add.json diff --git a/train/compute/python/examples/pytorch/configs/simple_add_range.json b/example_input_files/pytorch/configs/simple_add_range.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/simple_add_range.json rename to example_input_files/pytorch/configs/simple_add_range.json diff --git a/train/compute/python/examples/pytorch/configs/simple_mm.json b/example_input_files/pytorch/configs/simple_mm.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/simple_mm.json rename to example_input_files/pytorch/configs/simple_mm.json diff --git a/train/compute/python/examples/pytorch/configs/simple_mm_range.json b/example_input_files/pytorch/configs/simple_mm_range.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/simple_mm_range.json rename to example_input_files/pytorch/configs/simple_mm_range.json diff --git a/train/compute/python/examples/pytorch/configs/split_table_batched_embeddings_ops.json b/example_input_files/pytorch/configs/split_table_batched_embeddings_ops.json similarity index 100% rename from train/compute/python/examples/pytorch/configs/split_table_batched_embeddings_ops.json rename to example_input_files/pytorch/configs/split_table_batched_embeddings_ops.json diff --git a/train/compute/python/examples/pytorch/run_op.py b/example_input_files/pytorch/run_op.py similarity index 100% rename from train/compute/python/examples/pytorch/run_op.py rename to example_input_files/pytorch/run_op.py diff --git a/train/compute/python/examples/pytorch/run_op_split_table_batched_embeddings.py b/example_input_files/pytorch/run_op_split_table_batched_embeddings.py similarity index 100% rename from train/compute/python/examples/pytorch/run_op_split_table_batched_embeddings.py rename to example_input_files/pytorch/run_op_split_table_batched_embeddings.py diff --git a/train/comms/pt/comms.py b/param/comm/comms.py similarity index 99% rename from train/comms/pt/comms.py rename to param/comm/comms.py index ceb00d8a..29dfcfe5 100755 --- a/train/comms/pt/comms.py +++ b/param/comm/comms.py @@ -17,8 +17,8 @@ # pytorch import torch -from param_bench.train.comms.pt import comms_utils -from param_bench.train.comms.pt.comms_utils import ( +from param.comm import comms_utils +from param.comm.comms_utils import ( bootstrap_info_holder, commsParamsHolderBase, ensureTensorFlush, @@ -26,7 +26,7 @@ paramDeviceTimer, paramStreamGuard, ) -from param_bench.train.comms.pt.logger_utils import ( +from param.comm.logger_utils import ( benchType, commsCollPerfMetrics, commsPt2PtPerfMetrics, @@ -34,7 +34,7 @@ customized_perf_loggers, ) -from param_bench.train.comms.pt.pytorch_backend_utils import ( +from param.comm.pytorch_backend_utils import ( pt2ptPatterns, supportedC10dBackends, supportedCollectives, @@ -1760,13 +1760,13 @@ def initBackend( commsParams.nw_stack == "pytorch-dist" and commsParams.backend in supportedC10dBackends ): - from param_bench.train.comms.pt.pytorch_dist_backend import ( + from param.comm.pytorch_dist_backend import ( PyTorchDistBackend, ) backendObj = PyTorchDistBackend(bootstrap_info, commsParams) elif commsParams.nw_stack == "pytorch-xla-tpu": - from param_bench.train.comms.pt.pytorch_tpu_backend import PyTorchTPUBackend + from param.comm.pytorch_tpu_backend import PyTorchTPUBackend backendObj = PyTorchTPUBackend(bootstrap_info, commsParams) else: @@ -1775,7 +1775,7 @@ def initBackend( logging.warning( f"Attempt loading customized backend {commsParams.backend} if registered. Note that this is not officially supported. Use it with caution and at your own risk." ) - from param_bench.train.comms.pt.pytorch_backend_utils import ( + from param.comm.pytorch_backend_utils import ( customized_backend, ) diff --git a/train/comms/pt/commsTraceParser.py b/param/comm/commsTraceParser.py similarity index 97% rename from train/comms/pt/commsTraceParser.py rename to param/comm/commsTraceParser.py index 728fa929..abdd106a 100644 --- a/train/comms/pt/commsTraceParser.py +++ b/param/comm/commsTraceParser.py @@ -5,11 +5,11 @@ from typing import List, Tuple -from param_bench.train.comms.pt import comms_utils -from param_bench.train.comms.pt.comms_utils import commsArgs -from param_bench.train.comms.pt.pytorch_backend_utils import supportedP2pOps +from param.comm import comms_utils +from param.comm.comms_utils import commsArgs +from param.comm.pytorch_backend_utils import supportedP2pOps -from param_bench.train.compute.python.tools.execution_trace import ExecutionTrace +from param.execution_trace import ExecutionTrace tensorDtypeMap = { "Tensor(int)": "int", @@ -162,7 +162,7 @@ def _parseKinetoUnitrace(in_trace: List, target_rank: int) -> List: if ( "name" in entry - and entry["name"] == "record_param_comms" + and entry["name"] == "record_param.comm" and entry["args"]["rank"] == target_rank ): @@ -256,7 +256,7 @@ def _parseExecutionTrace( # Parse comms nodes for node in in_trace.nodes.values(): - if node.name == "record_param_comms": + if node.name == "record_param.comm": shift = ( 0 if len(node.inputs) == 8 or len(node.inputs) == 10 else 1 ) # wait/barrier ops do not have an input tensor (len=7), shift index one over diff --git a/train/comms/pt/comms_utils.py b/param/comm/comms_utils.py similarity index 99% rename from train/comms/pt/comms_utils.py rename to param/comm/comms_utils.py index f5d41eeb..e81b0b8b 100644 --- a/train/comms/pt/comms_utils.py +++ b/param/comm/comms_utils.py @@ -23,7 +23,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union try: - from param_bench.train.comms.pt.fb.internals import ( + from param.comm.fb.internals import ( fbInitProfiler, fbSampleProfiler, fbStartProfiler, @@ -38,8 +38,8 @@ import numpy as np import torch -from param_bench.train.comms.pt.param_profile import paramTimer -from param_bench.train.comms.pt.pytorch_backend_utils import ( +from param.comm.param_profile import paramTimer +from param.comm.pytorch_backend_utils import ( backendFunctions, collectiveArgsHolder, customized_backend, @@ -893,7 +893,7 @@ def __init__( class paramCommsBench(ABC): - """Abstract class for any param comms benchmark.""" + """Abstract class for any param.comm benchmark.""" def __init__(self, supportedNwstacks: List[str] = None) -> None: self.supportedNwstacks = supportedNwstacks diff --git a/train/comms/pt/logger_utils.py b/param/comm/logger_utils.py similarity index 97% rename from train/comms/pt/logger_utils.py rename to param/comm/logger_utils.py index d9c9a82b..7f01cc10 100644 --- a/train/comms/pt/logger_utils.py +++ b/param/comm/logger_utils.py @@ -8,7 +8,7 @@ from enum import Enum from typing import abstractmethod, Dict, Optional -from param_bench.train.comms.pt.pytorch_backend_utils import backendFunctions +from param.comm.pytorch_backend_utils import backendFunctions logger = logging.getLogger(__name__) diff --git a/train/comms/pt/param_profile.py b/param/comm/param_profile.py similarity index 100% rename from train/comms/pt/param_profile.py rename to param/comm/param_profile.py diff --git a/train/comms/pt/pytorch_backend_utils.py b/param/comm/pytorch_backend_utils.py similarity index 99% rename from train/comms/pt/pytorch_backend_utils.py rename to param/comm/pytorch_backend_utils.py index 8872840d..074284b0 100644 --- a/train/comms/pt/pytorch_backend_utils.py +++ b/param/comm/pytorch_backend_utils.py @@ -9,7 +9,7 @@ import torch -from param_bench.train.comms.pt.param_profile import paramTimer +from param.comm.param_profile import paramTimer from torch.distributed import ProcessGroup diff --git a/train/comms/pt/pytorch_dist_backend.py b/param/comm/pytorch_dist_backend.py similarity index 99% rename from train/comms/pt/pytorch_dist_backend.py rename to param/comm/pytorch_dist_backend.py index 9f874315..082f7d49 100644 --- a/train/comms/pt/pytorch_dist_backend.py +++ b/param/comm/pytorch_dist_backend.py @@ -13,14 +13,14 @@ import torch import torch.distributed as dist import torch.nn as nn -from param_bench.train.comms.pt.param_profile import paramProfile -from param_bench.train.comms.pt.pytorch_backend_utils import ( +from param.comm.param_profile import paramProfile +from param.comm.pytorch_backend_utils import ( backendFunctions, collectiveArgsHolder, ) try: - from param_bench.train.comms.pt.fb.internals import ( + from param.comm.fb.internals import ( all_to_all_internal, all_to_allv_internal, extend_distributed, diff --git a/train/comms/pt/pytorch_tpu_backend.py b/param/comm/pytorch_tpu_backend.py similarity index 100% rename from train/comms/pt/pytorch_tpu_backend.py rename to param/comm/pytorch_tpu_backend.py diff --git a/param/comp/__init__.py b/param/comp/__init__.py new file mode 100644 index 00000000..8c0d5d5b --- /dev/null +++ b/param/comp/__init__.py @@ -0,0 +1 @@ +__version__ = "2.0.0" diff --git a/train/compute/python/lib/config.py b/param/comp/config.py similarity index 100% rename from train/compute/python/lib/config.py rename to param/comp/config.py diff --git a/train/compute/python/lib/data.py b/param/comp/data.py similarity index 100% rename from train/compute/python/lib/data.py rename to param/comp/data.py diff --git a/train/compute/python/lib/generator.py b/param/comp/generator.py similarity index 100% rename from train/compute/python/lib/generator.py rename to param/comp/generator.py diff --git a/train/compute/python/lib/init_helper.py b/param/comp/init_helper.py similarity index 97% rename from train/compute/python/lib/init_helper.py rename to param/comp/init_helper.py index 15474d53..33b54cfc 100644 --- a/train/compute/python/lib/init_helper.py +++ b/param/comp/init_helper.py @@ -22,7 +22,7 @@ def init_logging(log_level): FORMAT = "[%(asctime)s] %(process)d %(filename)s:%(lineno)-3d [%(levelname)s]: %(message)s" else: FORMAT = "[%(asctime)s] %(process)d [%(levelname)s]: %(message)s" - _logger = logging.getLogger("param_bench") + _logger = logging.getLogger("param") _logger.setLevel(log_level) # Reset the stream handlers to avoid multiple outputs. _logger.handlers.clear() diff --git a/train/compute/python/lib/iterator.py b/param/comp/iterator.py similarity index 100% rename from train/compute/python/lib/iterator.py rename to param/comp/iterator.py diff --git a/train/compute/python/lib/operator.py b/param/comp/operator.py similarity index 100% rename from train/compute/python/lib/operator.py rename to param/comp/operator.py diff --git a/train/compute/python/examples/__init__.py b/param/comp/pytorch/__init__.py similarity index 100% rename from train/compute/python/examples/__init__.py rename to param/comp/pytorch/__init__.py diff --git a/train/compute/python/lib/pytorch/benchmark.py b/param/comp/pytorch/benchmark.py similarity index 100% rename from train/compute/python/lib/pytorch/benchmark.py rename to param/comp/pytorch/benchmark.py diff --git a/train/compute/python/lib/pytorch/benchmark_helper.py b/param/comp/pytorch/benchmark_helper.py similarity index 94% rename from train/compute/python/lib/pytorch/benchmark_helper.py rename to param/comp/pytorch/benchmark_helper.py index c820259e..202d6db7 100644 --- a/train/compute/python/lib/pytorch/benchmark_helper.py +++ b/param/comp/pytorch/benchmark_helper.py @@ -7,19 +7,19 @@ import torch -from param_bench.train.compute.python.lib import __version__, pytorch as lib_pytorch -from param_bench.train.compute.python.lib.config import BenchmarkConfig -from param_bench.train.compute.python.lib.init_helper import load_modules -from param_bench.train.compute.python.lib.pytorch.benchmark import ( +from param.comp.python.lib import __version__, pytorch as lib_pytorch +from param.comp.python.lib.config import BenchmarkConfig +from param.comp.python.lib.init_helper import load_modules +from param.comp.python.lib.pytorch.benchmark import ( make_default_benchmark, ) -from param_bench.train.compute.python.lib.pytorch.config_util import ( +from param.comp.python.lib.pytorch.config_util import ( ExecutionPass, get_benchmark_options, get_sys_info, OpExecutionMode, ) -from param_bench.train.compute.python.workloads import pytorch as workloads_pytorch +from param.comp.python.workloads import pytorch as workloads_pytorch from torch.autograd.profiler import record_function from torch.profiler import _ExperimentalConfig, ExecutionTraceObserver diff --git a/train/compute/python/lib/pytorch/build_executor.py b/param/comp/pytorch/build_executor.py similarity index 99% rename from train/compute/python/lib/pytorch/build_executor.py rename to param/comp/pytorch/build_executor.py index 9a05a5b9..008a2db2 100644 --- a/train/compute/python/lib/pytorch/build_executor.py +++ b/param/comp/pytorch/build_executor.py @@ -296,7 +296,7 @@ def _run_ncu(self, shm, config): ncu_bin = self.run_options["ncu_bin"] - param_bench_range = "param_bench:measure" + param_range = "param:measure" start_input_id = self.input_config_queue[0]["id"] out_file_prefix = self.run_options["out_file_prefix"] timestamp = int(datetime.timestamp(datetime.now())) @@ -304,7 +304,7 @@ def _run_ncu(self, shm, config): ncu_extra_args = self.run_options["ncu_args"] ncu_options = ( f"--log-file {ncu_log_file} --csv --app-replay-buffer file --nvtx " - f"--nvtx-include {param_bench_range} --target-processes all" + f"--nvtx-include {param_range} --target-processes all" ) if ncu_extra_args: ncu_options += f" {ncu_extra_args}" diff --git a/train/compute/python/lib/pytorch/config_util.py b/param/comp/pytorch/config_util.py similarity index 96% rename from train/compute/python/lib/pytorch/config_util.py rename to param/comp/pytorch/config_util.py index 19dc91f0..6656a099 100644 --- a/train/compute/python/lib/pytorch/config_util.py +++ b/param/comp/pytorch/config_util.py @@ -9,7 +9,7 @@ import torch from torch.utils.collect_env import get_nvidia_driver_version, run as run_cmd -from ...lib import __version__ +from param.comp import __version__ @enum.unique @@ -60,7 +60,7 @@ def get_benchmark_options() -> Dict[str, Any]: "nsys_iteration": 10, "run_batch_size": 50, "batch_cuda_device": 1, - "batch_cmd": "python -m param_bench.train.compute.python.pytorch.run_batch", + "batch_cmd": "python -m param.comp.python.pytorch.run_batch", "resume_op_run_id": None, "stop_op_run_id": None, } @@ -130,7 +130,7 @@ def get_sys_info(): "pid": os.getpid(), "cwd": os.getcwd(), "python_version": platform.python_version(), - "param_train_compute_version": __version__, + "param.comp_version": __version__, "cuda_available": cuda_available, **cuda_info, "pytorch_version": torch.__version__, diff --git a/train/compute/python/lib/pytorch/cuda_util.py b/param/comp/pytorch/cuda_util.py similarity index 100% rename from train/compute/python/lib/pytorch/cuda_util.py rename to param/comp/pytorch/cuda_util.py diff --git a/train/compute/python/lib/pytorch/data_impl.py b/param/comp/pytorch/data_impl.py similarity index 100% rename from train/compute/python/lib/pytorch/data_impl.py rename to param/comp/pytorch/data_impl.py diff --git a/train/compute/python/lib/pytorch/op_executor.py b/param/comp/pytorch/op_executor.py similarity index 100% rename from train/compute/python/lib/pytorch/op_executor.py rename to param/comp/pytorch/op_executor.py diff --git a/train/compute/python/lib/pytorch/operator_impl.py b/param/comp/pytorch/operator_impl.py similarity index 100% rename from train/compute/python/lib/pytorch/operator_impl.py rename to param/comp/pytorch/operator_impl.py diff --git a/train/compute/python/lib/pytorch/timer.py b/param/comp/pytorch/timer.py similarity index 100% rename from train/compute/python/lib/pytorch/timer.py rename to param/comp/pytorch/timer.py diff --git a/train/compute/python/workloads/pytorch/resnet.py b/param/comp/workloads/resnet.py similarity index 100% rename from train/compute/python/workloads/pytorch/resnet.py rename to param/comp/workloads/resnet.py diff --git a/train/compute/python/tools/et_replay_utils.py b/param/et_replay_utils.py similarity index 95% rename from train/compute/python/tools/et_replay_utils.py rename to param/et_replay_utils.py index d9f8f29c..0ac0015b 100644 --- a/train/compute/python/tools/et_replay_utils.py +++ b/param/et_replay_utils.py @@ -4,10 +4,10 @@ import torch from fbgemm_gpu.split_table_batched_embeddings_ops import PoolingMode, WeightDecayMode -from param_bench.train.compute.python.lib.pytorch.config_util import create_op_args -from param_bench.train.compute.python.tools.execution_trace import NodeType +from param.compute.python.lib.pytorch.config_util import create_op_args +from param.execution_trace import ExecutionTrace -from param_bench.train.compute.python.workloads.pytorch.split_table_batched_embeddings_ops import ( +from param.compute.python.workloads.pytorch.split_table_batched_embeddings_ops import ( SplitTableBatchedEmbeddingBagsCodegenInputDataGenerator, SplitTableBatchedEmbeddingBagsCodegenOp, ) @@ -153,7 +153,7 @@ def skip_op(op): and "thread" in op.parent.name and op.tid == 2 ) - or (op.name == "record_param_comms" and op.inputs[3] == "init") + or (op.name == "record_param.comm" and op.inputs[3] == "init") or (op.name == "aten::view" and "aten::view.dtype" in op.op_schema) ) @@ -469,11 +469,11 @@ def generate_prefix(label, skip_nodes, et_input, cuda, compute_only, tf32, rows) import os import time from datetime import datetime -from param_bench.train.comms.pt import comms_utils +from param.comm import comms_utils import torch -from param_bench.train.comms.pt import commsTraceReplay -from param_bench.train.compute.python.tools.et_replay_utils import ( +from param.comm import commsTraceReplay +from param.compute.python.tools.et_replay_utils import ( build_fbgemm_func, build_torchscript_func, generate_fbgemm_tensors, @@ -482,8 +482,8 @@ def generate_prefix(label, skip_nodes, et_input, cuda, compute_only, tf32, rows) is_qualified, ) -from param_bench.train.compute.python.tools.execution_trace import ExecutionTrace -from param_bench.train.compute.python.tools.utility import trace_handler +from param.compute.python.tools.execution_trace import ExecutionTrace +from param.compute.python.tools.utility import trace_handler print("PyTorch version: ", torch.__version__) @@ -519,7 +519,7 @@ def dfs_traverse(node): if "://" in \"{et_input}\": try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( read_remote_trace, ) except ImportError: @@ -655,7 +655,7 @@ def generate_suffix(warmup_iter, replay_iter, cuda_id, profile_replay): end_time = datetime.now() try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( generate_query_url, ) except ImportError: diff --git a/train/compute/python/tools/execution_trace.py b/param/execution_trace.py similarity index 100% rename from train/compute/python/tools/execution_trace.py rename to param/execution_trace.py diff --git a/train/compute/python/tools/utility.py b/param/utility.py similarity index 94% rename from train/compute/python/tools/utility.py rename to param/utility.py index 6d91440a..d4f515c7 100644 --- a/train/compute/python/tools/utility.py +++ b/param/utility.py @@ -6,7 +6,7 @@ import uuid from typing import Any, Dict -from param_bench.train.compute.python.tools.execution_trace import ExecutionTrace +from param.execution_trace import ExecutionTrace def get_tmp_trace_filename() -> str: diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..e441f274 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools", "setuptools-grpc"] +build-backend = "setuptools.build_meta" + +[project] +name = "param" +version = "2.0" + +[tool.setuptools] +packages.find = { where = ["."] } + +[project.scripts] +comms_replay = "tools.commsTraceReplay:main" +et_replay = "tools.et_replay:main" +run_batch = "tools.run_batch:main" +run_benchmark = "tools.run_benchmark:main" + +[tool.ruff] +target-version = "py39" +line-length = 120 + +[tool.ruff.lint] +select = ["I", "B", "E", "F", "SIM", "W", "C90", "EXE"] + +[tool.ruff.format] +indent-style = "space" diff --git a/requirements.txt b/requirements.txt index 79f10f90..4efdbbfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,8 @@ -torch +fbgemm_gpu future +gitpython +networkx numpy +pydot +scipy +torch>=2.0.0 diff --git a/train/compute/python/examples/pytorch/__init__.py b/tests/__init__.py similarity index 100% rename from train/compute/python/examples/pytorch/__init__.py rename to tests/__init__.py diff --git a/train/comms/pt/tests/commsTraceReplay_tests.py b/tests/commsTraceReplay_tests.py similarity index 98% rename from train/comms/pt/tests/commsTraceReplay_tests.py rename to tests/commsTraceReplay_tests.py index d5b18a60..a9828c7a 100644 --- a/train/comms/pt/tests/commsTraceReplay_tests.py +++ b/tests/commsTraceReplay_tests.py @@ -5,9 +5,9 @@ from comms_utils import commsArgs -from param_bench.train.comms.pt.commsTraceReplay import commsTraceReplayBench -from param_bench.train.comms.pt.tests.mocks.backend_mock import MockBackendFunction -from param_bench.train.comms.pt.tests.test_utils import ( +from param.comm.commsTraceReplay import commsTraceReplayBench +from param.comm.tests.mocks.backend_mock import MockBackendFunction +from param.comm.tests.test_utils import ( commsParamsTest, createCommsArgs, testArgs, diff --git a/train/comms/pt/tests/comms_utils_tests.py b/tests/comms_utils_tests.py similarity index 97% rename from train/comms/pt/tests/comms_utils_tests.py rename to tests/comms_utils_tests.py index a86f5d43..0397cc1f 100644 --- a/train/comms/pt/tests/comms_utils_tests.py +++ b/tests/comms_utils_tests.py @@ -3,9 +3,9 @@ import torch -from param_bench.train.comms.pt import comms_utils -from param_bench.train.comms.pt.tests.mocks.backend_mock import MockBackendFunction -from param_bench.train.comms.pt.tests.test_utils import ( +from param.comm import comms_utils +from param.comm.tests.mocks.backend_mock import MockBackendFunction +from param.comm.tests.test_utils import ( bootstrap_info_test, commsParamsTest, ) diff --git a/train/comms/pt/tests/mocks/backend_mock.py b/tests/mocks/backend_mock.py similarity index 100% rename from train/comms/pt/tests/mocks/backend_mock.py rename to tests/mocks/backend_mock.py diff --git a/train/compute/python/lib/pytorch/__init__.py b/tests/pytorch/__init__.py similarity index 100% rename from train/compute/python/lib/pytorch/__init__.py rename to tests/pytorch/__init__.py diff --git a/train/compute/python/test/pytorch/configs/test_native_basic_ops.json b/tests/pytorch/configs/test_native_basic_ops.json similarity index 100% rename from train/compute/python/test/pytorch/configs/test_native_basic_ops.json rename to tests/pytorch/configs/test_native_basic_ops.json diff --git a/train/compute/python/test/test_benchmark_load.py b/tests/test_benchmark_load.py similarity index 72% rename from train/compute/python/test/test_benchmark_load.py rename to tests/test_benchmark_load.py index eeb0e53e..ee7ab13a 100644 --- a/train/compute/python/test/test_benchmark_load.py +++ b/tests/test_benchmark_load.py @@ -2,15 +2,15 @@ import os import unittest -from param_bench.train.compute.python.lib import pytorch as lib_pytorch +from param.comp.python.lib import pytorch as lib_pytorch -from param_bench.train.compute.python.lib.config import BenchmarkConfig -from param_bench.train.compute.python.lib.init_helper import load_modules -from param_bench.train.compute.python.lib.pytorch.benchmark import ( +from param.comp.python.lib.config import BenchmarkConfig +from param.comp.python.lib.init_helper import load_modules +from param.comp.python.lib.pytorch.benchmark import ( Benchmark, make_default_benchmark, ) -from param_bench.train.compute.python.lib.pytorch.config_util import ( +from param.comp.python.lib.pytorch.config_util import ( get_benchmark_options, ) diff --git a/train/compute/python/test/test_execution_trace.py b/tests/test_execution_trace.py similarity index 86% rename from train/compute/python/test/test_execution_trace.py rename to tests/test_execution_trace.py index e96ae595..bfd0e66a 100644 --- a/train/compute/python/test/test_execution_trace.py +++ b/tests/test_execution_trace.py @@ -3,8 +3,8 @@ import os import unittest -from param_bench.train.compute.python.tools.execution_trace import ExecutionTrace -from param_bench.train.compute.python.tools.validate_trace import TraceValidator +from param.execution_trace import ExecutionTrace +from param.comp.python.tools.validate_trace import TraceValidator CURR_DIR = os.path.dirname(os.path.realpath(__file__)) diff --git a/train/compute/python/test/test_generator.py b/tests/test_generator.py similarity index 98% rename from train/compute/python/test/test_generator.py rename to tests/test_generator.py index 1799be09..33e23d4b 100644 --- a/train/compute/python/test/test_generator.py +++ b/tests/test_generator.py @@ -1,7 +1,7 @@ import copy import unittest -from param_bench.train.compute.python.lib.generator import ( +from param.comp.python.lib.generator import ( full_range, IterableList, ListProduct, diff --git a/train/compute/python/test/test_register.py b/tests/test_register.py similarity index 91% rename from train/compute/python/test/test_register.py rename to tests/test_register.py index 841206f6..7066b767 100644 --- a/train/compute/python/test/test_register.py +++ b/tests/test_register.py @@ -1,16 +1,16 @@ import unittest -from param_bench.train.compute.python.lib.data import ( +from param.comp.python.lib.data import ( data_generator_map, DataGenerator, register_data_generator, ) -from param_bench.train.compute.python.lib.iterator import ( +from param.comp.python.lib.iterator import ( config_iterator_map, ConfigIterator, register_config_iterator, ) -from param_bench.train.compute.python.lib.operator import ( +from param.comp.python.lib.operator import ( op_map, OperatorInterface, register_operator, diff --git a/train/compute/python/test/test_split_table_batched_embeddings_ops.py b/tests/test_split_table_batched_embeddings_ops.py similarity index 88% rename from train/compute/python/test/test_split_table_batched_embeddings_ops.py rename to tests/test_split_table_batched_embeddings_ops.py index 57250720..c013b52f 100644 --- a/train/compute/python/test/test_split_table_batched_embeddings_ops.py +++ b/tests/test_split_table_batched_embeddings_ops.py @@ -1,8 +1,8 @@ import unittest -from param_bench.train.compute.python.lib.config import make_op_config -from param_bench.train.compute.python.lib.pytorch.config_util import create_op_info -from param_bench.train.compute.python.workloads.pytorch import ( # noqa +from param.comp.python.lib.config import make_op_config +from param.comp.python.lib.pytorch.config_util import create_op_info +from param.comp.python.workloads.pytorch import ( # noqa split_table_batched_embeddings_ops, ) diff --git a/train/comms/pt/tests/test_utils.py b/tests/test_utils.py similarity index 100% rename from train/comms/pt/tests/test_utils.py rename to tests/test_utils.py diff --git a/train/comms/pt/commsTraceReplay.py b/tools/commsTraceReplay.py similarity index 98% rename from train/comms/pt/commsTraceReplay.py rename to tools/commsTraceReplay.py index e4b9c66c..611e0198 100644 --- a/train/comms/pt/commsTraceReplay.py +++ b/tools/commsTraceReplay.py @@ -17,8 +17,8 @@ import numpy as np import torch -from param_bench.train.comms.pt import comms_utils -from param_bench.train.comms.pt.comms_utils import ( +from param.comm import comms_utils +from param.comm.comms_utils import ( bootstrap_info_holder, commsArgs, commsParamsHolderBase, @@ -26,8 +26,8 @@ paramStreamGuard, paramToCommName, ) -from param_bench.train.comms.pt.param_profile import paramProfile, paramTimer -from param_bench.train.comms.pt.pytorch_backend_utils import supportedP2pOps +from param.comm.param_profile import paramProfile, paramTimer +from param.comm.pytorch_backend_utils import supportedP2pOps try: from trainer_iteration_wrapper import setTrainingIteration # @manual @@ -64,7 +64,7 @@ def writeCommDetails(commsTracePerf: List, rank: int, folder: str = "./") -> Non if "://" in comms_file: # assume that "://" in directory path means remote store saveToLocal = False try: - from param_bench.train.comms.pt.fb.internals import ( + from param.comm.fb.internals import ( writeRemoteTrace as writeFbRemoteTrace, ) @@ -1383,13 +1383,13 @@ def initBackend( """ # init backend and corresponding function pointers if commsParams.nw_stack == "pytorch-dist": - from param_bench.train.comms.pt.pytorch_dist_backend import ( + from param.comm.pytorch_dist_backend import ( PyTorchDistBackend, ) self.backendFuncs = PyTorchDistBackend(bootstrap_info, commsParams) elif commsParams.nw_stack == "pytorch-xla-tpu": - from param_bench.train.comms.pt.pytorch_tpu_backend import PyTorchTPUBackend + from param.comm.pytorch_tpu_backend import PyTorchTPUBackend self.backendFuncs = PyTorchTPUBackend(bootstrap_info, commsParams) else: @@ -1521,7 +1521,7 @@ def readRawTrace(self, remotePath: str, rank: int) -> None: raw_comms_trace = comms_utils.commonUrlRead(remotePath=remotePath) else: try: - from param_bench.train.comms.pt.fb.internals import ( + from param.comm.fb.internals import ( readRemoteTrace as readFbRemoteTrace, ) @@ -1584,7 +1584,7 @@ def readTrace(self, remotePath: str, rank: int) -> None: # Convert trace to comms trace. try: - from param_bench.train.comms.pt import commsTraceParser + from param.comm import commsTraceParser except ImportError: logger.info("FB internals not present, using base parser.") self.comms_trace = extractCommsInfo(self.comms_trace) @@ -1654,7 +1654,7 @@ def main() -> None: traceBench = commsTraceReplayBench() parser = argparse.ArgumentParser( - description="PARAM-Comms Trace Replay Mode", + description="param.comm Trace Replay Mode", formatter_class=argparse.ArgumentDefaultsHelpFormatter, allow_abbrev=False, ) diff --git a/train/compute/python/tools/et_replay.py b/tools/et_replay.py similarity index 97% rename from train/compute/python/tools/et_replay.py rename to tools/et_replay.py index 48731d00..c1f75ed9 100644 --- a/train/compute/python/tools/et_replay.py +++ b/tools/et_replay.py @@ -12,11 +12,12 @@ import numpy as np import torch -from param_bench.train.comms.pt import comms_utils, commsTraceReplay +from param.comm import commsTraceReplay +from param.comm.comms_utils import read_comms_env_vars, bootstrap_info_holder, commsParamsHolderBase -from param_bench.train.compute.python.lib import pytorch as lib_pytorch -from param_bench.train.compute.python.lib.init_helper import load_modules -from param_bench.train.compute.python.tools.et_replay_utils import ( +from param.compute.python.lib import pytorch as lib_pytorch +from param.compute.python.lib.init_helper import load_modules +from param.compute.python.tools.et_replay_utils import ( build_fbgemm_func, build_torchscript_func, build_triton_func, @@ -40,13 +41,13 @@ TORCH_DTYPES_RNG_str, ) -from param_bench.train.compute.python.tools.execution_trace import ( +from param.execution_trace import ( ExecutionTrace, NodeType, ) -from param_bench.train.compute.python.tools.utility import trace_handler -from param_bench.train.compute.python.workloads import pytorch as workloads_pytorch +from param.compute.python.tools.utility import trace_handler +from param.compute.python.workloads import pytorch as workloads_pytorch from torch._inductor.codecache import AsyncCompile, TritonFuture # grid and split_scan_grid are dynamically loaded @@ -72,7 +73,7 @@ def __init__(self): self.dump_path = "" self.args = None # Comms env. - self.comms_env_params = comms_utils.read_comms_env_vars() + self.comms_env_params = read_comms_env_vars() self.commsBench = None self.comms_world_info = None self.commsParams = None @@ -129,7 +130,7 @@ def __init__(self): self.label = "" try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( add_internal_label, add_internal_parallel_nodes_parents, add_internal_skip_nodes, @@ -212,7 +213,7 @@ def initBench(self): # Input et trace should be explicitly specified after --input. if "://" in self.args.input: try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( read_remote_trace, ) except ImportError: @@ -239,7 +240,7 @@ def initBench(self): # Different processes should read different traces based on global_rank_id. if "://" in self.args.trace_path: try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( read_remote_trace, ) except ImportError: @@ -334,7 +335,7 @@ def anlayze_node(node): for _, t_id, _ in get_input_tensors(node): if self.tensor_with_device: t_id = tuple(list(t_id)[:5]) - if node.name == "record_param_comms" and ( + if node.name == "record_param.comm" and ( self.compute_only or self.args.separate ): continue @@ -492,7 +493,7 @@ def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1): self.special_tensors.add(self.tensors_mapping[(node_id, t_id, input)]) for node in self.sorted_nodes: - if node.name == "record_param_comms" and ( + if node.name == "record_param.comm" and ( self.compute_only or self.args.separate ): continue @@ -523,7 +524,7 @@ def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1): # Simulate the execution progress and record the output tensors we have seen so far. output_set = set() for node in self.sorted_nodes: - if node.name == "record_param_comms" and ( + if node.name == "record_param.comm" and ( self.compute_only or self.args.separate ): continue @@ -544,7 +545,7 @@ def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1): def allocate_tensors(self): for node in self.sorted_nodes: - if node.name == "record_param_comms" and ( + if node.name == "record_param.comm" and ( self.compute_only or self.args.separate ): continue @@ -664,7 +665,7 @@ def _generate_tensor_allocation_str(): unallocated_tensors = set() tensor_allocate_template = """{tensor} = {rng}({shape}).to({dtype}){cuda}""" for node in self.sorted_nodes: - if node.name == "record_param_comms" and ( + if node.name == "record_param.comm" and ( self.compute_only or self.args.separate ): continue @@ -899,7 +900,7 @@ def _generate_run_ops_str(override): exec_template = """ {outputs} = {func}[0]({inputs})""" start_special = False for node in self.sorted_nodes: - if node.name == "record_param_comms" and not self.compute_only: + if node.name == "record_param.comm" and not self.compute_only: if node.id in self.parallel_nodes_ids: if not start_special: code_str += " with torch.cuda.stream(s1):\n" @@ -1098,7 +1099,7 @@ def get_inputs(self, node): print(f"Inputs error: {e} at node: {node.id}") def run_op(self, node, iter): - if node.name == "record_param_comms" and not self.compute_only: + if node.name == "record_param.comm" and not self.compute_only: opTensor = self.commsBench.replaySingle( self.commsParams, node.id, self.regenerate_tensors ) @@ -1258,7 +1259,7 @@ def run_op(self, node, iter): self.exec_time.append(after_execution - before_execution) def init_comms(self): - comms_env_params = comms_utils.read_comms_env_vars() + comms_env_params = read_comms_env_vars() print(comms_env_params, self.cuda) self.commsBench = commsTraceReplay.commsTraceReplayBench() @@ -1272,13 +1273,13 @@ def init_comms(self): self.commsBench.checkArgs(comms_args) time.sleep(1) - self.bootstrap_info = comms_utils.bootstrap_info_holder( + self.bootstrap_info = bootstrap_info_holder( comms_args.master_ip, comms_args.master_port, comms_args.num_tpu_cores, comms_env_params, ) - self.commsParams = comms_utils.commsParamsHolderBase(comms_args) + self.commsParams = commsParamsHolderBase(comms_args) self.commsBench.trace_type = "et" @@ -1301,7 +1302,7 @@ def analyze_ops(self): aten_cnt += 1 elif "fbgemm::" in op: custom_cnt += 1 - elif "record_param_comms" in op: + elif "record_param.comm" in op: comms_cnt += 1 else: print(op) @@ -1507,7 +1508,7 @@ def benchTime(self): end_time = datetime.now() try: - from param_bench.train.compute.python.tools.fb.internals import ( + from param.compute.python.tools.fb.internals import ( generate_query_url, ) except ImportError: diff --git a/train/compute/python/pytorch/run_batch.py b/tools/run_batch.py similarity index 100% rename from train/compute/python/pytorch/run_batch.py rename to tools/run_batch.py diff --git a/train/compute/python/pytorch/run_benchmark.py b/tools/run_benchmark.py similarity index 97% rename from train/compute/python/pytorch/run_benchmark.py rename to tools/run_benchmark.py index a85a9d7b..236592c1 100644 --- a/train/compute/python/pytorch/run_benchmark.py +++ b/tools/run_benchmark.py @@ -8,17 +8,17 @@ from torch.autograd.profiler import record_function from torch.profiler import _ExperimentalConfig, ExecutionTraceObserver -from ..lib import __version__, pytorch as lib_pytorch -from ..lib.config import BenchmarkConfig -from ..lib.init_helper import init_logging, load_modules -from ..lib.pytorch.benchmark import make_default_benchmark -from ..lib.pytorch.config_util import ( +from param.comp import __version__, pytorch as lib_pytorch +from param.comp.config import BenchmarkConfig +from param.comp.init_helper import init_logging, load_modules +from param.comp.pytorch.benchmark import make_default_benchmark +from param.comp.pytorch.config_util import ( ExecutionPass, get_benchmark_options, get_sys_info, OpExecutionMode, ) -from ..workloads import pytorch as workloads_pytorch +from param.comp.workloads import pytorch as workloads_pytorch def main(): diff --git a/train/comms/pt/README.md b/train/comms/pt/README.md deleted file mode 100644 index 5a16a67c..00000000 --- a/train/comms/pt/README.md +++ /dev/null @@ -1,68 +0,0 @@ -# PARAM benchmark - Communication benchmarks - -PARAM-Comms is an effort to develop a unified benchmarking framework to -characterize training platform backends. Currently, the benchmark supports -Pytorch Distributed and PyTorch-XLA backends. - -The PARAM-Comms benchmark offers a single point solution to perform both top-down -(DLRM application) and bottoms-up (collectives) operations for any given -communication backend. - -The Collective-Comms benchmark (`comms.py`) is designed similar to nccl-tests -for evaluating collective operations, such as All-reduce and All-to-all, through PyTorch backends. -The DLRM-Comms benchmark (`dlrm.py`) is similar to the open-source DLRM benchmark except it -only implements communication primitives. -The Trace Replay benchmark (`commsTraceReplay.py`) is designed to replay the communication patterns captured -from any distributed PyTorch workloads. - -## Usage: - -### Collective-Comms benchmark (`comms.py`) -```bash -mpirun -np -N --hostfile ./comms.py \ - --master-ip 127.0.0.1 - --b \ - --e \ - --n \ - --f \ - --z \ - --collective -``` -Example: -```bash -mpirun -np 16 -N 8 --hostfile ./hfile ./comms.py --master-ip $(head -n 1 ./hfile.txt) --b 8 --e 256M --n 100 \ - --f 2 --z 1 --collective all_to_all --backend nccl --device cuda --log INFO -``` - -### DLRM-Comms benchmark (`dlrm.py`) -```bash -mpirun -np -N --hostfile ./dlrm.py \ - --master-ip - --arch-sparse-feature-size \ - --arch-embedding-size \ - --arch-mlp-bot \ - --arch-mlp-top \ - --mini-batch-size \ - --num-batches -``` -Example: -```bash -mpirun -np 16 -N 8 --hostfile ./hfile ./dlrm.py --master-ip $(head -n 1 ./hfile.txt) --mini-batch-size 32 \ - --num-batches 100 \ - --arch-mlp-bot 1024-256 \ - --arch-sparse-feature-size 64 \ - --arch-embedding-size "10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000-10000" -``` - -### Trace Replay benchmark (`commsTraceReplay.py`) -```bash -mpirun -np -N --hostfile ./commsTraceReplay.py \ - --master-ip 127.0.0.1 --trace-path /path/to/traces --dry-run -``` -Example: -```bash -mpirun -np 16 -N 8 --hostfile ./hfile ./commsTraceReplay.py --master-ip $(head -n 1 ./hfile.txt) \ - --backend nccl --device cuda \ - --trace-path /path/to/commTraces -``` -Note that there should be one trace file (in JSON format) per rank. diff --git a/train/comms/pt/setup.py b/train/comms/pt/setup.py deleted file mode 100644 index b448603a..00000000 --- a/train/comms/pt/setup.py +++ /dev/null @@ -1,27 +0,0 @@ -from setuptools import setup - - -def main(): - package_base = "param_bench.train.comms.pt" - - # List the packages and their dir mapping: - # "install_destination_package_path": "source_dir_path" - package_dir_map = { - f"{package_base}": ".", - } - - packages = list(package_dir_map) - - setup( - name="parambench-train-comms", - python_requires=">=3.8", - author="Louis Feng", - author_email="lofe@fb.com", - url="https://github.com/facebookresearch/param", - packages=packages, - package_dir=package_dir_map, - ) - - -if __name__ == "__main__": - main() diff --git a/train/compute/python/.gitignore b/train/compute/python/.gitignore deleted file mode 100644 index b92aba17..00000000 --- a/train/compute/python/.gitignore +++ /dev/null @@ -1,7 +0,0 @@ -/build -/dist -*.so -*.egg-info -/*.json -/*.log -__pycache__ diff --git a/train/compute/python/README.md b/train/compute/python/README.md deleted file mode 100644 index 0fc49a15..00000000 --- a/train/compute/python/README.md +++ /dev/null @@ -1,301 +0,0 @@ -# PARAM Training Compute Benchmark - -## Overview -The training compute benchmarks have gone under development to support new use cases. The previous [standalone](../pt) benchmarks remain unchanged. - -The general motivation and design philosophy for the new microbenchmarks are: -* Support a wide variety of PyTorch operators and workloads. -* Decoupled workload configuration from Python code. It allows building tools in a pipeline to collect operator configuration from production runs, generate microbenchmark inputs, gather metrics. -* Generates human readable and easy to parse output data format for downstream tools. -* A library interface allowing external tools to run and collect data results from the microbenchmarks. -* Support replay of workload through PyTorch execution trace. - -For design and implementation details or make a contribution to the project, please look at the [development documentation](development.md). - -## Installation -We use `setuptools` to install/uninstall the `parambench-train-compute` package: - -```shell -# Inside dir "param/train/compute/pytnon" - -# Install required dependencies -> pip install -r requirements.txt - -# Install PARAM Compute package -> pip install . - -# Uninstall package -> python -m pip uninstall parambench-train-compute -``` - -The installed packages are under **`param_bench.train.compute.python`**. - -To use the [`FBGEMM_GPU`](https://github.com/pytorch/FBGEMM/tree/main/fbgemm_gpu) library and its operator benchmark workload ([`split_table_batched_embeddings_ops.py`](workloads/pytorch/split_table_batched_embeddings_ops.py)), please follow its set up instruction to download and install. It's not required for the compute benchmarks. During initialization, if an operator fail to import, it'll be ignored and will not affect other benchmarks. - -Please make sure to install the `parambench-train-comms` package (`train/comms/pt`). This is important because some functions in this package reference those in the comms package. - -## Usage -The bundled tool scripts such as [`run_benchmark.py`](pytorch/run_benchmark.py) are written using relative import paths as part of the `parambench-train-compute` package, so they must be ran as a module using the `python -m` option. - -A reliable way to run the benchmarks is install `parambench-train-compute` as a package following the above instructions. Afterward, it can be ran as: -```shell -# Run benchmark tool script module -> python -m param_bench.train.compute.python.pytorch.run_benchmark -c examples/pytorch/configs/simple_add.json -``` - -Without installing the package, you can run a tool script as a module in the source directory: -```shell -# Inside dir "param/train/compute" -> python -m python.pytorch.run_benchmark -c python/examples/pytorch/configs/simple_add.json -``` -However, this method may conflict with other packages (such as `fbgemm_gpu.split_table_batched_embeddings_ops`) that have its own modules under a `python` package. - -Additional example configs can be found in [`examples/pytorch/configs/`](examples/pytorch/configs/). - -### Benchmark Library -As a library, it can be used as any regular Python package: -```python -from param_bench.train.compute.python.lib.config import BenchmarkConfig -``` -A complete example to generate benchmark config, run the benchmark, then get the results can be found in [`run_op.py`](examples/pytorch/run_op.py) - -## PyTorch Benchmark Options -``` -=> python -m param_bench.train.compute.python.pytorch.run_benchmark -h -usage: run_benchmark.py [-h] [-c CONFIG] [-w WARMUP] [-i ITERATION] [-b] [-d DEVICE] [-o OUTPUT_PREFIX] [-r RESUME_ID] [-s STOP_ID] [-a] [--cuda-l2-cache [{on,off}]] [--ncu] [--ncu-bin NCU_BIN] [--ncu-args-file NCU_ARGS_FILE] [--ncu-warmup NCU_WARMUP] - [--ncu-iteration NCU_ITERATION] [--nsys] [--nsys-bin NSYS_BIN] [--nsys-args-file NSYS_ARGS_FILE] [--nsys-warmup NSYS_WARMUP] [--nsys-iteration NSYS_ITERATION] [--run-batch-size RUN_BATCH_SIZE] [--batch-cuda-device BATCH_CUDA_DEVICE] - [--batch-cmd BATCH_CMD] [--exec-mode [{discrete,continuous,continuous_events}]] [-p] [-l LOG_LEVEL] [--version] - -PyTorch Microbenchmarks - -optional arguments: - -h, --help show this help message and exit - -c CONFIG, --config CONFIG - The benchmark config file. - -w WARMUP, --warmup WARMUP - Number of warm up iterations. - -i ITERATION, --iteration ITERATION - Number of benchmark iterations. - -b, --backward Include backward pass. - -d DEVICE, --device DEVICE - Target device for benchmark. - -o OUTPUT_PREFIX, --output-prefix OUTPUT_PREFIX - File name prefix to write benchmark results. - -r RESUME_ID, --resume-id RESUME_ID - Define a resume op_run_id to continue benchmark, skip all previous configs. - -s STOP_ID, --stop_id STOP_ID - Define a stop op_run_id (exclusive) to stop benchmark, skip remaining configs. - -a, --append Append to output file, rather than overwrite. - --cuda-l2-cache [{on,off}] - Set option for CUDA GPU L2 cache between iterations in discrete mode. - --ncu Run NSight Compute to collect metrics. - --ncu-bin NCU_BIN Path to the NSight Compute (ncu) binary. - --ncu-args-file NCU_ARGS_FILE - NSight Compute extra command line options (metrics etc.). - --ncu-warmup NCU_WARMUP - NSight Systems number of warmup runs. - --ncu-iteration NCU_ITERATION - NSight Systems number of measured iteration runs. - --nsys Run NSight Systems to collect metrics. - --nsys-bin NSYS_BIN Path to the NSight Systems (nsys) binary. - --nsys-args-file NSYS_ARGS_FILE - NSight Systems extra command line options (metrics etc.). - --nsys-warmup NSYS_WARMUP - NSight Systems number of warmup runs. - --nsys-iteration NSYS_ITERATION - NSight Systems number of measured iteration runs. - --run-batch-size RUN_BATCH_SIZE - Batch run input size (number of input configs to run in one launch), used by both NCU and NSYS. - --batch-cuda-device BATCH_CUDA_DEVICE - CUDA GPU device ID to run batch job. - --batch-cmd BATCH_CMD - Run batch job command. - --exec-mode [{discrete,continuous,continuous_events}] - Set execution mode of the operators (discrete, continuous, continuous_events). Default=discrete - -p, --profile Enable profiler and tracing. - -l LOG_LEVEL, --log-level LOG_LEVEL - Log output verbosity. - --version Print version. -``` - -## Benchmark Configuration File -Benchmark configurations are defined in a JSON format. It can be stored in a file on disk, or being passed between external callers and the benchmark’s library interface. There are two types of configurations: -* Build configuration (optional) - * Defines arguments used to construct and initialize the operator. - * It’s optional for operators that do not require initialization. -* Input configuration - * Defines arguments used to execute the operator. - -An operator may or may not need to have a build configuration. For example, `torch.matmul` can be called directly with input arguments, so there's no need to specify a build configuration. Other operators require creating the operator first before running it: - -```python -embedding = torch.nn.EmbeddingBag( - num_embeddings=embeddingbags, - embedding_dim=dim, - mode=mode, - include_last_offset=include_last_offset, - sparse=sparse).to(device=device) -embedding(input, offset) -``` -The library expects the benchmark configuration in the following JSON format (the notes in `<>` are comments): -```json -{ - "operator_name": { - "build_iterator": "(optional) build iterator name", - "input_iterator": "(optional) input iterator name", - "build_data_generator": "(optional) data generator name", - "input_data_generator": "(required) data generator name", - "config": [ - - { - "build": [ - <(optional) a list of build spec> - ] - "input": [ - <(required) a list of input spec> - ] - } - ] - } - -} -``` -The **`"operator_name"`** is a key mapped to a concrete workload implementation defined inside [`workloads`](workloads) directory, for example [`workloads/pytorch/native_basic_ops.py`](workloads/pytorch/native_basic_ops.py). - -## Default PyTorch Config Specification -For each **`"build"`** and **`"input"`** configuration, the __`"args"`__ and __`"kwargs"`__ JSON keys should be specified with a list of argument data specifications (see [PyTorch Data Types](#pyTorch-data-types)). This is synonymous to Python's __`*args`__ and __`**kwargs`__. As expected, **`"args"`** is positional and defined as a list. __`"kwargs"`__ is defined as a dictionary of `"kwarg_name": `. - -**Example** -```json -{ - "torch.baddbmm": { - "input_data_generator": "PyTorch:DefaultDataGenerator", - "config": [ - { - "input": [ - { - "args": [ - { - "dtype": "float", - "shape": [2, 1, 512], - "type": "tensor" - }, - { - "dtype": "float", - "shape": [2, 512, 512], - "type": "tensor" - }, - { - "dtype": "float", - "shape": [2, 512, 512], - "type": "tensor" - } - ], - "kwargs": { - "beta": { - "type": "int", - "value": 1 - }, - "alpha": { - "type": "int", - "value": 1 - } - } - } - ] - } - ] - } -} -``` - -### PyTorch Data Types Specification -Current supported data types and examples are listed here: -```json -{ - "type": "int", - "value": 237 -}, -{ - "type": "int", - "value_range": [100, 1000] -}, -{ - "type": "long", - "value": 8328 -}, -{ - "type": "long", - "value_range": [1000, 10000] -}, -{ - "type": "float", - "value": 1.2 -}, -{ - "type": "float", - "value_range": [0.0, 2.0] -}, -{ - "type": "double", - "value": 3.4 -}, -{ - "type": "double", - "value_range": [0.5, 5.5] -}, -{ - "type": "bool", - "value": false -}, -{ - "type": "device", - "value": "cpu" -}, -{ - "type": "str", - "value": "a string value" -}, -{ - "type": "genericlist", - "value": [ - { - "type": "int", - "value": 237, - }, - { - "type": "tensor", - "dtype": "float", - "shape": [16, 32], - }] -}, -{ - "type": "tuple", - "value": [ - { - "type": "tensor", - "dtype": "float", - "shape": [16, 32], - }, - { - "type": "tensor", - "dtype": "float", - "shape": [16, 32], - }] -}, -{ - "type": "tensor", - "dtype": "float", - "shape": [128, 256] -} -``` -**Notes** -* `"value_range"` specifies a random value in the `[min, max]` range to be generated for the argument. - -To help construct benchmark configs, some utility functions are available in [examples/pytorch/run_op.py](examples/pytorch/run_op.py). - -### Configuration Customization -Users are able to implement custom specs for **`"build"`** and **`"input"`** to support a wide variety of operators. Should there's a need, the benchmark specification allows implementing new [`ConfigIterator`](lib/config.py) and [`DataGenerator`](lib/data.py) for your specific use case. - -## Development Contributions -For more details, please take a look at the [development document](development.md). diff --git a/train/compute/python/lib/__init__.py b/train/compute/python/lib/__init__.py deleted file mode 100644 index b9b5e2c5..00000000 --- a/train/compute/python/lib/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -import time - -__base_version__ = "1.0.0" - - -def __generate_git_param_train_compute_version(): - # git hash - commit_version = "+git" - try: - import git - - repo = git.Repo(search_parent_directories=True) - commit_version = f"{commit_version}.{repo.head.object.hexsha}" - except Exception: - pass - - timestamp = int(time.time()) - commit_version = f"{commit_version}.{timestamp}" - return f"{__base_version__}{commit_version}" - - -def __generate_fbcode_param_train_compute_version(): - # Meta build hash - commit_version = "+fbcode" - try: - from __manifest__ import fbmake - - if fbmake["revision"]: - commit_version = f"{commit_version}.{fbmake['revision']}" - if fbmake["time"]: - commit_version = f"{commit_version}.{fbmake['epochtime']}" - else: - timestamp = int(time.time()) - commit_version = f"{commit_version}.{timestamp}" - except Exception: - commit_version = "+local" - - return f"{__base_version__}{commit_version}" - - -def __get_version(): - # First try to get the version from setup.py generated _version.py file. - try: - from ._version import __param_train_compute_version - - return __param_train_compute_version - except Exception: - pass - # If failed try to get fbcode build version. - return __generate_fbcode_param_train_compute_version() - - -__version__ = __get_version() diff --git a/train/compute/python/pytorch/__init__.py b/train/compute/python/pytorch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/requirements.txt b/train/compute/python/requirements.txt deleted file mode 100644 index d776a048..00000000 --- a/train/compute/python/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -fbgemm_gpu -gitpython -networkx -numpy -pydot -scipy -torch>=2.0.0 diff --git a/train/compute/python/setup.py b/train/compute/python/setup.py deleted file mode 100644 index 1f89d9c6..00000000 --- a/train/compute/python/setup.py +++ /dev/null @@ -1,45 +0,0 @@ -from lib import __generate_git_param_train_compute_version -from setuptools import setup - - -def main(): - package_base = "param_bench.train.compute.python" - - # List the packages and their dir mapping: - # "install_destination_package_path": "source_dir_path" - package_dir_map = { - f"{package_base}": ".", - f"{package_base}.examples": "examples", - f"{package_base}.examples.pytorch": "examples/pytorch", - f"{package_base}.lib": "lib", - f"{package_base}.lib.pytorch": "lib/pytorch", - f"{package_base}.pytorch": "pytorch", - f"{package_base}.test": "test", - f"{package_base}.test.pytorch": "test/pytorch", - f"{package_base}.tools": "tools", - f"{package_base}.workloads": "workloads", - f"{package_base}.workloads.pytorch": "workloads/pytorch", - } - - packages = list(package_dir_map) - - param_train_compute_version = __generate_git_param_train_compute_version() - with open("./lib/_version.py", "w") as version_out: - version_out.write( - f"__param_train_compute_version='{param_train_compute_version}'" - ) - - setup( - name="parambench-train-compute", - version=param_train_compute_version, - python_requires=">=3.8", - author="Louis Feng", - author_email="lofe@fb.com", - url="https://github.com/facebookresearch/param", - packages=packages, - package_dir=package_dir_map, - ) - - -if __name__ == "__main__": - main() diff --git a/train/compute/python/test/__init__.py b/train/compute/python/test/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/test/data/__init__.py b/train/compute/python/test/data/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/test/pytorch/__init__.py b/train/compute/python/test/pytorch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/tools/README.md b/train/compute/python/tools/README.md deleted file mode 100644 index 50848b66..00000000 --- a/train/compute/python/tools/README.md +++ /dev/null @@ -1,34 +0,0 @@ -# Execution Trace Replay (et_replay) -`et_replay` is a tool designed for replaying Chakra Execution Traces (ET) from machine learning models. - -## Installation -To install `param`, use the following commands: - -```bash -$ git clone --recurse-submodules git@github.com:facebookresearch/param.git -$ conda create -n param python=3.8.0 -$ conda activate param -$ cd param -$ pip3 install -r requirements.txt -$ cd train/comms/pt/ -$ pip3 install . -$ cd - -$ cd train/compute/python/ -$ pip3 install -r requirements.txt -$ pip3 install . -$ cd - -``` - -## Running et_replay -To use et_replay, execution traces are required. -Start by collecting an execution trace using the command below. This command runs a benchmark with specific configurations and enables execution tracing. -```bash -$ python -m param_bench.train.compute.python.pytorch.run_benchmark -c train/compute/python/examples/pytorch/configs/simple_add.json --et -``` - -After collecting the trace, replay it with the following command. Set the warm-up iteration count to at least 1 to exclude tensor transfer time to GPUs. -```bash -$ python -m param_bench.train.compute.python.tools.et_replay --input --warmup-iter 10 --iter 50 --compute --profile-replay -``` - -> Note: When analyzing performance values from et_replay, refer to the collected Kineto traces rather than the execution time reported by et_replay. Kineto traces are only collected when --profile-replay is provided. diff --git a/train/compute/python/tools/__init__.py b/train/compute/python/tools/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/workloads/__init__.py b/train/compute/python/workloads/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/workloads/pytorch/__init__.py b/train/compute/python/workloads/pytorch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/train/compute/python/workloads/pytorch/alex_net.py b/train/compute/python/workloads/pytorch/alex_net.py deleted file mode 100644 index bd2db3ce..00000000 --- a/train/compute/python/workloads/pytorch/alex_net.py +++ /dev/null @@ -1,49 +0,0 @@ -import torch -import torch.nn as nn - -from ...lib.operator import register_operator -from ...lib.pytorch.operator_impl import BuildableOp - - -class AlexNet(nn.Module): - """ - Ref: https://pytorch.org/vision/master/_modules/torchvision/models/alexnet.html - """ - - def __init__(self, num_classes: int = 1000, dropout: float = 0.5) -> None: - super().__init__() - self.features = nn.Sequential( - nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), - nn.ReLU(inplace=True), - nn.MaxPool2d(kernel_size=3, stride=2), - nn.Conv2d(64, 192, kernel_size=5, padding=2), - nn.ReLU(inplace=True), - nn.MaxPool2d(kernel_size=3, stride=2), - nn.Conv2d(192, 384, kernel_size=3, padding=1), - nn.ReLU(inplace=True), - nn.Conv2d(384, 256, kernel_size=3, padding=1), - nn.ReLU(inplace=True), - nn.Conv2d(256, 256, kernel_size=3, padding=1), - nn.ReLU(inplace=True), - nn.MaxPool2d(kernel_size=3, stride=2), - ) - self.avgpool = nn.AdaptiveAvgPool2d((6, 6)) - self.classifier = nn.Sequential( - nn.Dropout(p=dropout), - nn.Linear(256 * 6 * 6, 4096), - nn.ReLU(inplace=True), - nn.Dropout(p=dropout), - nn.Linear(4096, 4096), - nn.ReLU(inplace=True), - nn.Linear(4096, num_classes), - ) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - x = self.features(x) - x = self.avgpool(x) - x = torch.flatten(x, 1) - x = self.classifier(x) - return x - - -register_operator("pytorch.model.alex_net", BuildableOp(AlexNet)) diff --git a/train/compute/python/workloads/pytorch/native_basic_ops.py b/train/compute/python/workloads/pytorch/native_basic_ops.py deleted file mode 100644 index 2ca9fd31..00000000 --- a/train/compute/python/workloads/pytorch/native_basic_ops.py +++ /dev/null @@ -1,39 +0,0 @@ -from typing import Dict - -import torch - -from ...lib.operator import OperatorInterface, register_operators -from ...lib.pytorch.operator_impl import BuildableOp, CallableOp, UnaryOp - - -# Unary -unary_ops: Dict[str, OperatorInterface] = { - "torch.add_": UnaryOp("add_"), - "torch.clamp_": UnaryOp("clamp_"), -} -register_operators(unary_ops) - -callable_ops: Dict[str, OperatorInterface] = { - "torch.add": CallableOp(torch.add), - "torch.baddbmm": CallableOp(torch.baddbmm), - "torch.bmm": CallableOp(torch.bmm), - "torch.cat": CallableOp(torch.cat), - "torch.matmul": CallableOp(torch.matmul), - "torch.mean": CallableOp(torch.mean), - "torch.mm": CallableOp(torch.mm), - "torch.mul": CallableOp(torch.mul), - "torch.nn.functional.relu": CallableOp(torch.nn.functional.relu), - "torch.reshape": CallableOp(torch.reshape), -} -register_operators(callable_ops) - - -buildable_ops: Dict[str, OperatorInterface] = { - "torch.nn.AdaptiveAvgPool2d": BuildableOp(torch.nn.AdaptiveAvgPool2d), - "torch.nn.Conv2d": BuildableOp(torch.nn.Conv2d), - "torch.nn.Dropout": BuildableOp(torch.nn.Dropout), - "torch.nn.MaxPool2d": BuildableOp(torch.nn.MaxPool2d), - "torch.nn.ReLU": BuildableOp(torch.nn.ReLU), - "torch.nn.Linear": BuildableOp(torch.nn.Linear), -} -register_operators(buildable_ops) diff --git a/train/compute/python/workloads/pytorch/split_table_batched_embeddings_ops.py b/train/compute/python/workloads/pytorch/split_table_batched_embeddings_ops.py deleted file mode 100644 index ab25ba15..00000000 --- a/train/compute/python/workloads/pytorch/split_table_batched_embeddings_ops.py +++ /dev/null @@ -1,331 +0,0 @@ -import copy -import gc -import os -from typing import Any, Dict, List, Optional, Tuple, Union - -import numpy as np -import torch -from fbgemm_gpu.split_table_batched_embeddings_ops import ( - CacheAlgorithm, - ComputeDevice, - EmbeddingLocation, - OptimType, - PoolingMode, - SparseType, - SplitTableBatchedEmbeddingBagsCodegen, - WeightDecayMode, -) - -from ...lib.data import register_data_generator -from ...lib.generator import full_range, IterableList, ListProduct, TableProduct -from ...lib.init_helper import get_logger -from ...lib.iterator import ( - ConfigIterator, - genericList_to_list, - register_config_iterator, - remove_meta_attr, -) -from ...lib.operator import OperatorInterface, register_operator - -logger = get_logger() - - -class SplitTableBatchedEmbeddingBagsCodegenInputIterator(ConfigIterator): - def __init__( - self, - configs: Dict[str, Any], - key: str, - device: str, - ): - super(SplitTableBatchedEmbeddingBagsCodegenInputIterator, self).__init__( - configs, key, device - ) - logger.debug(f"build_input_config: {configs}") - build_config = configs["build"] - logger.debug(f"build_config: {build_config}") - self.num_tables = build_config["args"][0] - self.rows = build_config["args"][1] - self.dim = build_config["args"][2] - self.weighted = build_config["args"][4] - self.weights_precision = build_config["args"][5] - self.generator = self._generator() - - def _generator(self): - inputs = self.configs[self.key] - var_id = 0 - for input in inputs: - input_config = copy.deepcopy(input) - args = [] - for arg in input_config["args"]: - if "__range__" in arg: - arg["value"] = full_range(*arg["value"]) - if "__list__" in arg: - arg["value"] = IterableList(arg["value"]) - args.append(TableProduct(arg)) - - config_id = 0 - for arg_config in ListProduct(args): - batch_size = arg_config[0] - pooling_factor = arg_config[1] - result = { - "args": [ - self.num_tables, - self.rows, - self.dim, - batch_size, - pooling_factor, - self.weighted, - self.weights_precision, - ], - "kwargs": {}, - } - yield (f"{var_id}_{config_id}", remove_meta_attr(result)) - config_id += 1 - - def __next__(self): - return next(self.generator) - - -register_config_iterator( - "SplitTableBatchedEmbeddingBagsCodegenInputIterator", - SplitTableBatchedEmbeddingBagsCodegenInputIterator, -) - - -def generate_requests( - B: int, # batch size - L: int, # pooling factor - E: int, # emb size - offset_start: int, # indices offset from previous generator - # alpha <= 1.0: use uniform distribution - # alpha > 1.0: use zjpf distribution - alpha: float = 1.0, - weighted: bool = False, -) -> List[Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]]: - indices_size = B * L - # indices - if alpha == 0: - # linear sequence by pooling factor - indices = torch.arange(0, indices_size).long() % L - elif alpha <= 0.5: - # linear sequence by embedding size - indices = torch.arange(0, indices_size).long() % E - elif alpha <= 1.0: - indices = torch.randint( - low=0, - high=E, - size=(indices_size,), - dtype=torch.int64, - ) - else: - indices = torch.as_tensor(np.random.zipf(a=alpha, size=indices_size)).long() % E - - # offsets - lengths = np.ones(B, dtype=np.int64) * L - # here we want to add the start of previous offset to all the offsets - # if offset_start = 0, we insert it in the beginning - if offset_start == 0: - offsets = torch.tensor(np.cumsum([0] + lengths.tolist())) - else: - offsets = torch.tensor(offset_start + np.cumsum(lengths)) - - # weights - weights_tensor = ( - torch.randn(indices_size, dtype=torch.float32) if weighted else None - ) - - return (indices, offsets, weights_tensor) - - -class SplitTableBatchedEmbeddingBagsCodegenInputDataGenerator: - def get_data(self, config, device, alpha=1): - logger.debug(f"data generator config: {config}") - # batch size * pooling_factor - num_tables = config["args"][0]["value"] - - if num_tables > 1: - rows = config["args"][1]["value"] - pooling_factors = config["args"][4]["value"] - else: - rows = [config["args"][1]["value"]] - pooling_factors = [config["args"][4]["value"]] - batch_size = config["args"][3]["value"] - weighted = config["args"][5]["value"] - - indices_list = [] - offsets_list = [] - per_sample_weights_list = [] - offset_start = 0 - distribution = os.getenv("split_embedding_distribution") - if distribution is None: - distribution = 1 - distribution = alpha - logger.debug(f"distribution = {distribution}") - - target_device = torch.device(device) - - indices_file = None - offsets_file = None - weights_file = None - if ("indices_tensor" in config["args"][4]) and ( - "offsets_tensor" in config["args"][4] - ): - indices_file = config["args"][4]["indices_tensor"] - offsets_file = config["args"][4]["offsets_tensor"] - if weighted and "weights_tensor" in config["args"][4]: - weights_file = config["args"][4]["weights_tensor"] - else: - indices_file = os.getenv("split_embedding_indices") - offsets_file = os.getenv("split_embedding_offsets") - if weighted: - weights_file = os.getenv("split_embedding_weights") - - logger.debug(f"indices_file: {indices_file}, offsets_file: {offsets_file}") - if indices_file is not None and offsets_file is not None: - indices_tensor = torch.load(indices_file, map_location=target_device) - offsets_tensor = torch.load(offsets_file, map_location=target_device) - per_sample_weights_tensor = None - if weights_file: - per_sample_weights_tensor = torch.load( - weights_file, map_location=target_device - ) - else: - for i in range(num_tables): - indices, offsets, per_sample_weights = generate_requests( - batch_size, - pooling_factors[i], - rows[i], - offset_start, - float(distribution), - weighted, - ) - indices_list.append(indices) - offsets_list.append(offsets) - # update to the offset_start to the last element of current offset - offset_start = offsets[-1].item() - if weighted: - per_sample_weights_list.append(per_sample_weights) - - indices_tensor = torch.cat(indices_list) - offsets_tensor = torch.cat(offsets_list) - - # check for per sample weights - per_sample_weights_tensor = ( - torch.cat(per_sample_weights_list) if weighted else None - ) - - logger.debug(f"indices: {indices_tensor.shape}") - logger.debug(f"offsets: {offsets_tensor.shape}") - if per_sample_weights_tensor is not None: - logger.debug( - f"per_sample_weights: {per_sample_weights_tensor.shape}, {per_sample_weights_tensor}" - ) - - return ( - [ - indices_tensor.to(target_device), - offsets_tensor.to(target_device), - per_sample_weights_tensor.to(target_device) if weighted else None, - ], - {}, - ) - - -register_data_generator( - "SplitTableBatchedEmbeddingBagsCodegenInputDataGenerator", - SplitTableBatchedEmbeddingBagsCodegenInputDataGenerator, -) - - -# Callable ops are ops can be called in the form of op(*args, **kwargs) -class SplitTableBatchedEmbeddingBagsCodegenOp(OperatorInterface): - def __init__( - self, - ): - super(SplitTableBatchedEmbeddingBagsCodegenOp, self).__init__() - self.op = None - self.fwd_out: torch.tensor = None - self.grad_in: torch.tensor = None - - def build( - self, - num_tables: int, - rows: Union[int, list], - dims: Union[int, list], - pooling: int, - weighted: bool, - weights_precision: str, - optimizer: str, - lr: float = 0.01, - eps: float = 1.0e-8, - weight_decay: float = 0.0, - weight_decay_mode: WeightDecayMode = WeightDecayMode.NONE, - ): - logger.debug( - f"build: [{num_tables}, {rows}, {dims}, {pooling}, {weighted}, {weights_precision}, \ - {optimizer}, {lr}, {eps}, {weight_decay}, {WeightDecayMode}]" - ) - rows_list = rows if isinstance(rows, list) else [rows] - dims_list = dims if isinstance(dims, list) else [dims] - if self.device.startswith("cpu"): - compute_device = ComputeDevice.CPU - location = EmbeddingLocation.HOST - elif self.device.startswith("cuda"): - compute_device = ComputeDevice.CUDA - location = EmbeddingLocation.DEVICE - else: - raise ValueError(f"Unknown compute device {self.device}") - - # split_table op options from actual runs of - # caffe2/torch/fb/module_factory/proxy_module/grouped_sharded_embedding_bag.py - self.op = SplitTableBatchedEmbeddingBagsCodegen( - [ - ( - rows_list[i], - dims_list[i], - location, - compute_device, - ) - for i in range(num_tables) - ], - optimizer=OptimType(optimizer), - pooling_mode=PoolingMode(pooling), - weights_precision=SparseType(weights_precision), - stochastic_rounding=True, - cache_algorithm=CacheAlgorithm.LFU, - cache_load_factor=0.0, - cache_reserved_memory=12.0, - device=torch.device(self.device), - learning_rate=lr, - eps=eps, - weight_decay=weight_decay, - weight_decay_mode=weight_decay_mode, - ) - - logger.debug(f"op embedding_specs: {self.op.embedding_specs}") - - def cleanup(self): - logger.debug("op cleanup") - self.op = None - self.grad_in = None - self.fwd_out = None - - def forward(self, *args, **kwargs): - self.fwd_out = self.op.forward(args[0], args[1], args[2]) - return self.fwd_out - - def create_grad(self): - self.grad_in = torch.ones_like(self.fwd_out) - - def backward(self, grad=None): - if grad is not None: - self.fwd_out.backward(grad) - else: - if self.grad_in is None: - self.create_grad() - self.fwd_out.backward(self.grad_in) - - -register_operator( - "SplitTableBatchedEmbeddingBagsCodegen", SplitTableBatchedEmbeddingBagsCodegenOp() -)