Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use data registry to access paths and save pipeline runs #93

Merged
merged 20 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,100 @@ name: Continuous Integration

on:
push:
branches: [ master ]
branches: [ master, data-registry ]
pull_request:
branches: [ master ]

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9, "3.10", "3.11", "3.12"]

steps:
- name: Checkout repository
uses: actions/checkout@v2
with:
submodules: true

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Install
run: |
sudo apt-get update && sudo apt-get -y install libopenmpi-dev openmpi-bin graphviz graphviz-dev
pip install .[all]

- name: Tests
run: |
pytest --cov=ceci


- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1

test_data_registry:

runs-on: ubuntu-latest

env:
DATAREG_CONFIG: "${{ github.workspace }}/config.txt"

# Service containers to run with `runner-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: desc_data_registry
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

strategy:
matrix:
python-version: [3.9]

steps:
- name: Checkout repository
uses: actions/checkout@v2
with:
submodules: true

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Checkout dataregistry repository
uses: actions/checkout@v2
with:
repository: LSSTDESC/dataregistry
path: './dataregistry'

- name: Install
run: |
sudo apt-get update && sudo apt-get -y install libopenmpi-dev openmpi-bin graphviz graphviz-dev
pip install .[all]

- name: Set up dataregistry
run: |
echo "sqlalchemy.url : postgresql://postgres:postgres@localhost:5432/desc_data_registry" > $DATAREG_CONFIG
cd dataregistry
python3 -m pip install .
python3 scripts/create_registry_db.py --config $DATAREG_CONFIG
cd ../tests
python3 create_registry_entries.py
cd ..
ceci tests/test_dataregistry.yml
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ nb/Untitled.ipynb
out/
test/
dask-worker-space/
ceci.code-workspace
dask-scratch-space
inprogress*
ceci/_version.py
114 changes: 111 additions & 3 deletions ceci/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import yaml
import shutil
from abc import abstractmethod
import warnings

from .stage import PipelineStage
from . import minirunner
Expand Down Expand Up @@ -318,8 +319,9 @@
Space seperated path of modules loaded for this pipeline
"""
self.launcher_config = launcher_config
self.data_registry = None

self.overall_inputs = kwargs.get("overall_inputs", {}).copy()
self.overall_inputs = {}
self.modules = kwargs.get("modules", "")

# These are populated as we add stages below
Expand Down Expand Up @@ -364,6 +366,7 @@
"log_dir": pipe_config.get("log_dir", "."),
"resume": pipe_config.get("resume", False),
"flow_chart": pipe_config.get("flow_chart", ""),
"registry": pipe_config.get("registry", None),
}

launcher_dict = dict(cwl=CWLPipeline, parsl=ParslPipeline, mini=MiniPipeline)
Expand Down Expand Up @@ -451,6 +454,107 @@
stream.write(f"{stage.instance_name:20}: {str(stage)}")
stream.write("\n")

def setup_data_registry(self, registry_config): #pragma: no cover
"""
Set up the data registry.

# TODO: interactive version

Parameters
----------
registry_config : dict
A dictionary with information about the data registry to use
"""
from dataregistry import DataRegistry

# Establish a connection to the data registry. If the config_file is
# None the dataregistry will assume the users config file is in the
# default location (~/.config_reg_access).
registry = DataRegistry(config_file=registry_config.get("config", None),
owner_type=registry_config.get("owner_type", "user"),
owner=registry_config.get("owner", None),
root_dir=registry_config.get("root_dir", None))

#if not os.environ.get("NERSC_HOST"):
# warnings.warn("The Data Registry is only available on NERSC: not setting it up now.")
# return None

# Save the things that may be useful.
return {
"registry": registry,
"config": registry_config,
}


def data_registry_lookup(self, info): #pragma: no cover
"""
Look up a dataset in the data registry

Parameters
----------
info : dict
A dictionary with information about the dataset to look up. Must contain
either an id, and alias, or a name
"""
if self.data_registry is None:
raise ValueError("No data registry configured")

registry = self.data_registry["registry"]

# We have various ways of looking up a dataset
# 1. By the `dataset_id`
# 2. By the dataset `name`
# 3. By a dataset alias `name`
if "id" in info:
return registry.Query.get_dataset_absolute_path(info["id"])
elif "name" in info:
filter = registry.Query.gen_filter("dataset.name", "==", info["name"])
elif "alias" in info:
raise NotImplementedError("Alias lookup not yet implemented")
else:
raise ValueError("Must specify either id or name in registry lookup")

# Main finder method
results = registry.Query.find_datasets(["dataset.dataset_id"], [filter])

# Check that we find exactly one dataset matching the query
results = list(results)
if len(results) == 0:
raise ValueError(f"Could not find any dataset matching {info} in registry")
elif len(results) > 1:
raise ValueError(f"Found multiple datasets matching {info} in registry")

# Get the absolute path
return registry.Query.get_dataset_absolute_path(results[0].dataset_id)


def process_overall_inputs(self, inputs):
"""
Find the correct paths for the overall inputs to the pipeline.

Paths may be explicit strings, or may be looked up in the data registry.

Parameters
----------
inputs : dict
A dictionary of inputs to the pipeline
"""
paths = {}
for tag, value in inputs.items():
# Case 1, explicit lookup (the original version)
if isinstance(value, str):
paths[tag] = value
# Case 2, dictionary with lookup method
elif isinstance(value, dict): #pragma: no cover
# This means that we will look up a path
# using the data registry
paths[tag] = self.data_registry_lookup(value)
elif value is None:
paths[tag] = None

Check warning on line 553 in ceci/pipeline.py

View check run for this annotation

Codecov / codecov/patch

ceci/pipeline.py#L552-L553

Added lines #L552 - L553 were not covered by tests
else:
raise ValueError(f"Unknown input type {type(value)}")

Check warning on line 555 in ceci/pipeline.py

View check run for this annotation

Codecov / codecov/patch

ceci/pipeline.py#L555

Added line #L555 was not covered by tests
return paths

@classmethod
def read(cls, pipeline_config_filename, extra_config=None, dry_run=False):
"""Create a pipeline for a configuration dictionary from a yaml file and extra optional parameters
Expand Down Expand Up @@ -779,9 +883,13 @@
self.run_config : copy of configuration parameters on how to run the pipeline
"""

# Make a copy, since we maybe be modifying these
self.overall_inputs = overall_inputs.copy()
# Set up paths to our overall input files
if run_config.get("registry") is not None:
self.data_registry = self.setup_data_registry(run_config["registry"])

Check warning on line 888 in ceci/pipeline.py

View check run for this annotation

Codecov / codecov/patch

ceci/pipeline.py#L888

Added line #L888 was not covered by tests
self.overall_inputs = self.process_overall_inputs(overall_inputs)
self.pipeline_files.insert_paths(self.overall_inputs)

# Make a copy, since we maybe be modifying these
self.run_config = run_config.copy()

self.stages_config = stages_config
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ test = [
"dask[distributed]",
]

test_dataregistry = [
"pyyaml > 3",
"psutil",
"dataregistry @ git+https://github.com/LSSTDESC/dataregistry",
]

all = [
"parsl >= 1.0.0",
"flask",
Expand Down
28 changes: 28 additions & 0 deletions tests/create_registry_entries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import sys

from dataregistry import DataRegistry

_TEST_ROOT_DIR = "DataRegistry_data"

# Make root dir
if not os.path.isdir(_TEST_ROOT_DIR):
os.makedirs(_TEST_ROOT_DIR)

# Establish connection to database
datareg = DataRegistry(root_dir=_TEST_ROOT_DIR)

# Add new entry.
datareg.Registrar.register_dataset(
"dm.txt",
"0.0.1",
verbose=True,
old_location="inputs/dm.txt"
)

datareg.Registrar.register_dataset(
"fiducial_cosmology.txt",
"0.0.1",
verbose=True,
old_location="inputs/fiducial_cosmology.txt"
)
Loading
Loading