Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FE-350 update manifest.py to remove managed access project ids from final partition set #341

Merged
2 changes: 1 addition & 1 deletion .github/workflows/generate-requirements-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.1.9
version: 1.8.0
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.1.9
version: 1.8.0
- name: Cache dependencies
uses: actions/cache@v2
env:
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ENV LANG='en_US.UTF-8' \
SBT_VERSION=1.7.1

# Install some helpful tools not included in the base image, as well as set up for JDK install
# python-is-python3 makes python3 the default, to avoid issues with poetry
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive \
&& apt-get install -yqq --no-install-recommends \
Expand All @@ -20,6 +21,7 @@ RUN apt-get update \
git \
gnupg \
locales \
python-is-python3 \
sudo \
tzdata \
unzip \
Expand Down
25 changes: 17 additions & 8 deletions orchestration/hca_manage/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"dev": {
"EBI": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev",
"UCSC": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev",
"TEST": "gs://broad-dsp-monster-hca-prod-ebi-storage/broad_test_dataset"
}
}
ENV_PIPELINE_ENDINGS = {
Expand Down Expand Up @@ -92,21 +91,22 @@ def _sanitize_gs_path(path: str) -> str:


def _parse_csv(csv_path: str, env: str, project_id_only: bool = False,
include_release_tag: bool = False, release_tag: str = "") -> list[list[str]]:
include_release_tag: bool = False, release_tag: str = "") -> tuple[list[list[str]], list[list[str]]]:
keys = set()
public_projects = set()
with open(csv_path, "r") as f:
reader = csv.reader(f)
for row in reader:
if not row:
logging.debug("Empty path detected, skipping")
continue

assert len(row) == 2
assert len(row) == 3, "CSV must have 3 columns: institution, project_id, and Yes or No for Public"
row = [x.strip() for x in row]
institution = row[0].upper()
project_id = find_project_id_in_str(row[1])
public = row[2].lower()

key = None
if project_id_only:
project_id = row[1]
key = project_id
Expand All @@ -126,18 +126,26 @@ def _parse_csv(csv_path: str, env: str, project_id_only: bool = False,
key = key + f",{release_tag}"
keys.add(key)

# make a separate set of public projects
# The question is "Does this project contain managed access data?"
# so no = public, yes = managed access (ma)
if public == "no":
public_projects.add(key)

chunked_paths = chunked(keys, MAX_STAGING_AREAS_PER_PARTITION_SET)
return [chunk for chunk in chunked_paths]
chunked_paths_no_ma = chunked(public_projects, MAX_STAGING_AREAS_PER_PARTITION_SET)
return [chunk for chunk in chunked_paths], [chunk for chunk in chunked_paths_no_ma]


def parse_and_load_manifest(env: str, csv_path: str, release_tag: str,
pipeline_name: str, project_id_only: bool = False,
include_release_tag: bool = False) -> None:
include_release_tag: bool = False, no_ma: bool = False) -> None:
chunked_paths = _parse_csv(csv_path, env, project_id_only, include_release_tag, release_tag)
paths_to_use = chunked_paths[1] if no_ma else chunked_paths[0]
storage_client = Client()
bucket: Bucket = storage_client.bucket(bucket_name=ETL_PARTITION_BUCKETS[env])

for pos, chunk in enumerate(chunked_paths):
for pos, chunk in enumerate(paths_to_use):
assert len(chunk), "At least one import path is required"
qualifier = chr(pos + 97) # dcp11_a, dcp11_b, etc.
blob_name = f"{pipeline_name}/{release_tag}_{qualifier}_manifest.csv"
Expand Down Expand Up @@ -196,7 +204,8 @@ def load(args: argparse.Namespace) -> None:
args.release_tag,
f"make_snapshot_public_job_{ENV_PIPELINE_ENDINGS[args.env]}",
project_id_only=True,
include_release_tag=True
include_release_tag=True,
no_ma=True
)
_reload_repository(_get_dagster_client())

Expand Down
Loading
Loading