Skip to content

Commit

Permalink
Merge pull request #45 from LSSTDESC/pass_python_paths
Browse files Browse the repository at this point in the history
Pass python path to stages
  • Loading branch information
joezuntz authored Sep 16, 2020
2 parents d4da69e + 173bf2d commit 95091ca
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 10 deletions.
18 changes: 11 additions & 7 deletions ceci/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,18 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False):
# parsl execution/launcher configuration information
launcher_config = pipe_config.get("launcher", {"name": "mini"})
launcher_name = launcher_config["name"]

# Launchers may need to know if this is a dry-run
launcher_config["dry_run"] = dry_run

# Later we will add these paths to sys.path for running here,
# but we will also need to pass them to the sites below so that
# they can be added within any containers or other launchers
# that we use
paths = pipe_config.get("python_paths", [])
if isinstance(paths, str):
paths = paths.split()

# Python modules in which to search for pipeline stages
modules = pipe_config["modules"].split()

Expand All @@ -83,6 +92,8 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False):
default_site = get_default_site()

site_config = pipe_config.get("site", {"name": "local"})
# Pass the paths along to the site
site_config["python_paths"] = paths
load(launcher_config, [site_config])

# Inputs and outputs
Expand Down Expand Up @@ -122,13 +133,6 @@ def run(pipeline_config_filename, extra_config=None, dry_run=False):
else:
raise ValueError("Unknown pipeline launcher {launcher_name}")




paths = pipe_config.get("python_paths", [])
if isinstance(paths, str):
paths = paths.split()

# temporarily add the paths to sys.path,
# but remove them at the end
with extra_paths(paths):
Expand Down
24 changes: 23 additions & 1 deletion ceci/sites/cori.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def command(self, cmd, sec):
mpi1 = f"{self.mpi_command} {sec.nprocess} --cpus-per-task={sec.threads_per_process}"
mpi2 = f"--mpi" if sec.nprocess > 1 else ""
volume_flag = f"-V {sec.volume} " if sec.volume else ""
paths = self.config.get("python_paths", [])

if sec.nodes:
mpi1 += f" --nodes {sec.nodes}"
Expand All @@ -46,17 +47,38 @@ def command(self, cmd, sec):
)

if sec.image:
# If we are setting python paths then we have to modify the executable
# here. This is because we need the path to be available right from the
# start, in case the stage is defined in a module added by these paths.
# The --env flags in docker/shifter overwrites an env var, and there
# doesn't seem to be a way to just append to one, so we have to be a bit
# roundabout to make this work, and invoke bash -c instead.
paths_start = (
("bash -c 'PYTHONPATH=$PYTHONPATH:" + (":".join(paths)))
if paths
else ""
)
paths_end = "'" if paths else ""
return (
f"{mpi1} "
f"shifter "
f"--env OMP_NUM_THREADS={sec.threads_per_process} "
f"{volume_flag} "
f"--image {sec.image} "
f"{paths_start} "
f"{cmd} {mpi2} "
f"{paths_end} "
)
else:
paths_env = (
("PYTHONPATH=" + (":".join(paths)) + ":$PYTHONPATH") if paths else ""
)
return (
f"OMP_NUM_THREADS={sec.threads_per_process} " f"{mpi1} " f"{cmd} {mpi2}"
# In the non-container case this is much easier
f"OMP_NUM_THREADS={sec.threads_per_process} "
f"{paths_env} "
f"{mpi1} "
f"{cmd} {mpi2}"
)

def configure_for_mini(self):
Expand Down
22 changes: 21 additions & 1 deletion ceci/sites/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,40 @@ def command(self, cmd, sec):
mpi1 = f"{self.mpi_command} {sec.nprocess}" if sec.nprocess > 1 else ""
mpi2 = f"--mpi" if sec.nprocess > 1 else ""
volume_flag = f"-v {sec.volume} " if sec.volume else ""
paths = self.config.get("python_paths", [])

# TODO: allow other container types here, like singularity
if sec.image:
# If we are setting python paths then we have to modify the executable
# here. This is because we need the path to be available right from the
# start, in case the stage is defined in a module added by these paths.
# The --env flags in docker/shifter overwrites an env var, and there
# doesn't seem to be a way to just append to one, so we have to be a bit
# roundabout to make this work, and invoke bash -c instead.
paths_start = (
"bash -c 'PYTHONPATH=$PYTHONPATH:" + (":".join(paths)) if paths else ""
)
paths_end = "'" if paths else ""
return (
f"docker run "
f"--env OMP_NUM_THREADS={sec.threads_per_process} "
f"{volume_flag} "
f"--rm -it {sec.image} "
f"{paths_start} "
f"{mpi1} "
f"{cmd} {mpi2} "
f"{paths_end}"
)
else:
# In the non-container case this is much easier
paths_env = (
"PYTHONPATH=" + (":".join(paths)) + ":$PYTHONPATH" if paths else ""
)
return (
f"OMP_NUM_THREADS={sec.threads_per_process} " f"{mpi1} " f"{cmd} {mpi2}"
f"OMP_NUM_THREADS={sec.threads_per_process} "
f"{paths_env} "
f"{mpi1} "
f"{cmd} {mpi2}"
)

def configure_for_parsl(self):
Expand Down
30 changes: 29 additions & 1 deletion ceci/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
from contextlib import contextmanager
import sys
import os

def add_python_path(path, start):
# add a path to the env var PYTHONPATH
old = os.environ.get("PYTHONPATH", "")
if start:
new = path + ":" + old
else:
new = old + ":" + path
os.environ['PYTHONPATH'] = new

def remove_python_path(path, start):
# remove a path from PYTHONPATH
p = os.environ.get("PYTHONPATH", "").split(":")
if start:
p.remove(path)
else:
remove_last(p, path)
os.environ["PYTHONPATH"] = ":".join(p)




@contextmanager
def extra_paths(paths, start=True):
Expand All @@ -8,7 +30,9 @@ def extra_paths(paths, start=True):
if isinstance(paths, str):
paths = paths.split()

# On enter, add paths to sys.path,
# On enter, add paths to both sys.path,
# and the PYTHONPATH env var, so that subprocesses
# can see it,
# either the start or the end depending
# on the start argument
for path in paths:
Expand All @@ -17,6 +41,8 @@ def extra_paths(paths, start=True):
else:
sys.path.append(path)

add_python_path(path, start)

# Return control to caller
try:
yield
Expand All @@ -28,6 +54,8 @@ def extra_paths(paths, start=True):
sys.path.remove(path)
else:
remove_last(sys.path, path)
# also remove env var entry
remove_python_path(path, start)
# If e.g. user has already done this
# manually for some reason then just
# skip
Expand Down
64 changes: 64 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from ceci import PipelineStage, MiniPipeline, ParslPipeline, Pipeline, DryRunPipeline
from ceci_example.types import TextFile
from ceci.sites import load, reset_default_site
from ceci.utils import extra_paths
import pytest
from parsl import clear
import yaml
import os
import tempfile
import sys

# This one should work
class AAA(PipelineStage):
Expand Down Expand Up @@ -145,6 +148,67 @@ def test_dry_run():
assert status == 0


def test_python_paths():
# make a temp dir
with tempfile.TemporaryDirectory() as dirname:
os.mkdir(dirname + "/pretend")

# create a subdir of that with a module in
mod_dir = dirname + "/pretend"
mod_path = mod_dir + "/pretend_module.py"

# empty module, just to check it imports
open(mod_path, 'w').close()

assert os.path.exists(mod_path)
print(os.listdir(mod_dir))

# create a stage there that uses the submodule
stage_path = dirname + "/my_stage.py"
open(stage_path, 'w').write("""
import ceci
class MyStage(ceci.PipelineStage):
name = "MyStage"
inputs = []
outputs = []
config_options = {"x": int}
def run(self):
import pretend_module
assert self.config["x"] == 17
""")

# pipeline admin
config_path = dirname + "/config.yml"
open(config_path, 'w').write("""
MyStage:
x: 17
""")

run_config = {
"log_dir": dirname,
"output_dir": dirname,
"resume": False,
"python_paths": [dirname, mod_dir],
}


launcher_config = {"interval": 0.5, "name": "mini"}
site_config = {"name":"local", "python_paths":[dirname, mod_dir]}
load(launcher_config, [site_config])

# note that we don't add the subdir here
with extra_paths(dirname):
import my_stage
print(os.environ["PYTHONPATH"])
print(sys.path)
print(os.listdir(dirname))
pipeline = MiniPipeline([{"name": "MyStage"}], launcher_config)
status = pipeline.run({}, run_config, config_path)
log = open(dirname + "/MyStage.out").read()
print(log)
assert status == 0


# this has to be here because we test running the pipeline
if __name__ == "__main__":
PipelineStage.main()
5 changes: 5 additions & 0 deletions tests/test_python_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ceci.main import run
from ceci.utils import remove_last, extra_paths
import pytest
import os

def test_remove_item():
l = list('abcdea')
Expand All @@ -21,15 +22,19 @@ class MyError(Exception):
def test_extra_paths():
p = 'xxx111yyy222'
orig_path = sys.path[:]
orig_env = os.environ.get("PYTHONPATH", "")

# check path is put in
with extra_paths(p):
assert sys.path[0] == p
assert p in os.environ['PYTHONPATH']

# check everything back to normal
# after with statement
assert p not in sys.path
assert sys.path == orig_path
assert p not in os.environ['PYTHONPATH']
assert os.environ['PYTHONPATH'] == orig_env

# check that an exception does not interfere
# with this
Expand Down

0 comments on commit 95091ca

Please sign in to comment.