Skip to content

Commit

Permalink
Switch to DB native export format
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Mar 3, 2025
1 parent edce1a9 commit 338d165
Show file tree
Hide file tree
Showing 23 changed files with 349 additions and 448 deletions.
49 changes: 23 additions & 26 deletions cumulus_library/actions/exporter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import csv
import pathlib

import pandas
import pyarrow
from pyarrow import csv, parquet
from rich import console
from rich.progress import track

from cumulus_library import base_utils, study_manifest
Expand Down Expand Up @@ -41,15 +43,16 @@ def export_study(
data_path: pathlib.Path,
archive: bool,
chunksize: int = 1000000,
) -> list:
):
"""Exports csvs/parquet extracts of tables listed in export_list
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword data_path: the path to the place on disk to save data
:keyword archive: If true, get all study data and zip with timestamp
:keyword chunksize: number of rows to export in a single transaction
:returns: a list of queries, (only for unit tests)
"""

skipped_tables = []
reset_counts_exports(manifest)
if manifest.get_dedicated_schema():
prefix = f"{manifest.get_dedicated_schema()}."
Expand All @@ -64,34 +67,28 @@ def export_study(
table_list.append(study_manifest.ManifestExport(name=row[0], export_type="archive"))
else:
table_list = manifest.get_export_table_list()
queries = []
path = pathlib.Path(f"{data_path}/{manifest.get_study_prefix()}/")
path.mkdir(parents=True, exist_ok=True)
for table in track(
table_list,
description=f"Exporting {manifest.get_study_prefix()} data...",
):
query = f"SELECT * FROM {table.name}" # noqa: S608
query = base_utils.update_query_if_schema_specified(query, manifest)
dataframe_chunks, db_schema = config.db.execute_as_pandas(query, chunksize=chunksize)
path.mkdir(parents=True, exist_ok=True)
arrow_schema = pyarrow.schema(config.db.col_pyarrow_types_from_sql(db_schema))
with parquet.ParquetWriter(
f"{path}/{table.name}.{table.export_type}.parquet", arrow_schema
) as p_writer:
with csv.CSVWriter(
f"{path}/{table.name}.{table.export_type}.csv",
arrow_schema,
write_options=csv.WriteOptions(
# Note that this quoting style is not exactly csv.QUOTE_MINIMAL
# https://github.com/apache/arrow/issues/42032
quoting_style="needed"
),
) as c_writer:
for chunk in dataframe_chunks:
_write_chunk(p_writer, chunk, arrow_schema) # pragma: no cover
_write_chunk(c_writer, chunk, arrow_schema) # pragma: no cover
queries.append(query)
table.name = base_utils.update_query_if_schema_specified(table.name, manifest)
parquet_path = config.db.export_table_as_parquet(table.name, table.export_type, path)
if parquet_path:
df = pandas.read_parquet(parquet_path)
df.to_csv(
str(parquet_path).replace(".parquet", ".csv"),
quoting=csv.QUOTE_NONNUMERIC,
index=False,
)
else:
skipped_tables.append(table.name)

if len(skipped_tables) > 0:
c = console.Console()
c.print("The following tables were empty and were not exported:")
for table in skipped_tables:
c.print(table)
if archive:
base_utils.zip_dir(path, data_path, manifest.get_study_prefix())
return queries
69 changes: 41 additions & 28 deletions cumulus_library/databases/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import os
import pathlib

import awswrangler
import boto3
import botocore
import numpy
import pandas
import pyarrow
import pyathena
from pyathena.common import BaseCursor as AthenaCursor
from pyathena.pandas.cursor import PandasCursor as AthenaPandasCursor
Expand Down Expand Up @@ -96,33 +96,6 @@ def col_parquet_types_from_pandas(self, field_types: list) -> list:
)
return output

def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
output = []
for column in columns:
match column[1]:
case "varchar":
output.append((column[0], pyarrow.string()))
case "bigint":
output.append((column[0], pyarrow.int64()))
case "integer":
output.append((column[0], pyarrow.int64()))
case "double":
output.append((column[0], pyarrow.float64()))
# This is future proofing - we don't see this type currently.
case "decimal":
output.append( # pragma: no cover
(column[0], pyarrow.decimal128(column[4], column[5]))
)
case "boolean":
output.append((column[0], pyarrow.bool_()))
case "date":
output.append((column[0], pyarrow.date64()))
case "timestamp":
output.append((column[0], pyarrow.timestamp("s")))
case _:
raise errors.CumulusLibraryError(f"Unsupported SQL type '{column[1]}' found.")
return output

def upload_file(
self,
*,
Expand Down Expand Up @@ -168,6 +141,46 @@ def upload_file(
)
return f"s3://{bucket}/{s3_key}"

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> str | None:
s3_client = boto3.client("s3")
output_path = location / f"{table_name}.parquet"
workgroup = self.connection._client.get_work_group(WorkGroup=self.work_group)
wg_conf = workgroup["WorkGroup"]["Configuration"]["ResultConfiguration"]
s3_path = wg_conf["OutputLocation"]
bucket = "/".join(s3_path.split("/")[2:3])
output_path = location / f"{table_name}.{table_type}.parquet"
s3_path = f"s3://{bucket}/export/{table_name}.{table_type}.parquet"

# Cleanup location in case there was an error of some kind
res = s3_client.list_objects_v2(
Bucket=bucket, Prefix=f"export/{table_name}.{table_type}.parquet"
)
if "Contents" in res:
for file in res["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=file["Key"])

self.connection.cursor().execute(f"""UNLOAD
(SELECT * from {table_name})
TO '{s3_path}'
WITH (format='PARQUET', compression='SNAPPY')
""") # noqa: S608
# UNLOAD is not guaranteed to create a single file. AWS Wrangler's read_parquet
# allows us to ignore that wrinkle
try:
df = awswrangler.s3.read_parquet(s3_path)
except awswrangler.exceptions.NoFilesFound:
return None
df = df.sort_values(by=list(df.columns), ascending=False, na_position="first")
df.to_parquet(output_path)
res = s3_client.list_objects_v2(
Bucket=bucket, Prefix=f"export/{table_name}.{table_type}.parquet"
)
for file in res["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=file["Key"])
return output_path

def create_schema(self, schema_name) -> None:
"""Creates a new schema object inside the database"""
glue_client = boto3.client("glue")
Expand Down
10 changes: 10 additions & 0 deletions cumulus_library/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ def upload_file(
have an API for file upload (i.e. cloud databases)"""
return None

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> pathlib.Path:
"""Gets a parquet file from a specified table.
This is intended as a way to get the most database native parquet export possible,
so we don't have to infer schema information. Only do schema inferring if your
DB engine does not support parquet natively."""
return pathlib.Path("/dev/null")

@abc.abstractmethod
def create_schema(self, schema_name):
"""Creates a new schema object inside the catalog"""
Expand Down
44 changes: 18 additions & 26 deletions cumulus_library/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

import collections
import datetime
import pathlib
import re

import duckdb
import pandas
import pyarrow
import pyarrow.dataset

from cumulus_library import errors
from cumulus_library.databases import base


Expand Down Expand Up @@ -174,30 +173,6 @@ def execute_as_pandas(
return iter([result.df().convert_dtypes()]), result.description
return result.df().convert_dtypes(), result.description

def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
output = []
for column in columns:
match column[1]:
case "STRING":
output.append((column[0], pyarrow.string()))
case "INTEGER":
output.append((column[0], pyarrow.int64()))
case "NUMBER":
output.append((column[0], pyarrow.float64()))
case "DOUBLE":
output.append((column[0], pyarrow.float64()))
case "boolean" | "bool":
output.append((column[0], pyarrow.bool_()))
case "Date":
output.append((column[0], pyarrow.date64()))
case "TIMESTAMP" | "DATETIME":
output.append((column[0], pyarrow.timestamp("s")))
case _:
raise errors.CumulusLibraryError(
f"{column[0], column[1]} does not have a conversion type"
)
return output

def parser(self) -> base.DatabaseParser:
return DuckDbParser()

Expand All @@ -207,6 +182,23 @@ def operational_errors(self) -> tuple[type[Exception], ...]:
duckdb.BinderException,
)

def export_table_as_parquet(
self, table_name: str, table_type: str, location: pathlib.Path, *args, **kwargs
) -> str | None:
parquet_path = location / f"{table_name}.{table_type}.parquet"
parquet_path.parent.mkdir(exist_ok=True, parents=True)
table_size = self.connection.execute(f"SELECT count(*) FROM {table_name}").fetchone() # noqa: S608
if table_size[0] == 0:
return None
query = f"""COPY
(SELECT * from {table_name} ORDER BY ALL desc)
TO '{parquet_path}'
(FORMAT parquet)
""" # noqa: S608
self.connection.execute(query)

return parquet_path

def create_schema(self, schema_name):
"""Creates a new schema object inside the database"""
schemas = self.connection.sql(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "cumulus-library"
requires-python = ">= 3.11"
dependencies = [
"awswrangler >= 3.11, < 4",
"cumulus-fhir-support >= 1.3.1", # 1.3.1 fixes a "load all rows into memory" bug
"duckdb >= 1.1.3",
"Jinja2 > 3",
Expand Down
Loading

0 comments on commit 338d165

Please sign in to comment.