Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic calc of node/gpu req #366

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 127 additions & 17 deletions nemo_skills/pipeline/openrlhf/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ def format_reward_critic_args(self):
f" --reward_pretrain {self.reward_model} "
# TODO: add proper defaults when we figure out how these should be used
# for now we require users to be explicit
# f" --ref_num_nodes {self.num_nodes} "
# f" --ref_num_gpus_per_node {self.num_gpus} "
# f" --reward_num_nodes {self.num_nodes} "
# f" --reward_num_gpus_per_node {self.num_gpus} "
# f" --critic_num_nodes {self.num_nodes} "
# f" --critic_num_gpus_per_node {self.num_gpus} "
# f" --vllm_num_engines {self.num_gpus} "
# f" --vllm_tensor_parallel_size 1 "
# f" --colocate_critic_reward "
# f" --colocate_actor_ref "
)
return cmd

Expand Down Expand Up @@ -99,8 +89,7 @@ def format_data_args(self):
# Note: Validation data isnt used as of now
# If using chat message dict as data, add `--apply_chat_template`
# and --input_key 'context_messages'
cmd = f" --prompt_data {self.prompt_data} --input_key '{self.input_key}' "

cmd = f" --prompt_data {self.prompt_data} --input_key 'input' "
return cmd

def get_common_arg_overrides(self):
Expand Down Expand Up @@ -229,6 +218,67 @@ def get_training_cmd(
return task.get_cmd()


def pack_nodes(gpus_per_node, num_nodes):
order = ['actor', 'ref', 'critic', 'reward', 'vllm'] # This is the placement order in OpenRLHF cli/train_ppo_ray.py and so we follow that
items = []
for model in order:
g = gpus_per_node[model]
n = num_nodes[model]
if g*n == 0:
continue
items.extend([g]*n)

bins = [] # Each element is the sum of GPUs allocated on that node.
for item in items:
placed = False
# Try to place the item in an existing node
for i in range(len(bins)):
if bins[i] + item <= 8:
bins[i] += item
placed = True
break
# If it doesn't fit in any existing node, allocate a new node.
if not placed:
bins.append(item)

num_nodes_needed = len(bins)
return num_nodes_needed


def get_num_nodes_and_gpus(
cluster_config,
rm_model,
num_processes,
gpus_per_node,
colocate_critic_reward,
colocate_actor_ref,
advantage_estimator,
):
if advantage_estimator not in ['gae']:
# Colocation does not make sense if critic model is not needed
# This modification is also necessary for the next reward allocation if condition
gpus_per_node['critic'] = 0
num_processes['critic'] = 0
colocate_critic_reward = False

try:
assert (rm_model is not None) and (type(rm_model) == str) and (rm_model != '')
check_if_mounted(cluster_config, rm_model) # This means previous condition didn't trigger assertion
assert not colocate_critic_reward
except AssertionError:
gpus_per_node['reward'] = 0
num_processes['reward'] = 0

if colocate_actor_ref:
gpus_per_node['ref'] = 0
num_processes['ref'] = 0

num_nodes = pack_nodes(gpus_per_node, num_processes)
num_gpus = 8

return num_nodes, num_gpus


@openrlhf_app.command(name='ppo', context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
@typer_unpacker
def ppo_openrlhf(
Expand All @@ -243,9 +293,19 @@ def ppo_openrlhf(
hf_model: str = typer.Option(..., help="Path to the HF model"),
rm_model: str = typer.Option(..., help="Path to the HF reward model"),
prompt_data: str = typer.Option(None, help="Path to the prompt data"),
input_key: str = typer.Option("input", help="Input key for the prompt data"),
num_nodes: int = typer.Option(1, help="Number of nodes"),
num_gpus: int = typer.Option(..., help="Number of GPUs"),
ref_num_nodes: int = typer.Option(1, help="Number of nodes for reference model"),
ref_num_gpus_per_node: int = typer.Option(..., help="Number of GPUs per node for reference model"),
actor_num_nodes: int = typer.Option(..., help="Number of nodes for actor model"),
actor_num_gpus_per_node: int = typer.Option(..., help="Number of GPUs per node for actor model"),
critic_num_nodes: int = typer.Option(..., help="Number of nodes for critic model"),
critic_num_gpus_per_node: int = typer.Option(..., help="Number of GPUs per node for critic model"),
reward_num_nodes: int = typer.Option(..., help="Number of nodes for reward model"),
reward_num_gpus_per_node: int = typer.Option(..., help="Number of GPUs per node for reward model"),
vllm_num_engines: int = typer.Option(..., help="Number of VLLM engines"),
vllm_tensor_parallel_size: int = typer.Option(..., help="Number of VLLM tensor parallel size"),
colocate_critic_reward: bool = typer.Option(False, help="Colocate critic and reward models"),
colocate_actor_ref: bool = typer.Option(False, help="Colocate actor and reference models"),
advantage_estimator: str = typer.Option('gae', help="What advantage estimator method to use [gae, reinforce, rloo]"),
num_training_jobs: int = typer.Option(1, help="Number of training jobs"),
wandb_project: str = typer.Option("nemo-skills", help="Weights & Biases project name"),
disable_wandb: bool = typer.Option(False, help="Disable wandb logging"),
Expand Down Expand Up @@ -286,14 +346,63 @@ def ppo_openrlhf(
LOG.info("Starting training job")
LOG.info("Extra arguments that will be passed to the underlying script: %s", extra_arguments)

assert len(rm_model.split(',')) == 1, f"RM model must be a single model as behavior as our team has not tested multi-model RM feature in OpenRLHF. We got RM models: {rm_model}"

cluster_config = get_cluster_config(cluster, config_dir)

gpus_per_node = {
'actor': actor_num_gpus_per_node,
'critic': critic_num_gpus_per_node,
'reward': reward_num_gpus_per_node,
'ref': ref_num_gpus_per_node,
'vllm': vllm_tensor_parallel_size,
}

num_processes = {
'actor': actor_num_nodes,
'critic': critic_num_nodes,
'reward': reward_num_nodes,
'ref': ref_num_nodes,
'vllm': vllm_num_engines,
}

for model in gpus_per_node:
if model == 'vllm':
continue
extra_arguments += f" --{model}_num_nodes {num_processes[model]}"
extra_arguments += f" --{model}_num_gpus_per_node {gpus_per_node[model]}"
extra_arguments += f" --vllm_num_engines {vllm_num_engines}"
extra_arguments += f" --vllm_tensor_parallel_size {vllm_tensor_parallel_size}"

num_nodes, num_gpus = get_num_nodes_and_gpus(
cluster_config,
rm_model,
num_processes,
gpus_per_node,
colocate_critic_reward,
colocate_actor_ref,
advantage_estimator,
)
LOG.info(f"Total number of nodes: {num_nodes}")
LOG.info(f"Number of GPUs per node: {num_gpus}")
extra_arguments += f" --advantage_estimator {advantage_estimator}"
if colocate_actor_ref:
LOG.info("Colocating actor and reference models on same GPUs.")
extra_arguments += f" --colocate_actor_ref"
if colocate_critic_reward:
LOG.info("Colocating critic and reward models on same GPUs")
extra_arguments += f" --colocate_critic_reward"

check_if_mounted(cluster_config, output_dir)
check_if_mounted(cluster_config, hf_model)
if log_dir:
check_if_mounted(cluster_config, log_dir)
else:
log_dir = output_dir

if (rm_model != None) and (rm_model.startswith('/')):
check_if_mounted(cluster_config, rm_model)

if num_training_jobs > 0:
if prompt_data is None:
raise ValueError("prompt_data is required when num_training_jobs > 0")
Expand All @@ -305,7 +414,7 @@ def ppo_openrlhf(

# Check if custom PPOOpenRLHFTask is provided via ctx.obj['ppo_task'], use that if available
if hasattr(ctx, 'obj') and ctx.obj is not None and isinstance(ctx.obj, dict) and 'ppo_task' in ctx.obj:
ppo_task = ctx.obj['ppo_task'] # type: type(PPOOpenRLHFTask)
ppo_task = ctx.obj['ppo_task'] # type: PPOOpenRLHFTask
assert isinstance(ppo_task, PPOOpenRLHFTask), "`ppo_task` must be a subclass of PPOOpenRLHFTask"
else:
ppo_task = None
Expand All @@ -318,7 +427,6 @@ def ppo_openrlhf(
rm_model=rm_model,
output_dir=output_dir,
prompt_data=prompt_data,
input_key=input_key,
num_gpus=num_gpus,
num_nodes=num_nodes,
expname=expname,
Expand All @@ -327,6 +435,8 @@ def ppo_openrlhf(
extra_arguments=extra_arguments,
)

LOG.info(train_cmd)

with run.Experiment(expname) as exp:
prev_task = None
for job_id in range(num_training_jobs):
Expand Down