From b7c0106b8d713b2e6c3c2ea28b12e61f5dfc44a4 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 4 Oct 2022 15:29:18 +0100 Subject: [PATCH 1/2] remove old code --- ceci/__init__.py | 1 + ceci/main.py | 103 ++++++------------------------------- tests/test_interactive.py | 1 - tests/test_main.py | 21 +------- tests/test_python_paths.py | 1 - tests/test_site.py | 1 - tests/test_snapshot.py | 1 - 7 files changed, 18 insertions(+), 111 deletions(-) diff --git a/ceci/__init__.py b/ceci/__init__.py index 55bcd45..1084c42 100644 --- a/ceci/__init__.py +++ b/ceci/__init__.py @@ -2,6 +2,7 @@ from .stage import PipelineStage from .pipeline import Pipeline, MiniPipeline, ParslPipeline, DryRunPipeline, FlowChartPipeline +from .main import prepare_for_pipeline, run_pipeline from pkg_resources import DistributionNotFound from pkg_resources import get_distribution diff --git a/ceci/main.py b/ceci/main.py index cf381c7..4f5b36a 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -7,6 +7,7 @@ from .pipeline import Pipeline from .sites import load, set_default_site, get_default_site from .utils import extra_paths +import contextlib # Add the current dir to the path - often very useful sys.path.append(os.getcwd()) @@ -35,23 +36,6 @@ ) -def run_prescript(pre_script=None, dry_run=False, script_args=None): - """Run a script before running the pipeline - - Parameters - ---------- - pre_script : `str` - The script to run - dry_run : `bool` - If true do not run the script - script_args : `list`, (`str`) - Arguments to the script - """ - if script_args is None: # pragma: no cover - script_args = [] - if pre_script and not dry_run: - subprocess.check_call(pre_script.split() + script_args, shell=True) - def run_pipeline(pipe_config): """Run a pipeline as defined by a particular configuration @@ -66,68 +50,16 @@ def run_pipeline(pipe_config): status : `int` Usual unix convention of 0 -> success, non-zero is an error code """ - default_site = get_default_site() - try: + with prepare_for_pipeline(pipe_config): p = Pipeline.create(pipe_config) status = p.run() - finally: - # The create command above changes the default site. - # So that this function doesn't confuse later things, - # reset that site now. - set_default_site(default_site) - return status -def run_postscript(post_script=None, dry_run=False, script_args=None): - """Run a script after the pipeline finishes - - Parameters - ---------- - post_script : `str` - The script to run - dry_run : `bool` - If true do not run the script - script_args : `list`, (`str`) - Arguments to the script - - Returns - ------- - return_code : `int` - Usual unix convention of 0 -> success, non-zero is an error code +@contextlib.contextmanager +def prepare_for_pipeline(pipe_config): """ - if script_args is None: # pragma: no cover - script_args = [] - if post_script and not dry_run: - return_code = subprocess.call(post_script.split() + script_args, shell=True) - if return_code: # pragma: no cover - sys.stderr.write( - f"\nWARNING: The post-script command {post_script} " - "returned error status {return_code}\n\n" - ) - return return_code - # Otherwise everything must have gone fine. - return 0 - - -def run(pipe_config, pipeline_config_filename, extra_config=None, dry_run=False): - """Run a pipeline and associated scripts - - Parameters - ---------- - pipe_config : `dict` - The configuration dictionary - pipe_config_filename : `str` - The yaml file with the pipeline configuration - extra_config : `dict` - Extra parameters to override configuration - dry_run : `bool` - Flag to not actually run jobs - - Returns - ------- - status : `int` - Usual unix convention of 0 -> success, non-zero is an error code + Prepare the paths and launcher needed to read and run a pipeline. """ # Later we will add these paths to sys.path for running here, @@ -138,8 +70,10 @@ def run(pipe_config, pipeline_config_filename, extra_config=None, dry_run=False) if isinstance(paths, str): # pragma: no cover paths = paths.split() + # Get information (maybe the default) on the launcher we may be using launcher_config = pipe_config.setdefault("launcher", {"name": "mini"}) site_config = pipe_config.get("site", {"name": "local"}) + # Pass the paths along to the site site_config["python_paths"] = paths load(launcher_config, [site_config]) @@ -147,29 +81,22 @@ def run(pipe_config, pipeline_config_filename, extra_config=None, dry_run=False) # Python modules in which to search for pipeline stages modules = pipe_config.get("modules", "").split() - pre_script = pipe_config.get("pre_script") - post_script = pipe_config.get("post_script") - script_args = [pipeline_config_filename] - if extra_config: - script_args += extra_config + # This helps with testing + default_site = get_default_site() # temporarily add the paths to sys.path, # but remove them at the end with extra_paths(paths): + # Import modules. We have to do this because the definitions + # of the stages can be inside. for module in modules: __import__(module) - run_prescript(pre_script=pre_script, dry_run=dry_run, script_args=script_args) - - status = run_pipeline(pipe_config) - if status: # pragma: no cover - return status - - status = run_postscript( - post_script=post_script, dry_run=dry_run, script_args=script_args - ) - return status + try: + yield + finally: + set_default_site(default_site) def main(): # pragma: no cover diff --git a/tests/test_interactive.py b/tests/test_interactive.py index 32a3e05..b3db02e 100644 --- a/tests/test_interactive.py +++ b/tests/test_interactive.py @@ -1,4 +1,3 @@ -from ceci.main import run from parsl import clear import tempfile import os diff --git a/tests/test_main.py b/tests/test_main.py index a3d544e..18f742e 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,4 @@ -from ceci.main import run +from ceci.main import run_pipeline from parsl import clear import tempfile import os @@ -16,7 +16,7 @@ def run1(*config_changes, config_yaml="tests/test.yml", dry_run=False, expect_fa config = [f"output_dir={out_dir}", f"log_dir={log_dir}"] config += config_changes pipe_config = Pipeline.build_config(config_yaml, config, dry_run) - status = run(pipe_config, config_yaml, config, dry_run) + status = run_pipeline(pipe_config) if expect_fail: assert status != 0 else: @@ -49,23 +49,6 @@ def test_run_namespace(): run1(config_yaml="tests/test_namespace.yml", expect_outputs=False) == 0 -def test_pre_script(): - # use the bash "true" command to simulate a - # pre-script suceeding - run1("pre_script='true'") - # and false to simulate a failure - with pytest.raises(subprocess.CalledProcessError): - # error should happen before we get to the asserts, so no expect_fail etc - run1("pre_script='false'") - - -def test_post_script(): - # use the bash "true" command to simulate a - # pre-script suceeding - run1("post_script='true'") - # and false to simulate a failure - should not raise an error - # but should fail. Outputs should exist. - run1("post_script='false'", expect_fail=True, expect_outputs=True) if __name__ == "__main__": diff --git a/tests/test_python_paths.py b/tests/test_python_paths.py index 2b22c1c..358609a 100644 --- a/tests/test_python_paths.py +++ b/tests/test_python_paths.py @@ -1,5 +1,4 @@ import sys -from ceci.main import run from ceci.utils import remove_last, extra_paths import pytest import os diff --git a/tests/test_site.py b/tests/test_site.py index 147ca7e..0d2face 100644 --- a/tests/test_site.py +++ b/tests/test_site.py @@ -1,6 +1,5 @@ from ceci.sites import load, get_default_site from ceci.pipeline import StageExecutionConfig -from ceci.main import run import pytest import os diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py index e89db41..21aea47 100644 --- a/tests/test_snapshot.py +++ b/tests/test_snapshot.py @@ -1,4 +1,3 @@ -from ceci.main import run import tempfile import os import pytest From 64232bab2b235022232c6f21d3171f19401e547b Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 4 Oct 2022 15:50:03 +0100 Subject: [PATCH 2/2] fix main --- ceci/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ceci/main.py b/ceci/main.py index 4f5b36a..b55b371 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -109,7 +109,7 @@ def main(): # pragma: no cover pipe_config = Pipeline.build_config( args.pipeline_config, args.extra_config, args.dry_run, args.flow_chart ) - status = run(pipe_config, args.pipeline_config, args.extra_config, args.dry_run) + status = run_pipeline(pipe_config) if status == 0: print("Pipeline successful. Joy is sparked.")