Skip to content

Commit

Permalink
Switch to DB native export format
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Mar 3, 2025
1 parent edce1a9 commit fbaccb2
Show file tree
Hide file tree
Showing 21 changed files with 349 additions and 250 deletions.
49 changes: 23 additions & 26 deletions cumulus_library/actions/exporter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import csv
import pathlib

import pandas
import pyarrow
from pyarrow import csv, parquet
from rich import console
from rich.progress import track

from cumulus_library import base_utils, study_manifest
Expand Down Expand Up @@ -41,15 +43,16 @@ def export_study(
data_path: pathlib.Path,
archive: bool,
chunksize: int = 1000000,
) -> list:
):
"""Exports csvs/parquet extracts of tables listed in export_list
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword data_path: the path to the place on disk to save data
:keyword archive: If true, get all study data and zip with timestamp
:keyword chunksize: number of rows to export in a single transaction
:returns: a list of queries, (only for unit tests)
"""

skipped_tables = []
reset_counts_exports(manifest)
if manifest.get_dedicated_schema():
prefix = f"{manifest.get_dedicated_schema()}."
Expand All @@ -64,34 +67,28 @@ def export_study(
table_list.append(study_manifest.ManifestExport(name=row[0], export_type="archive"))
else:
table_list = manifest.get_export_table_list()
queries = []
path = pathlib.Path(f"{data_path}/{manifest.get_study_prefix()}/")
path.mkdir(parents=True, exist_ok=True)
for table in track(
table_list,
description=f"Exporting {manifest.get_study_prefix()} data...",
):
query = f"SELECT * FROM {table.name}" # noqa: S608
query = base_utils.update_query_if_schema_specified(query, manifest)
dataframe_chunks, db_schema = config.db.execute_as_pandas(query, chunksize=chunksize)
path.mkdir(parents=True, exist_ok=True)
arrow_schema = pyarrow.schema(config.db.col_pyarrow_types_from_sql(db_schema))
with parquet.ParquetWriter(
f"{path}/{table.name}.{table.export_type}.parquet", arrow_schema
) as p_writer:
with csv.CSVWriter(
f"{path}/{table.name}.{table.export_type}.csv",
arrow_schema,
write_options=csv.WriteOptions(
# Note that this quoting style is not exactly csv.QUOTE_MINIMAL
# https://github.com/apache/arrow/issues/42032
quoting_style="needed"
),
) as c_writer:
for chunk in dataframe_chunks:
_write_chunk(p_writer, chunk, arrow_schema) # pragma: no cover
_write_chunk(c_writer, chunk, arrow_schema) # pragma: no cover
queries.append(query)
table.name = base_utils.update_query_if_schema_specified(table.name, manifest)
parquet_path = config.db.export_table_as_parquet(table.name, table.export_type, path)
if parquet_path:
df = pandas.read_parquet(parquet_path)
df.to_csv(
str(parquet_path).replace(".parquet", ".csv"),
quoting=csv.QUOTE_NONNUMERIC,
index=False,
)
else:
skipped_tables.append(table.name)

if len(skipped_tables) > 0:
c = console.Console()
c.print("The following tables were empty and were not exported:")
for table in skipped_tables:
c.print(table)
if archive:
base_utils.zip_dir(path, data_path, manifest.get_study_prefix())
return queries
41 changes: 41 additions & 0 deletions cumulus_library/databases/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import pathlib

import awswrangler
import boto3
import botocore
import numpy
Expand Down Expand Up @@ -168,6 +169,46 @@ def upload_file(
)
return f"s3://{bucket}/{s3_key}"

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> str | None:
s3_client = boto3.client("s3")
output_path = location / f"{table_name}.parquet"
workgroup = self.connection._client.get_work_group(WorkGroup=self.work_group)
wg_conf = workgroup["WorkGroup"]["Configuration"]["ResultConfiguration"]
s3_path = wg_conf["OutputLocation"]
bucket = "/".join(s3_path.split("/")[2:3])
output_path = location / f"{table_name}.{table_type}.parquet"
s3_path = f"s3://{bucket}/export/{table_name}.{table_type}.parquet"

# Cleanup location in case there was an error of some kind
res = s3_client.list_objects_v2(
Bucket=bucket, Prefix=f"export/{table_name}.{table_type}.parquet"
)
if "Contents" in res:
for file in res["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=file["Key"])

self.connection.cursor().execute(f"""UNLOAD
(SELECT * from {table_name})
TO '{s3_path}'
WITH (format='PARQUET', compression='SNAPPY')
""") # noqa: S608
# UNLOAD is not guaranteed to create a single file. AWS Wrangler's read_parquet
# allows us to ignore that wrinkle
try:
df = awswrangler.s3.read_parquet(s3_path)
except awswrangler.exceptions.NoFilesFound:
return None
df = df.sort_values(by=list(df.columns), ascending=False, na_position="first")
df.to_parquet(output_path)
res = s3_client.list_objects_v2(
Bucket=bucket, Prefix=f"export/{table_name}.{table_type}.parquet"
)
for file in res["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=file["Key"])
return output_path

def create_schema(self, schema_name) -> None:
"""Creates a new schema object inside the database"""
glue_client = boto3.client("glue")
Expand Down
10 changes: 10 additions & 0 deletions cumulus_library/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ def upload_file(
have an API for file upload (i.e. cloud databases)"""
return None

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> pathlib.Path:
"""Gets a parquet file from a specified table.
This is intended as a way to get the most database native parquet export possible,
so we don't have to infer schema information. Only do schema inferring if your
DB engine does not support parquet natively."""
return pathlib.Path("/dev/null")

@abc.abstractmethod
def create_schema(self, schema_name):
"""Creates a new schema object inside the catalog"""
Expand Down
18 changes: 18 additions & 0 deletions cumulus_library/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import collections
import datetime
import pathlib
import re

import duckdb
Expand Down Expand Up @@ -207,6 +208,23 @@ def operational_errors(self) -> tuple[type[Exception], ...]:
duckdb.BinderException,
)

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> str | None:
parquet_path = location / f"{table_name}.{table_type}.parquet"
parquet_path.parent.mkdir(exist_ok=True, parents=True)
table_size = self.connection.execute(f"SELECT count(*) FROM {table_name}").fetchone() # noqa: S608
if table_size[0] == 0:
return None
query = f"""COPY
(SELECT * from {table_name} ORDER BY ALL desc)
TO '{parquet_path}'
(FORMAT parquet)
""" # noqa: S608
self.connection.execute(query)

return parquet_path

def create_schema(self, schema_name):
"""Creates a new schema object inside the database"""
schemas = self.connection.sql(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "cumulus-library"
requires-python = ">= 3.11"
dependencies = [
"awswrangler >= 3.11, < 4",
"cumulus-fhir-support >= 1.3.1", # 1.3.1 fixes a "load all rows into memory" bug
"duckdb >= 1.1.3",
"Jinja2 > 3",
Expand Down
38 changes: 35 additions & 3 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,14 @@ def test_clean(tmp_path, args, expected, raises):
clear=True,
)
@pytest.mark.parametrize(
"build_args,export_args,expected_tables,raises",
"build_args,export_args,expected_tables,raises,expected_missing",
[
(
["build", "-t", "core"],
["export", "-t", "core"],
73,
does_not_raise(),
[],
),
(
# checking that a study is loaded from a child directory
Expand All @@ -304,6 +305,10 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/"],
3,
does_not_raise(),
[
"study_valid__table",
"study_valid__table2",
],
),
(
# checking that a study is loaded from a child directory
Expand All @@ -318,6 +323,10 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/"],
3,
does_not_raise(),
[
"study_valid__table",
"study_valid__table2",
],
),
(
# checking that a study is loaded from the directory of a user-defined
Expand All @@ -333,6 +342,10 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"],
3,
does_not_raise(),
[
"study_valid__table",
"study_valid__table2",
],
),
(
[
Expand All @@ -346,6 +359,11 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"],
3,
does_not_raise(),
[
"study_valid__table",
"study_valid__table2",
"study_valid__table",
],
),
(
[
Expand All @@ -360,6 +378,7 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"],
2,
pytest.raises(duckdb.duckdb.CatalogException),
[],
),
(
[
Expand All @@ -374,6 +393,7 @@ def test_clean(tmp_path, args, expected, raises):
["export", "-t", "study_valid", "-s", "tests/test_data/study_valid/"],
2,
pytest.raises(errors.StudyManifestParsingError),
[],
),
(
[
Expand All @@ -392,6 +412,7 @@ def test_clean(tmp_path, args, expected, raises):
],
4,
does_not_raise(),
["study_dedicated_schema__table_raw_sql"],
),
(
[
Expand All @@ -410,6 +431,12 @@ def test_clean(tmp_path, args, expected, raises):
],
5,
does_not_raise(),
[
"study_valid_all_exports__table",
"study_valid_all_exports__table2",
"study_valid_all_exports__table3",
"study_valid_all_exports__table4",
],
),
(
[
Expand All @@ -428,6 +455,7 @@ def test_clean(tmp_path, args, expected, raises):
],
2,
pytest.raises(errors.StudyManifestParsingError),
[],
),
(
[
Expand All @@ -446,10 +474,13 @@ def test_clean(tmp_path, args, expected, raises):
],
2,
pytest.raises(errors.StudyManifestParsingError),
[],
),
],
)
def test_cli_executes_queries(tmp_path, build_args, export_args, expected_tables, raises):
def test_cli_executes_queries(
tmp_path, build_args, export_args, expected_tables, raises, expected_missing
):
with raises:
build_args = duckdb_args(build_args, tmp_path)
cli.main(cli_args=build_args)
Expand Down Expand Up @@ -483,7 +514,8 @@ def test_cli_executes_queries(tmp_path, build_args, export_args, expected_tables
export_config = config["export_config"]
for export_list in export_config.values():
for export_table in export_list:
assert any(export_table in x for x in csv_files)
if export_table not in expected_missing:
assert any(export_table in x for x in csv_files)


@mock.patch.dict(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"cnt","category","recordedDate_month","code_display","reaction_manifestation_display"
17,,,,
16,,"2018-08-01",,
17,"","","",""
16,"","2018-08-01","",""
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"cnt","category_code","recordedDate_month","code_display"
15,,,
15,"encounter-diagnosis",,
15,"encounter-diagnosis","",""
15,"","",""
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"cnt","category_display","code_display","issued_month"
13,,,
13,,"History and physical note",
13,,"Evaluation + Plan note",
13,,"CBC panel - Blood by Automated count",
13,"Laboratory",,
13,"Laboratory","CBC panel - Blood by Automated count",
13,"History and physical note",,
13,"History and physical note","History and physical note",
13,"History and physical note","Evaluation + Plan note",
13,"Evaluation + Plan note",,
13,"Evaluation + Plan note","History and physical note",
13,"Evaluation + Plan note","Evaluation + Plan note",
12,"cumulus__none",,
11,,"Generalized anxiety disorder 7 item (GAD-7)",
11,"cumulus__none","Generalized anxiety disorder 7 item (GAD-7)",
10,,"Patient Health Questionnaire 2 item (PHQ-2) [Reported]",
10,,"Alcohol Use Disorder Identification Test - Consumption [AUDIT-C]",
10,"cumulus__none","Patient Health Questionnaire 2 item (PHQ-2) [Reported]",
10,"cumulus__none","Alcohol Use Disorder Identification Test - Consumption [AUDIT-C]",
13,"Laboratory","CBC panel - Blood by Automated count",""
13,"Laboratory","",""
13,"History and physical note","History and physical note",""
13,"History and physical note","Evaluation + Plan note",""
13,"History and physical note","",""
13,"Evaluation + Plan note","History and physical note",""
13,"Evaluation + Plan note","Evaluation + Plan note",""
13,"Evaluation + Plan note","",""
13,"","History and physical note",""
13,"","Evaluation + Plan note",""
13,"","CBC panel - Blood by Automated count",""
13,"","",""
12,"cumulus__none","",""
11,"cumulus__none","Generalized anxiety disorder 7 item (GAD-7)",""
11,"","Generalized anxiety disorder 7 item (GAD-7)",""
10,"cumulus__none","Patient Health Questionnaire 2 item (PHQ-2) [Reported]",""
10,"cumulus__none","Alcohol Use Disorder Identification Test - Consumption [AUDIT-C]",""
10,"","Patient Health Questionnaire 2 item (PHQ-2) [Reported]",""
10,"","Alcohol Use Disorder Identification Test - Consumption [AUDIT-C]",""
Loading

0 comments on commit fbaccb2

Please sign in to comment.