From ac1dd50bf94be3ad4379963de4dc8a23b82e342d Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 11:49:20 +0100 Subject: [PATCH 1/6] pass python_paths to containers and elsewhere --- ceci/main.py | 18 ++++++++++-------- ceci/sites/__init__.py | 3 +++ ceci/sites/cori.py | 24 +++++++++++++++++++++++- ceci/sites/local.py | 22 +++++++++++++++++++++- 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/ceci/main.py b/ceci/main.py index fa8504e..2d25d13 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -67,9 +67,19 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): # parsl execution/launcher configuration information launcher_config = pipe_config.get("launcher", {"name": "mini"}) launcher_name = launcher_config["name"] + # Launchers may need to know if this is a dry-run launcher_config["dry_run"] = dry_run + # Later we will add these paths to sys.path for running here, + # but we will also need to pass them to the launcher so that + # they can be added within any containers or other launchers + # that we use + paths = pipe_config.get("python_paths", []) + if isinstance(paths, str): + paths = paths.split() + launcher_config["python_paths"] = paths + # Python modules in which to search for pipeline stages modules = pipe_config["modules"].split() @@ -109,7 +119,6 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): "resume": pipe_config["resume"], } - # Choice of actual pipeline type to run if dry_run: pipeline_class = pipeline.DryRunPipeline @@ -122,13 +131,6 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): else: raise ValueError("Unknown pipeline launcher {launcher_name}") - - - - paths = pipe_config.get("python_paths", []) - if isinstance(paths, str): - paths = paths.split() - # temporarily add the paths to sys.path, # but remove them at the end with extra_paths(paths): diff --git a/ceci/sites/__init__.py b/ceci/sites/__init__.py index 1cc4e2c..d150ff3 100644 --- a/ceci/sites/__init__.py +++ b/ceci/sites/__init__.py @@ -58,6 +58,7 @@ def load(launcher_config, site_configs): launcher_name = launcher_config["name"] dry_run = launcher_config.get("dry_run", False) + python_paths = launcher_config.get("python_paths", []) # Create an object for each site. for site_config in site_configs: @@ -68,6 +69,8 @@ def load(launcher_config, site_configs): # that test if we are not actually running the command, # just printing it. site_config["dry_run"] = dry_run + # and about any extra paths to add + site_config["python_paths"] = python_paths try: cls = site_classes[site_name] diff --git a/ceci/sites/cori.py b/ceci/sites/cori.py index 25de6b6..6671920 100644 --- a/ceci/sites/cori.py +++ b/ceci/sites/cori.py @@ -31,6 +31,7 @@ def command(self, cmd, sec): mpi1 = f"{self.mpi_command} {sec.nprocess} --cpus-per-task={sec.threads_per_process}" mpi2 = f"--mpi" if sec.nprocess > 1 else "" volume_flag = f"-V {sec.volume} " if sec.volume else "" + paths = self.config["python_paths"] if sec.nodes: mpi1 += f" --nodes {sec.nodes}" @@ -46,17 +47,38 @@ def command(self, cmd, sec): ) if sec.image: + # If we are setting python paths then we have to modify the executable + # here. This is because we need the path to be available right from the + # start, in case the stage is defined in a module added by these paths. + # The --env flags in docker/shifter overwrites an env var, and there + # doesn't seem to be a way to just append to one, so we have to be a bit + # roundabout to make this work, and invoke bash -c instead. + paths_start = ( + ("bash -c 'PYTHONPATH=$PYTHONPATH:" + (":".join(paths))) + if paths + else "" + ) + paths_end = "'" if paths else "" return ( f"{mpi1} " f"shifter " f"--env OMP_NUM_THREADS={sec.threads_per_process} " f"{volume_flag} " f"--image {sec.image} " + f"{paths_start} " f"{cmd} {mpi2} " + f"{paths_end} " ) else: + paths_env = ( + ("PYTHONPATH=" + (":".join(paths)) + ":$PYTHONPATH") if paths else "" + ) return ( - f"OMP_NUM_THREADS={sec.threads_per_process} " f"{mpi1} " f"{cmd} {mpi2}" + # In the non-container case this is much easier + f"OMP_NUM_THREADS={sec.threads_per_process} " + f"{paths_env} " + f"{mpi1} " + f"{cmd} {mpi2}" ) def configure_for_mini(self): diff --git a/ceci/sites/local.py b/ceci/sites/local.py index aa25870..e165818 100644 --- a/ceci/sites/local.py +++ b/ceci/sites/local.py @@ -31,20 +31,40 @@ def command(self, cmd, sec): mpi1 = f"{self.mpi_command} {sec.nprocess}" if sec.nprocess > 1 else "" mpi2 = f"--mpi" if sec.nprocess > 1 else "" volume_flag = f"-v {sec.volume} " if sec.volume else "" + paths = self.config["python_paths"] # TODO: allow other container types here, like singularity if sec.image: + # If we are setting python paths then we have to modify the executable + # here. This is because we need the path to be available right from the + # start, in case the stage is defined in a module added by these paths. + # The --env flags in docker/shifter overwrites an env var, and there + # doesn't seem to be a way to just append to one, so we have to be a bit + # roundabout to make this work, and invoke bash -c instead. + paths_start = ( + "bash -c 'PYTHONPATH=$PYTHONPATH:" + (":".join(paths)) if paths else "" + ) + paths_end = "'" if paths else "" return ( f"docker run " f"--env OMP_NUM_THREADS={sec.threads_per_process} " f"{volume_flag} " f"--rm -it {sec.image} " + f"{paths_start} " f"{mpi1} " f"{cmd} {mpi2} " + f"{paths_end}" ) else: + # In the non-container case this is much easier + paths_env = ( + "PYTHONPATH=" + (":".join(paths)) + ":$PYTHONPATH" if paths else "" + ) return ( - f"OMP_NUM_THREADS={sec.threads_per_process} " f"{mpi1} " f"{cmd} {mpi2}" + f"OMP_NUM_THREADS={sec.threads_per_process} " + f"{paths_env} " + f"{mpi1} " + f"{cmd} {mpi2}" ) def configure_for_parsl(self): From 7d06ac363e4dc0dbefa243cce619523ebf2661cb Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 11:51:35 +0100 Subject: [PATCH 2/6] fix when python_paths not set --- ceci/sites/cori.py | 2 +- ceci/sites/local.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ceci/sites/cori.py b/ceci/sites/cori.py index 6671920..8b7cb73 100644 --- a/ceci/sites/cori.py +++ b/ceci/sites/cori.py @@ -31,7 +31,7 @@ def command(self, cmd, sec): mpi1 = f"{self.mpi_command} {sec.nprocess} --cpus-per-task={sec.threads_per_process}" mpi2 = f"--mpi" if sec.nprocess > 1 else "" volume_flag = f"-V {sec.volume} " if sec.volume else "" - paths = self.config["python_paths"] + paths = self.config.get("python_paths", []) if sec.nodes: mpi1 += f" --nodes {sec.nodes}" diff --git a/ceci/sites/local.py b/ceci/sites/local.py index e165818..b63923f 100644 --- a/ceci/sites/local.py +++ b/ceci/sites/local.py @@ -31,7 +31,7 @@ def command(self, cmd, sec): mpi1 = f"{self.mpi_command} {sec.nprocess}" if sec.nprocess > 1 else "" mpi2 = f"--mpi" if sec.nprocess > 1 else "" volume_flag = f"-v {sec.volume} " if sec.volume else "" - paths = self.config["python_paths"] + paths = self.config.get("python_paths", []) # TODO: allow other container types here, like singularity if sec.image: From b1f40d0d2bf0e4274d7d89283a54cff7b6c9640a Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 12:27:06 +0100 Subject: [PATCH 3/6] make the extra_paths util also add to the environment variable --- ceci/utils.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/ceci/utils.py b/ceci/utils.py index 21a74ea..80a1626 100644 --- a/ceci/utils.py +++ b/ceci/utils.py @@ -1,5 +1,27 @@ from contextlib import contextmanager import sys +import os + +def add_python_path(path, start): + # add a path to the env var PYTHONPATH + old = os.environ.get("PYTHONPATH", "") + if start: + new = path + ":" + old + else: + new = old + ":" + path + os.environ['PYTHONPATH'] = new + +def remove_python_path(path, start): + # remove a path from PYTHONPATH + p = os.environ.get("PYTHONPATH", "").split(":") + if start: + p.remove(path) + else: + remove_last(p, path) + os.environ["PYTHONPATH"] = ":".join(p) + + + @contextmanager def extra_paths(paths, start=True): @@ -8,7 +30,9 @@ def extra_paths(paths, start=True): if isinstance(paths, str): paths = paths.split() - # On enter, add paths to sys.path, + # On enter, add paths to both sys.path, + # and the PYTHONPATH env var, so that subprocesses + # can see it, # either the start or the end depending # on the start argument for path in paths: @@ -17,6 +41,8 @@ def extra_paths(paths, start=True): else: sys.path.append(path) + add_python_path(path, start) + # Return control to caller try: yield @@ -28,6 +54,8 @@ def extra_paths(paths, start=True): sys.path.remove(path) else: remove_last(sys.path, path) + # also remove env var entry + remove_python_path(path, start) # If e.g. user has already done this # manually for some reason then just # skip From c469dab4b5d7b1c8e6062a6ff58570e5da9c4de0 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 12:27:18 +0100 Subject: [PATCH 4/6] tests for new features --- tests/test_pipeline.py | 63 ++++++++++++++++++++++++++++++++++++++ tests/test_python_paths.py | 4 +++ 2 files changed, 67 insertions(+) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8dd4405..3814822 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,10 +1,13 @@ from ceci import PipelineStage, MiniPipeline, ParslPipeline, Pipeline, DryRunPipeline from ceci_example.types import TextFile from ceci.sites import load, reset_default_site +from ceci.utils import extra_paths import pytest from parsl import clear import yaml import os +import tempfile +import sys # This one should work class AAA(PipelineStage): @@ -145,6 +148,66 @@ def test_dry_run(): assert status == 0 +def test_python_paths(): + # make a temp dir + with tempfile.TemporaryDirectory() as dirname: + os.mkdir(dirname + "/pretend") + + # create a subdir of that with a module in + mod_dir = dirname + "/pretend" + mod_path = mod_dir + "/pretend_module.py" + + # empty module, just to check it imports + open(mod_path, 'w').close() + + assert os.path.exists(mod_path) + print(os.listdir(mod_dir)) + + # create a stage there that uses the submodule + stage_path = dirname + "/my_stage.py" + open(stage_path, 'w').write(""" +import ceci +class MyStage(ceci.PipelineStage): + name = "MyStage" + inputs = [] + outputs = [] + config_options = {"x": int} + def run(self): + import pretend_module + assert self.config["x"] == 17 +""") + + # pipeline admin + config_path = dirname + "/config.yml" + open(config_path, 'w').write(""" +MyStage: + x: 17 + """) + + run_config = { + "log_dir": dirname, + "output_dir": dirname, + "resume": False, + "python_paths": [dirname, mod_dir], + } + + # note that we don't add the subdir here + with extra_paths(dirname): + import my_stage + print(os.environ["PYTHONPATH"]) + print(sys.path) + print(os.listdir(dirname)) + launcher_config = {"interval": 0.5, "name": "mini", "python_paths":[dirname, mod_dir]} + pipeline = MiniPipeline([{"name": "MyStage"}], launcher_config) + status = pipeline.run({}, run_config, config_path) + log = open(dirname + "/MyStage.out").read() + print(log) + assert status == 0 + + + + + # this has to be here because we test running the pipeline if __name__ == "__main__": PipelineStage.main() diff --git a/tests/test_python_paths.py b/tests/test_python_paths.py index d8f4a93..e2770de 100644 --- a/tests/test_python_paths.py +++ b/tests/test_python_paths.py @@ -21,15 +21,19 @@ class MyError(Exception): def test_extra_paths(): p = 'xxx111yyy222' orig_path = sys.path[:] + orig_env = os.environ.get("PYTHONPATH", "") # check path is put in with extra_paths(p): assert sys.path[0] == p + assert p in os.environ['PYTHONPATH'] # check everything back to normal # after with statement assert p not in sys.path assert sys.path == orig_path + assert p not in os.environ['PYTHONPATH'] + assert os.environ['PYTHONPATH'] == orig_env # check that an exception does not interfere # with this From bef66966ed7be6fc78c883dbc3ba0c72f424c539 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 12:36:39 +0100 Subject: [PATCH 5/6] fix tes --- tests/test_python_paths.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_python_paths.py b/tests/test_python_paths.py index e2770de..7597e88 100644 --- a/tests/test_python_paths.py +++ b/tests/test_python_paths.py @@ -2,6 +2,7 @@ from ceci.main import run from ceci.utils import remove_last, extra_paths import pytest +import os def test_remove_item(): l = list('abcdea') From 173bf2d800497fedb1ae65fad8721335db418121 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 11 Aug 2020 13:08:20 +0100 Subject: [PATCH 6/6] make python_paths per-site not per-launcher --- ceci/main.py | 6 ++++-- ceci/sites/__init__.py | 3 --- tests/test_pipeline.py | 9 +++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ceci/main.py b/ceci/main.py index 2d25d13..b0cc6de 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -72,13 +72,12 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): launcher_config["dry_run"] = dry_run # Later we will add these paths to sys.path for running here, - # but we will also need to pass them to the launcher so that + # but we will also need to pass them to the sites below so that # they can be added within any containers or other launchers # that we use paths = pipe_config.get("python_paths", []) if isinstance(paths, str): paths = paths.split() - launcher_config["python_paths"] = paths # Python modules in which to search for pipeline stages modules = pipe_config["modules"].split() @@ -93,6 +92,8 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): default_site = get_default_site() site_config = pipe_config.get("site", {"name": "local"}) + # Pass the paths along to the site + site_config["python_paths"] = paths load(launcher_config, [site_config]) # Inputs and outputs @@ -119,6 +120,7 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False): "resume": pipe_config["resume"], } + # Choice of actual pipeline type to run if dry_run: pipeline_class = pipeline.DryRunPipeline diff --git a/ceci/sites/__init__.py b/ceci/sites/__init__.py index d150ff3..1cc4e2c 100644 --- a/ceci/sites/__init__.py +++ b/ceci/sites/__init__.py @@ -58,7 +58,6 @@ def load(launcher_config, site_configs): launcher_name = launcher_config["name"] dry_run = launcher_config.get("dry_run", False) - python_paths = launcher_config.get("python_paths", []) # Create an object for each site. for site_config in site_configs: @@ -69,8 +68,6 @@ def load(launcher_config, site_configs): # that test if we are not actually running the command, # just printing it. site_config["dry_run"] = dry_run - # and about any extra paths to add - site_config["python_paths"] = python_paths try: cls = site_classes[site_name] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3814822..0b67362 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -191,13 +191,17 @@ def run(self): "python_paths": [dirname, mod_dir], } + + launcher_config = {"interval": 0.5, "name": "mini"} + site_config = {"name":"local", "python_paths":[dirname, mod_dir]} + load(launcher_config, [site_config]) + # note that we don't add the subdir here with extra_paths(dirname): import my_stage print(os.environ["PYTHONPATH"]) print(sys.path) print(os.listdir(dirname)) - launcher_config = {"interval": 0.5, "name": "mini", "python_paths":[dirname, mod_dir]} pipeline = MiniPipeline([{"name": "MyStage"}], launcher_config) status = pipeline.run({}, run_config, config_path) log = open(dirname + "/MyStage.out").read() @@ -205,9 +209,6 @@ def run(self): assert status == 0 - - - # this has to be here because we test running the pipeline if __name__ == "__main__": PipelineStage.main()