diff --git a/.gitignore b/.gitignore index 5bf1ebaa1..7cea36e68 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,6 @@ dmypy.json # VS Code .vscode + +# project specific files +/exports/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 677af01a1..9bf49de07 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,4 +29,4 @@ repos: pass_filenames: false entry: mypy args: ['--config-file=setup.cfg'] - additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests'] + additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests', 'types-python-slugify'] diff --git a/cli/tests/conftest.py b/cli/tests/conftest.py new file mode 100644 index 000000000..254115b38 --- /dev/null +++ b/cli/tests/conftest.py @@ -0,0 +1,32 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CLI testing fixtures.""" +import pathlib + +import pytest +from core.omop import OmopExtract + + +@pytest.fixture() +def omop_files(tmp_path_factory: pytest.TempPathFactory) -> OmopExtract: + """Create an OmopExtract instance using a temporary directory""" + export_dir = tmp_path_factory.mktemp("repo_base") + return OmopExtract(export_dir) + + +@pytest.fixture() +def resources() -> pathlib.Path: + """Test resources directory path.""" + return pathlib.Path(__file__).parent / "resources" diff --git a/cli/tests/resources/omop/public/CARE_SITE.parquet b/cli/tests/resources/omop/public/CARE_SITE.parquet new file mode 100644 index 000000000..18da482a3 Binary files /dev/null and b/cli/tests/resources/omop/public/CARE_SITE.parquet differ diff --git a/cli/tests/resources/omop/public/CDM_SOURCE.parquet b/cli/tests/resources/omop/public/CDM_SOURCE.parquet new file mode 100644 index 000000000..c8f6c1702 Binary files /dev/null and b/cli/tests/resources/omop/public/CDM_SOURCE.parquet differ diff --git a/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet b/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet new file mode 100644 index 000000000..c41ba6b4e Binary files /dev/null and b/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet differ diff --git a/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet b/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet new file mode 100644 index 000000000..744754fe9 Binary files /dev/null and b/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet differ diff --git a/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet b/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet new file mode 100644 index 000000000..4ed30b556 Binary files /dev/null and b/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet differ diff --git a/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet b/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet new file mode 100644 index 000000000..93b22f7b7 Binary files /dev/null and b/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet differ diff --git a/cli/tests/resources/omop/public/LOCATION.parquet b/cli/tests/resources/omop/public/LOCATION.parquet new file mode 100644 index 000000000..49f8f3064 Binary files /dev/null and b/cli/tests/resources/omop/public/LOCATION.parquet differ diff --git a/cli/tests/resources/omop/public/MEASUREMENT.parquet b/cli/tests/resources/omop/public/MEASUREMENT.parquet new file mode 100644 index 000000000..8196a80eb Binary files /dev/null and b/cli/tests/resources/omop/public/MEASUREMENT.parquet differ diff --git a/cli/tests/resources/omop/public/OBSERVATION.parquet b/cli/tests/resources/omop/public/OBSERVATION.parquet new file mode 100644 index 000000000..6ae40ad48 Binary files /dev/null and b/cli/tests/resources/omop/public/OBSERVATION.parquet differ diff --git a/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet b/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet new file mode 100644 index 000000000..f0eddab84 Binary files /dev/null and b/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet differ diff --git a/cli/tests/resources/omop/public/PERSON.parquet b/cli/tests/resources/omop/public/PERSON.parquet new file mode 100644 index 000000000..ae693885e Binary files /dev/null and b/cli/tests/resources/omop/public/PERSON.parquet differ diff --git a/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet b/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 000000000..01451206e Binary files /dev/null and b/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet differ diff --git a/cli/tests/resources/omop/public/SPECIMEN.parquet b/cli/tests/resources/omop/public/SPECIMEN.parquet new file mode 100644 index 000000000..9a7dec739 Binary files /dev/null and b/cli/tests/resources/omop/public/SPECIMEN.parquet differ diff --git a/cli/tests/resources/omop/public/VISIT_DETAIL.parquet b/cli/tests/resources/omop/public/VISIT_DETAIL.parquet new file mode 100644 index 000000000..da20bd662 Binary files /dev/null and b/cli/tests/resources/omop/public/VISIT_DETAIL.parquet differ diff --git a/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet b/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet new file mode 100644 index 000000000..5b65fbf0a Binary files /dev/null and b/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet differ diff --git a/cli/tests/test_copy_omop.py b/cli/tests/test_copy_omop.py new file mode 100644 index 000000000..bc064b04f --- /dev/null +++ b/cli/tests/test_copy_omop.py @@ -0,0 +1,95 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test copying of OMOP ES data for later export.""" +import datetime + +import pytest + + +def test_new_project_copies(omop_files, resources): + """ + Given a valid export directory and hasn't been exported before + When copy to exports is run + Then the public files should be copied and symlinked to the latest export directory + """ + # ARRANGE + input_dir = resources / "omop" + project_name = "Really great cool project" + input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") + # ACT + omop_files.copy_to_exports(input_dir, project_name, input_date) + # ASSERT + output_base = omop_files.export_dir / "really-great-cool-project" + + # check public files copied + specific_export_dir = ( + output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public" + ) + assert (specific_export_dir).exists() + expected_files = [x.stem for x in (input_dir / "public").glob("*.parquet")] + output_files = [x.stem for x in (specific_export_dir).glob("*.parquet")] + assert expected_files == output_files + # check that symlinked files exist + symlinked_dir = output_base / "latest" / "omop" / "public" + symlinked_files = list(symlinked_dir.glob("*.parquet")) + assert expected_files == [x.stem for x in symlinked_files] + assert symlinked_dir.is_symlink() + + +def test_second_export(omop_files, resources): + """ + Given one export already exists for the project + When a second export with a different timestamp is run for the same project + Then there should be two export directories in the all_extracts dir, + and the symlinked dir should point to the most recently copied dir + """ + # ARRANGE + input_dir = resources / "omop" + project_name = "Really great cool project" + first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00") + omop_files.copy_to_exports(input_dir, project_name, first_export_datetime) + second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00") + + # ACT + omop_files.copy_to_exports(input_dir, project_name, second_export_datetime) + + # ASSERT + output_base = omop_files.export_dir / "really-great-cool-project" + specific_export_dir = ( + output_base / "all_extracts" / "omop" / "2020-07-10t18-00-00" / "public" + ) + assert specific_export_dir.exists() + # check that symlinked files are the most recent export + symlinked_dir = output_base / "latest" / "omop" / "public" + assert symlinked_dir.readlink() == specific_export_dir + previous_export_dir = ( + output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public" + ) + assert symlinked_dir.readlink() != previous_export_dir + assert previous_export_dir.exists() + + +def test_project_with_no_public(omop_files, resources): + """ + Given an export directory which has no "public" subdirectory + When copy to exports is run + Then an assertion error will be raised + """ + input_dir = resources + project_name = "Really great cool project" + input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") + with pytest.raises(FileNotFoundError) as error_info: + omop_files.copy_to_exports(input_dir, project_name, input_date) + + assert error_info.match("Could not find public") diff --git a/docker-compose.yml b/docker-compose.yml index e1d27adae..dbbcfc14c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -65,6 +65,7 @@ volumes: orthanc-anon-data: orthanc-raw-data: postgres-data: + exports: networks: pixl-net: @@ -247,6 +248,8 @@ services: retries: 5 networks: - pixl-net + volumes: + - ${PWD}/exports:/run/exports pacs-api: build: diff --git a/pixl_core/README.md b/pixl_core/README.md index fced850ed..20356cbe0 100644 --- a/pixl_core/README.md +++ b/pixl_core/README.md @@ -51,3 +51,32 @@ The client of choice for RabbitMQ at this point in time is [pika](https://pika.r asynchronous way of transferring messages. The former is geared towards high data throughput whereas the latter is geared towards stability. The asynchronous mode of transferring messages is a lot more complex as it is based on the [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html). + + +### OMOP ES files + +Public parquet exports from OMOP ES that should be transferred outside the hospital are copied to the `exports` directory at the repository base. + +Within this directory each project has a directory, with all extracts run stored in `all_extracts` and the `latest` directory +contains a symlink to the most recent extract. This symlinking means that during the export stage it is clear which export should be sent. + +``` +└── project-1 + ├── all_extracts + │ └── omop + │ ├── 2020-06-10t18-00-00 + │ │ └── public + │ └── 2020-07-10t18-00-00 + │ └── public + └── latest + └── omop + └── public -> ../../../ all_extracts / omop / 2020-07-10t18-00-00 / public +└── project-2 + ├── all_extracts + │ └── omop + │ └── 2023-12-13t16-22-40 + │ └── public + └── latest + └── omop + └── public -> ../../../ all_extracts / omop / 2023-12-13t16-22-40 / public +``` \ No newline at end of file diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index 4fca9032c..8fdd72727 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "fastapi==0.103.2", "token-bucket==0.3.0", "python-decouple==3.6", + "python-slugify==8.0.1", "pika==1.3.1", "aio_pika==8.2.4", "environs==9.5.0", diff --git a/pixl_core/src/core/omop.py b/pixl_core/src/core/omop.py new file mode 100644 index 000000000..dc09cb327 --- /dev/null +++ b/pixl_core/src/core/omop.py @@ -0,0 +1,86 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Processing of OMOP parquet files.""" +import datetime +import pathlib +import shutil + +import slugify + +root_from_install = pathlib.Path(__file__).parents[3] + + +class OmopExtract: + """Processing Omop extracts on the filesystem.""" + + def __init__(self, root_dir: pathlib.Path = root_from_install) -> None: + """Create instance of OMOPExtract helper.""" + self.export_dir = root_dir / "exports" + + @staticmethod + def _get_slugs( + project_name: str, extract_datetime: datetime.datetime + ) -> tuple[str, str]: + """Convert project name and datetime to slugs for writing to filesystem.""" + project_slug = slugify.slugify(project_name) + extract_time_slug = slugify.slugify(extract_datetime.isoformat()) + return project_slug, extract_time_slug + + def copy_to_exports( + self, + omop_dir: pathlib.Path, + project_name: str, + extract_datetime: datetime.datetime, + ) -> str: + """ + Copy public omop directory as the latest extract for the project. + + Creates directories if they don't already exist. + :param omop_dir: parent path for omop export, with a "public" subdirectory + :param project_name: name of the project + :param extract_datetime: datetime that the OMOP ES extract was run + :raises FileNotFoundError: if there is no public subdirectory in `omop_dir` + :returns str: the project slug, so this can be registered for export to the DSH + """ + public_input = omop_dir / "public" + if not public_input.exists(): + msg = f"Could not find public directory in input {omop_dir}" + raise FileNotFoundError(msg) + + # Make directory for exports if they don't exist + project_slug, extract_time_slug = self._get_slugs( + project_name, extract_datetime + ) + export_base = self.export_dir / project_slug + public_output = OmopExtract._mkdir( + export_base / "all_extracts" / "omop" / extract_time_slug / "public" + ) + + # Copy extract files, overwriting if it exists + shutil.copytree(public_input, public_output, dirs_exist_ok=True) + # Make the latest export dir if it doesn't exist + latest_parent_dir = self._mkdir(export_base / "latest" / "omop") + # Symlink this extract to the latest directory + latest_public = latest_parent_dir / "public" + if latest_public.exists(): + latest_public.unlink() + + latest_public.symlink_to(public_output, target_is_directory=True) + return project_slug + + @staticmethod + def _mkdir(directory: pathlib.Path) -> pathlib.Path: + directory.mkdir(parents=True, exist_ok=True) + return directory