From 37f987f1435dc1a4c34f2294ef74a50a0b3c7ab2 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Wed, 23 Aug 2023 13:44:43 +0100 Subject: [PATCH] Add ancestors script and tests --- ceci/tools/__init__.py | 0 ceci/tools/ancestors.py | 50 +++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 2 ++ tests/test_main.py | 14 ++++++++++++ 4 files changed, 66 insertions(+) create mode 100644 ceci/tools/__init__.py create mode 100644 ceci/tools/ancestors.py diff --git a/ceci/tools/__init__.py b/ceci/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ceci/tools/ancestors.py b/ceci/tools/ancestors.py new file mode 100644 index 0000000..29ed883 --- /dev/null +++ b/ceci/tools/ancestors.py @@ -0,0 +1,50 @@ +import ceci +import yaml +import argparse + +def get_ancestors(dag, job): + for parent in dag[job]: + yield parent + yield from get_ancestors(dag, parent) + + +def print_ancestors(pipeline_config_file, target): + with open(pipeline_config_file) as f: + pipe_config = yaml.safe_load(f) + + # need to manually switch off resume mode because it + # would stop jobs from being properly in the DAG. + pipe_config['resume'] = False + + with ceci.prepare_for_pipeline(pipe_config): + pipe = ceci.Pipeline.create(pipe_config) + + jobs = pipe.run_info[0] + dag = pipe.build_dag(jobs) + + if target in jobs: + # in this case the target is the name of a stage. + job = jobs[target] + else: + # otherwise it must be the name of an output tag + for stage in pipe.stages: + if target in stage.output_tags(): + job = jobs[stage.instance_name] + break + else: + raise ValueError(f"Could not find job or output tag {target}") + + for ancestor in get_ancestors(dag, job): + print(ancestor.name) + +parser = argparse.ArgumentParser() +parser.add_argument('pipeline_config_file') +parser.add_argument('stage_name_or_output_tag') + + +def main(): + args = parser.parse_args() + print_ancestors(args.pipeline_config_file, args.stage_name_or_output_tag) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index aad7c34..d632be2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,11 +41,13 @@ write_to = "ceci/_version.py" packages = [ "ceci", "ceci.sites", + "ceci.tools", ] [project.scripts] ceci = "ceci.main:main" +ceci-ancestors = "ceci.tools.ancestors:main" [project.optional-dependencies] diff --git a/tests/test_main.py b/tests/test_main.py index e0c258e..0084692 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,5 @@ from ceci.main import run_pipeline +from ceci.tools.ancestors import print_ancestors from parsl import clear import tempfile import os @@ -54,6 +55,19 @@ def test_run_cwl(): def test_run_namespace(): run1(config_yaml="tests/test_namespace.yml", expect_outputs=False) == 0 +def test_ancestors_stage(capsys): + print_ancestors("tests/test.yml", "WLGCRandoms") + captured = capsys.readouterr() + assert captured.out.strip() == "SysMapMaker" + +def test_ancestors_output(capsys): + print_ancestors("tests/test.yml", "tomography_catalog") + captured = capsys.readouterr() + assert captured.out.strip() == "shearMeasurementPipe\nPZEstimationPipe" + +def test_ancestors_broken(capsys): + with pytest.raises(ValueError): + print_ancestors("tests/test.yml", "not-a-real-stage-or-output")