From 85b07c08e7b0d4279611c0540547802b59ab5351 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Fri, 24 Feb 2023 12:21:55 +0000 Subject: [PATCH] run black to format source code --- ceci/__init__.py | 8 +++++++- ceci/config.py | 10 ++++++++-- ceci/main.py | 1 - ceci/pipeline.py | 41 ++++++++++++++++++++++++++--------------- ceci/stage.py | 18 +++++++++--------- 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/ceci/__init__.py b/ceci/__init__.py index 1084c42..ab9c6bf 100644 --- a/ceci/__init__.py +++ b/ceci/__init__.py @@ -1,7 +1,13 @@ """Ceci n'est pas une pipeline""" from .stage import PipelineStage -from .pipeline import Pipeline, MiniPipeline, ParslPipeline, DryRunPipeline, FlowChartPipeline +from .pipeline import ( + Pipeline, + MiniPipeline, + ParslPipeline, + DryRunPipeline, + FlowChartPipeline, +) from .main import prepare_for_pipeline, run_pipeline from pkg_resources import DistributionNotFound from pkg_resources import get_distribution diff --git a/ceci/config.py b/ceci/config.py index 1f14865..00385c9 100644 --- a/ceci/config.py +++ b/ceci/config.py @@ -60,7 +60,9 @@ def cast_to_streamable(value): class StageParameter: """A small class to manage a single parameter with basic type checking""" - def __init__(self, dtype=None, default=None, fmt="%s", required=None, msg="A parameter"): + def __init__( + self, dtype=None, default=None, fmt="%s", required=None, msg="A parameter" + ): """Build from keywords Parameters @@ -109,7 +111,11 @@ def required(self): def copy(self): """Return a copy of self""" return StageParameter( - dtype=self._dtype, default=self._default, fmt=self._format, required=self._required, msg=self._help + dtype=self._dtype, + default=self._default, + fmt=self._format, + required=self._required, + msg=self._help, ) def set(self, value): diff --git a/ceci/main.py b/ceci/main.py index b55b371..b299b00 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -36,7 +36,6 @@ ) - def run_pipeline(pipe_config): """Run a pipeline as defined by a particular configuration diff --git a/ceci/pipeline.py b/ceci/pipeline.py index 10d54b2..1551034 100644 --- a/ceci/pipeline.py +++ b/ceci/pipeline.py @@ -177,7 +177,9 @@ def build_stage_object(self, args): The newly constructed object """ if self.stage_class is None: # pragma: no cover - self.stage_class = PipelineStage.get_stage(self.class_name, self.module_name) + self.stage_class = PipelineStage.get_stage( + self.class_name, self.module_name + ) self.stage_obj = self.stage_class(args) return self.stage_obj @@ -391,7 +393,9 @@ def interactive(): return MiniPipeline([], launcher_config) @staticmethod - def build_config(pipeline_config_filename, extra_config=None, dry_run=False, flow_chart=None): + def build_config( + pipeline_config_filename, extra_config=None, dry_run=False, flow_chart=None + ): """Build a configuration dictionary from a yaml file and extra optional parameters Parameters @@ -890,7 +894,10 @@ def save(self, pipefile, stagefile=None, reduce_config=False): if site is None: site = val.site.config pipe_stage_info = dict( - name=val.name, classname=val.class_name, nprocess=val.nprocess, module_name=val.module_name, + name=val.name, + classname=val.class_name, + nprocess=val.nprocess, + module_name=val.module_name, ) if val.threads_per_process != 1: pipe_stage_info["threads_per_process"] = val.threads_per_process @@ -939,33 +946,34 @@ def make_flow_chart(self, filename): # Add overall pipeline inputs for inp in self.overall_inputs.keys(): - graph.add_node(inp, shape='box', color='gold', style='filled') + graph.add_node(inp, shape="box", color="gold", style="filled") seen.add(inp) for stage in self.stages: # add the stage itself - graph.add_node(stage.instance_name, shape='ellipse', color='orangered', style='filled') + graph.add_node( + stage.instance_name, shape="ellipse", color="orangered", style="filled" + ) # connect that stage to its inputs for inp, _ in stage.inputs: inp = stage.get_aliased_tag(inp) - if inp not in seen: # pragma: no cover - graph.add_node(inp, shape='box', color='skyblue', style='filled') + if inp not in seen: # pragma: no cover + graph.add_node(inp, shape="box", color="skyblue", style="filled") seen.add(inp) - graph.add_edge(inp, stage.instance_name, color='black') + graph.add_edge(inp, stage.instance_name, color="black") # and to its outputs for out, _ in stage.outputs: out = stage.get_aliased_tag(out) if out not in seen: - graph.add_node(out, shape='box', color='skyblue', style='filled') + graph.add_node(out, shape="box", color="skyblue", style="filled") seen.add(out) - graph.add_edge(stage.instance_name, out, color='black') + graph.add_edge(stage.instance_name, out, color="black") # finally, output the stage to file - if filename.endswith('.dot'): + if filename.endswith(".dot"): graph.write(filename) else: - graph.draw(filename, prog='dot') - + graph.draw(filename, prog="dot") class DryRunPipeline(Pipeline): @@ -1007,6 +1015,7 @@ def run_jobs(self): def find_all_outputs(self): return {} + class FlowChartPipeline(DryRunPipeline): def run_jobs(self): filename = self.run_config["flow_chart"] @@ -1242,7 +1251,10 @@ def build_dag(self, jobs): # if that stage is supplied by another pipeline stage if potential_parent.instance_name not in jobs: # pragma: no cover continue - potential_parent_tags = [potential_parent.get_aliased_tag(tag_) for tag_ in potential_parent.output_tags()] + potential_parent_tags = [ + potential_parent.get_aliased_tag(tag_) + for tag_ in potential_parent.output_tags() + ] if aliased_tag in potential_parent_tags: depend[job].append(jobs[potential_parent.instance_name]) return depend @@ -1492,7 +1504,6 @@ def run_jobs(self): with open(f"{cwl_dir}/pipeline.cwl", "w") as f: yaml.dump(workflow.save(), f) - # If 'launcher' is defined, use that launcher = self.launcher_config.get( "launch", diff --git a/ceci/stage.py b/ceci/stage.py index 1e5a264..61b3a31 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -457,9 +457,9 @@ def main(cls): if stage_name in ["--help", "-h"] and len(sys.argv) == 2: # pragma: no cover cls.usage() return 1 - if stage_name.find('.') >= 0: - tokens = stage_name.split('.') - module_name = '.'.join(tokens[:-1]) + if stage_name.find(".") >= 0: + tokens = stage_name.split(".") + module_name = ".".join(tokens[:-1]) stage_name = tokens[-1] else: module_name = None @@ -879,8 +879,6 @@ def map_tasks_by_rank(self, function, inputs, allgather=False): return results - - def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True): """Split a number of rows by process. @@ -1176,8 +1174,8 @@ def find_outputs(self, outdir): ret_dict = {} for tag, ftype in self.outputs_(): aliased_tag = self.get_aliased_tag(tag) - if not aliased_tag in self._outputs.keys(): # pragma: no cover - self._outputs[aliased_tag]=ftype.make_name(aliased_tag) + if not aliased_tag in self._outputs.keys(): # pragma: no cover + self._outputs[aliased_tag] = ftype.make_name(aliased_tag) ret_dict[aliased_tag] = f"{outdir}/{self._outputs[aliased_tag]}" return ret_dict @@ -1322,7 +1320,7 @@ def generate_command( # Namescape module, use 'ceci' to the get main # and specify the full path flags = [f"{cls.get_module()}.{cls.name}"] - module = 'ceci' + module = "ceci" aliases = aliases or {} @@ -1369,7 +1367,9 @@ def generate_cwl(cls, log_dir=None): module = module.split(".")[0] # Basic definition of the tool - cwl_tool = cwlgen.CommandLineTool([], [], + cwl_tool = cwlgen.CommandLineTool( + [], + [], id=cls.name, label=cls.name, baseCommand="python3",