Skip to content

Commit

Permalink
evaluators: Add initial MPIEvaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
EwoutH committed Aug 31, 2023
1 parent 09c3f02 commit 597de6a
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import threading
import warnings
from mpi4py.futures import MPIPoolExecutor

from ema_workbench.em_framework.samplers import AbstractSampler
from .callbacks import DefaultCallback
Expand Down Expand Up @@ -415,6 +416,42 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
add_tasks(self.n_processes, self._pool, ex_gen, callback)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

def __init__(self, msis, **kwargs):
super().__init__(msis, **kwargs)
self._pool = None

def initialize(self):
self._pool = MPIPoolExecutor()
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
return self

def finalize(self):
self._pool.shutdown()
_logger.info("MPI pool has been shut down")

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)

# Here, we're using the map function from MPIPoolExecutor. This function behaves
# like the built-in map, but the tasks are executed in parallel processes.
# Depending on how your experiments and callback are structured,
# you may need to adjust this to fit your exact needs.

results = self._pool.map(run_experiment_mpi, ex_gen)
for experiment, outcomes in results:
callback(experiment, outcomes)

def run_experiment_mpi(experiment):
# This function assumes you have a function to run your experiment similar
# to how the `ExperimentRunner` was used in the SequentialEvaluator.
runner = ExperimentRunner([experiment.model])
outcomes = runner.run_experiment(experiment)
return experiment, outcomes


class IpyparallelEvaluator(BaseEvaluator):
"""evaluator for using an ipypparallel pool"""

Expand Down

0 comments on commit 597de6a

Please sign in to comment.