Skip to content

Commit

Permalink
Init publish outputs script
Browse files Browse the repository at this point in the history
  • Loading branch information
bendnorman committed Mar 6, 2024
1 parent f092d71 commit 38fd923
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/run-full-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ jobs:
run: |
docker-compose down
- name: Publish publish outputs
if: (github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')) || (github.ref_name == 'version-outputs')
run: |
docker compose run --rm app python dbcp/publish.py --build-ref ${{ github.ref_name }}
# The google-github-actions/auth step is run as runner:docker,
# so we need to give the workspace back to runner:docker
- name: Give ownership of the workspace back to root
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ beautifulsoup4~=4.11
plotly~=5.15.0
gridstatus~=0.20.0
s3fs>=2022.11.0
click
4 changes: 4 additions & 0 deletions src/dbcp/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""DBCP constants."""
from pathlib import Path

from pudl.metadata.enums import POLITICAL_SUBDIVISIONS

FIPS_CODE_VINTAGE = 2020
Expand All @@ -14,3 +16,5 @@
].subdivision_name
)
US_STATES_TERRITORIES = US_STATES.union(US_TERRITORIES)

OUTPUT_DIR = Path("/app/data/output")
7 changes: 6 additions & 1 deletion src/dbcp/data_mart/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd

import dbcp
from dbcp.constants import OUTPUT_DIR
from dbcp.helpers import enforce_dtypes, psql_insert_copy
from dbcp.metadata.data_mart import metadata
from dbcp.validation.tests import validate_data_mart
Expand Down Expand Up @@ -57,7 +58,9 @@ def create_data_marts(args): # noqa: max-complexity=11
metadata.drop_all(engine)
metadata.create_all(engine)

# Load table into postgres
parquet_dir = OUTPUT_DIR / "data_mart"

# Load table into postgres and parquet
with engine.connect() as con:
for table in metadata.sorted_tables:
logger.info(f"Load {table.name} to postgres.")
Expand All @@ -71,6 +74,8 @@ def create_data_marts(args): # noqa: max-complexity=11
method=psql_insert_copy,
)

df.to_parquet(parquet_dir / f"{table.name}.parquet", index=False)

validate_data_mart(engine=engine)

if args.upload_to_bigquery:
Expand Down
6 changes: 5 additions & 1 deletion src/dbcp/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd

import dbcp
from dbcp.constants import OUTPUT_DIR
from dbcp.extract.ncsl_state_permitting import NCSLScraper
from dbcp.helpers import enforce_dtypes, psql_insert_copy
from dbcp.metadata.data_warehouse import metadata
Expand Down Expand Up @@ -216,7 +217,9 @@ def etl(args):
metadata.drop_all(engine)
metadata.create_all(engine)

# Load table into postgres
parquet_dir = OUTPUT_DIR / "data_warehouse"

# Load table into postgres and parquet
with engine.connect() as con:
for table in metadata.sorted_tables:
logger.info(f"Load {table.name} to postgres.")
Expand All @@ -232,6 +235,7 @@ def etl(args):
chunksize=1000,
method=psql_insert_copy,
)
df.to_parquet(parquet_dir / f"{table.name}.parquet", index=False)

validate_warehouse(engine=engine)

Expand Down
140 changes: 140 additions & 0 deletions src/dbcp/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Upload the parquet files to GCS and load them into BigQuery."""
from pathlib import Path

import click
from google.cloud import bigquery, storage

from dbcp.constants import OUTPUT_DIR


def upload_parquet_directory_to_gcs(
directory_path, bucket_name, destination_blob_prefix, version
):
"""
Uploads a directory of Parquet files to Google Cloud Storage.
Args:
directory_path (str): Path to the directory containing Parquet files.
bucket_name (str): Name of the GCS bucket to upload files to.
destination_blob_prefix (str): Prefix to prepend to destination blob names.
Returns:
None
"""
# Create a storage client
client = storage.Client()

# Get the GCS bucket
bucket = client.get_bucket(bucket_name)

# List all Parquet files in the directory
parquet_files = list(Path(directory_path).glob("*.parquet"))

# Upload each Parquet file to GCS
for file in parquet_files:
# Construct the destination blob name
destination_blob_name = f"{version}/{destination_blob_prefix}/{file.name}"

# Create a blob object in the bucket
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(str(file))

print(f"Uploaded {file} to gs://{bucket_name}/{destination_blob_name}")


def load_parquet_files_to_bigquery(
bucket_name: str, destination_blob_prefix: str, version: str
):
"""
Load Parquet files from GCS to BigQuery.
Args:
None
Returns:
None
"""
# Create a BigQuery client
client = bigquery.Client()

# Get the BigQuery dataset
dataset_id = f"test_{destination_blob_prefix}{'_version-outputs' if version == 'version-outputs' else ''}"
dataset_ref = client.dataset(dataset_id)

# Create the dataset if it doesn't exist
dataset = bigquery.Dataset(dataset_ref)
dataset = client.create_dataset(dataset, exists_ok=True)

# Get the GCS bucket
bucket = storage.Client().get_bucket(bucket_name)

# get all parquet files in the bucket/{version} directory
blobs = bucket.list_blobs(prefix=f"{version}/{destination_blob_prefix}")

# Load each Parquet file to BigQuery
for blob in blobs:
if blob.name.endswith(".parquet"):
# get the blob filename without the extension
table_name = blob.name.split("/")[-1].split(".")[0]

# Construct the destination table
table_ref = dataset_ref.table(table_name)

# delete table if it exists
client.delete_table(table_ref, not_found_ok=True)

# Load the Parquet file to BigQuery
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
load_job = client.load_table_from_uri(
f"gs://{bucket_name}/{blob.name}", table_ref, job_config=job_config
)

print(f"Loading {blob.name} to {dataset_id}.{table_name}")
load_job.result()

# add a label to the table, "." is not allowed in labels
labels = {"version": version.replace(".", "-")}
table = client.get_table(table_ref)
table.labels = labels
client.update_table(table, ["labels"])

print(f"Loaded {blob.name} to {dataset_id}.{table_name}")


@click.command()
@click.option("--build-ref")
def publish_outputs(build_ref: str):
"""Publish outputs to Google Cloud Storage and Big Query."""
directories = ("data_warehouse", "data_mart")
bucket_name = "dgm-outputs"

if build_ref == "version-outputs":
for directory in directories:
upload_parquet_directory_to_gcs(
OUTPUT_DIR / directory, bucket_name, directory, build_ref
)

for directory in directories:
load_parquet_files_to_bigquery(bucket_name, directory, build_ref)
elif build_ref.startswith("v"):
for directory in directories:
upload_parquet_directory_to_gcs(
OUTPUT_DIR / directory, bucket_name, directory, build_ref
)
upload_parquet_directory_to_gcs(
OUTPUT_DIR / directory, bucket_name, directory, "prod"
)

for directory in directories:
load_parquet_files_to_bigquery(bucket_name, directory, build_ref)
else:
raise ValueError("build-ref must be 'dev' or start with 'v'")


if __name__ == "__main__":
publish_outputs()

0 comments on commit 38fd923

Please sign in to comment.