Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
quaquel committed May 28, 2024
1 parent 5386bae commit d651277
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
28 changes: 14 additions & 14 deletions docs/source/indepth_tutorial/mpi-evaluator.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"def some_model(x1=None, x2=None, x3=None):\n",
" return {\"y\": x1 * x2 + x3}"
]
],
"outputs": []
},
{
"cell_type": "markdown",
Expand All @@ -121,7 +121,6 @@
"outputs_hidden": true
}
},
"outputs": [],
"source": [
"from ema_workbench import Model, RealParameter, ScalarOutcome, ema_logging, perform_experiments\n",
"\n",
Expand All @@ -139,7 +138,8 @@
" ]\n",
" # specify outcomes\n",
" ema_model.outcomes = [ScalarOutcome(\"y\")]"
]
],
"outputs": []
},
{
"cell_type": "markdown",
Expand All @@ -160,13 +160,13 @@
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"from ema_workbench import SequentialEvaluator\n",
"\n",
"with SequentialEvaluator(ema_model) as evaluator:\n",
" results = perform_experiments(ema_model, 100, evaluator=evaluator)"
]
],
"outputs": []
},
{
"cell_type": "markdown",
Expand Down Expand Up @@ -195,7 +195,6 @@
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# ema_example_model.py\n",
"from ema_workbench import (\n",
Expand Down Expand Up @@ -231,7 +230,8 @@
"\n",
" # Save the results\n",
" save_results(results, \"ema_example_model.tar.gz\")"
]
],
"outputs": []
},
{
"cell_type": "markdown",
Expand Down Expand Up @@ -283,9 +283,9 @@
"module load py-mpi4py\n",
"module load py-pip\n",
"\n",
"pip install --user --upgrade ema_workbench\n",
"pip install --user ema_workbench\n",
"\n",
"mpiexec -n 1 python3 ema_example_model.py\n",
"mpiexec -n 1 python3 example_mpi_lake_model.py\n",
"```\n",
"\n",
"Modify the script to suit your needs:\n",
Expand Down Expand Up @@ -340,13 +340,13 @@
"\n",
"Create a new directory for this tutorial, for example `mkdir ema_mpi_test` and then `cd ema_mpi_test`\n",
"\n",
"Then, you want to send your Python file and SLURM script to DelftBlue. Make sure your files are all in one folder. Open a **new** command line terminal, `cd` in the folder where your files are, and then send the files with `scp`:\n",
"Then, you want to send your Python file and SLURM script to DelftBlue. Open a **new** command line terminal, and then you can do this with `scp`:\n",
"\n",
"```bash\n",
"scp -J <netid>@student-linux.tudelft.nl ema_example_model.py slurm_script.sh <netid>@login.delftblue.tudelft.nl:/scratch/<netid>/ema_mpi_test\n",
"```\n",
"\n",
"Now go back to your original cmd window, on which you logged in on DelftBlue. Before scheduling the SLURM script, we first have to make it executable:\n",
"Before scheduling the SLURM script, we first have to make it executable:\n",
"\n",
"```bash\n",
"chmod +x slurm_script.sh\n",
Expand Down Expand Up @@ -393,8 +393,8 @@
"execution_count": null,
"id": "7fa60315-6145-4cac-950b-e9006428a955",
"metadata": {},
"outputs": [],
"source": []
"source": [],
"outputs": []
}
],
"metadata": {
Expand Down
54 changes: 27 additions & 27 deletions ema_workbench/em_framework/futures_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,26 @@ def mpi_initializer(models, log_level, root_dir):
experiment_runner = ExperimentRunner(msis)

# setup the logging
info = MPI.INFO_NULL
service = "logwatcher"
port = MPI.Lookup_name(service)
logcomm = MPI.COMM_WORLD.Connect(port, info, 0)
# info = MPI.INFO_NULL
# service = "logwatcher"
# port = MPI.Lookup_name(service)
# logcomm = MPI.COMM_WORLD.Connect(port, info, 0)

root_logger = get_rootlogger()
# root_logger = get_rootlogger()

handler = MPIHandler(logcomm)
handler.addFilter(RankFilter(rank))
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter("[worker %(rank)s/%(levelname)s] %(message)s"))
root_logger.addHandler(handler)
# handler = MPIHandler(logcomm)
# handler.addFilter(RankFilter(rank))
# handler.setLevel(log_level)
# handler.setFormatter(logging.Formatter("[worker %(rank)s/%(levelname)s] %(message)s"))
# root_logger.addHandler(handler)

# setup the working directories
tmpdir = setup_working_directories(models, root_dir)
if tmpdir:
atexit.register(finalizer(experiment_runner), os.path.abspath(tmpdir))

# _logger.info(f"worker {rank} initialized")
root_logger.info(f"worker {rank} initialized")
# root_logger.info(f"worker {rank} initialized")


def logwatcher(start_event, stop_event):
Expand Down Expand Up @@ -175,20 +175,20 @@ def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

start_event = threading.Event()
self.stop_event = threading.Event()
self.logwatcher_thread = threading.Thread(
name="logwatcher",
target=logwatcher,
daemon=False,
args=(
start_event,
self.stop_event,
),
)
self.logwatcher_thread.start()
start_event.wait()
_logger.info("logwatcher server started")
# start_event = threading.Event()
# self.stop_event = threading.Event()
# self.logwatcher_thread = threading.Thread(
# name="logwatcher",
# target=logwatcher,
# daemon=False,
# args=(
# start_event,
# self.stop_event,
# ),
# )
# self.logwatcher_thread.start()
# start_event.wait()
# _logger.info("logwatcher server started")

self.root_dir = determine_rootdir(self._msis)
self._pool = MPIPoolExecutor(
Expand All @@ -207,8 +207,8 @@ def initialize(self):
@method_logger(__name__)
def finalize(self):
# submit sentinel
self.stop_event.set()
self._pool.submit(send_sentinel)
# self.stop_event.set()
# self._pool.submit(send_sentinel)
self._pool.shutdown()
self.logwatcher_thread.join(timeout=60)

Expand Down

0 comments on commit d651277

Please sign in to comment.