Skip to content

Commit

Permalink
Merge pull request #91 from LSSTDESC/move-aliases-to-pipeline-file
Browse files Browse the repository at this point in the history
Move aliases to pipeline file
  • Loading branch information
joezuntz authored Jul 10, 2024
2 parents ae1dd52 + 9973ada commit 516603f
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 161 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
strategy:
matrix:
python-version: [3.9]
>>>>>>> master

steps:
- name: Checkout repository
Expand Down
249 changes: 119 additions & 130 deletions ceci/pipeline.py

Large diffs are not rendered by default.

26 changes: 19 additions & 7 deletions ceci/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PipelineStage:
doc = ""
allow_reload = False

def __init__(self, args, comm=None):
def __init__(self, args, comm=None, aliases=None):
"""Construct a pipeline stage, specifying the inputs, outputs, and configuration for it.
The constructor needs a dict or namespace. It should include:
Expand Down Expand Up @@ -82,6 +82,8 @@ def __init__(self, args, comm=None):
Specification of input and output paths and any missing config options
comm: MPI communicator
(default is None) An MPI comm object to use in preference to COMM_WORLD
aliases: dict
Mapping of tags to new tags
"""
self._configs = StageConfig(**self.config_options)
self._inputs = None
Expand All @@ -92,11 +94,15 @@ def __init__(self, args, comm=None):
self._rank = 0
self._io_checked = False
self.dask_client = None
if aliases is None:
aliases = {}
self._aliases = aliases

self.load_configs(args)
if comm is not None:
self.setup_mpi(comm)


@classmethod
def make_stage(cls, **kwargs):
"""Make a stage of a particular type"""
Expand All @@ -111,13 +117,19 @@ def make_stage(cls, **kwargs):
for output_ in cls.outputs: # pylint: disable=no-member
outtag = output_[0]
aliases[outtag] = f"{outtag}_{name}"
kwcopy["aliases"] = aliases
return cls(kwcopy, comm=comm)
# EAC. Ideally we would just pass the aliases into the construction call
# but that would requiring changing the signature of every sub-class, so we do this
# instead. At some point we might want to migrate to doing it the better way
stage = cls(kwcopy, comm=comm)
stage._aliases.update(**aliases)
stage._io_checked = False
stage.check_io()
return stage

def get_aliases(self):
"""Returns the dictionary of aliases used to remap inputs and outputs
in the case that we want to have multiple instance of this class in the pipeline"""
return self.config.get("aliases", None)
return self._aliases

def get_aliased_tag(self, tag):
"""Returns the possibly remapped value for an input or output tag
Expand All @@ -133,8 +145,6 @@ def get_aliased_tag(self, tag):
The aliases version of the tag
"""
aliases = self.get_aliases()
if aliases is None:
return tag
return aliases.get(tag, tag)

@abstractmethod
Expand Down Expand Up @@ -1154,7 +1164,9 @@ def read_config(self, args):

# This is all the config information in the file, including
# things for other stages
if config_file is not None:
if isinstance(config_file, dict):
overall_config = config_file
elif config_file is not None:
with open(config_file) as _config_file:
overall_config = yaml.safe_load(_config_file)
else:
Expand Down
114 changes: 114 additions & 0 deletions ceci/update_pipelines_for_ceci_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import ruamel.yaml
import os
import sys
import collections


def is_pipeline_file(info):
required = ["config", "stages", "launcher", "site", "inputs"]
return all(r in info for r in required)


def update_pipeline_file(pipeline_info, config_info):
for stage_info in pipeline_info["stages"]:
name = stage_info["name"]

stage_config = config_info.get(name, None)

if stage_config is None:
continue

aliases = stage_config.get("aliases", None)

if aliases is not None:
aliases = {k:v.strip() for k, v in aliases.items()}
stage_info["aliases"] = aliases

def update_config_file(config_info):
for stage_info in config_info.values():
r = stage_info.pop("aliases", None)


def update_pipeline_file_group(pipeline_files):
# configure yaml - these are the approximate
# settings we have mostly used in TXPipe
yaml = ruamel.yaml.YAML()
yaml.indent(sequence=4, offset=2, mapping=4)
yaml.width = 4096
yaml.allow_duplicate_keys = True

# Read all the pipeline files
pipeline_infos = []
for pipeline_file in pipeline_files:

with open(pipeline_file) as f:
yaml_str = f.read()

pipeline_info = yaml.load(yaml_str)
config_file = pipeline_info["config"]
pipeline_infos.append(pipeline_info)

# Check that all the pipeline files use the same config file
for pipeline_info in pipeline_infos:
if not pipeline_info["config"] == config_file:
raise ValueError("All pipeline files supplied to this script should use the same config file. Run the script multiple times on different files otherwise.")

# Read the config file
with open(config_file) as f:
yaml_str = f.read()
config_info = yaml.load(yaml_str)

# Update all the pipeline files.
for pipeline_info in pipeline_infos:
update_pipeline_file(pipeline_info, config_info)

# Only now can we delete the alias information
update_config_file(config_info)

# Update all the files in-place
for pipeline_file, pipeline_info in zip(pipeline_files, pipeline_infos):
with open(pipeline_file, "w") as f:
yaml.dump(pipeline_info, f)

with open(config_file, "w") as f:
yaml.dump(config_info, f)


def scan_directory_and_update(base_dir):
groups = collections.defaultdict(list)
yaml = ruamel.yaml.YAML()
for dirpath, subdirs, filenames in os.walk(base_dir):
# just process yaml files
for filename in filenames:
if not (filename.endswith(".yaml") or filename.endswith(".yml")):
continue
filepath = os.path.join(dirpath, filename)
with open(filepath) as f:
yaml_str = f.read()
info = yaml.load(yaml_str)

if is_pipeline_file(info):
config = info["config"]
groups[config].append(filepath)

for config_filename, group in groups.items():
print("Updating group:", group)
try:
with open(config_filename) as f:
yaml_str = f.read()
except FileNotFoundError:
print('# missing', config_filename)
continue
if not "alias" in yaml_str:
continue

update_pipeline_file_group(group)


def main():
if len(sys.argv) != 2:
raise ValueError("Please supply a base directory to work on")
scan_directory_and_update(sys.argv[1])

if __name__ == "__main__":
main()
8 changes: 4 additions & 4 deletions ceci_example/example_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ class WLGCCov(PipelineStage):
("source_summary_data", TextFile),
("diagnostic_maps", TextFile),
]
outputs = [("covariance", TextFile)]
outputs = [("covariance_1", TextFile)]

def rank_filename(self, rank, size):
filename = self.get_output("covariance")
filename = self.get_output("covariance_1")
if size == 1:
fname = filename
else:
Expand All @@ -173,7 +173,7 @@ def run(self):
print(f" WLGCCov rank {rank}/{size} reading from {filename}")
open(filename)

filename = self.get_output("covariance")
filename = self.get_output("covariance_1")
my_filename = self.rank_filename(rank, size)
print(f" WLGCCov rank {rank}/{size} writing to {my_filename}")
open(my_filename, "w").write(f"WLGCCov rank {rank} was here \n")
Expand All @@ -198,7 +198,7 @@ class WLGCSummaryStatistic(PipelineStage):
name = "WLGCSummaryStatistic"
inputs = [
("twopoint_data", TextFile),
("covariance", TextFile),
("covariance_2", TextFile),
("source_summary_data", TextFile),
]
outputs = [("wlgc_summary_data", TextFile)]
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ classifiers = [
dynamic = ["version"]

dependencies =[
"pyyaml > 3",
"pyyaml >= 5.1",
"psutil",
'graphlib_backport ; python_version < "3.9"',
]


Expand All @@ -47,6 +48,7 @@ packages = [

[project.scripts]
ceci = "ceci.main:main"
ceci-update-for-version-2 = "ceci.update_pipelines_for_ceci_2:main"
ceci-ancestors = "ceci.tools.ancestors:main"

[project.optional-dependencies]
Expand Down
9 changes: 0 additions & 9 deletions tests/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,3 @@ shearMeasurementPipe:
WLGCSelector:
zbin_edges: [0.2, 0.3, 0.5]
ra_range: [-5, 5]

IndiaCopy:
aliases:
my_input: "my_alias"


JulietCopy:
aliases:
my_input: "my_alias"
4 changes: 4 additions & 0 deletions tests/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ stages:
- name: WLGCSummaryStatistic
nprocess: 1
threads_per_process: 2
aliases:
covariance_2: covariance_shared
- name: SysMapMaker
nprocess: 1
- name: shearMeasurementPipe
Expand All @@ -67,6 +69,8 @@ stages:
nprocess: 1
- name: WLGCCov
nprocess: 1
aliases:
covariance_1: covariance_shared

# Definitions of where to find inputs for the overall pipeline.
# Any input required by a pipeline stage that is not generated by
Expand Down
6 changes: 3 additions & 3 deletions tests/test_interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def test_interactive_pipeline():
pipe2.build_stage(WLGCSelector, zbin_edges=[0.2, 0.3, 0.5], ra_range=[-5, 5])
pipe2.build_stage(SysMapMaker)
pipe2.build_stage(SourceSummarizer)
pipe2.build_stage(WLGCCov, aliases=dict(covariance='covariance_copy'))
pipe2.build_stage(WLGCCov, aliases=dict(covariance_1='covariance_copy'))
pipe2.build_stage(WLGCRandoms)
pipe2.build_stage(WLGCTwoPoint, name="WLGC2Point")
pipe2.build_stage(WLGCSummaryStatistic, aliases=dict(covariance='covariance_copy'))
pipe2.build_stage(WLGCSummaryStatistic, aliases=dict(covariance_2='covariance_copy'))

assert len(pipe2.WLGCCov.outputs) == 1

Expand All @@ -107,7 +107,7 @@ def test_interactive_pipeline():

path = pipe2.pipeline_files.get_path('covariance_copy')
assert pipe2.pipeline_files.get_tag(path) == 'covariance_copy'
assert pipe2.pipeline_files.get_type('covariance_copy') == pipe2.WLGCCov.get_output_type('covariance')
assert pipe2.pipeline_files.get_type('covariance_copy') == pipe2.WLGCCov.get_output_type('covariance_1')

pipe2.run()

Expand Down
30 changes: 27 additions & 3 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ceci.main import run_pipeline
from ceci.main import run_pipeline, prepare_for_pipeline
from ceci.tools.ancestors import print_ancestors
from parsl import clear
import tempfile
Expand All @@ -9,6 +9,32 @@
from ceci.pipeline import Pipeline


def test_save_load():
config_yaml="tests/test.yml"
with tempfile.TemporaryDirectory() as dirname:
out_dir = os.path.join(dirname, "output")
log_dir = os.path.join(dirname, "logs")
yml_path = os.path.join(dirname, "saved_pipeline.yml")
config = [f"output_dir={out_dir}", f"log_dir={log_dir}", "resume=False"]
pipe_config = Pipeline.build_config(config_yaml, config)

# Run the first time
with prepare_for_pipeline(pipe_config):
p = Pipeline.create(pipe_config)
p.run()

p.save(yml_path)

with open(yml_path) as f:
print(f.read())

# load from the saved path and run again
with prepare_for_pipeline(pipe_config):
q = Pipeline.read(yml_path, config)
q.run()



def run1(*config_changes, config_yaml="tests/test.yml", dry_run=False, expect_fail=False, expect_outputs=True, flow_chart=None):
try:
with tempfile.TemporaryDirectory() as dirname:
Expand Down Expand Up @@ -75,5 +101,3 @@ def test_ancestors_broken(capsys):
test_run_dry_run()
test_run_parsl()
test_run_mini()
test_run_cwl()
test_pre_script()
Loading

0 comments on commit 516603f

Please sign in to comment.