Skip to content

Commit

Permalink
rename files
Browse files Browse the repository at this point in the history
  • Loading branch information
emattia committed Apr 11, 2024
1 parent aa98938 commit 3a43e5d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 45 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Running Multi-node Tasks

The `@parallel` decorator is like the [foreach](https://docs.metaflow.org/metaflow/basics#foreach) in that it allows Metaflow users to launch multiple runtime tasks based on the same step function process end users write. The primary difference is that `@parallel` enables inter-process communication (IPC) between tasks, often referred to as “multi-node computation” in HPC and deep learning contexts.
## From foreach to @parallel

This basic difference is simple to visualize. First consider the embarassingly parallel `foreach`:
The `@parallel` decorator, and the `num_parallel=N` construct attached to it, is like the [`foreach`](https://docs.metaflow.org/metaflow/basics#foreach) construct in that each mode allows Metaflow users to launch multiple runtime tasks based on one `@step` function. The primary difference is `@parallel` enables inter-process communication (IPC) between tasks.

Check warning on line 5 in docs/scaling/compute/multi-node.md

View workflow job for this annotation

GitHub Actions / Run linters

Line length: Expected: 100; Actual: 362

This difference is best visualized by comparing `foreach` to `num_parallel` and `@parallel`. First consider the embarassingly parallel `foreach`, where there are no inter-process communications expected:

Check warning on line 7 in docs/scaling/compute/multi-node.md

View workflow job for this annotation

GitHub Actions / Run linters

Line length: Expected: 100; Actual: 203
```mdx-code-block
import ReactPlayer from 'react-player';
```
Expand All @@ -14,11 +16,11 @@ import ReactPlayer from 'react-player';
And now note the difference with the `num_parallel` case:
<ReactPlayer playing controls muted loop url='/assets/parallel-job.mp4' width='100%' height='100%'/>

IPC is needed in cases that require computers on different machines pooling their resources to complete one job, such as a multi-node process that where each worker does some computing and then results are synced in an [all reduce](https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce/). This computation pattern primarily appears in distributed AI training and numerical simulations in traditional HPC contexts. To support these use cases, the `@parallel` decorator provides a Metaflow API to launch `num_parallel` tasks that can communicate directly with each other.
IPC is needed in cases that require computers on different machines pooling their resources to complete one job, such as a multi-node process where each worker computes something and results are synced via an [all reduce](https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce/) or similar aggregation. This computation pattern primarily appears in distributed model training and numerical simulations in the AI and HPC fields. To support these use cases, the `@parallel` decorator provides a Metaflow API to launch `num_parallel` tasks that can communicate directly with each other.

Check warning on line 19 in docs/scaling/compute/multi-node.md

View workflow job for this annotation

GitHub Actions / Run linters

Line length: Expected: 100; Actual: 588

The main reason to use Metaflow and the `@parallel` decorator for this style of compute is that the implementation works with standard tools in the distributed computing ecosystem like Ray and PyTorch distributed, and is additive with the typical benefits of Metaflow, for example, packaging code and dependencies across worker nodes and seamlessly moving between compute providers.
The implementation works with standard tools in the distributed computing ecosystem like MPI, Ray, and PyTorch distributed, and is additive with the typical benefits of Metaflow, for example, packaging code and dependencies across worker nodes and seamlessly moving between compute providers.

To implement the idea, Metaflow has a decorator called `@parallel` that automatically forms a set of [gang-scheduled](https://en.wikipedia.org/wiki/Gang_scheduling) tasks that can coordinate on a single job defined or called in the `@step` function.
## How it works

### Unbounded foreach
The unbounded foreach is the fundamental mechanism that underlies the `@parallel` implementation and enables gang-scheduled Metaflow tasks. It provides context to other Metaflow decorators that need to be aware of the multi-node setting.
Expand Down Expand Up @@ -102,22 +104,22 @@ if __name__ == "__main__":
MPI4PyFlow()
```

### User-facing abstractions
## Higher-level abstractions
The MPI decorator shown above is one of several existing Metaflow plugins demonstrating how to extend the `@parallel` functionality. You can look into the repositories in the decorator implementation column of this table to see the pattern you can follow to modify or implement your own decorator for any multi-node computing framework.

:::note
If preferred, you can also use the `@parallel` decorator itself and manually configure any other setup needed for your use case as part of the Metaflow user code.
:::

| Decorator Implementation | UX | Description | PyPi Release | Example |
| :---: | :---: | :---: | :---: | :---: |
| :---: | --- | --- | :---: | :---: |
| [`@torchrun`](https://github.com/outerbounds/metaflow-torchrun) | Use `current.torch.run` to submit your `torch.distributed` program. No need to log into each node, call the code once in `@step`. | A [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html) command that runs `@step` function code on each node. [Torch distributed](https://pytorch.org/tutorials/beginner/dist_overview.html) is used under the hood to handle communication between nodes. | [`metaflow-torchrun`](https://pypi.org/project/metaflow-torchrun/) | [MinGPT](https://github.com/outerbounds/metaflow-torchrun/blob/main/examples/min-gpt/flow.py) |
| [`@tensorflow`](https://github.com/outerbounds/metaflow-tensorflow/tree/main) | Put TensorFlow code in a distributed strategy scope, and call it from step function. | Run the `@step` function code on each node. This means the user picks the appropriate [strategy](https://www.tensorflow.org/guide/distributed_training#types_of_strategies) in their code. | [`metaflow-tensorflow`](https://pypi.org/project/metaflow-tensorflow/) | [Keras Distributed](https://github.com/outerbounds/metaflow-tensorflow/tree/main/examples/multi-node) |
| [`@deepspeed`](https://github.com/outerbounds/metaflow-deepspeed) | Exposes `current.deepspeed.run` <br/> Requires OpenSSH and OpenMPI installed in the Metaflow task container. | Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run. | [`metaflow-deepspeed`](https://pypi.org/project/metaflow-deepspeed/) | [Bert](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/bert) & [Dolly](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/dolly) |
| [`@metaflow_ray`](https://github.com/outerbounds/metaflow-ray/tree/main) | Write a Ray program locally or call script from `@step` function, `@metaflow_ray` takes care of forming the Ray cluster. | Forms a [Ray cluster](https://docs.ray.io/en/latest/cluster/getting-started.html) dynamically. Runs the `@step` function code on the control task as Ray’s “head node”. | [`metaflow-ray`](https://pypi.org/project/metaflow-ray/) | [GPT-J](https://github.com/outerbounds/metaflow-ray/tree/main/examples/ray-fine-tuning-gpt-j) & [Distributed XGBoost](https://github.com/outerbounds/metaflow-ray/tree/main/examples/train) |
| [`@tensorflow`](https://github.com/outerbounds/metaflow-tensorflow/tree/main) | Put TensorFlow code in a distributed strategy scope, and call it from step function. | Run the `@step` function code on each node. This means the user picks the appropriate [strategy](https://www.tensorflow.org/guide/distributed_training#types_of_strategies) in their code. | [`metaflow-tensorflow`](https://pypi.org/project/metaflow-tensorflow/) | [Keras Distributed](https://github.com/outerbounds/metaflow-tensorflow/tree/main/examples/multi-node) |
| [`@mpi`](https://github.com/outerbounds/metaflow-mpi) | Exposes `current.mpi.cc`, `current.mpi.broadcast_file`, `current.mpi.run`, `current.mpi.exec`. Cluster SSH config is handled automatically inside the decorator. Requires OpenSSH and an MPI implementation are installed in the Metaflow task container. It was tested against OpenMPI, which you can find a sample Dockerfile for [here](https://github.com/outerbounds/metaflow-mpi/blob/main/examples/Dockerfile). | Forms an MPI cluster with passwordless SSH configured at task runtime. Users can submit a `mpi4py` program or compile, broadcast, and submit a C program. | [`metaflow-mpi`](https://pypi.org/project/metaflow-mpi/) | [Libgrape](https://github.com/outerbounds/metaflow-mpi/tree/main/examples/libgrape-ldbc-graph-benchmark) |
| [`@deepspeed`](https://github.com/outerbounds/metaflow-deepspeed) | Exposes `current.deepspeed.run` <br/> Requires OpenSSH and OpenMPI installed in the Metaflow task container. | Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run. | [`metaflow-deepspeed`](https://pypi.org/project/metaflow-deepspeed/) | [Bert](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/bert) & [Dolly](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/dolly) |

## Compute environment considerations
## Preparing multi-node infrastructure

Depending on the distributed computing frameworks and job types you want to use, there are various network adapters and HPC services that you may want to install into the Metaflow deployment.

Expand Down Expand Up @@ -155,7 +157,7 @@ When you pick the instance types you want in your AWS Batch compute environment,
The reason to do this is that latency between nodes is much faster when all worker nodes are in the same AWS Availability Zone, which will not necessarily happen without a Cluster Placement Group.

#### Intranode communication with shared memory
AWS Batch has a parameter called `shared_memory` that allows multiple processors on the same compute node to communicate using a memory (RAM) portion that is shared. _This feature works independently of the multi-node setting_, but can have additive benefits. This value can be tuned to your applications, and this [AWS blog](https://aws.amazon.com/blogs/compute/using-shared-memory-for-low-latency-intra-node-communication-in-aws-batch/) suggests a reasonable starting value of 4096 MB for most cases. In Metaflow, you can set this value like any other argument to the batch decorator:
AWS Batch has a parameter called `shared_memory` that allows multiple processors on the same compute node to communicate using a memory (RAM) portion that is shared. _This feature works independently of the multi-node setting_, but can have additive benefits and may resolve errors that can appear in the multi-node setting. This value can be tuned to your applications, and this [AWS blog](https://aws.amazon.com/blogs/compute/using-shared-memory-for-low-latency-intra-node-communication-in-aws-batch/) suggests a reasonable starting value of 4096 MB for most cases. In Metaflow, you can set this value like any other argument to the batch decorator:

```python
from metaflow import FlowSpec, step, resources
Expand Down Expand Up @@ -185,5 +187,13 @@ if __name__ == '__main__':
SharedMemory()
```

#### Intranode communication with AWS Elastic Fabric Adapter (EFA)

[Some AWS EC2 instances](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa.html#efa-instance-types) have network devices built for high-performance computing use cases. In order to use this, the AWS Batch Compute Environment that is connected the AWS Batch Job Queue needs to install the necessary EFA drivers in a Launch Template. You can see an example CloudFormation template with Launch Template that installs EFA devices [here](https://github.com/outerbounds/metaflow-trainium/blob/main/cfn/trn1_batch_resources.yaml#L294-L320). Then, Metaflow users running can specify `@batch(efa=8, ...)` to attach 8 network interfaces to each node in a multi-node batch job.

#### Limitations

AWS Batch Multi-node is not integrated with AWS Step Functions, so you cannot use the Metaflow step functions integration when using `@parallel` decorators and `@batch` together.

### Kubernetes
Multi-node support for Metaflow tasks run on Kubernetes clusters is a work-in-progress. Please reach us on [Slack](http://slack.outerbounds.co/) to access the latest experimental implementation and/or co-develop it with the Outerbounds team.
Loading

0 comments on commit 3a43e5d

Please sign in to comment.