Skip to content

Commit

Permalink
run black to format source code
Browse files Browse the repository at this point in the history
  • Loading branch information
joezuntz committed Feb 24, 2023
1 parent 0ff34bc commit 85b07c0
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
8 changes: 7 additions & 1 deletion ceci/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""Ceci n'est pas une pipeline"""

from .stage import PipelineStage
from .pipeline import Pipeline, MiniPipeline, ParslPipeline, DryRunPipeline, FlowChartPipeline
from .pipeline import (
Pipeline,
MiniPipeline,
ParslPipeline,
DryRunPipeline,
FlowChartPipeline,
)
from .main import prepare_for_pipeline, run_pipeline
from pkg_resources import DistributionNotFound
from pkg_resources import get_distribution
Expand Down
10 changes: 8 additions & 2 deletions ceci/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def cast_to_streamable(value):
class StageParameter:
"""A small class to manage a single parameter with basic type checking"""

def __init__(self, dtype=None, default=None, fmt="%s", required=None, msg="A parameter"):
def __init__(
self, dtype=None, default=None, fmt="%s", required=None, msg="A parameter"
):
"""Build from keywords
Parameters
Expand Down Expand Up @@ -109,7 +111,11 @@ def required(self):
def copy(self):
"""Return a copy of self"""
return StageParameter(
dtype=self._dtype, default=self._default, fmt=self._format, required=self._required, msg=self._help
dtype=self._dtype,
default=self._default,
fmt=self._format,
required=self._required,
msg=self._help,
)

def set(self, value):
Expand Down
1 change: 0 additions & 1 deletion ceci/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
)



def run_pipeline(pipe_config):
"""Run a pipeline as defined by a particular configuration
Expand Down
41 changes: 26 additions & 15 deletions ceci/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def build_stage_object(self, args):
The newly constructed object
"""
if self.stage_class is None: # pragma: no cover
self.stage_class = PipelineStage.get_stage(self.class_name, self.module_name)
self.stage_class = PipelineStage.get_stage(
self.class_name, self.module_name
)
self.stage_obj = self.stage_class(args)
return self.stage_obj

Expand Down Expand Up @@ -391,7 +393,9 @@ def interactive():
return MiniPipeline([], launcher_config)

@staticmethod
def build_config(pipeline_config_filename, extra_config=None, dry_run=False, flow_chart=None):
def build_config(
pipeline_config_filename, extra_config=None, dry_run=False, flow_chart=None
):
"""Build a configuration dictionary from a yaml file and extra optional parameters
Parameters
Expand Down Expand Up @@ -890,7 +894,10 @@ def save(self, pipefile, stagefile=None, reduce_config=False):
if site is None:
site = val.site.config
pipe_stage_info = dict(
name=val.name, classname=val.class_name, nprocess=val.nprocess, module_name=val.module_name,
name=val.name,
classname=val.class_name,
nprocess=val.nprocess,
module_name=val.module_name,
)
if val.threads_per_process != 1:
pipe_stage_info["threads_per_process"] = val.threads_per_process
Expand Down Expand Up @@ -939,33 +946,34 @@ def make_flow_chart(self, filename):

# Add overall pipeline inputs
for inp in self.overall_inputs.keys():
graph.add_node(inp, shape='box', color='gold', style='filled')
graph.add_node(inp, shape="box", color="gold", style="filled")
seen.add(inp)

for stage in self.stages:
# add the stage itself
graph.add_node(stage.instance_name, shape='ellipse', color='orangered', style='filled')
graph.add_node(
stage.instance_name, shape="ellipse", color="orangered", style="filled"
)
# connect that stage to its inputs
for inp, _ in stage.inputs:
inp = stage.get_aliased_tag(inp)
if inp not in seen: # pragma: no cover
graph.add_node(inp, shape='box', color='skyblue', style='filled')
if inp not in seen: # pragma: no cover
graph.add_node(inp, shape="box", color="skyblue", style="filled")
seen.add(inp)
graph.add_edge(inp, stage.instance_name, color='black')
graph.add_edge(inp, stage.instance_name, color="black")
# and to its outputs
for out, _ in stage.outputs:
out = stage.get_aliased_tag(out)
if out not in seen:
graph.add_node(out, shape='box', color='skyblue', style='filled')
graph.add_node(out, shape="box", color="skyblue", style="filled")
seen.add(out)
graph.add_edge(stage.instance_name, out, color='black')
graph.add_edge(stage.instance_name, out, color="black")

# finally, output the stage to file
if filename.endswith('.dot'):
if filename.endswith(".dot"):
graph.write(filename)
else:
graph.draw(filename, prog='dot')

graph.draw(filename, prog="dot")


class DryRunPipeline(Pipeline):
Expand Down Expand Up @@ -1007,6 +1015,7 @@ def run_jobs(self):
def find_all_outputs(self):
return {}


class FlowChartPipeline(DryRunPipeline):
def run_jobs(self):
filename = self.run_config["flow_chart"]
Expand Down Expand Up @@ -1242,7 +1251,10 @@ def build_dag(self, jobs):
# if that stage is supplied by another pipeline stage
if potential_parent.instance_name not in jobs: # pragma: no cover
continue
potential_parent_tags = [potential_parent.get_aliased_tag(tag_) for tag_ in potential_parent.output_tags()]
potential_parent_tags = [
potential_parent.get_aliased_tag(tag_)
for tag_ in potential_parent.output_tags()
]
if aliased_tag in potential_parent_tags:
depend[job].append(jobs[potential_parent.instance_name])
return depend
Expand Down Expand Up @@ -1492,7 +1504,6 @@ def run_jobs(self):
with open(f"{cwl_dir}/pipeline.cwl", "w") as f:
yaml.dump(workflow.save(), f)


# If 'launcher' is defined, use that
launcher = self.launcher_config.get(
"launch",
Expand Down
18 changes: 9 additions & 9 deletions ceci/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ def main(cls):
if stage_name in ["--help", "-h"] and len(sys.argv) == 2: # pragma: no cover
cls.usage()
return 1
if stage_name.find('.') >= 0:
tokens = stage_name.split('.')
module_name = '.'.join(tokens[:-1])
if stage_name.find(".") >= 0:
tokens = stage_name.split(".")
module_name = ".".join(tokens[:-1])
stage_name = tokens[-1]
else:
module_name = None
Expand Down Expand Up @@ -879,8 +879,6 @@ def map_tasks_by_rank(self, function, inputs, allgather=False):

return results



def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True):
"""Split a number of rows by process.
Expand Down Expand Up @@ -1176,8 +1174,8 @@ def find_outputs(self, outdir):
ret_dict = {}
for tag, ftype in self.outputs_():
aliased_tag = self.get_aliased_tag(tag)
if not aliased_tag in self._outputs.keys(): # pragma: no cover
self._outputs[aliased_tag]=ftype.make_name(aliased_tag)
if not aliased_tag in self._outputs.keys(): # pragma: no cover
self._outputs[aliased_tag] = ftype.make_name(aliased_tag)
ret_dict[aliased_tag] = f"{outdir}/{self._outputs[aliased_tag]}"
return ret_dict

Expand Down Expand Up @@ -1322,7 +1320,7 @@ def generate_command(
# Namescape module, use 'ceci' to the get main
# and specify the full path
flags = [f"{cls.get_module()}.{cls.name}"]
module = 'ceci'
module = "ceci"

aliases = aliases or {}

Expand Down Expand Up @@ -1369,7 +1367,9 @@ def generate_cwl(cls, log_dir=None):
module = module.split(".")[0]

# Basic definition of the tool
cwl_tool = cwlgen.CommandLineTool([], [],
cwl_tool = cwlgen.CommandLineTool(
[],
[],
id=cls.name,
label=cls.name,
baseCommand="python3",
Expand Down

0 comments on commit 85b07c0

Please sign in to comment.