Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support torchrun+SLURM multi-node trainings in ReturnnTrainingJob #552

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions returnn/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import sys
import os
import shutil
import socket
import subprocess as sp
import numpy as np
from typing import Dict, Sequence, Iterable, List, Optional, Union
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(
self.rqmt["gpu"] *= self.horovod_num_processes // (self.multi_node_slots or 1)
self.rqmt["mem"] *= self.horovod_num_processes // (self.multi_node_slots or 1)

def _get_run_cmd(self):
def _get_run_cmd(self, *, rdzv_node_addr: Optional[str] = None):
run_cmd = [
self.returnn_python_exe.get_path(),
self.returnn_root.join_right("rnn.py").get_path(),
Expand All @@ -255,11 +256,19 @@ def _get_run_cmd(self):
# Instead of using the torchrun binary, directly execute the corresponding Python module
# and directly use the correct Python environment.
prefix = [self.returnn_python_exe.get_path(), "-mtorch.distributed.run"]
if (self.multi_node_slots or 1) == 1:
multi_node_slots = self.multi_node_slots or 1
if multi_node_slots == 1:
prefix += ["--standalone"]
elif multi_node_slots > 1:
rdzv_id = self.job_id().replace("/", "_")
prefix += [
f'--rdzv-id="{rdzv_id}"',
"--rdzv-backend=c10d",
f'--rdzv-endpoint={rdzv_node_addr or "$RDZV_NODE_ADDR"}',
]
prefix += [
f"--nnodes={self.multi_node_slots or 1}",
f"--nproc-per-node={self.horovod_num_processes}",
f"--nnodes={multi_node_slots}",
f"--nproc-per-node={self.horovod_num_processes // multi_node_slots}",
]
run_cmd = prefix + run_cmd[1:]
elif self.distributed_launch_cmd == "mpirun":
Expand Down Expand Up @@ -379,12 +388,78 @@ def run(self):
print(f.read())
except Exception as exc:
print("Cannot read:", exc)
sys.stdout.flush()

env = os.environ.copy()
env["OMP_NUM_THREADS"] = str(self.rqmt["cpu"])
env["MKL_NUM_THREADS"] = str(self.rqmt["cpu"])
sp.check_call(self._get_run_cmd(), env=env)

rdzv_addr = None
if (self.multi_node_slots or 1) > 1 and self.distributed_launch_cmd == "torchrun":
import hashlib
import psutil

assert "SLURM_JOB_NODELIST" in os.environ, (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole logic is dependent on the cluster and maybe even on the specific configuration at our site.. could the cluster dependent logic somehow be abstracted in the sisyphus engine s.t. it can be implemented for other schedulers should we decide to switch in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the information a job has about the environment it is running in is quite limited, this would be a first.

I could see an interface like:

from sisyphus import runtime, Job

class MyJob(Job):
  def run(self):
    print(runtime.n_job_nodes()) # int(4)
    print(runtime.job_nodes()) # ["c-01", "c-02", ...] or so?
    print(runtime.cur_node_id()) # "c-01", or should this just be the local host name as given by e.g. `import socket`?

I suppose array tasks would then see single-entry items in there, and be given information about how many array tasks they are and which one they are right now? And the information in there would then be filled out by the currently active engine, e.g. by querying the environment variables or using other kinds of APIs.

Tbqh I feel this API risks being reduced to the lowest common denominator between all the different schedulers there are. Also the API can be limiting. What if the nodes assigned to a job change dynamically on the fly, e.g. if broken nodes are put back into rotation? Maybe it's better after all to push this responsibility into the job (as it is now) and then adapt the job for different types of schedulers as needed.

Copy link
Contributor Author

@NeoLegends NeoLegends Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree though, it's a bummer how much this mixes up generic job responsibilities and engine-specific responsibilities currently.

Especially the part on NCCL_NET is bad. NCCL seems to default to InfiniBand (IB) communication, even if there is no IB installed on the nodes. Then it crashes and you have to force it to use ethernet via NCCL_NET=socket (could be a peculiarity on our nodes though?). This env variable needs to be set on the training job, and it does not seem like setting it via gs.check_engine_limits works. Probably the job has already been pickled by that point, which is why changing the env from there no longer has an effect. Currently I set it in my regular training pipeline via job.set_env(...), but I find this quite annoying as it ties the job very closely to the nodes that it is going to run on.

Maybe we could extend the engines to set certain env variables on a per-cluster partition basis? I'm not sure. Or we improve RETURNN to try and detect the presence of IB and fall back to eth automatically if needed. I thought this would already be the default in NCCL. cc @albertz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show a list of all the env vars which are set in such a multi node job? Maybe there are some others which are more generic (not so specific about Slurm).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, NCCL normally figures these things out by itself. Did you file a NCCL issue (https://github.com/NVIDIA/nccl/issues; they are usually quite responsive; code is actually also not so hard to understand)? Did you check the NCCL debug logs (NCCL_DEBUG=INFO)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Horovod, we used mpirun, because usually, mpirin in a Cluster is better connected with the scheduling system, and usually that should always work, and MPI then should not about all the available nodes. Maybe that would be another option. Either to use mpirun directly instead of torchrun, or to somehow use MPI to get this info in some other way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbqh I feel this API risks being reduced to the lowest common denominator between all the different schedulers there are. Also the API can be limiting.

I disagree.

  1. If there is an API then it must implement all the things that our Jobs need. If an engine does not support a feature needed, then independent of having an API or not, it cannot be used with that Job. So we might as well raise errors if some engine does not support it
  2. Having all of the environment handling in the Job only serves to clutter the Job code and leads to code duplication across different Jobs that implement the same parsing

What if the nodes assigned to a job change dynamically on the fly, e.g. if broken nodes are put back into rotation?

No engine and no Job currently supports this. But support can be added if need be. If it becomes relevant we could even define capabilities that a Job can request and then it can only be scheduled on engines that support these capabilities.

Maybe it's better after all to push this responsibility into the job (as it is now) and then adapt the job for different types of schedulers as needed.

I strongly disagree. The point of Jobs is to define what is executed and sisyphus+engine care about the how

And besides:

"multi-node multi GPU-training w/ torchrun currently depends on SLURM environment variables to determine the master node"

if that is the only problem, there are engine independent ways around it. E.g. all tasks atomically write their hostname into a file and then wait until there are N names in the file. The first name becomes the master.
One could even implement a node drop out and be replaced scheme when writing time stamps and any node that does not update their timestamp in a period is dropped 🤷

Copy link
Contributor Author

@NeoLegends NeoLegends Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show a list of all the env vars which are set in such a multi node job? Maybe there are some others which are more generic (not so specific about Slurm).

_: /usr/bin/sbatch
BROWSER: [...]
COLORTERM: truecolor
DBUS_SESSION_BUS_ADDRESS: [...]
EDITOR: vim
ENVIRONMENT: BATCH
GIT_ASKPASS: [...]
HOME: /home/mgunz
HOSTNAME: c-07
HYDRA_BOOTSTRAP: slurm
HYDRA_LAUNCHER_EXTRA_ARGS: --external-launcher
I_MPI_HYDRA_BOOTSTRAP_EXEC_EXTRA_ARGS: --external-launcher
I_MPI_HYDRA_BOOTSTRAP: slurm
LANG: C.UTF-8
LESS: -R
LESSCLOSE: /usr/bin/lesspipe %s %s
LESSOPEN: | /usr/bin/lesspipe %s
LOGNAME: mgunz
LS_COLORS: [...]
LSCOLORS: [...]
MOTD_SHOWN: pam
OLDPWD: [...]
OMPI_MCA_plm_slurm_args: --external-launcher
PAGER: less
PATH: [...]
PRTE_MCA_plm_slurm_args: --external-launcher
PS1: [...]
PWD: [...]
SHELL: /bin/bash
SHLVL: 2
SLURM_CLUSTER_NAME: cluster
SLURM_CONF: /etc/slurm/slurm.conf
SLURM_CPU_BIND_LIST: 0x0000002000000020
SLURM_CPU_BIND_TYPE: mask_cpu:
SLURM_CPU_BIND_VERBOSE: quiet
SLURM_CPU_BIND: quiet,mask_cpu:0x0000002000000020
SLURM_CPUS_ON_NODE: 2
SLURM_CPUS_PER_TASK: 2
SLURM_DISTRIBUTION: cyclic
SLURM_GTIDS: 0
SLURM_JOB_ACCOUNT: science
SLURM_JOB_CPUS_PER_NODE: 2(x2)
SLURM_JOB_END_TIME: 1732115253
SLURM_JOB_GID: 1714422273
SLURM_JOB_ID: 4180414
SLURM_JOB_NAME: wrap
SLURM_JOB_NODELIST: c-[07-08]
SLURM_JOB_NUM_NODES: 2
SLURM_JOB_PARTITION: cpu
SLURM_JOB_QOS: normal
SLURM_JOB_START_TIME: 1730905653
SLURM_JOB_UID: 1714423942
SLURM_JOB_USER: mgunz
SLURM_JOBID: 4180414
SLURM_LAUNCH_NODE_IPADDR: 127.0.0.1
SLURM_LOCALID: 0
SLURM_MEM_PER_NODE: 100
SLURM_NNODES: 2
SLURM_NODEID: 0
SLURM_NODELIST: c-[07-08]
SLURM_NPROCS: 2
SLURM_NTASKS: 2
SLURM_PRIO_PROCESS: 0
SLURM_PROCID: 0
SLURM_SCRIPT_CONTEXT: prolog_task
SLURM_SRUN_COMM_HOST: 127.0.0.1
SLURM_SRUN_COMM_PORT: 34485
SLURM_STEP_ID: 0
SLURM_STEP_LAUNCHER_PORT: 34485
SLURM_STEP_NODELIST: c-[07-08]
SLURM_STEP_NUM_NODES: 2
SLURM_STEP_NUM_TASKS: 2
SLURM_STEP_TASKS_PER_NODE: 1(x2)
SLURM_STEPID: 0
SLURM_SUBMIT_DIR: [...]
SLURM_SUBMIT_HOST: gw-02
SLURM_TASK_PID: 3016977
SLURM_TASKS_PER_NODE: 1(x2)
SLURM_TOPOLOGY_ADDR_PATTERN: node
SLURM_TOPOLOGY_ADDR: c-07
SLURM_TRES_PER_TASK: cpu:2
SLURM_UMASK: 0022
SLURMD_DEBUG: 2
SLURMD_NODENAME: c-07
SRUN_DEBUG: 3
SSH_CLIENT: 10.5.0.251 61832 22
SSH_CONNECTION: 10.5.0.251 61832 10.5.120.2 22
SSL_CERT_DIR: /usr/lib/ssl/certs
SSL_CERT_FILE: /usr/lib/ssl/certs/ca-certificates.crt
STARSHIP_SESSION_KEY: 1017414017597927
STARSHIP_SHELL: zsh
TERM_PROGRAM_VERSION: 1.95.1
TERM_PROGRAM: vscode
TERM: xterm-256color
TMPDIR: /tmp
USER_ZDOTDIR: /home/mgunz
USER: mgunz
VIRTUAL_ENV_PROMPT: (pythonenv) 
VIRTUAL_ENV: /home/mgunz/src/venv/pythonenv
VSCODE_GIT_ASKPASS_EXTRA_ARGS: 
VSCODE_GIT_ASKPASS_MAIN: [...]
VSCODE_GIT_ASKPASS_NODE: [...]
VSCODE_GIT_IPC_HANDLE: [...]
VSCODE_INJECTION: 1
VSCODE_IPC_HOOK_CLI: [...]
XDG_DATA_DIRS: [...]
XDG_RUNTIME_DIR: [...]
XDG_SESSION_CLASS: user
XDG_SESSION_ID: 537835
XDG_SESSION_TYPE: tty
ZDOTDIR: /home/mgunz
ZSH: [...]

Not sure there are relevant vars that are not SLURM-specific.

I strongly disagree. The point of Jobs is to define what is executed and sisyphus+engine care about the how

Okay, would you then rather have the runtime interface or the file-based solution? The latter sounds to me like quite the hack.

Copy link
Member

@albertz albertz Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some MPI related env vars (HYDRA, OMPI, I_MPI). Probably MPI would be able to tell you the list of nodes in some way. That would already give you the runtime interface you are asking for, in a very standardized way (MPI should always be available in such cluster setting, esp with multi node). I.e. no need to introduce anything new here, no need to reinvent the wheel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I like this idea. I'm going to explore it.

"multi-node multi GPU-training w/ torchrun currently depends on SLURM environment variables "
"to determine the master node"
)

partaking_nodes = os.environ["SLURM_JOB_NODELIST"]
nodes = sorted(
node_name.strip()
for node_name in sp.check_output(["scontrol", "show", "hostnames", partaking_nodes])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, why is this need? You already have the nodes already (partaking_nodes)? How is nodes different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partaking_nodes is in SLURMs short abbreviated format, e.g. you'll get something like g-[11-13],g-16, and that command "decodes" the short form into the full list of proper node names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, I agree to @michelwi (see his comment above), we should not make this logic dependent on Slurm here in the job.

.decode("utf-8")
.splitlines()
)

cur_hostname = socket.gethostname()
assert cur_hostname in nodes

# Declare first node as rendezvous (RDVZ) node, it is arbitrary which one
# it is, as long as the network connectivity on that node is good.
#
# For trainings on a well-known set of nodes the master node can be set
# statically, but if you're in a cluster and you don't know the nodes
# your training will be running on this needs to be determined
# dynamically after the scheduler has placed your job.
rdzv_node = nodes[0]
rdzv_host = socket.getfqdn(rdzv_node)
if rdzv_host == rdzv_node:
rdzv_host = socket.gethostbyname(rdzv_node)

# 29400 is torchrun's default RDZV port. Also incorporate the job ID to
# reduce the chance for port conflicts.
rdzv_port = 29400 + (int(hashlib.sha256(self.job_id().encode("utf-8")).hexdigest(), 16) % 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Can't rdzv should pick some own random port? What happens when you just do:

Suggested change
rdzv_port = 29400 + (int(hashlib.sha256(self.job_id().encode("utf-8")).hexdigest(), 16) % 1024)
rdzv_port = 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All nodes need to pick the same port for the processes to find themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean one node is the master node, to which the other nodes connect to. If the master node picks a random port, how are the other nodes supposed to establish a TCP connection to the torchrun process on that node? This is why you specify the RDZV-address as a tuple of host and port, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why you need to specify that at all. With mpirun, we did not need that. It somehow did that automatically. I think SGE/Slurm did that already. Not sure how exactly it worked.

rdzv_addr = f"{rdzv_host}:{rdzv_port}"
print(f"Nodes {partaking_nodes} in job, running RDZV server on {rdzv_node} @ {rdzv_host} ({rdzv_addr}).")

# Gloo and NCCL cannot be trusted to find suitable network interfaces on their own.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part here is also quite annoying. If you don't set the env var (at least for Gloo), you get a crash on the AppTek cluster. Gloo is also supposed to handle this automatically for you, but it does not seem like it works well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. What crash? What is the problem? This sounds more like sth the admins should fix?

When reading how NCCL does the logic, it sounds almost the same as what you do here? (https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/env.html) So what's different here? Did you file an NCCL issue?

Also such NCCL issue should be referenced here in a code comment, explaining why NCCL does not work, and then pointing to the NCCL issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand here: This sounds like you just have something wrongly configured on your side. (Maybe also a NCCL bug, but even if so, probably could be worked around by some NCCL settings.) Thus I don't think this belongs here, but rather you should fix this in your env. (Unless you show me good reasons that this is not possible.)

#
# torchrun docs say all nodes need to specify the exact same number of network interfaces.
# Therefore for now we specify just one.
up_ifaces = []
net_if = psutil.net_if_addrs()
for ifname, sni_c_addrs in net_if.items():
if any(ifname.startswith(prefix) for prefix in ["lo", "docker"]) or any(
addr.address == "127.0.0.1" for addr in sni_c_addrs
):
# skip loopback and docker iface
continue
if any(
addr.family in [socket.AF_INET, socket.AF_INET6] and addr.broadcast is not None
for addr in sni_c_addrs
):
up_ifaces.append(ifname)
if not up_ifaces:
raise ValueError(f"Could not find UP network interface in {net_if}.")
iface_to_use = up_ifaces[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can use multiple interfaces?

Suggested change
iface_to_use = up_ifaces[0]
iface_to_use = ",".join(up_ifaces)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that all processes need to specify the same number of interfaces according to the docs:

The backend will dispatch operations in a round-robin fashion across these interfaces. It is imperative that all processes specify the same number of interfaces in this variable.

If we just specify one interface (for now) we don't break on setups where some nodes have different numbers of UP network interfaces than others. I wonder if this situation arises in practice. I guess ideally we'd always want to use the maximum number of available interfaces, but I'm not sure how to best find this out in distributed training scenaria.

if len(up_ifaces) > 1:
print(f"Found more than one UP interface: {', '.join(up_ifaces)}, using first one found.")
print(f"Using {iface_to_use} as GLOO_SOCKET_NAME and NCCL_SOCKET_IFNAME.")
env["GLOO_SOCKET_IFNAME"] = iface_to_use
env["NCCL_SOCKET_IFNAME"] = iface_to_use

sys.stdout.flush()
sp.check_call(self._get_run_cmd(rdzv_node_addr=rdzv_addr), env=env)

lrf = self.returnn_config.get("learning_rate_file", "learning_rates")
self._relink(lrf, self.out_learning_rates.get_path())
Expand Down
Loading