Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use queues to not pollute cwd #193

Merged
merged 9 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/pytorch_bert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from optimum_benchmark import Benchmark, BenchmarkConfig, InferenceConfig, ProcessConfig, PyTorchConfig
from optimum_benchmark.logging_utils import setup_logging

setup_logging(level="INFO", handlers=["console"])
setup_logging(level="INFO", format_prefix="MAIN-PROCESS", handlers=["console"])

if __name__ == "__main__":
BENCHMARK_NAME = "pytorch_bert"
Expand Down
14 changes: 3 additions & 11 deletions examples/pytorch_llama_awq.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
from optimum_benchmark import Benchmark, BenchmarkConfig, InferenceConfig, ProcessConfig, PyTorchConfig
from optimum_benchmark.logging_utils import setup_logging

setup_logging(level="INFO", handlers=["console"])
setup_logging(level="INFO", format_prefix="MAIN-PROCESS", handlers=["console"])

if __name__ == "__main__":
BENCHMARK_NAME = "pytorch_llama_awq"
REPO_ID = f"IlyasMoutawwakil/{BENCHMARK_NAME}"

launcher_config = ProcessConfig(
device_isolation=True,
device_isolation_action="warn",
)
scenario_config = InferenceConfig(
memory=True,
latency=True,
input_shapes={"batch_size": 1, "sequence_length": 128},
generate_kwargs={"max_new_tokens": 100, "min_new_tokens": 100},
)
backend_config = PyTorchConfig(
device="cuda",
device_ids="0",
no_weights=True,
model="TheBloke/Llama-2-70B-AWQ",
)
launcher_config = ProcessConfig(device_isolation=True, device_isolation_action="warn")
backend_config = PyTorchConfig(device="cuda", device_ids="0", no_weights=True, model="TheBloke/Llama-2-70B-AWQ")

benchmark_config = BenchmarkConfig(
name=BENCHMARK_NAME, launcher=launcher_config, scenario=scenario_config, backend=backend_config
Expand Down
20 changes: 9 additions & 11 deletions optimum_benchmark/launchers/process/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,8 @@ def launch(self, worker: Callable[..., BenchmarkReport], *worker_args: Any) -> B
ctx = mp.get_context(self.config.start_method)
log_level = ctx.get_logger().getEffectiveLevel()
queue = ctx.Queue()
lock = ctx.Lock()

isolated_process = mp.Process(
target=target,
args=(worker, *worker_args),
kwargs={"log_level": log_level, "queue": queue, "lock": lock},
daemon=False,
)

isolated_process = mp.Process(target=target, args=(log_level, queue, worker, *worker_args), daemon=False)
isolated_process.start()

with device_isolation_context(
Expand All @@ -49,11 +43,17 @@ def launch(self, worker: Callable[..., BenchmarkReport], *worker_args: Any) -> B
else:
raise RuntimeError("Could not retrieve report from isolated process.")

LOGGER.info("Logging final report.")
report.log()

return report


def target(
worker: Callable[..., BenchmarkReport], *worker_args, log_level: Union[int, str], queue: mp.Queue, lock: mp.Lock
log_level: Union[int, str],
queue: mp.Queue,
worker: Callable[..., BenchmarkReport],
*worker_args: Any,
) -> None:
isolated_process_pid = os.getpid()
os.environ["ISOLATED_PROCESS_PID"] = str(isolated_process_pid)
Expand All @@ -63,10 +63,8 @@ def target(

report = worker(*worker_args)

lock.acquire()
LOGGER.info("Putting report in queue.")
queue.put(report.to_dict())
lock.release()

LOGGER.info("Exiting isolated process.")
exit(0)
60 changes: 38 additions & 22 deletions optimum_benchmark/launchers/torchrun/launcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing as mp
import os
import signal
import time
from logging import getLogger
from typing import Any, Callable, Union

Expand All @@ -22,7 +23,7 @@ class ForcedZeroExit(SystemExit):

def forced_zero_exit_signal_handler(signum, frame):
for p in mp.active_children():
LOGGER.info(f"Sending a forced zero exit signal to process [{p.pid}].")
LOGGER.info(f"Sending a forced zero exit signal to child process [{p.pid}].")
os.kill(p.pid, signal.SIGUSR2)

raise ForcedZeroExit
Expand Down Expand Up @@ -60,12 +61,10 @@ def __init__(self, config: TorchrunConfig):
def launch(self, worker: Callable[..., BenchmarkReport], *worker_args: Any) -> BenchmarkReport:
ctx = mp.get_context(self.config.start_method)
log_level = ctx.get_logger().getEffectiveLevel()
queue = ctx.Queue()

isolated_process = mp.Process(
target=target,
args=(log_level, worker, *worker_args),
kwargs={"launch_config": self.launch_config},
daemon=False,
target=target, args=(self.launch_config, log_level, queue, worker, *worker_args), daemon=False
)
isolated_process.start()

Expand All @@ -74,26 +73,33 @@ def launch(self, worker: Callable[..., BenchmarkReport], *worker_args: Any) -> B
):
isolated_process.join()

if all(os.path.isfile(f"benchmark_report_rank_{rank}.json") for rank in range(self.config.nproc_per_node)):
LOGGER.info("\t+ Gatehring reports from all ranks.")
reports = [
BenchmarkReport.from_json(f"benchmark_report_rank_{rank}.json")
for rank in range(self.config.nproc_per_node)
]
LOGGER.info("\t+ Aggregating reports from all ranks.")
if not queue.empty() and queue.qsize() == self.config.nproc_per_node:
LOGGER.info("\t+ Retrieving reports from queue.")
reports = [BenchmarkReport.from_dict(queue.get()) for _ in range(queue.qsize())]
LOGGER.info("\t+ Aggregating reports.")
report = BenchmarkReport.aggregate(reports)
LOGGER.info("\t+ Logging aggregated report.")
report.log()
elif isolated_process.exitcode != 0:
raise RuntimeError(f"Process exited with non-zero code {isolated_process.exitcode}.")
else:
raise RuntimeError("Could not retrieve report from isolated process.")
if queue.empty():
raise RuntimeError("Queue is empty.")
else:
raise RuntimeError(
f"Queue size ({queue.qsize()}) does not match number of ranks ({self.config.nproc_per_node})."
)

LOGGER.info("\t+ Logging final report.")
report.log()

return report


def target(
log_level: Union[str, int], worker: Callable[..., BenchmarkReport], *worker_args, launch_config: LaunchConfig
launch_config: LaunchConfig,
log_level: Union[str, int],
queue: mp.Queue,
worker: Callable[..., BenchmarkReport],
*worker_args: Any,
):
isolated_process_pid = os.getpid()
os.environ["ISOLATED_PROCESS_PID"] = str(isolated_process_pid)
Expand All @@ -103,15 +109,20 @@ def target(

elastic_agent_launcher = elastic_launch(config=launch_config, entrypoint=entrypoint)
try:
elastic_agent_launcher(log_level, worker, *worker_args)
elastic_agent_launcher(log_level, queue, worker, *worker_args)
except ForcedZeroExit:
pass

LOGGER.info("Exiting isolated process.")
exit(0)


def entrypoint(log_level: Union[str, int], worker: Callable[..., BenchmarkReport], *worker_args):
def entrypoint(
log_level: Union[str, int],
queue: mp.Queue,
worker: Callable[..., BenchmarkReport],
*worker_args: Any,
):
rank = int(os.environ.get("RANK", "0"))
isolated_process_pid = int(os.environ["ISOLATED_PROCESS_PID"])

Expand All @@ -129,11 +140,16 @@ def entrypoint(log_level: Union[str, int], worker: Callable[..., BenchmarkReport

report = worker(*worker_args)

LOGGER.info(f"Saving report from rank {rank}.")
report.save_json(f"benchmark_report_rank_{rank}.json")
LOGGER.info("Putting report into queue.")
queue.put(report.to_dict())

LOGGER.info("Waiting for other ranks to put their reports into queue. ")
while queue.qsize() < int(os.environ["WORLD_SIZE"]):
LOGGER.info(f"Queue size: {queue.qsize()} / World size: {os.environ['WORLD_SIZE']}.")
time.sleep(1)

LOGGER.info("Waiting for all ranks to finish.")
torch.distributed.barrier()
LOGGER.info("All ranks have put their reports into queue.")
LOGGER.info(f"Queue size: {queue.qsize()} / World size: {os.environ['WORLD_SIZE']}.")

LOGGER.info("Destroying torch.distributed process group.")
torch.distributed.destroy_process_group()
Expand Down
2 changes: 1 addition & 1 deletion optimum_benchmark/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def setup_logging(
"version": 1,
"handlers": {
"console": {"formatter": "simple", "stream": "ext://sys.stdout", "class": "logging.StreamHandler"},
"file": {"formatter": "simple", "filename": "cli.log", "class": "logging.FileHandler"},
"file": {"formatter": "simple", "filename": "benchmark.log", "class": "logging.FileHandler"},
},
"root": {"level": level, "handlers": handlers},
"disable_existing_loggers": disable_existing_loggers,
Expand Down
8 changes: 7 additions & 1 deletion tests/configs/_ddp_.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ defaults:

launcher:
nproc_per_node: 2
socket_ifname: lo

backend:
device_ids: 0,1
model: openai-community/gpt2

hydra:
launcher:
n_jobs: 1
job:
env_set:
LOG_ALL_RANKS: 1
8 changes: 7 additions & 1 deletion tests/configs/_deepspeed_inference_.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defaults:

launcher:
nproc_per_node: 2
socket_ifname: lo

backend:
device_ids: 0,1
Expand All @@ -16,3 +15,10 @@ backend:
scenario:
input_shapes:
batch_size: 2

hydra:
launcher:
n_jobs: 1
job:
env_set:
LOG_ALL_RANKS: 1
1 change: 0 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def test_cli_configs(config_name):
config_name,
# to run the tests faster (comment for debugging)
"hydra/launcher=joblib",
"hydra.launcher.n_jobs=-1",
"hydra.launcher.batch_size=1",
"hydra.launcher.prefer=threads",
]
Expand Down
Loading