diff --git a/.github/workflows/run-full-build.yml b/.github/workflows/run-full-build.yml index d43aba16..79608200 100644 --- a/.github/workflows/run-full-build.yml +++ b/.github/workflows/run-full-build.yml @@ -56,6 +56,11 @@ jobs: run: | docker-compose down + - name: Publish publish outputs + if: (github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')) || (github.ref_name == 'version-outputs') + run: | + docker compose run --rm app python dbcp/publish.py --build-ref ${{ github.ref_name }} + # The google-github-actions/auth step is run as runner:docker, # so we need to give the workspace back to runner:docker - name: Give ownership of the workspace back to root diff --git a/requirements.txt b/requirements.txt index 02c5eeea..bc607752 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ beautifulsoup4~=4.11 plotly~=5.15.0 gridstatus~=0.20.0 s3fs>=2022.11.0 +click diff --git a/src/dbcp/constants.py b/src/dbcp/constants.py index d7e81e5a..e81978d8 100644 --- a/src/dbcp/constants.py +++ b/src/dbcp/constants.py @@ -1,4 +1,6 @@ """DBCP constants.""" +from pathlib import Path + from pudl.metadata.enums import POLITICAL_SUBDIVISIONS FIPS_CODE_VINTAGE = 2020 @@ -14,3 +16,5 @@ ].subdivision_name ) US_STATES_TERRITORIES = US_STATES.union(US_TERRITORIES) + +OUTPUT_DIR = Path("/app/data/output") diff --git a/src/dbcp/data_mart/__init__.py b/src/dbcp/data_mart/__init__.py index dc6e4c57..1bebced7 100644 --- a/src/dbcp/data_mart/__init__.py +++ b/src/dbcp/data_mart/__init__.py @@ -7,6 +7,7 @@ import pandas as pd import dbcp +from dbcp.constants import OUTPUT_DIR from dbcp.helpers import enforce_dtypes, psql_insert_copy from dbcp.metadata.data_mart import metadata from dbcp.validation.tests import validate_data_mart @@ -57,7 +58,9 @@ def create_data_marts(args): # noqa: max-complexity=11 metadata.drop_all(engine) metadata.create_all(engine) - # Load table into postgres + parquet_dir = OUTPUT_DIR / "data_mart" + + # Load table into postgres and parquet with engine.connect() as con: for table in metadata.sorted_tables: logger.info(f"Load {table.name} to postgres.") @@ -71,6 +74,8 @@ def create_data_marts(args): # noqa: max-complexity=11 method=psql_insert_copy, ) + df.to_parquet(parquet_dir / f"{table.name}.parquet", index=False) + validate_data_mart(engine=engine) if args.upload_to_bigquery: diff --git a/src/dbcp/etl.py b/src/dbcp/etl.py index fa377f7d..0711bef6 100644 --- a/src/dbcp/etl.py +++ b/src/dbcp/etl.py @@ -6,6 +6,7 @@ import pandas as pd import dbcp +from dbcp.constants import OUTPUT_DIR from dbcp.extract.ncsl_state_permitting import NCSLScraper from dbcp.helpers import enforce_dtypes, psql_insert_copy from dbcp.metadata.data_warehouse import metadata @@ -216,7 +217,9 @@ def etl(args): metadata.drop_all(engine) metadata.create_all(engine) - # Load table into postgres + parquet_dir = OUTPUT_DIR / "data_warehouse" + + # Load table into postgres and parquet with engine.connect() as con: for table in metadata.sorted_tables: logger.info(f"Load {table.name} to postgres.") @@ -232,6 +235,7 @@ def etl(args): chunksize=1000, method=psql_insert_copy, ) + df.to_parquet(parquet_dir / f"{table.name}.parquet", index=False) validate_warehouse(engine=engine) diff --git a/src/dbcp/publish.py b/src/dbcp/publish.py new file mode 100644 index 00000000..263a7213 --- /dev/null +++ b/src/dbcp/publish.py @@ -0,0 +1,140 @@ +"""Upload the parquet files to GCS and load them into BigQuery.""" +from pathlib import Path + +import click +from google.cloud import bigquery, storage + +from dbcp.constants import OUTPUT_DIR + + +def upload_parquet_directory_to_gcs( + directory_path, bucket_name, destination_blob_prefix, version +): + """ + Uploads a directory of Parquet files to Google Cloud Storage. + + Args: + directory_path (str): Path to the directory containing Parquet files. + bucket_name (str): Name of the GCS bucket to upload files to. + destination_blob_prefix (str): Prefix to prepend to destination blob names. + + Returns: + None + """ + # Create a storage client + client = storage.Client() + + # Get the GCS bucket + bucket = client.get_bucket(bucket_name) + + # List all Parquet files in the directory + parquet_files = list(Path(directory_path).glob("*.parquet")) + + # Upload each Parquet file to GCS + for file in parquet_files: + # Construct the destination blob name + destination_blob_name = f"{version}/{destination_blob_prefix}/{file.name}" + + # Create a blob object in the bucket + blob = bucket.blob(destination_blob_name) + + # Upload the file to GCS + blob.upload_from_filename(str(file)) + + print(f"Uploaded {file} to gs://{bucket_name}/{destination_blob_name}") + + +def load_parquet_files_to_bigquery( + bucket_name: str, destination_blob_prefix: str, version: str +): + """ + Load Parquet files from GCS to BigQuery. + + Args: + None + + Returns: + None + """ + # Create a BigQuery client + client = bigquery.Client() + + # Get the BigQuery dataset + dataset_id = f"test_{destination_blob_prefix}{'_version-outputs' if version == 'version-outputs' else ''}" + dataset_ref = client.dataset(dataset_id) + + # Create the dataset if it doesn't exist + dataset = bigquery.Dataset(dataset_ref) + dataset = client.create_dataset(dataset, exists_ok=True) + + # Get the GCS bucket + bucket = storage.Client().get_bucket(bucket_name) + + # get all parquet files in the bucket/{version} directory + blobs = bucket.list_blobs(prefix=f"{version}/{destination_blob_prefix}") + + # Load each Parquet file to BigQuery + for blob in blobs: + if blob.name.endswith(".parquet"): + # get the blob filename without the extension + table_name = blob.name.split("/")[-1].split(".")[0] + + # Construct the destination table + table_ref = dataset_ref.table(table_name) + + # delete table if it exists + client.delete_table(table_ref, not_found_ok=True) + + # Load the Parquet file to BigQuery + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, + ) + load_job = client.load_table_from_uri( + f"gs://{bucket_name}/{blob.name}", table_ref, job_config=job_config + ) + + print(f"Loading {blob.name} to {dataset_id}.{table_name}") + load_job.result() + + # add a label to the table, "." is not allowed in labels + labels = {"version": version.replace(".", "-")} + table = client.get_table(table_ref) + table.labels = labels + client.update_table(table, ["labels"]) + + print(f"Loaded {blob.name} to {dataset_id}.{table_name}") + + +@click.command() +@click.option("--build-ref") +def publish_outputs(build_ref: str): + """Publish outputs to Google Cloud Storage and Big Query.""" + directories = ("data_warehouse", "data_mart") + bucket_name = "dgm-outputs" + + if build_ref == "version-outputs": + for directory in directories: + upload_parquet_directory_to_gcs( + OUTPUT_DIR / directory, bucket_name, directory, build_ref + ) + + for directory in directories: + load_parquet_files_to_bigquery(bucket_name, directory, build_ref) + elif build_ref.startswith("v"): + for directory in directories: + upload_parquet_directory_to_gcs( + OUTPUT_DIR / directory, bucket_name, directory, build_ref + ) + upload_parquet_directory_to_gcs( + OUTPUT_DIR / directory, bucket_name, directory, "prod" + ) + + for directory in directories: + load_parquet_files_to_bigquery(bucket_name, directory, build_ref) + else: + raise ValueError("build-ref must be 'dev' or start with 'v'") + + +if __name__ == "__main__": + publish_outputs()