From 323049743bb321e1ca544f56b6f9845664ee0543 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Thu, 5 Aug 2021 12:39:47 +0100 Subject: [PATCH 01/11] enable dask in stages --- ceci/stage.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index 56cc9a0..7540679 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -9,6 +9,7 @@ SERIAL = "serial" MPI_PARALLEL = "mpi" +DASK_PARALLEL = "dask" IN_PROGRESS_PREFIX = "inprogress_" @@ -29,6 +30,7 @@ class PipelineStage: """ parallel = True + dask_parallel = False config_options = {} doc = "" @@ -141,6 +143,12 @@ def __init__(self, args, comm=None): self._size = 1 self._rank = 0 + # If we are running under MPI but this subclass has enabled dask + # then we note that here. It stops various MPI-specific things happening + # later + if (self._parallel == MPI_PARALLEL) and self.dask_parallel: + self._parallel = DASK_PARALLEL + pipeline_stages = {} incomplete_pipeline_stages = {} @@ -386,11 +394,18 @@ def execute(cls, args): """ import pdb + # Create the stage instance. Running under dask this only + # actually needs to happen for one process, but it's not a major + # overhead and lets us do a whole bunch of other setup above stage = cls(args) + # This happens before dask is initialized if stage.rank == 0: print(f"Executing stage: {cls.name}") + if stage.is_dask(): + stage.start_dask() + if args.cprofile: profile = cProfile.Profile() profile.enable() @@ -431,7 +446,7 @@ def execute(cls, args): profile.dump_stats(args.cprofile) profile.print_stats("cumtime") - if stage.rank == 0: + if stage.rank == 0 or stage.is_dask(): print(f"Stage complete: {cls.name}") def finalize(self): @@ -441,8 +456,9 @@ def finalize(self): self.comm.Barrier() # Move files to their final path - # only the master process moves things - if self.rank == 0: + # Only the root process moves things, except under dask only the root + # process should get to this point + if (self.rank == 0) or self.is_dask(): for tag in self.output_tags(): # find the old and new names temp_name = self.get_output(tag) @@ -463,7 +479,6 @@ def finalize(self): ############################################# # Parallelism-related methods and properties. ############################################# - @property def rank(self): """The rank of this process under MPI (0 if not running under MPI)""" @@ -494,6 +509,47 @@ def is_mpi(self): """ return self._parallel == MPI_PARALLEL + def is_dask(self): + """ + Returns True if the stage is being run in parallel with Dask. + """ + return self._parallel == DASK_PARALLEL + + def start_dask(self): + """ + Prepare dask to run under MPI. After calling this method + only a single process, MPI rank 1 will continue to exeute code + """ + + # using the programmatic dask configuration system + # does not seem to work. Presumably the loggers have already + # been created by the time we modify the config. Doing it with + # env vars seems to work. If the user has already set this then + # we use that value. Otherwise we only want error logs + import os + key = 'DASK_LOGGING__DISTRIBUTED' + os.environ[key] = os.environ.get(key, 'error') + + import dask + import dask_mpi + import dask.distributed + # Cannot specify non-COMM_WORLD communicator here. It wouldn't work anyway. + # If we want to be able to run things under dask in a library mode + # while keeping the same MPI comm then we would need to modify the + # dask_mpi library, which currently also calls sys.exit on all but + # one of the processes once it's complete, and awaits exit on the + # other one. + # After this point only a single process, MPI rank 1, + # will continue to exeute code. The others enter an event + # loop and run sys.exit at the end of it. + dask_mpi.initialize() + + # Connect this local process to remote workers + self.dask_client = dask.distributed.Client() + print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") + + + def split_tasks_by_rank(self, tasks): """Iterate through a list of items, yielding ones this process is responsible for/ From e7adf7d680d653b3f4a55a32c532c364e7f99488 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 08:44:27 +0100 Subject: [PATCH 02/11] tidy comments --- ceci/stage.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index 7540679..dd0d08e 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -446,6 +446,10 @@ def execute(cls, args): profile.dump_stats(args.cprofile) profile.print_stats("cumtime") + # Under dask the + # the root process has gone off to become the scheduler, + # and process 1 becomes the client which runs this code + # and gets to this point if stage.rank == 0 or stage.is_dask(): print(f"Stage complete: {cls.name}") @@ -456,8 +460,9 @@ def finalize(self): self.comm.Barrier() # Move files to their final path - # Only the root process moves things, except under dask only the root - # process should get to this point + # Only the root process moves things, except under dask it is + # process 1, which is the only process that reaches this point + # (as noted above) if (self.rank == 0) or self.is_dask(): for tag in self.output_tags(): # find the old and new names @@ -529,10 +534,15 @@ def start_dask(self): import os key = 'DASK_LOGGING__DISTRIBUTED' os.environ[key] = os.environ.get(key, 'error') + try: + import dask + import dask_mpi + import dask.distributed + except ImportError: + print("ERROR: Using --mpi option on stages that use dask requires " + "dask[distributed] and dask_mpi to be installed.") + raise - import dask - import dask_mpi - import dask.distributed # Cannot specify non-COMM_WORLD communicator here. It wouldn't work anyway. # If we want to be able to run things under dask in a library mode # while keeping the same MPI comm then we would need to modify the @@ -544,7 +554,8 @@ def start_dask(self): # loop and run sys.exit at the end of it. dask_mpi.initialize() - # Connect this local process to remote workers + # Connect this local process to remote workers. + # I don't yet know how to see this dashboard link at nersc self.dask_client = dask.distributed.Client() print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") From 0aa79a82ab92d6f58408d4c69c5e577cffe80785 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 08:44:59 +0100 Subject: [PATCH 03/11] partial black run --- ceci/stage.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index dd0d08e..2a3d635 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -532,15 +532,18 @@ def start_dask(self): # env vars seems to work. If the user has already set this then # we use that value. Otherwise we only want error logs import os - key = 'DASK_LOGGING__DISTRIBUTED' - os.environ[key] = os.environ.get(key, 'error') + + key = "DASK_LOGGING__DISTRIBUTED" + os.environ[key] = os.environ.get(key, "error") try: import dask import dask_mpi import dask.distributed except ImportError: - print("ERROR: Using --mpi option on stages that use dask requires " - "dask[distributed] and dask_mpi to be installed.") + print( + "ERROR: Using --mpi option on stages that use dask requires " + "dask[distributed] and dask_mpi to be installed." + ) raise # Cannot specify non-COMM_WORLD communicator here. It wouldn't work anyway. @@ -559,8 +562,6 @@ def start_dask(self): self.dask_client = dask.distributed.Client() print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") - - def split_tasks_by_rank(self, tasks): """Iterate through a list of items, yielding ones this process is responsible for/ From db856ae9c0e8d315b6706fece87a5a9687a13a24 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 08:48:13 +0100 Subject: [PATCH 04/11] add error if mpi used with less than three processes. --- ceci/stage.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ceci/stage.py b/ceci/stage.py index 2a3d635..a8c9f33 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -546,6 +546,13 @@ def start_dask(self): ) raise + if self.size < 3: + raise ValueError( + "Dask requires at least three processes. One becomes a scheduler " + "process, one is a client that runs the code, and more are required " + "as worker processes." + ) + # Cannot specify non-COMM_WORLD communicator here. It wouldn't work anyway. # If we want to be able to run things under dask in a library mode # while keeping the same MPI comm then we would need to modify the From 9381327df23f0439d6dd91b878a72d303269766f Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:34:04 +0100 Subject: [PATCH 05/11] update to use my dask version --- .github/workflows/ci.yml | 1 + ceci/stage.py | 32 ++++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bad725..be18f4c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,7 @@ jobs: - name: Install run: | pip install --upgrade pytest pytest-mock codecov pytest-cov h5py + pip install git+https://github.com/joezuntz/dask-mpi pip install .[test,cwl,parsl] - name: Tests diff --git a/ceci/stage.py b/ceci/stage.py index a8c9f33..a93ddec 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -380,7 +380,7 @@ def _parse_command_line(cls, cmd=None): return args @classmethod - def execute(cls, args): + def execute(cls, args, comm=None): """ Create an instance of this stage and run it with the specified inputs and outputs. @@ -397,14 +397,18 @@ def execute(cls, args): # Create the stage instance. Running under dask this only # actually needs to happen for one process, but it's not a major # overhead and lets us do a whole bunch of other setup above - stage = cls(args) + stage = cls(args, comm=comm) # This happens before dask is initialized if stage.rank == 0: print(f"Executing stage: {cls.name}") if stage.is_dask(): - stage.start_dask() + is_client = stage.start_dask() + # worker and scheduler stages do not execute the + # run method under dask + if not is_client: + return if args.cprofile: profile = cProfile.Profile() @@ -427,6 +431,8 @@ def execute(cls, args): finally: if args.memmon: monitor.stop() + if stage.is_dask(): + stage.stop_dask() # The default finalization renames any output files to their # final location, but subclasses can override to do other things too @@ -562,12 +568,22 @@ def start_dask(self): # After this point only a single process, MPI rank 1, # will continue to exeute code. The others enter an event # loop and run sys.exit at the end of it. - dask_mpi.initialize() + is_client = dask_mpi.initialize(comm=self.comm, exit=False) - # Connect this local process to remote workers. - # I don't yet know how to see this dashboard link at nersc - self.dask_client = dask.distributed.Client() - print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") + if is_client: + # Connect this local process to remote workers. + # I don't yet know how to see this dashboard link at nersc + self.dask_client = dask.distributed.Client() + print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") + + return is_client + + def stop_dask(self): + """ + End the dask event loop + """ + from dask_mpi import send_close_signal + send_close_signal() def split_tasks_by_rank(self, tasks): """Iterate through a list of items, yielding ones this process is responsible for/ From 6dc9c4a5320d550061f8c0e35c4cbc84008d7a48 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:42:47 +0100 Subject: [PATCH 06/11] add test --- tests/test_dask.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/test_dask.py diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 0000000..433a170 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,35 @@ +from ceci.stage import PipelineStage +import mockmpi + +def core_dask(comm): + class DaskTestStage(PipelineStage): + name = "dasktest" + dask_parallel = True + inputs = [] + outputs = [] + config_options = {} + + def run(self): + import dask.array as da + arr = da.arange(100) + x = arr.sum() + x = x.compute() + assert x == 4950 + + + args = DaskTestStage._parse_command_line(["dasktest", "--config", "tests/config.yml"]) + DaskTestStage.execute(args, comm=comm) + + # check that all procs get here + if comm is not None: + comm.Barrier() + + +def test_dask(): + core_dask(None) + mockmpi.mock_mpiexec(3, core_dask) + mockmpi.mock_mpiexec(5, core_dask) + + +if __name__ == '__main__': + test_dask() From 0aeb29ca92f46008d16fcd26d2967a635b827ba9 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:45:08 +0100 Subject: [PATCH 07/11] install requires yaml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be18f4c..5ee6bdd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: - name: Install run: | - pip install --upgrade pytest pytest-mock codecov pytest-cov h5py + pip install --upgrade pytest pytest-mock codecov pytest-cov h5py pyyaml pip install git+https://github.com/joezuntz/dask-mpi pip install .[test,cwl,parsl] From 0cb7f7e940a7cae5a58c5f56f125cc3d2f887d81 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:46:34 +0100 Subject: [PATCH 08/11] require mockmpi for tests --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ee6bdd..7dfc7f5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: - name: Install run: | - pip install --upgrade pytest pytest-mock codecov pytest-cov h5py pyyaml + pip install --upgrade pytest pytest-mock codecov pytest-cov h5py pyyaml mockmpi pip install git+https://github.com/joezuntz/dask-mpi pip install .[test,cwl,parsl] From 902c9733df8b0cdb95926f6637cf75c29d4d45e3 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:49:38 +0100 Subject: [PATCH 09/11] require dask --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7dfc7f5..bdb3e2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,7 @@ jobs: - name: Install run: | pip install --upgrade pytest pytest-mock codecov pytest-cov h5py pyyaml mockmpi + pip install dask[distributed] pip install git+https://github.com/joezuntz/dask-mpi pip install .[test,cwl,parsl] From 19100a91227b38e5b585527df0dbb3de67425137 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 11:58:47 +0100 Subject: [PATCH 10/11] update comment --- ceci/stage.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index a93ddec..e970ae4 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -559,21 +559,19 @@ def start_dask(self): "as worker processes." ) - # Cannot specify non-COMM_WORLD communicator here. It wouldn't work anyway. - # If we want to be able to run things under dask in a library mode - # while keeping the same MPI comm then we would need to modify the - # dask_mpi library, which currently also calls sys.exit on all but - # one of the processes once it's complete, and awaits exit on the - # other one. + # This requires my fork until/unless they merge the PR, to allow + # us to pass in these two arguents. In vanilla dask-mpi sys.exit + # is called at the end of the event loop without returning to us. # After this point only a single process, MPI rank 1, - # will continue to exeute code. The others enter an event - # loop and run sys.exit at the end of it. + # should continue to exeute code. The others enter an event + # loop and return with is_client=False, which we return here + # to tell the caller that they should not run everything. is_client = dask_mpi.initialize(comm=self.comm, exit=False) if is_client: # Connect this local process to remote workers. - # I don't yet know how to see this dashboard link at nersc self.dask_client = dask.distributed.Client() + # I don't yet know how to see this dashboard link at nersc print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") return is_client From 60ab3ed7555cb0114658418c71bd56d3633b11c7 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 6 Aug 2021 12:04:13 +0100 Subject: [PATCH 11/11] typo --- ceci/stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ceci/stage.py b/ceci/stage.py index e970ae4..53a84ee 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -560,7 +560,7 @@ def start_dask(self): ) # This requires my fork until/unless they merge the PR, to allow - # us to pass in these two arguents. In vanilla dask-mpi sys.exit + # us to pass in these two arguments. In vanilla dask-mpi sys.exit # is called at the end of the event loop without returning to us. # After this point only a single process, MPI rank 1, # should continue to exeute code. The others enter an event