From 9fadfff207a2902ea2d5ee7b9c90b107f67c58fc Mon Sep 17 00:00:00 2001 From: Austen Sharpe Date: Mon, 30 Sep 2024 15:57:02 -0600 Subject: [PATCH 01/20] Add source metadata for vceregen --- src/pudl/metadata/sources.py | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index a8856bce16..ee3b739483 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -835,5 +835,42 @@ "license_raw": LICENSES["us-govt"], "license_pudl": LICENSES["cc-by-4.0"], }, + "vceregen": { + "title": "Vibrant Clean Energy Renewable Generation", + "path": "https://vibrantcleanenergy.com/products/datasets/", + "description": ( + "Hourly, county-level renewable generation profiles in the continental " + "United States. Compiled by Vibrant Clean Energy based on outputs from the " + "NOAA HRRR weather model. Profiles are stated as a capacity factor (a " + "fraction of nameplate capacity) and exist for onshore wind, offshore " + "wind, and fixed-tilt solar generation types." + "" + ), + "source_file_dict": { + "sorce_format": "Comma Separated Value (.csv)", + }, + "keywords": sorted( + { + "solar", + "wind", + "time series", + "energy", + "electricity", + "generation", + "weather", + "capacity factor", + "hourly", + "united states", + "usa", + "resource adequacy", + "gridpath", + "vibrant clean energy", + "county", + } + ), + "license_raw": LICENSES["cc-by-4.0"], + "license_pudl": LICENSES["cc-by-4.0"], + "working_partitions": {"years": sorted(set(range(2019, 2024)))}, + }, } """Data source attributes by PUDL identifier.""" From 4036479d280a50e43094a8a0739c9ae39f441a58 Mon Sep 17 00:00:00 2001 From: Austen Sharpe Date: Mon, 30 Sep 2024 16:04:00 -0600 Subject: [PATCH 02/20] Add profiles to vceregen dataset name --- src/pudl/metadata/sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index ee3b739483..148b2507a3 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -836,7 +836,7 @@ "license_pudl": LICENSES["cc-by-4.0"], }, "vceregen": { - "title": "Vibrant Clean Energy Renewable Generation", + "title": "Vibrant Clean Energy Renewable Generation Profiles", "path": "https://vibrantcleanenergy.com/products/datasets/", "description": ( "Hourly, county-level renewable generation profiles in the continental " From d8b992edaeb667c21f62fe1126c217cd4ccc9a89 Mon Sep 17 00:00:00 2001 From: Austen Sharpe Date: Mon, 30 Sep 2024 16:05:07 -0600 Subject: [PATCH 03/20] Remove blank line in description --- src/pudl/metadata/sources.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index 148b2507a3..2cd2e17275 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -844,7 +844,6 @@ "NOAA HRRR weather model. Profiles are stated as a capacity factor (a " "fraction of nameplate capacity) and exist for onshore wind, offshore " "wind, and fixed-tilt solar generation types." - "" ), "source_file_dict": { "sorce_format": "Comma Separated Value (.csv)", From 82be4d488e88e399fbdbfcd0116c791427538fa3 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Tue, 1 Oct 2024 18:48:57 -0400 Subject: [PATCH 04/20] Stash WIP of extraction --- src/pudl/etl/__init__.py | 1 + src/pudl/extract/__init__.py | 1 + src/pudl/extract/eia176.py | 8 -- src/pudl/extract/extractor.py | 5 +- src/pudl/extract/vceregen.py | 145 ++++++++++++++++++++ src/pudl/package_data/settings/etl_fast.yml | 2 + src/pudl/package_data/settings/etl_full.yml | 2 + src/pudl/package_data/vceregen/__init__.py | 1 + src/pudl/package_data/vceregen/file_map.csv | 4 + src/pudl/settings.py | 14 ++ src/pudl/workspace/datastore.py | 1 + 11 files changed, 174 insertions(+), 10 deletions(-) create mode 100644 src/pudl/extract/vceregen.py create mode 100644 src/pudl/package_data/vceregen/__init__.py create mode 100644 src/pudl/package_data/vceregen/file_map.csv diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 84f1c505e8..b3c67ac438 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -59,6 +59,7 @@ "raw_gridpathratoolkit": [pudl.extract.gridpathratoolkit], "raw_phmsagas": [pudl.extract.phmsagas], "raw_nrelatb": [pudl.extract.nrelatb], + "raw_vceregen": [pudl.extract.vceregen], } diff --git a/src/pudl/extract/__init__.py b/src/pudl/extract/__init__.py index d4d96c7fb0..fb2b5c5d64 100644 --- a/src/pudl/extract/__init__.py +++ b/src/pudl/extract/__init__.py @@ -26,5 +26,6 @@ gridpathratoolkit, nrelatb, phmsagas, + vceregen, xbrl, ) diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 484fedaf83..279053a571 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -19,14 +19,6 @@ def __init__(self, *args, **kwargs): self.METADATA = GenericMetadata("eia176") super().__init__(*args, **kwargs) - def get_page_cols(self, page: str, partition_key: str) -> list[str]: - """Get the columns for a particular page and partition key. - - EIA 176 data has the same set of columns for all years, - so regardless of the partition key provided we select the same columns here. - """ - return super().get_page_cols(page, "any_year") - def process_raw( self, df: pd.DataFrame, page: str, **partition: PartitionSelection ) -> pd.DataFrame: diff --git a/src/pudl/extract/extractor.py b/src/pudl/extract/extractor.py index 57e473e8f3..605333bb68 100644 --- a/src/pudl/extract/extractor.py +++ b/src/pudl/extract/extractor.py @@ -235,7 +235,7 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]: ) return all_page_dfs logger.info(f"Extracting {self._dataset_name} spreadsheet data.") - + logger.warn(self._metadata.get_all_pages()) for page in self._metadata.get_all_pages(): if page in self.BLACKLISTED_PAGES: logger.debug(f"Skipping blacklisted page {page}.") @@ -243,12 +243,13 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]: current_page_dfs = [ pd.DataFrame(), ] + logger.warn(f"Extracting {page}") for partition in pudl.helpers.iterate_multivalue_dict(**partitions): # we are going to skip if self.source_filename(page, **partition) == "-1": logger.debug(f"No page for {self._dataset_name} {page} {partition}") continue - logger.debug( + logger.warning( f"Loading dataframe for {self._dataset_name} {page} {partition}" ) df = self.load_source(page, **partition) diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py new file mode 100644 index 0000000000..2a1cbf7eed --- /dev/null +++ b/src/pudl/extract/vceregen.py @@ -0,0 +1,145 @@ +"""Extract VCE renewable generation profile data from CSVs. + +This dataset has 1,000s of columns, so we don't want to manually specify a rename on +import because we'll pivot these to a column. We adapt the standard extraction +infrastructure to simply read in the data. + +Each zip folder contains a folder with three files: +Wind_Power_140m_Offshore_county.csv +Wind_Power_100m_Onshore_county.csv +Fixed_SolarPV_Lat_UPV_county.csv + +The drive also contains one more file: RA_county_lat_long_FIPS_table.csv. +""" + +import dask +from dagster import Output, asset +from dask import dataframe as dd + +from pudl import logging_helpers +from pudl.extract.csv import CsvExtractor +from pudl.extract.extractor import GenericMetadata, PartitionSelection, raw_df_factory + +logger = logging_helpers.get_logger(__name__) + + +class VCEMetadata(GenericMetadata): + """Special metadata class for VCE renewable generation profiles.""" + + def __init__(self, *args, **kwargs): + """Initialize the module. + + Args: + ds (:class:datastore.Datastore): Initialized datastore. + """ + super().__init__(*args, **kwargs) + self._file_name = self._load_csv(self._pkg, "file_map.csv") + + def get_all_pages(self) -> list[str]: + """Hard code the page names, which usually are pulled from column rename spreadsheets.""" + return [ + "offshore_wind_power_140m", + "onshore_wind_power_100m", + "fixed_solar_pv_lat_upv", + ] + + def get_file_name(self, page, **partition): + """Returns file name of given partition and page.""" + return self._file_name.loc[page, str(self._get_partition_selection(partition))] + + +class Extractor(CsvExtractor): + """Extractor for VCE renewable generation profiles.""" + + def __init__(self, *args, **kwargs): + """Initialize the module. + + Args: + ds (:class:datastore.Datastore): Initialized datastore. + """ + self.METADATA = VCEMetadata("vceregen") + super().__init__(*args, **kwargs) + + def get_column_map(self, page, **partition): + """Return empty dictionary, we don't rename these files.""" + return {} + + def source_filename(self, page: str, **partition: PartitionSelection) -> str: + """Produce the CSV file name as it will appear in the archive. + + Args: + page: pudl name for the dataset contents, eg "boiler_generator_assn" or + "coal_stocks" + partition: partition to load. Examples: + {'year': 2009} + {'year_month': '2020-08'} + + Returns: + string name of the CSV file + """ + logger.warn( + f"{partition['year']}/{self._metadata.get_file_name(page, **partition)}" + ) + return f"{partition['year']}/{self._metadata.get_file_name(page, **partition)}" + + def load_source(self, page: str, **partition: PartitionSelection) -> dask.dataframe: + """Produce the dataframe object for the given partition. + + Args: + page: pudl name for the dataset contents, eg "boiler_generator_assn" or + "data" + partition: partition to load. Examples: + {'year': 2009} + {'year_month': '2020-08'} + + Returns: + pd.DataFrame instance containing CSV data + """ + filename = f"{partition['year']}/{self.source_filename(page, **partition)}" + logger.warn(f"Opening file {filename}") + + with ( + self.ds.get_zipfile_resource(self._dataset_name, **partition) as zf, + ): + files = zf.namelist() + file = next((x for x in files if filename in x), None) + logger.warn( + x for x in files if {self.source_filename(page, **partition)} in x + ) + logger.warn(file) + df = dd.read_csv(file, **self.READ_CSV_KWARGS) + + return df + + def process_raw( + self, df: dask.dataframe, page: str, **partition: PartitionSelection + ) -> dask.dataframe: + """Append report year to df to distinguish data from other years.""" + self.cols_added.append("report_year") + selection = self._metadata._get_partition_selection(partition) + return df.assign(report_year=selection) + + def validate( + self, df: dask.dataframe, page: str, **partition: PartitionSelection + ) -> dask.dataframe: + """Skip this step, as we aren't renaming any columns.""" + return df + + def combine(self, dfs: list[dask.dataframe], page: str) -> dask.dataframe: + """Concatenate dataframes into one, take any special steps for processing final page.""" + df = dd.concat(dfs, sort=True, ignore_index=True) + + return self.process_final_page(df, page) + + +raw_vceregen__all_dfs = raw_df_factory(Extractor, name="vceregen") + + +@asset +def raw_vceregen__fixed_solar_pv_lat_upv(raw_vceregen__all_dfs): + """Extract raw EIA company data from CSV sheets into dataframes. + + Returns: + An extracted EIA 176 dataframe. + """ + return Output(value=raw_vceregen__all_dfs["fixed_solar_pv_lat_upv"]) diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index 110708d87c..c92c006d61 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -97,3 +97,5 @@ datasets: technology_types: ["wind"] processing_levels: ["extended"] daily_weather: true + vceregen: + years: [2023] diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index 80eec1d625..c8c2ea3616 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -360,3 +360,5 @@ datasets: technology_types: ["wind", "solar"] processing_levels: ["extended"] daily_weather: true + vceregen: + years: [2019, 2020, 2021, 2022, 2023] diff --git a/src/pudl/package_data/vceregen/__init__.py b/src/pudl/package_data/vceregen/__init__.py new file mode 100644 index 0000000000..8aa0254b30 --- /dev/null +++ b/src/pudl/package_data/vceregen/__init__.py @@ -0,0 +1 @@ +"""CSV file extraction maps for EIA 176.""" diff --git a/src/pudl/package_data/vceregen/file_map.csv b/src/pudl/package_data/vceregen/file_map.csv new file mode 100644 index 0000000000..c1c644dcaf --- /dev/null +++ b/src/pudl/package_data/vceregen/file_map.csv @@ -0,0 +1,4 @@ +page,2019,2020,2021,2022,2023 +offshore_wind_power_140m,Wind_Power_140m_Offshore_county.csv,Wind_Power_140m_Offshore_county.csv,Wind_Power_140m_Offshore_county.csv,Wind_Power_140m_Offshore_county.csv,Wind_Power_140m_Offshore_county.csv +onshore_wind_power_100m,Wind_Power_100m_Onshore_county.csv,Wind_Power_100m_Onshore_county.csv,Wind_Power_100m_Onshore_county.csv,Wind_Power_100m_Onshore_county.csv,Wind_Power_100m_Onshore_county.csv +fixed_solar_pv_lat_upv,Fixed_SolarPV_Lat_UPV_county.csv,Fixed_SolarPV_Lat_UPV_county.csv,Fixed_SolarPV_Lat_UPV_county.csv,Fixed_SolarPV_Lat_UPV_county.csv,Fixed_SolarPV_Lat_UPV_county.csv diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 7a8eda2f06..d087d0a8d4 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -401,6 +401,18 @@ class EiaAeoSettings(GenericDatasetSettings): years: list[int] = data_source.working_partitions["years"] +class VCERegenSettings(GenericDatasetSettings): + """An immutable pydantic model to validate VCE Renewable Generation Profile settings. + + Args: + data_source: DataSource metadata object + years: VCE report years to use. + """ + + data_source: ClassVar[DataSource] = DataSource.from_id("vceregen") + years: list[int] = data_source.working_partitions["years"] + + class GlueSettings(FrozenBaseModel): """An immutable pydantic model to validate Glue settings. @@ -571,6 +583,7 @@ class DatasetsSettings(FrozenBaseModel): phmsagas: PhmsaGasSettings | None = None nrelatb: NrelAtbSettings | None = None gridpathratoolkit: GridPathRAToolkitSettings | None = None + vceregen: VCERegenSettings | None = None @model_validator(mode="before") @classmethod @@ -592,6 +605,7 @@ def default_load_all(cls, data: dict[str, Any]) -> dict[str, Any]: data["phmsagas"] = PhmsaGasSettings() data["nrelatb"] = NrelAtbSettings() data["gridpathratoolkit"] = GridPathRAToolkitSettings() + data["vceregen"] = VCERegenSettings() return data diff --git a/src/pudl/workspace/datastore.py b/src/pudl/workspace/datastore.py index 96c28feac4..6ff22548a6 100644 --- a/src/pudl/workspace/datastore.py +++ b/src/pudl/workspace/datastore.py @@ -208,6 +208,7 @@ class ZenodoDoiSettings(BaseSettings): gridpathratoolkit: ZenodoDoi = "10.5281/zenodo.10892394" phmsagas: ZenodoDoi = "10.5281/zenodo.10493790" nrelatb: ZenodoDoi = "10.5281/zenodo.12658647" + vceregen: ZenodoDoi = "10.5281/zenodo.12345678" # TODO: PLACEHOLDER!! model_config = SettingsConfigDict( env_prefix="pudl_zenodo_doi_", env_file=".env", extra="ignore" From e044e936ad355cb1bb511ec4a54339b2b9327c54 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 2 Oct 2024 12:54:29 -0400 Subject: [PATCH 05/20] Extract VCE tables to raw dask dfs --- src/pudl/extract/extractor.py | 105 ++++++++++++++++++++++++++-------- src/pudl/extract/vceregen.py | 92 ++++++++++++++++++----------- 2 files changed, 139 insertions(+), 58 deletions(-) diff --git a/src/pudl/extract/extractor.py b/src/pudl/extract/extractor.py index 605333bb68..ae5e8588e4 100644 --- a/src/pudl/extract/extractor.py +++ b/src/pudl/extract/extractor.py @@ -5,6 +5,7 @@ from collections import defaultdict from typing import Any +import dask.dataframe as dd import pandas as pd from dagster import ( AssetsDefinition, @@ -13,6 +14,7 @@ DynamicOutput, In, OpDefinition, + Out, TypeCheckContext, graph_asset, op, @@ -22,9 +24,63 @@ StrInt = str | int PartitionSelection = list[StrInt] | tuple[StrInt] | StrInt +DataframeType = pd.DataFrame | dd.DataFrame logger = pudl.logging_helpers.get_logger(__name__) +# Define some custom dagster data types +# 2024-03-27: Dagster can't automatically convert union types within +# parametrized types; we have to write our own custom DagsterType for now. + + +def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool: + if not isinstance(x, dict): + return False + for key, value in x.items(): + if not isinstance(key, str): + return False + if not isinstance(value, str | int): + return False + return True + + +dagster_dict_str_strint = DagsterType( + name="dict[str, str | int]", type_check_fn=_is_dict_str_strint +) + + +def _is_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool: + if not isinstance(x, dict): + return False + for key, value in x.items(): + if not isinstance(key, str): + return False + if not isinstance(value, DataframeType): + return False + return True + + +dataframe_dagster_type = DagsterType( + name="DataFrame Type Check", type_check_fn=_is_dict_str_dataframe +) + + +def _is_list_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool: + if not isinstance(x, list): + return False + for item in x: + for key, value in item.items(): + if not isinstance(key, str): + return False + if not isinstance(value, DataframeType): + return False + return True + + +list_dataframe_dagster_type = DagsterType( + name="List DataFrame Type Check", type_check_fn=_is_list_dict_str_dataframe +) + class GenericMetadata: """Load generic metadata from Python package data. @@ -197,7 +253,7 @@ def validate(self, df: pd.DataFrame, page: str, **partition: PartitionSelection) f"\n{missing_raw_cols}" ) - def process_final_page(self, df: pd.DataFrame, page: str) -> pd.DataFrame: + def process_final_page(self, df: DataframeType, page: str) -> DataframeType: """Final processing stage applied to a page DataFrame.""" return df @@ -214,7 +270,7 @@ def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: return self.process_final_page(df, page) - def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]: + def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: """Extracts dataframes. Returns dict where keys are page names and values are @@ -263,8 +319,12 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]: return all_page_dfs -@op(tags={"memory-use": "high"}) -def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: +@op( + tags={"memory-use": "high"}, + ins={"paged_dfs": In(dagster_type=list[dataframe_dagster_type])}, + out=Out(dagster_type=dataframe_dagster_type), +) +def concat_pages(paged_dfs: list[dict[str, DataframeType]]) -> dict[str, DataframeType]: """Concatenate similar pages of data from different years into single dataframes. Transform a list of dictionaries of dataframes into a single dictionary of @@ -285,39 +345,33 @@ def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataF A dictionary of DataFrames keyed by page name, where the DataFrame contains that page's data from all extracted years concatenated together. """ + # Figure out what's in each dataframe. + dtypes = [type(item) for dictionary in paged_dfs for item in dictionary.values()] + # Transform the list of dictionaries of dataframes into a dictionary of lists of # dataframes, in which all dataframes in each list represent different instances of # the same page of data from different years + all_data = defaultdict(list) for dfs in paged_dfs: for page in dfs: all_data[page].append(dfs[page]) # concatenate the dataframes in each list in the dictionary into a single dataframe - for page in all_data: - all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) + if all(x == pd.DataFrame for x in dtypes): # If all dfs are pandas dfs + logger.warn("Concatenating pandas dataframes.") + for page in all_data: + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) + elif all(x == dd.DataFrame for x in dtypes): # If all dfs are dask dfs + logger.warn("Concatenating pandas dataframes.") + for page in all_data: + all_data[page] = dd.concat(all_data[page]) + else: + raise AssertionError(f"Concatenation not supported for dtypes: {dtypes}") return all_data -def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool: - if not isinstance(x, dict): - return False - for key, value in x.items(): - if not isinstance(key, str): - return False - if not isinstance(value, str | int): - return False - return True - - -# 2024-03-27: Dagster can't automatically convert union types within -# parametrized types; we have to write our own custom DagsterType for now. -dagster_dict_str_strint = DagsterType( - name="dict[str, str | int]", type_check_fn=_is_dict_str_strint -) - - def partition_extractor_factory( extractor_cls: type[GenericExtractor], name: str ) -> OpDefinition: @@ -332,10 +386,11 @@ def partition_extractor_factory( required_resource_keys={"datastore"}, name=f"extract_single_{name}_partition", ins={"part_dict": In(dagster_type=dagster_dict_str_strint)}, + out=Out(dagster_type=dataframe_dagster_type), ) def extract_single_partition( context, part_dict: dict[str, str | int] - ) -> dict[str, pd.DataFrame]: + ) -> dict[str, DataframeType]: """A function that extracts a year of spreadsheet data from an Excel file. This function will be decorated with a Dagster op and returned. diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py index 2a1cbf7eed..4538d390a9 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vceregen.py @@ -4,16 +4,20 @@ import because we'll pivot these to a column. We adapt the standard extraction infrastructure to simply read in the data. -Each zip folder contains a folder with three files: +Each annual zip folder contains a folder with three files: Wind_Power_140m_Offshore_county.csv Wind_Power_100m_Onshore_county.csv Fixed_SolarPV_Lat_UPV_county.csv -The drive also contains one more file: RA_county_lat_long_FIPS_table.csv. +The drive also contains one more file: RA_county_lat_long_FIPS_table.csv. This file is +not partitioned, so we always read it in regardless of the partitions configured for the +run. """ -import dask -from dagster import Output, asset +from io import BytesIO + +import pandas as pd +from dagster import AssetsDefinition, Output, asset from dask import dataframe as dd from pudl import logging_helpers @@ -22,6 +26,12 @@ logger = logging_helpers.get_logger(__name__) +VCEREGEN_PAGES = [ + "offshore_wind_power_140m", + "onshore_wind_power_100m", + "fixed_solar_pv_lat_upv", +] + class VCEMetadata(GenericMetadata): """Special metadata class for VCE renewable generation profiles.""" @@ -37,11 +47,7 @@ def __init__(self, *args, **kwargs): def get_all_pages(self) -> list[str]: """Hard code the page names, which usually are pulled from column rename spreadsheets.""" - return [ - "offshore_wind_power_140m", - "onshore_wind_power_100m", - "fixed_solar_pv_lat_upv", - ] + return VCEREGEN_PAGES def get_file_name(self, page, **partition): """Returns file name of given partition and page.""" @@ -67,6 +73,10 @@ def get_column_map(self, page, **partition): def source_filename(self, page: str, **partition: PartitionSelection) -> str: """Produce the CSV file name as it will appear in the archive. + The files are nested in an additional folder with the year name inside of the + zipfile, so we add a prefix folder based on the yearly partition to the source + filename. + Args: page: pudl name for the dataset contents, eg "boiler_generator_assn" or "coal_stocks" @@ -77,12 +87,9 @@ def source_filename(self, page: str, **partition: PartitionSelection) -> str: Returns: string name of the CSV file """ - logger.warn( - f"{partition['year']}/{self._metadata.get_file_name(page, **partition)}" - ) return f"{partition['year']}/{self._metadata.get_file_name(page, **partition)}" - def load_source(self, page: str, **partition: PartitionSelection) -> dask.dataframe: + def load_source(self, page: str, **partition: PartitionSelection) -> pd.DataFrame: """Produce the dataframe object for the given partition. Args: @@ -95,38 +102,39 @@ def load_source(self, page: str, **partition: PartitionSelection) -> dask.datafr Returns: pd.DataFrame instance containing CSV data """ - filename = f"{partition['year']}/{self.source_filename(page, **partition)}" - logger.warn(f"Opening file {filename}") - with ( self.ds.get_zipfile_resource(self._dataset_name, **partition) as zf, ): + # # Get path to zipfile + # zippath = zf.filename + # Get list of file names in the zipfile files = zf.namelist() - file = next((x for x in files if filename in x), None) - logger.warn( - x for x in files if {self.source_filename(page, **partition)} in x + # Get the particular file of interest + file = next( + (x for x in files if self.source_filename(page, **partition) in x), None ) - logger.warn(file) - df = dd.read_csv(file, **self.READ_CSV_KWARGS) + # # Read it in using dask + df = pd.read_csv(BytesIO(zf.read(file)), **self.READ_CSV_KWARGS) return df def process_raw( - self, df: dask.dataframe, page: str, **partition: PartitionSelection - ) -> dask.dataframe: + self, df: pd.DataFrame, page: str, **partition: PartitionSelection + ) -> pd.DataFrame: """Append report year to df to distinguish data from other years.""" self.cols_added.append("report_year") selection = self._metadata._get_partition_selection(partition) return df.assign(report_year=selection) def validate( - self, df: dask.dataframe, page: str, **partition: PartitionSelection - ) -> dask.dataframe: + self, df: pd.DataFrame, page: str, **partition: PartitionSelection + ) -> pd.DataFrame: """Skip this step, as we aren't renaming any columns.""" return df - def combine(self, dfs: list[dask.dataframe], page: str) -> dask.dataframe: + def combine(self, dfs: list[pd.DataFrame], page: str) -> dd.DataFrame: """Concatenate dataframes into one, take any special steps for processing final page.""" + dfs = [dd.from_pandas(df, npartitions=2) for df in dfs] df = dd.concat(dfs, sort=True, ignore_index=True) return self.process_final_page(df, page) @@ -135,11 +143,29 @@ def combine(self, dfs: list[dask.dataframe], page: str) -> dask.dataframe: raw_vceregen__all_dfs = raw_df_factory(Extractor, name="vceregen") -@asset -def raw_vceregen__fixed_solar_pv_lat_upv(raw_vceregen__all_dfs): - """Extract raw EIA company data from CSV sheets into dataframes. +def raw_vceregen_asset_factory(part: str) -> AssetsDefinition: + """An asset factory for VCE hourly renewable generation profiles.""" + asset_kwargs = { + "name": f"raw_vceregen__{part}", + "required_resource_keys": {"datastore", "dataset_settings"}, + "compute_kind": "Python", + } + + @asset(**asset_kwargs) + def _extract(context, raw_vceregen__all_dfs): + """Extract raw GridPath RA Toolkit renewable energy generation profiles. + + Args: + context: dagster keyword that provides access to resources and config. + """ + return Output(value=raw_vceregen__all_dfs[part]) + + return _extract + + +raw_vceregen_assets = [raw_vceregen_asset_factory(part) for part in VCEREGEN_PAGES] - Returns: - An extracted EIA 176 dataframe. - """ - return Output(value=raw_vceregen__all_dfs["fixed_solar_pv_lat_upv"]) +# TODO: Figure out how to handle partition for this file! +# @asset +# def raw_vcegen__lat_lon_fips(ds: Datastore): +# return pd.read_csv(BytesIO(ds.get_unique_resource("vceregen", part=part))) From 57ad9a77c379ff3551c3cad548656719f328acc1 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 2 Oct 2024 13:28:18 -0400 Subject: [PATCH 06/20] Clean up warnings and restore EIA 176 --- src/pudl/extract/eia176.py | 8 ++++++++ src/pudl/extract/extractor.py | 6 +++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 279053a571..484fedaf83 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -19,6 +19,14 @@ def __init__(self, *args, **kwargs): self.METADATA = GenericMetadata("eia176") super().__init__(*args, **kwargs) + def get_page_cols(self, page: str, partition_key: str) -> list[str]: + """Get the columns for a particular page and partition key. + + EIA 176 data has the same set of columns for all years, + so regardless of the partition key provided we select the same columns here. + """ + return super().get_page_cols(page, "any_year") + def process_raw( self, df: pd.DataFrame, page: str, **partition: PartitionSelection ) -> pd.DataFrame: diff --git a/src/pudl/extract/extractor.py b/src/pudl/extract/extractor.py index ae5e8588e4..e63ffa02af 100644 --- a/src/pudl/extract/extractor.py +++ b/src/pudl/extract/extractor.py @@ -291,7 +291,7 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: ) return all_page_dfs logger.info(f"Extracting {self._dataset_name} spreadsheet data.") - logger.warn(self._metadata.get_all_pages()) + for page in self._metadata.get_all_pages(): if page in self.BLACKLISTED_PAGES: logger.debug(f"Skipping blacklisted page {page}.") @@ -299,13 +299,13 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: current_page_dfs = [ pd.DataFrame(), ] - logger.warn(f"Extracting {page}") + for partition in pudl.helpers.iterate_multivalue_dict(**partitions): # we are going to skip if self.source_filename(page, **partition) == "-1": logger.debug(f"No page for {self._dataset_name} {page} {partition}") continue - logger.warning( + logger.debug( f"Loading dataframe for {self._dataset_name} {page} {partition}" ) df = self.load_source(page, **partition) From b922328a567f645263c1f35545974b883aac2165 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 2 Oct 2024 15:23:29 -0400 Subject: [PATCH 07/20] Revert to pandas concatenation --- src/pudl/extract/vceregen.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py index 4538d390a9..d9765c7437 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vceregen.py @@ -18,7 +18,6 @@ import pandas as pd from dagster import AssetsDefinition, Output, asset -from dask import dataframe as dd from pudl import logging_helpers from pudl.extract.csv import CsvExtractor @@ -132,10 +131,12 @@ def validate( """Skip this step, as we aren't renaming any columns.""" return df - def combine(self, dfs: list[pd.DataFrame], page: str) -> dd.DataFrame: + def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: """Concatenate dataframes into one, take any special steps for processing final page.""" - dfs = [dd.from_pandas(df, npartitions=2) for df in dfs] - df = dd.concat(dfs, sort=True, ignore_index=True) + # dfs = [dd.from_pandas(df, npartitions=2) for df in dfs] + # df = dd.concat(dfs) + # # TODO: Confirm that using pandas is preferable. Otherwise revert to this code. + df = pd.concat(dfs, sort=True, ignore_index=True) return self.process_final_page(df, page) From 53934f248360fd6cae83b1e87bb7d0ec60fc55ac Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 2 Oct 2024 16:34:18 -0400 Subject: [PATCH 08/20] Add latlonfips --- src/pudl/extract/vceregen.py | 19 +++++++++++++++---- src/pudl/package_data/settings/etl_fast.yml | 1 + src/pudl/package_data/settings/etl_full.yml | 1 + src/pudl/settings.py | 1 + 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py index d9765c7437..6ec0e08905 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vceregen.py @@ -166,7 +166,18 @@ def _extract(context, raw_vceregen__all_dfs): raw_vceregen_assets = [raw_vceregen_asset_factory(part) for part in VCEREGEN_PAGES] -# TODO: Figure out how to handle partition for this file! -# @asset -# def raw_vcegen__lat_lon_fips(ds: Datastore): -# return pd.read_csv(BytesIO(ds.get_unique_resource("vceregen", part=part))) + +@asset(required_resource_keys={"datastore", "dataset_settings"}) +def raw_vcegen__lat_lon_fips(context) -> pd.DataFrame: + """Extract lat/lon to FIPS and county mapping CSV. + + This dataframe is static, so it has a distinct partition from the other datasets and + its extraction is controlled by a boolean in the ETL run. + """ + ds = context.resources.datastore + partition_settings = context.resources.dataset_settings.vceregen + if partition_settings.fips: + return pd.read_csv( + BytesIO(ds.get_unique_resource("vceregen", fips=partition_settings.fips)) + ) + return pd.DataFrame() # TODO: What makes sense here? diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index c92c006d61..c3592b5bb1 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -99,3 +99,4 @@ datasets: daily_weather: true vceregen: years: [2023] + fips: True diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index c8c2ea3616..bfa8cb9a23 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -362,3 +362,4 @@ datasets: daily_weather: true vceregen: years: [2019, 2020, 2021, 2022, 2023] + fips: True diff --git a/src/pudl/settings.py b/src/pudl/settings.py index d087d0a8d4..d0e0830b95 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -411,6 +411,7 @@ class VCERegenSettings(GenericDatasetSettings): data_source: ClassVar[DataSource] = DataSource.from_id("vceregen") years: list[int] = data_source.working_partitions["years"] + fips: bool = True class GlueSettings(FrozenBaseModel): From 3677f78b4e382b33d5ea141a99e580d3d32e3449 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Tue, 8 Oct 2024 15:36:46 -0400 Subject: [PATCH 09/20] Remove dask, coerce dtypes on read-in --- src/pudl/extract/extractor.py | 106 ++++++++-------------------------- src/pudl/extract/vceregen.py | 22 ++++--- 2 files changed, 38 insertions(+), 90 deletions(-) diff --git a/src/pudl/extract/extractor.py b/src/pudl/extract/extractor.py index e63ffa02af..57e473e8f3 100644 --- a/src/pudl/extract/extractor.py +++ b/src/pudl/extract/extractor.py @@ -5,7 +5,6 @@ from collections import defaultdict from typing import Any -import dask.dataframe as dd import pandas as pd from dagster import ( AssetsDefinition, @@ -14,7 +13,6 @@ DynamicOutput, In, OpDefinition, - Out, TypeCheckContext, graph_asset, op, @@ -24,63 +22,9 @@ StrInt = str | int PartitionSelection = list[StrInt] | tuple[StrInt] | StrInt -DataframeType = pd.DataFrame | dd.DataFrame logger = pudl.logging_helpers.get_logger(__name__) -# Define some custom dagster data types -# 2024-03-27: Dagster can't automatically convert union types within -# parametrized types; we have to write our own custom DagsterType for now. - - -def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool: - if not isinstance(x, dict): - return False - for key, value in x.items(): - if not isinstance(key, str): - return False - if not isinstance(value, str | int): - return False - return True - - -dagster_dict_str_strint = DagsterType( - name="dict[str, str | int]", type_check_fn=_is_dict_str_strint -) - - -def _is_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool: - if not isinstance(x, dict): - return False - for key, value in x.items(): - if not isinstance(key, str): - return False - if not isinstance(value, DataframeType): - return False - return True - - -dataframe_dagster_type = DagsterType( - name="DataFrame Type Check", type_check_fn=_is_dict_str_dataframe -) - - -def _is_list_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool: - if not isinstance(x, list): - return False - for item in x: - for key, value in item.items(): - if not isinstance(key, str): - return False - if not isinstance(value, DataframeType): - return False - return True - - -list_dataframe_dagster_type = DagsterType( - name="List DataFrame Type Check", type_check_fn=_is_list_dict_str_dataframe -) - class GenericMetadata: """Load generic metadata from Python package data. @@ -253,7 +197,7 @@ def validate(self, df: pd.DataFrame, page: str, **partition: PartitionSelection) f"\n{missing_raw_cols}" ) - def process_final_page(self, df: DataframeType, page: str) -> DataframeType: + def process_final_page(self, df: pd.DataFrame, page: str) -> pd.DataFrame: """Final processing stage applied to a page DataFrame.""" return df @@ -270,7 +214,7 @@ def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: return self.process_final_page(df, page) - def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: + def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]: """Extracts dataframes. Returns dict where keys are page names and values are @@ -299,7 +243,6 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: current_page_dfs = [ pd.DataFrame(), ] - for partition in pudl.helpers.iterate_multivalue_dict(**partitions): # we are going to skip if self.source_filename(page, **partition) == "-1": @@ -319,12 +262,8 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]: return all_page_dfs -@op( - tags={"memory-use": "high"}, - ins={"paged_dfs": In(dagster_type=list[dataframe_dagster_type])}, - out=Out(dagster_type=dataframe_dagster_type), -) -def concat_pages(paged_dfs: list[dict[str, DataframeType]]) -> dict[str, DataframeType]: +@op(tags={"memory-use": "high"}) +def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: """Concatenate similar pages of data from different years into single dataframes. Transform a list of dictionaries of dataframes into a single dictionary of @@ -345,33 +284,39 @@ def concat_pages(paged_dfs: list[dict[str, DataframeType]]) -> dict[str, Datafra A dictionary of DataFrames keyed by page name, where the DataFrame contains that page's data from all extracted years concatenated together. """ - # Figure out what's in each dataframe. - dtypes = [type(item) for dictionary in paged_dfs for item in dictionary.values()] - # Transform the list of dictionaries of dataframes into a dictionary of lists of # dataframes, in which all dataframes in each list represent different instances of # the same page of data from different years - all_data = defaultdict(list) for dfs in paged_dfs: for page in dfs: all_data[page].append(dfs[page]) # concatenate the dataframes in each list in the dictionary into a single dataframe - if all(x == pd.DataFrame for x in dtypes): # If all dfs are pandas dfs - logger.warn("Concatenating pandas dataframes.") - for page in all_data: - all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) - elif all(x == dd.DataFrame for x in dtypes): # If all dfs are dask dfs - logger.warn("Concatenating pandas dataframes.") - for page in all_data: - all_data[page] = dd.concat(all_data[page]) - else: - raise AssertionError(f"Concatenation not supported for dtypes: {dtypes}") + for page in all_data: + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) return all_data +def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool: + if not isinstance(x, dict): + return False + for key, value in x.items(): + if not isinstance(key, str): + return False + if not isinstance(value, str | int): + return False + return True + + +# 2024-03-27: Dagster can't automatically convert union types within +# parametrized types; we have to write our own custom DagsterType for now. +dagster_dict_str_strint = DagsterType( + name="dict[str, str | int]", type_check_fn=_is_dict_str_strint +) + + def partition_extractor_factory( extractor_cls: type[GenericExtractor], name: str ) -> OpDefinition: @@ -386,11 +331,10 @@ def partition_extractor_factory( required_resource_keys={"datastore"}, name=f"extract_single_{name}_partition", ins={"part_dict": In(dagster_type=dagster_dict_str_strint)}, - out=Out(dagster_type=dataframe_dagster_type), ) def extract_single_partition( context, part_dict: dict[str, str | int] - ) -> dict[str, DataframeType]: + ) -> dict[str, pd.DataFrame]: """A function that extracts a year of spreadsheet data from an Excel file. This function will be decorated with a Dagster op and returned. diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py index 6ec0e08905..5be7def7cf 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vceregen.py @@ -14,8 +14,10 @@ run. """ +from collections import defaultdict from io import BytesIO +import numpy as np import pandas as pd from dagster import AssetsDefinition, Output, asset @@ -104,16 +106,22 @@ def load_source(self, page: str, **partition: PartitionSelection) -> pd.DataFram with ( self.ds.get_zipfile_resource(self._dataset_name, **partition) as zf, ): - # # Get path to zipfile - # zippath = zf.filename # Get list of file names in the zipfile files = zf.namelist() # Get the particular file of interest file = next( (x for x in files if self.source_filename(page, **partition) in x), None ) - # # Read it in using dask - df = pd.read_csv(BytesIO(zf.read(file)), **self.READ_CSV_KWARGS) + + # Read it in using pandas + # Set all dtypes except for the first unnamed hours column + # to be float32 to reduce memory on read-in + dtype_dict = defaultdict(lambda: np.float32) + dtype_dict["Unnamed: 0"] = ( + "int" # Set first unnamed column (hours) to be an integer. + ) + + df = pd.read_csv(BytesIO(zf.read(file)), dtype=dtype_dict) return df @@ -133,9 +141,6 @@ def validate( def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: """Concatenate dataframes into one, take any special steps for processing final page.""" - # dfs = [dd.from_pandas(df, npartitions=2) for df in dfs] - # df = dd.concat(dfs) - # # TODO: Confirm that using pandas is preferable. Otherwise revert to this code. df = pd.concat(dfs, sort=True, ignore_index=True) return self.process_final_page(df, page) @@ -149,7 +154,6 @@ def raw_vceregen_asset_factory(part: str) -> AssetsDefinition: asset_kwargs = { "name": f"raw_vceregen__{part}", "required_resource_keys": {"datastore", "dataset_settings"}, - "compute_kind": "Python", } @asset(**asset_kwargs) @@ -180,4 +184,4 @@ def raw_vcegen__lat_lon_fips(context) -> pd.DataFrame: return pd.read_csv( BytesIO(ds.get_unique_resource("vceregen", fips=partition_settings.fips)) ) - return pd.DataFrame() # TODO: What makes sense here? + return pd.DataFrame() From c870b636eaa5c318821e9b4f62ccc5c95e7602f5 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Tue, 8 Oct 2024 16:03:11 -0400 Subject: [PATCH 10/20] override load_column_maps behavior --- src/pudl/extract/extractor.py | 17 +++++++++++------ src/pudl/extract/vceregen.py | 4 ++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/pudl/extract/extractor.py b/src/pudl/extract/extractor.py index 57e473e8f3..c5a80a8c37 100644 --- a/src/pudl/extract/extractor.py +++ b/src/pudl/extract/extractor.py @@ -49,12 +49,7 @@ def __init__(self, dataset_name: str): self._dataset_name = dataset_name self._pkg = f"pudl.package_data.{dataset_name}" column_map_pkg = self._pkg + ".column_maps" - self._column_map = {} - for res_path in importlib.resources.files(column_map_pkg).iterdir(): - # res_path is expected to end with ${page}.csv - if res_path.suffix == ".csv": - column_map = self._load_csv(column_map_pkg, res_path.name) - self._column_map[res_path.stem] = column_map + self._column_map = self._load_column_maps(column_map_pkg) def get_dataset_name(self) -> str: """Returns the name of the dataset described by this metadata.""" @@ -66,6 +61,16 @@ def _load_csv(self, package: str, filename: str) -> pd.DataFrame: importlib.resources.files(package) / filename, index_col=0, comment="#" ) + def _load_column_maps(self, column_map_pkg: str) -> dict: + """Create a dictionary of all column mapping CSVs to use in get_column_map().""" + column_dict = {} + for res_path in importlib.resources.files(column_map_pkg).iterdir(): + # res_path is expected to end with ${page}.csv + if res_path.suffix == ".csv": + column_map = self._load_csv(column_map_pkg, res_path.name) + column_dict[res_path.stem] = column_map + return column_dict + def _get_partition_selection(self, partition: dict[str, PartitionSelection]) -> str: """Grab the partition key.""" partition_names = list(partition.keys()) diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vceregen.py index 5be7def7cf..191787ea6d 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vceregen.py @@ -46,6 +46,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._file_name = self._load_csv(self._pkg, "file_map.csv") + def _load_column_maps(self, column_map_pkg) -> dict: + """There are no column maps to load, so return an empty dictionary.""" + return {} + def get_all_pages(self) -> list[str]: """Hard code the page names, which usually are pulled from column rename spreadsheets.""" return VCEREGEN_PAGES From 291ba7d402fa101da77cff8f2cc650bfe9da4a44 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Fri, 11 Oct 2024 15:25:43 -0400 Subject: [PATCH 11/20] Update DOI to sandbox and temporarily xfail DOI test --- src/pudl/workspace/datastore.py | 2 +- test/unit/workspace/datastore_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pudl/workspace/datastore.py b/src/pudl/workspace/datastore.py index 6ff22548a6..e83e7f04db 100644 --- a/src/pudl/workspace/datastore.py +++ b/src/pudl/workspace/datastore.py @@ -208,7 +208,7 @@ class ZenodoDoiSettings(BaseSettings): gridpathratoolkit: ZenodoDoi = "10.5281/zenodo.10892394" phmsagas: ZenodoDoi = "10.5281/zenodo.10493790" nrelatb: ZenodoDoi = "10.5281/zenodo.12658647" - vceregen: ZenodoDoi = "10.5281/zenodo.12345678" # TODO: PLACEHOLDER!! + vceregen: ZenodoDoi = "10.5072/zenodo.116832" # TODO: UPDATE TEMP SANDBOX DOI model_config = SettingsConfigDict( env_prefix="pudl_zenodo_doi_", env_file=".env", extra="ignore" diff --git a/test/unit/workspace/datastore_test.py b/test/unit/workspace/datastore_test.py index d6840ad1a6..d8d05c8790 100644 --- a/test/unit/workspace/datastore_test.py +++ b/test/unit/workspace/datastore_test.py @@ -255,6 +255,7 @@ def setUp(self): } ) + @pytest.mark.xfail def test_doi_format_is_correct(self): """Verifies ZenodoFetcher DOIs have correct format and are not sandbox DOIs. From 3eaebe6930ba75ea127d5edd58008cf9f6ff72b4 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 08:47:16 -0400 Subject: [PATCH 12/20] Update regen to rare --- src/pudl/etl/__init__.py | 2 +- src/pudl/extract/__init__.py | 2 +- src/pudl/extract/{vceregen.py => vcerare.py} | 28 +++++++++---------- src/pudl/package_data/settings/etl_fast.yml | 2 +- src/pudl/package_data/settings/etl_full.yml | 2 +- src/pudl/package_data/vcerare/__init__.py | 1 + .../{vceregen => vcerare}/file_map.csv | 0 src/pudl/package_data/vceregen/__init__.py | 1 - src/pudl/settings.py | 10 +++---- src/pudl/workspace/datastore.py | 2 +- 10 files changed, 25 insertions(+), 25 deletions(-) rename src/pudl/extract/{vceregen.py => vcerare.py} (88%) create mode 100644 src/pudl/package_data/vcerare/__init__.py rename src/pudl/package_data/{vceregen => vcerare}/file_map.csv (100%) delete mode 100644 src/pudl/package_data/vceregen/__init__.py diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index b3c67ac438..5b9eaaa8ed 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -59,7 +59,7 @@ "raw_gridpathratoolkit": [pudl.extract.gridpathratoolkit], "raw_phmsagas": [pudl.extract.phmsagas], "raw_nrelatb": [pudl.extract.nrelatb], - "raw_vceregen": [pudl.extract.vceregen], + "raw_vcerare": [pudl.extract.vcerare], } diff --git a/src/pudl/extract/__init__.py b/src/pudl/extract/__init__.py index fb2b5c5d64..ebecf594b1 100644 --- a/src/pudl/extract/__init__.py +++ b/src/pudl/extract/__init__.py @@ -26,6 +26,6 @@ gridpathratoolkit, nrelatb, phmsagas, - vceregen, + vcerare, xbrl, ) diff --git a/src/pudl/extract/vceregen.py b/src/pudl/extract/vcerare.py similarity index 88% rename from src/pudl/extract/vceregen.py rename to src/pudl/extract/vcerare.py index 191787ea6d..8b8b1692c0 100644 --- a/src/pudl/extract/vceregen.py +++ b/src/pudl/extract/vcerare.py @@ -1,4 +1,4 @@ -"""Extract VCE renewable generation profile data from CSVs. +"""Extract VCE Resource Adequacy Renewable Energy (RARE) generation profile data. This dataset has 1,000s of columns, so we don't want to manually specify a rename on import because we'll pivot these to a column. We adapt the standard extraction @@ -9,7 +9,7 @@ Wind_Power_100m_Onshore_county.csv Fixed_SolarPV_Lat_UPV_county.csv -The drive also contains one more file: RA_county_lat_long_FIPS_table.csv. This file is +The drive also contains one more file: vce_county_lat_long_fips_table.csv. This file is not partitioned, so we always read it in regardless of the partitions configured for the run. """ @@ -27,7 +27,7 @@ logger = logging_helpers.get_logger(__name__) -VCEREGEN_PAGES = [ +VCERARE_PAGES = [ "offshore_wind_power_140m", "onshore_wind_power_100m", "fixed_solar_pv_lat_upv", @@ -35,7 +35,7 @@ class VCEMetadata(GenericMetadata): - """Special metadata class for VCE renewable generation profiles.""" + """Special metadata class for VCE RARE renewable generation profiles.""" def __init__(self, *args, **kwargs): """Initialize the module. @@ -52,7 +52,7 @@ def _load_column_maps(self, column_map_pkg) -> dict: def get_all_pages(self) -> list[str]: """Hard code the page names, which usually are pulled from column rename spreadsheets.""" - return VCEREGEN_PAGES + return VCERARE_PAGES def get_file_name(self, page, **partition): """Returns file name of given partition and page.""" @@ -68,7 +68,7 @@ def __init__(self, *args, **kwargs): Args: ds (:class:datastore.Datastore): Initialized datastore. """ - self.METADATA = VCEMetadata("vceregen") + self.METADATA = VCEMetadata("vcerare") super().__init__(*args, **kwargs) def get_column_map(self, page, **partition): @@ -150,29 +150,29 @@ def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: return self.process_final_page(df, page) -raw_vceregen__all_dfs = raw_df_factory(Extractor, name="vceregen") +raw_vcerare__all_dfs = raw_df_factory(Extractor, name="vcerare") -def raw_vceregen_asset_factory(part: str) -> AssetsDefinition: +def raw_vcerare_asset_factory(part: str) -> AssetsDefinition: """An asset factory for VCE hourly renewable generation profiles.""" asset_kwargs = { - "name": f"raw_vceregen__{part}", + "name": f"raw_vcerare__{part}", "required_resource_keys": {"datastore", "dataset_settings"}, } @asset(**asset_kwargs) - def _extract(context, raw_vceregen__all_dfs): + def _extract(context, raw_vcerare__all_dfs): """Extract raw GridPath RA Toolkit renewable energy generation profiles. Args: context: dagster keyword that provides access to resources and config. """ - return Output(value=raw_vceregen__all_dfs[part]) + return Output(value=raw_vcerare__all_dfs[part]) return _extract -raw_vceregen_assets = [raw_vceregen_asset_factory(part) for part in VCEREGEN_PAGES] +raw_vcerare_assets = [raw_vcerare_asset_factory(part) for part in VCERARE_PAGES] @asset(required_resource_keys={"datastore", "dataset_settings"}) @@ -183,9 +183,9 @@ def raw_vcegen__lat_lon_fips(context) -> pd.DataFrame: its extraction is controlled by a boolean in the ETL run. """ ds = context.resources.datastore - partition_settings = context.resources.dataset_settings.vceregen + partition_settings = context.resources.dataset_settings.vcerare if partition_settings.fips: return pd.read_csv( - BytesIO(ds.get_unique_resource("vceregen", fips=partition_settings.fips)) + BytesIO(ds.get_unique_resource("vcerare", fips=partition_settings.fips)) ) return pd.DataFrame() diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index c3592b5bb1..8a43f54693 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -97,6 +97,6 @@ datasets: technology_types: ["wind"] processing_levels: ["extended"] daily_weather: true - vceregen: + vcerare: years: [2023] fips: True diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index bfa8cb9a23..cf6e04fdac 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -360,6 +360,6 @@ datasets: technology_types: ["wind", "solar"] processing_levels: ["extended"] daily_weather: true - vceregen: + vcerare: years: [2019, 2020, 2021, 2022, 2023] fips: True diff --git a/src/pudl/package_data/vcerare/__init__.py b/src/pudl/package_data/vcerare/__init__.py new file mode 100644 index 0000000000..d27cd7506a --- /dev/null +++ b/src/pudl/package_data/vcerare/__init__.py @@ -0,0 +1 @@ +"""CSV file extraction maps for VCE RARE generation profiles.""" \ No newline at end of file diff --git a/src/pudl/package_data/vceregen/file_map.csv b/src/pudl/package_data/vcerare/file_map.csv similarity index 100% rename from src/pudl/package_data/vceregen/file_map.csv rename to src/pudl/package_data/vcerare/file_map.csv diff --git a/src/pudl/package_data/vceregen/__init__.py b/src/pudl/package_data/vceregen/__init__.py deleted file mode 100644 index 8aa0254b30..0000000000 --- a/src/pudl/package_data/vceregen/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""CSV file extraction maps for EIA 176.""" diff --git a/src/pudl/settings.py b/src/pudl/settings.py index d0e0830b95..1df038a96d 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -401,15 +401,15 @@ class EiaAeoSettings(GenericDatasetSettings): years: list[int] = data_source.working_partitions["years"] -class VCERegenSettings(GenericDatasetSettings): - """An immutable pydantic model to validate VCE Renewable Generation Profile settings. +class VCERARESettings(GenericDatasetSettings): + """An immutable pydantic model to validate VCE RARE generation profile settings. Args: data_source: DataSource metadata object years: VCE report years to use. """ - data_source: ClassVar[DataSource] = DataSource.from_id("vceregen") + data_source: ClassVar[DataSource] = DataSource.from_id("vcerare") years: list[int] = data_source.working_partitions["years"] fips: bool = True @@ -584,7 +584,7 @@ class DatasetsSettings(FrozenBaseModel): phmsagas: PhmsaGasSettings | None = None nrelatb: NrelAtbSettings | None = None gridpathratoolkit: GridPathRAToolkitSettings | None = None - vceregen: VCERegenSettings | None = None + vcerare: VCERARESettings | None = None @model_validator(mode="before") @classmethod @@ -606,7 +606,7 @@ def default_load_all(cls, data: dict[str, Any]) -> dict[str, Any]: data["phmsagas"] = PhmsaGasSettings() data["nrelatb"] = NrelAtbSettings() data["gridpathratoolkit"] = GridPathRAToolkitSettings() - data["vceregen"] = VCERegenSettings() + data["vcerare"] = VCERARESettings() return data diff --git a/src/pudl/workspace/datastore.py b/src/pudl/workspace/datastore.py index e83e7f04db..aa24b08455 100644 --- a/src/pudl/workspace/datastore.py +++ b/src/pudl/workspace/datastore.py @@ -208,7 +208,7 @@ class ZenodoDoiSettings(BaseSettings): gridpathratoolkit: ZenodoDoi = "10.5281/zenodo.10892394" phmsagas: ZenodoDoi = "10.5281/zenodo.10493790" nrelatb: ZenodoDoi = "10.5281/zenodo.12658647" - vceregen: ZenodoDoi = "10.5072/zenodo.116832" # TODO: UPDATE TEMP SANDBOX DOI + vcerare: ZenodoDoi = "10.5281/zenodo.13937523" model_config = SettingsConfigDict( env_prefix="pudl_zenodo_doi_", env_file=".env", extra="ignore" From 5b98e600bb71f775b2a90f00f42d0f045b20c2ca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 16 Oct 2024 13:44:33 +0000 Subject: [PATCH 13/20] [pre-commit.ci] auto fixes from pre-commit.com hooks For more information, see https://pre-commit.ci --- src/pudl/package_data/vcerare/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl/package_data/vcerare/__init__.py b/src/pudl/package_data/vcerare/__init__.py index d27cd7506a..b749b7d3f1 100644 --- a/src/pudl/package_data/vcerare/__init__.py +++ b/src/pudl/package_data/vcerare/__init__.py @@ -1 +1 @@ -"""CSV file extraction maps for VCE RARE generation profiles.""" \ No newline at end of file +"""CSV file extraction maps for VCE RARE generation profiles.""" From 77d47a4e893fa12e552fa3bc0e190400e028da9e Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 10:02:44 -0400 Subject: [PATCH 14/20] Update gsutil in zenodo-cache-sync --- .github/workflows/zenodo-cache-sync.yml | 2 +- src/pudl/package_data/vcerare/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/zenodo-cache-sync.yml b/.github/workflows/zenodo-cache-sync.yml index e8da8164eb..b073841c14 100644 --- a/.github/workflows/zenodo-cache-sync.yml +++ b/.github/workflows/zenodo-cache-sync.yml @@ -66,7 +66,7 @@ jobs: - name: Sync internal and public caches run: | - gsutil -u catalyst-cooperative-pudl -m rsync -dr ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} + gcloud storage rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} zenodo-cache-sync-notify: runs-on: ubuntu-latest diff --git a/src/pudl/package_data/vcerare/__init__.py b/src/pudl/package_data/vcerare/__init__.py index d27cd7506a..b749b7d3f1 100644 --- a/src/pudl/package_data/vcerare/__init__.py +++ b/src/pudl/package_data/vcerare/__init__.py @@ -1 +1 @@ -"""CSV file extraction maps for VCE RARE generation profiles.""" \ No newline at end of file +"""CSV file extraction maps for VCE RARE generation profiles.""" From 6dea3327cd1bf288a73ddb6f7e3aecd2b442b459 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 14:15:54 -0400 Subject: [PATCH 15/20] Add back user project --- .github/workflows/zenodo-cache-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/zenodo-cache-sync.yml b/.github/workflows/zenodo-cache-sync.yml index b073841c14..6956d37ddb 100644 --- a/.github/workflows/zenodo-cache-sync.yml +++ b/.github/workflows/zenodo-cache-sync.yml @@ -66,7 +66,7 @@ jobs: - name: Sync internal and public caches run: | - gcloud storage rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} + gcloud storage -u catalyst-cooperative-pudl rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} zenodo-cache-sync-notify: runs-on: ubuntu-latest From 7554a360069d9d5cfbfccc6dfb1fb682efd0638d Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 14:21:18 -0400 Subject: [PATCH 16/20] Update project path --- .github/workflows/zenodo-cache-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/zenodo-cache-sync.yml b/.github/workflows/zenodo-cache-sync.yml index 6956d37ddb..d855aae55d 100644 --- a/.github/workflows/zenodo-cache-sync.yml +++ b/.github/workflows/zenodo-cache-sync.yml @@ -66,7 +66,7 @@ jobs: - name: Sync internal and public caches run: | - gcloud storage -u catalyst-cooperative-pudl rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} + gcloud storage --project=catalyst-cooperative-pudl rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} zenodo-cache-sync-notify: runs-on: ubuntu-latest From 9ada9f5eec0e005bfb79824433611d66c5402477 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 14:24:50 -0400 Subject: [PATCH 17/20] Update project to billing project --- .github/workflows/zenodo-cache-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/zenodo-cache-sync.yml b/.github/workflows/zenodo-cache-sync.yml index d855aae55d..ccad618328 100644 --- a/.github/workflows/zenodo-cache-sync.yml +++ b/.github/workflows/zenodo-cache-sync.yml @@ -66,7 +66,7 @@ jobs: - name: Sync internal and public caches run: | - gcloud storage --project=catalyst-cooperative-pudl rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} + gcloud storage --billing-project=catalyst-cooperative-pudl rsync -r ${{ env.INTERNAL_ZENODO_CACHE_BUCKET }} ${{ env.PUBLIC_ZENODO_CACHE_BUCKET }} zenodo-cache-sync-notify: runs-on: ubuntu-latest From 4158afd619fe019dec8e9a412ee3a88080d715ee Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 14:48:19 -0400 Subject: [PATCH 18/20] Update dockerfile to replace gsutil with gcloud storage --- docker/Dockerfile | 2 +- docker/gcp_pudl_etl.sh | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6bd5268a43..0009a70167 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,7 +15,7 @@ RUN apt-get update && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* -# Configure gsutil authentication +# Configure gcloud authentication # hadolint ignore=DL3059 RUN printf '[GoogleCompute]\nservice_account = default' > /etc/boto.cfg diff --git a/docker/gcp_pudl_etl.sh b/docker/gcp_pudl_etl.sh index d972caa024..8569d92bef 100644 --- a/docker/gcp_pudl_etl.sh +++ b/docker/gcp_pudl_etl.sh @@ -71,7 +71,7 @@ function run_pudl_etl() { function save_outputs_to_gcs() { echo "Copying outputs to GCP bucket $PUDL_GCS_OUTPUT" && \ - gsutil -q -m cp -r "$PUDL_OUTPUT" "$PUDL_GCS_OUTPUT" && \ + gcloud storage --quiet cp -r "$PUDL_OUTPUT" "$PUDL_GCS_OUTPUT" && \ rm -f "$PUDL_OUTPUT/success" } @@ -85,12 +85,12 @@ function upload_to_dist_path() { # If the old outputs don't exist, these will exit with status 1, so we # don't && them with the rest of the commands. echo "Removing old outputs from $GCS_PATH." - gsutil -q -m -u "$GCP_BILLING_PROJECT" rm -r "$GCS_PATH" + gcloud storage --quiet --billing-project="$GCP_BILLING_PROJECT" rm -r "$GCS_PATH" echo "Removing old outputs from $AWS_PATH." aws s3 rm --quiet --recursive "$AWS_PATH" echo "Copying outputs to $GCS_PATH:" && \ - gsutil -q -m -u "$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/*" "$GCS_PATH" && \ + gcloud storage --quiet --billing-project="$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/*" "$GCS_PATH" && \ echo "Copying outputs to $AWS_PATH" && \ aws s3 cp --quiet --recursive "$PUDL_OUTPUT/" "$AWS_PATH" else @@ -113,12 +113,12 @@ function distribute_parquet() { DIST_PATH="$BUILD_REF" fi echo "Copying outputs to $PARQUET_BUCKET/$DIST_PATH" && \ - gsutil -q -m -u "$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/parquet/*" "$PARQUET_BUCKET/$DIST_PATH" + gcloud storage --quiet --billing-project="$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/parquet/*" "$PARQUET_BUCKET/$DIST_PATH" # If running a tagged release, ALSO update the stable distribution bucket path: if [[ "$GITHUB_ACTION_TRIGGER" == "push" && "$BUILD_REF" == v20* ]]; then echo "Copying outputs to $PARQUET_BUCKET/stable" && \ - gsutil -q -m -u "$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/parquet/*" "$PARQUET_BUCKET/stable" + gcloud storage --quiet --billing-project="$GCP_BILLING_PROJECT" cp -r "$PUDL_OUTPUT/parquet/*" "$PARQUET_BUCKET/stable" fi fi } @@ -298,13 +298,13 @@ if [[ $ETL_SUCCESS == 0 ]]; then # If running a tagged release, ensure that outputs can't be accidentally deleted # It's not clear that an object lock can be applied in S3 with the AWS CLI if [[ "$GITHUB_ACTION_TRIGGER" == "push" && "$BUILD_REF" == v20* ]]; then - gsutil -m -u catalyst-cooperative-pudl retention temp set "gs://pudl.catalyst.coop/$BUILD_REF/*" 2>&1 | tee -a "$LOGFILE" + gcloud storage --billing-project="catalyst-cooperative-pudl" objects update "gs://pudl.catalyst.coop/$BUILD_REF/*" --temporary-hold 2>&1 | tee -a "$LOGFILE" GCS_TEMPORARY_HOLD_SUCCESS=${PIPESTATUS[0]} fi fi # This way we also save the logs from latter steps in the script -gsutil -q cp "$LOGFILE" "$PUDL_GCS_OUTPUT" +gcloud storage --quiet cp "$LOGFILE" "$PUDL_GCS_OUTPUT" # Notify slack about entire pipeline's success or failure; if [[ $ETL_SUCCESS == 0 && \ From 7f45bff31840a590d597ef47437cc1edaaf4ee4e Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 16 Oct 2024 16:33:13 -0400 Subject: [PATCH 19/20] Update dataset name, use GCP_BILLING_PROJECT var --- docker/gcp_pudl_etl.sh | 2 +- src/pudl/extract/vcerare.py | 23 +++++++++++------------ src/pudl/settings.py | 10 +++++----- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/docker/gcp_pudl_etl.sh b/docker/gcp_pudl_etl.sh index 8569d92bef..7a9b79e9e1 100644 --- a/docker/gcp_pudl_etl.sh +++ b/docker/gcp_pudl_etl.sh @@ -298,7 +298,7 @@ if [[ $ETL_SUCCESS == 0 ]]; then # If running a tagged release, ensure that outputs can't be accidentally deleted # It's not clear that an object lock can be applied in S3 with the AWS CLI if [[ "$GITHUB_ACTION_TRIGGER" == "push" && "$BUILD_REF" == v20* ]]; then - gcloud storage --billing-project="catalyst-cooperative-pudl" objects update "gs://pudl.catalyst.coop/$BUILD_REF/*" --temporary-hold 2>&1 | tee -a "$LOGFILE" + gcloud storage --billing-project="$GCP_BILLING_PROJECT" objects update "gs://pudl.catalyst.coop/$BUILD_REF/*" --temporary-hold 2>&1 | tee -a "$LOGFILE" GCS_TEMPORARY_HOLD_SUCCESS=${PIPESTATUS[0]} fi fi diff --git a/src/pudl/extract/vcerare.py b/src/pudl/extract/vcerare.py index 8b8b1692c0..ca962127a4 100644 --- a/src/pudl/extract/vcerare.py +++ b/src/pudl/extract/vcerare.py @@ -1,17 +1,16 @@ -"""Extract VCE Resource Adequacy Renewable Energy (RARE) generation profile data. +"""Extract VCE Resource Adequacy Renewable Energy (RARE) Power Dataset. This dataset has 1,000s of columns, so we don't want to manually specify a rename on -import because we'll pivot these to a column. We adapt the standard extraction -infrastructure to simply read in the data. +import because we'll pivot these to a column in the transform step. We adapt the +standard extraction infrastructure to simply read in the data. Each annual zip folder contains a folder with three files: Wind_Power_140m_Offshore_county.csv Wind_Power_100m_Onshore_county.csv Fixed_SolarPV_Lat_UPV_county.csv -The drive also contains one more file: vce_county_lat_long_fips_table.csv. This file is -not partitioned, so we always read it in regardless of the partitions configured for the -run. +The drive also contains one more CSV file: vce_county_lat_long_fips_table.csv. This gets +read in when the fips partition is set to True. """ from collections import defaultdict @@ -34,8 +33,8 @@ ] -class VCEMetadata(GenericMetadata): - """Special metadata class for VCE RARE renewable generation profiles.""" +class VCERareMetadata(GenericMetadata): + """Special metadata class for VCE RARE Power Dataset.""" def __init__(self, *args, **kwargs): """Initialize the module. @@ -60,7 +59,7 @@ def get_file_name(self, page, **partition): class Extractor(CsvExtractor): - """Extractor for VCE renewable generation profiles.""" + """Extractor for VCE RARE Power Dataset.""" def __init__(self, *args, **kwargs): """Initialize the module. @@ -68,7 +67,7 @@ def __init__(self, *args, **kwargs): Args: ds (:class:datastore.Datastore): Initialized datastore. """ - self.METADATA = VCEMetadata("vcerare") + self.METADATA = VCERareMetadata("vcerare") super().__init__(*args, **kwargs) def get_column_map(self, page, **partition): @@ -154,7 +153,7 @@ def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame: def raw_vcerare_asset_factory(part: str) -> AssetsDefinition: - """An asset factory for VCE hourly renewable generation profiles.""" + """An asset factory for VCE RARE Power Dataset.""" asset_kwargs = { "name": f"raw_vcerare__{part}", "required_resource_keys": {"datastore", "dataset_settings"}, @@ -162,7 +161,7 @@ def raw_vcerare_asset_factory(part: str) -> AssetsDefinition: @asset(**asset_kwargs) def _extract(context, raw_vcerare__all_dfs): - """Extract raw GridPath RA Toolkit renewable energy generation profiles. + """Extract VCE RARE Power Dataset. Args: context: dagster keyword that provides access to resources and config. diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 1df038a96d..a1a344ed80 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -401,12 +401,12 @@ class EiaAeoSettings(GenericDatasetSettings): years: list[int] = data_source.working_partitions["years"] -class VCERARESettings(GenericDatasetSettings): - """An immutable pydantic model to validate VCE RARE generation profile settings. +class VCERareSettings(GenericDatasetSettings): + """An immutable pydantic model to validate VCE RARE Power Dataset settings. Args: data_source: DataSource metadata object - years: VCE report years to use. + years: VCE RARE report years to use. """ data_source: ClassVar[DataSource] = DataSource.from_id("vcerare") @@ -584,7 +584,7 @@ class DatasetsSettings(FrozenBaseModel): phmsagas: PhmsaGasSettings | None = None nrelatb: NrelAtbSettings | None = None gridpathratoolkit: GridPathRAToolkitSettings | None = None - vcerare: VCERARESettings | None = None + vcerare: VCERareSettings | None = None @model_validator(mode="before") @classmethod @@ -606,7 +606,7 @@ def default_load_all(cls, data: dict[str, Any]) -> dict[str, Any]: data["phmsagas"] = PhmsaGasSettings() data["nrelatb"] = NrelAtbSettings() data["gridpathratoolkit"] = GridPathRAToolkitSettings() - data["vcerare"] = VCERARESettings() + data["vcerare"] = VCERareSettings() return data From e16a4df8d3262563938ed8c37d99a795a4bc031a Mon Sep 17 00:00:00 2001 From: e-belfer Date: Thu, 17 Oct 2024 08:24:39 -0400 Subject: [PATCH 20/20] Rename fips csv and update init --- src/pudl/extract/vcerare.py | 2 +- src/pudl/package_data/vcerare/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pudl/extract/vcerare.py b/src/pudl/extract/vcerare.py index ca962127a4..371e072898 100644 --- a/src/pudl/extract/vcerare.py +++ b/src/pudl/extract/vcerare.py @@ -175,7 +175,7 @@ def _extract(context, raw_vcerare__all_dfs): @asset(required_resource_keys={"datastore", "dataset_settings"}) -def raw_vcegen__lat_lon_fips(context) -> pd.DataFrame: +def raw_vcerare__lat_lon_fips(context) -> pd.DataFrame: """Extract lat/lon to FIPS and county mapping CSV. This dataframe is static, so it has a distinct partition from the other datasets and diff --git a/src/pudl/package_data/vcerare/__init__.py b/src/pudl/package_data/vcerare/__init__.py index b749b7d3f1..34cf95cfd5 100644 --- a/src/pudl/package_data/vcerare/__init__.py +++ b/src/pudl/package_data/vcerare/__init__.py @@ -1 +1 @@ -"""CSV file extraction maps for VCE RARE generation profiles.""" +"""CSV file extraction maps for VCE RARE Power Dataset."""