From 0c748256c3e9289628b6bfe72b02624dec923717 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 10 Aug 2022 08:09:46 -0700 Subject: [PATCH 1/3] Tooling for using code from namespace packages --- ceci/pipeline.py | 12 ++++++++---- ceci/stage.py | 20 ++++++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/ceci/pipeline.py b/ceci/pipeline.py index 0506c45..2ee2d85 100644 --- a/ceci/pipeline.py +++ b/ceci/pipeline.py @@ -55,6 +55,8 @@ class StageExecutionConfig: The name of the stage class_name: str The name of the class of the stage + module_name: str + The name of the module for the stage site: Site object (default the global default) The site this stage is run on nprocess: int @@ -88,6 +90,7 @@ def __init__(self, info): # Core attributes - mandatory self.name = info["name"] self.class_name = info.get("classname", self.name) + self.module_name = info["module_name"] self.site = info.get("site", get_default_site()) # Parallelism attributes - optional @@ -129,6 +132,7 @@ def create(cls, stage, **kwargs): info = kwargs.copy() info["name"] = stage.instance_name info["classname"] = stage.name + info["module_name"] = stage.get_module() sec = cls(info) sec.set_stage_obj(stage) return sec @@ -146,7 +150,7 @@ def set_stage_obj(self, stage_obj): TypeError : if stage_obj is not and instance of self.stage_class as determined by the self.class_name attribute """ - self.stage_class = PipelineStage.get_stage(self.class_name) + self.stage_class = PipelineStage.get_stage(self.class_name, self.module_name) if not isinstance(stage_obj, self.stage_class): # pragma: no cover raise TypeError(f"{str(stage_obj)} is not a {str(self.stage_class)}") self.stage_obj = stage_obj @@ -155,7 +159,7 @@ def build_stage_class(self): """Set the stage_class attribute by finding self.class_name in the dictionary of classes from `Pipeline_stage` """ - self.stage_class = PipelineStage.get_stage(self.class_name) + self.stage_class = PipelineStage.get_stage(self.class_name, self.module_name) return self.stage_class def build_stage_object(self, args): @@ -173,7 +177,7 @@ def build_stage_object(self, args): The newly constructed object """ if self.stage_class is None: # pragma: no cover - self.stage_class = PipelineStage.get_stage(self.class_name) + self.stage_class = PipelineStage.get_stage(self.class_name, self.module_name) self.stage_obj = self.stage_class(args) return self.stage_obj @@ -881,7 +885,7 @@ def save(self, pipefile, stagefile=None, reduce_config=False): if site is None: site = val.site.config pipe_stage_info = dict( - name=val.name, classname=val.class_name, nprocess=val.nprocess + name=val.name, classname=val.class_name, nprocess=val.nprocess, module_name=val.module_name, ) if val.threads_per_process != 1: pipe_stage_info["threads_per_process"] = val.threads_per_process diff --git a/ceci/stage.py b/ceci/stage.py index 3869ef4..9c3a767 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -344,7 +344,7 @@ def __init_subclass__(cls, **kwargs): ############################################# @classmethod - def get_stage(cls, name): + def get_stage(cls, name, module_name=None): """ Return the PipelineStage subclass with the given name. @@ -358,7 +358,12 @@ def get_stage(cls, name): The corresponding subclass """ stage = cls.pipeline_stages.get(name) - + if stage is None: + if module_name: + print("importing ", module_name) + __import__(module_name) + stage = cls.pipeline_stages.get(name) + # If not found, then check for incomplete stages if stage is None: if name in cls.incomplete_pipeline_stages: @@ -450,7 +455,14 @@ def main(cls): if stage_name in ["--help", "-h"] and len(sys.argv) == 2: # pragma: no cover cls.usage() return 1 - stage = cls.get_stage(stage_name) + if stage_name.find('.') >= 0: + tokens = stage_name.split('.') + module_name = '.'.join(tokens[:-1]) + stage_name = tokens[-1] + else: + module_name = None + + stage = cls.get_stage(stage_name, module_name) args = stage.parse_command_line() stage.execute(args) return 0 @@ -1284,7 +1296,7 @@ def generate_command( module = cls.get_module() module = module.split(".")[0] - flags = [cls.name] + flags = [f"{cls.get_module()}.{cls.name}"] aliases = aliases or {} for tag, _ in cls.inputs_(): From ad746a78221a9f6756e254345bfc2a9cd03f34a1 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 10 Aug 2022 10:00:59 -0700 Subject: [PATCH 2/3] whitespace cleanup and protecting against module_name not being provided --- ceci/pipeline.py | 2 +- ceci/stage.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ceci/pipeline.py b/ceci/pipeline.py index 2ee2d85..ff5fc48 100644 --- a/ceci/pipeline.py +++ b/ceci/pipeline.py @@ -90,7 +90,7 @@ def __init__(self, info): # Core attributes - mandatory self.name = info["name"] self.class_name = info.get("classname", self.name) - self.module_name = info["module_name"] + self.module_name = info.get("module_name") self.site = info.get("site", get_default_site()) # Parallelism attributes - optional diff --git a/ceci/stage.py b/ceci/stage.py index 9c3a767..9eb26fd 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -352,6 +352,9 @@ def get_stage(cls, name, module_name=None): for each new stage - instead we can just use a single one which can query which class it should be using based on the name. + If module_name is provided, this will import that module + in order to load the required class. + Returns ------- cls: class @@ -363,7 +366,7 @@ def get_stage(cls, name, module_name=None): print("importing ", module_name) __import__(module_name) stage = cls.pipeline_stages.get(name) - + # If not found, then check for incomplete stages if stage is None: if name in cls.incomplete_pipeline_stages: @@ -456,12 +459,12 @@ def main(cls): cls.usage() return 1 if stage_name.find('.') >= 0: - tokens = stage_name.split('.') + tokens = stage_name.split('.') module_name = '.'.join(tokens[:-1]) stage_name = tokens[-1] else: module_name = None - + stage = cls.get_stage(stage_name, module_name) args = stage.parse_command_line() stage.execute(args) From 6303f565ede34d2b397ee420be92606b6945b403 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Thu, 11 Aug 2022 10:13:54 -0700 Subject: [PATCH 3/3] Added testing for namespace module support --- .pylintrc | 54 +----- ceci/__main__.py | 5 + ceci/stage.py | 11 +- ceci_example_namespace/example_stages.py | 215 +++++++++++++++++++++++ do_cover.sh | 2 +- setup.py | 2 +- tests/config_nm.yml | 9 + tests/test_main.py | 10 +- tests/test_namespace.yml | 98 +++++++++++ 9 files changed, 346 insertions(+), 60 deletions(-) create mode 100644 ceci/__main__.py create mode 100644 ceci_example_namespace/example_stages.py create mode 100644 tests/config_nm.yml create mode 100644 tests/test_namespace.yml diff --git a/.pylintrc b/.pylintrc index e8ef0f6..c7f289c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -30,14 +30,6 @@ unsafe-load-any-extension=no # run arbitrary code extension-pkg-whitelist= -# Allow optimization of some AST trees. This will activate a peephole AST -# optimizer, which will apply various small optimizations. For instance, it can -# be used to obtain the result of joining multiple strings with the addition -# operator. Joining a lot of strings can lead to a maximum recursion error in -# Pylint and this flag can prevent that. It has one side effect, the resulting -# AST will be different than the one from reality. -optimize-ast=no - [MESSAGES CONTROL] @@ -60,7 +52,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,abstract-method,broad-except,invalid-name,line-too-long,wrong-import-order,wrong-import-position,too-many-statements,super-with-arguments,import-outside-toplevel,too-many-arguments,too-many-instance-attributes,unspecified-encoding,no-else-return +disable=suppressed-message,abstract-method,broad-except,invalid-name,line-too-long,wrong-import-order,wrong-import-position,too-many-statements,super-with-arguments,import-outside-toplevel,too-many-arguments,too-many-instance-attributes,unspecified-encoding,no-else-return [REPORTS] @@ -70,11 +62,6 @@ disable=import-star-module-level,old-octal-literal,oct-method,print-statement,un # mypackage.mymodule.MyReporterClass. output-format=text -# Put messages in a separate file for each module / package specified on the -# command line instead of printing them on stdout. Reports (if any) will be -# written in a file name "pylint_global.[txt|html]". -files-output=no - # Tells whether to display a full report or only the messages reports=yes @@ -92,9 +79,6 @@ evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / stateme [BASIC] -# List of builtins function names that should not be used, separated by a comma -bad-functions=map,filter,input - # Good variable names which should always be accepted, separated by a comma good-names=i,j,k,ex,Run,_ @@ -111,63 +95,33 @@ include-naming-hint=no # Regular expression matching correct function names function-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for function names -function-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct variable names variable-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for variable names -variable-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct constant names const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ -# Naming hint for constant names -const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__))$ - # Regular expression matching correct attribute names attr-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for attribute names -attr-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct argument names argument-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for argument names -argument-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct class attribute names class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ -# Naming hint for class attribute names -class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ - # Regular expression matching correct inline iteration names inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ -# Naming hint for inline iteration names -inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$ - # Regular expression matching correct class names class-rgx=[A-Z_][a-zA-Z0-9]+$ -# Naming hint for class names -class-name-hint=[A-Z_][a-zA-Z0-9]+$ - # Regular expression matching correct module names module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ -# Naming hint for module names -module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ - # Regular expression matching correct method names method-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for method names -method-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression which should only match function or class names that do # not require a docstring. no-docstring-rgx=^_ @@ -282,12 +236,6 @@ ignore-long-lines=^\s*(# )??$ # else. single-line-if-stmt=no -# List of optional constructs for which whitespace checking is disabled. `dict- -# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. -# `trailing-comma` allows a space between comma and closing bracket: (a, ). -# `empty-line` allows space-only lines. -no-space-check=trailing-comma,dict-separator - # Maximum number of lines in a module max-module-lines=2000 diff --git a/ceci/__main__.py b/ceci/__main__.py new file mode 100644 index 0000000..2330e18 --- /dev/null +++ b/ceci/__main__.py @@ -0,0 +1,5 @@ +# This file must exist with these contents +from .stage import PipelineStage + +if __name__ == "__main__": + PipelineStage.main() diff --git a/ceci/stage.py b/ceci/stage.py index 9eb26fd..cf2b3ca 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -363,7 +363,6 @@ def get_stage(cls, name, module_name=None): stage = cls.pipeline_stages.get(name) if stage is None: if module_name: - print("importing ", module_name) __import__(module_name) stage = cls.pipeline_stages.get(name) @@ -1299,7 +1298,15 @@ def generate_command( module = cls.get_module() module = module.split(".")[0] - flags = [f"{cls.get_module()}.{cls.name}"] + if sys.modules[module].__file__: + # Regular module, stage will be imported with module + flags = [f"{cls.name}"] + else: + # Namescape module, use 'ceci' to the get main + # and specify the full path + flags = [f"{cls.get_module()}.{cls.name}"] + module = 'ceci' + aliases = aliases or {} for tag, _ in cls.inputs_(): diff --git a/ceci_example_namespace/example_stages.py b/ceci_example_namespace/example_stages.py new file mode 100644 index 0000000..4e76dd7 --- /dev/null +++ b/ceci_example_namespace/example_stages.py @@ -0,0 +1,215 @@ +from ceci import PipelineStage +from ceci_example.types import TextFile, YamlFile + + +class NMshearMeasurementPipe(PipelineStage): + """ + This pipeline element is a template for a shape measurement tool + """ + + name = "NMshearMeasurementPipe" + inputs = [("DM", TextFile)] + outputs = [("shear_catalog", TextFile)] + config_options = {"metacalibration": bool, "apply_flag": bool} + + def run(self): + # Retrieve configuration: + my_config = self.config + print("Here is my configuration :", my_config) + + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" shearMeasurementPipe reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" shearMeasurementPipe writing to {filename}") + open(filename, "w").write("shearMeasurementPipe was here \n") + + +class NMPZEstimationPipe(PipelineStage): + name = "NMPZEstimationPipe" + inputs = [("DM", TextFile), ("fiducial_cosmology", TextFile)] + outputs = [("photoz_pdfs", TextFile)] + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" PZEstimationPipe reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" PZEstimationPipe writing to {filename}") + open(filename, "w").write("PZEstimationPipe was here \n") + + +class NMWLGCRandoms(PipelineStage): + name = "NMWLGCRandoms" + inputs = [("diagnostic_maps", TextFile)] + outputs = [("random_catalog", TextFile)] + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" WLGCRandoms reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" WLGCRandoms writing to {filename}") + open(filename, "w").write("WLGCRandoms was here \n") + + +class NMWLGCSelector(PipelineStage): + name = "NMWLGCSelector" + inputs = [("shear_catalog", TextFile), ("photoz_pdfs", TextFile)] + outputs = [("tomography_catalog", TextFile)] + config_options = {"zbin_edges": [float], "ra_range": [-5.0, 5.0]} + + def run(self): + print(self.config) + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" WLGCSelector reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" WLGCSelector writing to {filename}") + open(filename, "w").write("WLGCSelector was here \n") + + +class NMSourceSummarizer(PipelineStage): + name = "NMSourceSummarizer" + inputs = [ + ("tomography_catalog", TextFile), + ("photoz_pdfs", TextFile), + ("diagnostic_maps", TextFile), + ] + outputs = [("source_summary_data", TextFile)] + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" SourceSummarizer reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" SourceSummarizer writing to {filename}") + open(filename, "w").write("SourceSummarizer was here \n") + + +class NMSysMapMaker(PipelineStage): + name = "NMSysMapMaker" + inputs = [("DM", TextFile)] + outputs = [("diagnostic_maps", TextFile)] + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" SysMapMaker reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" SysMapMaker writing to {filename}") + open(filename, "w").write("SysMapMaker was here \n") + + +class NMWLGCTwoPoint(PipelineStage): + name = "NMWLGCTwoPoint" + inputs = [ + ("tomography_catalog", TextFile), + ("shear_catalog", TextFile), + ("diagnostic_maps", TextFile), + ("random_catalog", TextFile), + ] + outputs = [("twopoint_data", TextFile)] + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" WLGCTwoPoint reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" WLGCTwoPoint writing to {filename}") + open(filename, "w").write("WLGCTwoPoint was here \n") + + +class NMWLGCCov(PipelineStage): + name = "NMWLGCCov" + inputs = [ + ("fiducial_cosmology", TextFile), + ("tomography_catalog", TextFile), + ("shear_catalog", TextFile), + ("source_summary_data", TextFile), + ("diagnostic_maps", TextFile), + ] + outputs = [("covariance", TextFile)] + + def rank_filename(self, rank, size): + filename = self.get_output("covariance") + if size == 1: + fname = filename + else: + fname = f"{filename}.{rank}" + return fname + + def run(self): + + # MPI Information + rank = self.rank + size = self.size + comm = self.comm + + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" WLGCCov rank {rank}/{size} reading from {filename}") + open(filename) + + filename = self.get_output("covariance") + my_filename = self.rank_filename(rank, size) + print(f" WLGCCov rank {rank}/{size} writing to {my_filename}") + open(my_filename, "w").write(f"WLGCCov rank {rank} was here \n") + + # + if comm: + comm.Barrier() + + # If needed, concatenate all files + if rank == 0 and size > 1: + f = open(filename, "w") + print(f"Master process concatenating files:") + for i in range(size): + fname = self.rank_filename(i, size) + print(f" {fname}") + content = open(fname).read() + f.write(content) + f.close() + + +class NMWLGCSummaryStatistic(PipelineStage): + name = "NMWLGCSummaryStatistic" + inputs = [ + ("twopoint_data", TextFile), + ("covariance", TextFile), + ("source_summary_data", TextFile), + ] + outputs = [("wlgc_summary_data", TextFile)] + parallel = False + + def run(self): + for inp, _ in self.inputs: + filename = self.get_input(inp) + print(f" WLGCSummaryStatistic reading from {filename}") + open(filename) + + for out, _ in self.outputs: + filename = self.get_output(out) + print(f" WLGCSummaryStatistic writing to {filename}") + open(filename, "w").write("WLGCSummaryStatistic was here \n") diff --git a/do_cover.sh b/do_cover.sh index acf85fa..30c9fda 100755 --- a/do_cover.sh +++ b/do_cover.sh @@ -1 +1 @@ -python -m pytest --cov=./ceci --cov-report=html tests +python -m pytest --cov=./ceci --cov-branch --cov-report=html tests diff --git a/setup.py b/setup.py index ad40e43..4e6b0a7 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,6 @@ extras_require={ 'parsl': ['flask', 'parsl>=1.0.0'], 'cwl': ['cwlgen>=0.4', 'cwltool>=2.0.20200126090152'], - 'test': ['pytest', 'codecov', 'pytest-cov'], + 'test': ['pytest', 'codecov', 'pytest-cov', 'pytest-mock', 'mockmpi'], } ) diff --git a/tests/config_nm.yml b/tests/config_nm.yml new file mode 100644 index 0000000..9cc92d6 --- /dev/null +++ b/tests/config_nm.yml @@ -0,0 +1,9 @@ +global: + metacalibration: True + +NMshearMeasurementPipe: + apply_flag: False + +NMWLGCSelector: + zbin_edges: [0.2, 0.3, 0.5] + ra_range: [-5, 5] diff --git a/tests/test_main.py b/tests/test_main.py index b08cd91..a3d544e 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -8,15 +8,15 @@ from ceci.pipeline import Pipeline -def run1(*config_changes, dry_run=False, expect_fail=False, expect_outputs=True): +def run1(*config_changes, config_yaml="tests/test.yml", dry_run=False, expect_fail=False, expect_outputs=True): try: with tempfile.TemporaryDirectory() as dirname: out_dir = os.path.join(dirname, "output") log_dir = os.path.join(dirname, "logs") config = [f"output_dir={out_dir}", f"log_dir={log_dir}"] config += config_changes - pipe_config = Pipeline.build_config("tests/test.yml", config, dry_run) - status = run(pipe_config, "tests/test.yml", config, dry_run) + pipe_config = Pipeline.build_config(config_yaml, config, dry_run) + status = run(pipe_config, config_yaml, config, dry_run) if expect_fail: assert status != 0 else: @@ -45,6 +45,10 @@ def test_run_cwl(): run1("launcher.name=cwl", "launcher.dir=tests/cwl") == 0 +def test_run_namespace(): + run1(config_yaml="tests/test_namespace.yml", expect_outputs=False) == 0 + + def test_pre_script(): # use the bash "true" command to simulate a # pre-script suceeding diff --git a/tests/test_namespace.yml b/tests/test_namespace.yml new file mode 100644 index 0000000..478ac73 --- /dev/null +++ b/tests/test_namespace.yml @@ -0,0 +1,98 @@ +# Python modules that are imported to find +# stage classes. Any stages imported in these +# modules are automatically detected and their names can +# be used below +modules: ceci_example_namespace + +# The launcher to use +# These are defined in ceci/sites +launcher: + name: mini + interval: 0.5 + +# launcher: +# name: parsl +# # max_threads only referenced for local sites +# #log: parsl_log.txt + +# launcher: +# name: cwl +# launcher: cwltool +# dir: ./test/cwl + +site: + name: local + max_threads: 4 + # max_threads: 4 + # container: joezuntz/txpipe + # volume: $PWD:/opt/txpipe + + + +#site: +# name: cori-interactive +# # Put the log for the overall pipeline infrastructure in this file: +# pipeline_log: log.txt + +# site: +# name: cori-batch +# cpu_type: haswell +# queue: debug +# max_jobs: 2 +# account: m1727 +# walltime: "00:30:00" +# setup: /global/projecta/projectdirs/lsst/groups/WL/users/zuntz/setup-cori + + + +# The list of stages to run and the number of processors +# to use for each. +stages: + - name: NMWLGCSummaryStatistic + module_name: ceci_example_namespace.example_stages + nprocess: 1 + threads_per_process: 2 + - name: NMSysMapMaker + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMshearMeasurementPipe + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMPZEstimationPipe + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMWLGCRandoms + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMWLGCSelector + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMSourceSummarizer + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMWLGCTwoPoint + module_name: ceci_example_namespace.example_stages + nprocess: 1 + - name: NMWLGCCov + module_name: ceci_example_namespace.example_stages + nprocess: 1 + +# Definitions of where to find inputs for the overall pipeline. +# Any input required by a pipeline stage that is not generated by +# a previous stage must be defined here. They are listed by tag. +inputs: + DM: ./tests/inputs/dm.txt + fiducial_cosmology: ./tests/inputs/fiducial_cosmology.txt + +# Overall configuration file +config: ./tests/config_nm.yml + +# If all the outputs for a stage already exist then do not re-run that stage +resume: False + +# Put all the output files in this directory: +output_dir: ./tests/outputs + +# Put the logs from the individual stages in this directory: +log_dir: ./tests/logs +