Skip to content

Commit

Permalink
bug fix in MPI evaluator (#349)
Browse files Browse the repository at this point in the history
* Fixes a mistake in the MPI evaluator where the pool is created twice (probably some code merging issue in the past)
* cleans up the mpi example and associated slurm file.
* additional logging messages
  • Loading branch information
quaquel authored and EwoutH committed Apr 17, 2024
1 parent 4c71b68 commit 25b130f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 20 deletions.
55 changes: 43 additions & 12 deletions ema_workbench/em_framework/futures_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .util import NamedObjectMap
from .model import AbstractModel
from .experiment_runner import ExperimentRunner
from ..util import get_module_logger, get_rootlogger
from ..util import get_module_logger, get_rootlogger, method_logger

from ..util import ema_logging

Expand Down Expand Up @@ -71,7 +71,7 @@ def mpi_initializer(models, log_level, root_dir):
root_logger.info(f"worker {rank} initialized")


def logwatcher(stop_event):
def logwatcher(start_event, stop_event):
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()
Expand All @@ -84,6 +84,7 @@ def logwatcher(stop_event):
service = "logwatcher"
MPI.Publish_name(service, info, port)
_logger.debug(f"published service: {service}")
start_event.set()

root = 0
_logger.debug("waiting for client connection...")
Expand All @@ -96,12 +97,18 @@ def logwatcher(stop_event):
try:
logger = logging.getLogger(record.name)
except Exception as e:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
if record.msg is None:
_logger.debug("received sentinel")
break
else:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
else:
logger.callHandlers(record)

_logger.info("closing logwatcher")


def run_experiment_mpi(experiment):
_logger.debug(f"starting {experiment.experiment_id}")
Expand All @@ -113,6 +120,15 @@ def run_experiment_mpi(experiment):
return experiment, outcomes


def send_sentinel():
record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None, name=42))

for handler in get_rootlogger().handlers:
if isinstance(handler, MPIHandler):
_logger.debug("sending sentinel")
handler.communicator.send(record, 0, 0)


class MPIHandler(QueueHandler):
"""
This handler sends events from the worker process to the master process
Expand Down Expand Up @@ -154,45 +170,60 @@ def __init__(self, msis, n_processes=None, **kwargs):
self.stop_event = None
self.n_processes = n_processes

@method_logger(__name__)
def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

start_event = threading.Event()
self.stop_event = threading.Event()
self.logwatcher_thread = threading.Thread(
name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,)
name="logwatcher",
target=logwatcher,
daemon=False,
args=(
start_event,
self.stop_event,
),
)
self.logwatcher_thread.start()
start_event.wait()
_logger.info("logwatcher server started")

self.root_dir = determine_rootdir(self._msis)
self._pool = MPIPoolExecutor(
max_workers=self.n_processes,
initializer=mpi_initializer,
initargs=(self._msis, _logger.level, self.root_dir),
) # Removed initializer arguments
)

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
_logger.warning(
f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator"
)
return self

@method_logger(__name__)
def finalize(self):
self._pool.shutdown()
# submit sentinel
self.stop_event.set()
_logger.info("MPI pool has been shut down")
self._pool.submit(send_sentinel)
self._pool.shutdown()
self.logwatcher_thread.join(timeout=60)

if self.logwatcher_thread.is_alive():
_logger.warning(f"houston we have a problem")

if self.root_dir:
shutil.rmtree(self.root_dir)

time.sleep(0.1)
_logger.info("MPI pool has been shut down")

@method_logger(__name__)
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)
experiments = list(experiment_generator(scenarios, self._msis, policies, combine=combine))

results = self._pool.map(run_experiment_mpi, experiments, **kwargs)
for experiment, outcomes in results:
Expand Down
8 changes: 3 additions & 5 deletions ema_workbench/examples/example_mpi_lake_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
import math
import time

# FIXME
import sys

sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench")

import numpy as np
from scipy.optimize import brentq

Expand Down Expand Up @@ -87,10 +82,13 @@ def lake_problem(


if __name__ == "__main__":
import ema_workbench

# run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py
starttime = time.perf_counter()

ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True)
ema_logging.get_rootlogger().info(f"{ema_workbench.__version__}")

# instantiate the model
lake_model = Model("lakeproblem", function=lake_problem)
Expand Down
7 changes: 4 additions & 3 deletions ema_workbench/examples/slurm_script.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/bin/bash

#SBATCH --job-name="Python_test"
#SBATCH --time=00:02:00
#SBATCH --ntasks=10
#SBATCH --time=00:06:00
#SBATCH --ntasks=8
#SBATCH --cpus-per-task=1
#SBATCH --partition=compute
#SBATCH --mem-per-cpu=4GB
Expand All @@ -17,6 +17,7 @@ module load py-mpi4py
module load py-pip

pip install ipyparallel
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_fixes#egg=ema_workbench

mpiexec -n 1 python3 example_mpi_lake_model.py

0 comments on commit 25b130f

Please sign in to comment.