From 3435de7c85657c121ed3f527e351006a9c8406e6 Mon Sep 17 00:00:00 2001 From: Austen Sharpe <49878195+aesharpe@users.noreply.github.com> Date: Fri, 6 Sep 2024 14:59:06 -0600 Subject: [PATCH] Extract 714 xbrl (#3822) * Add _csv suffix to current FERC714 raw assets and functions * Add create_raw_ferc714_xbrl_assets and raw_ferc714_xbrl__metadata_json functions to FERC 714 extract module and include ferc714 io_manager. * Update XBRL table dict to mimic the TABLE_NAME_MAP_FERC1 format and fix functions mimic those of FERC 1 * Add .get_datasets() to ferc_settings definition in FercXBRLSQLiteIOManager * Update FERC714 working paritions and add xbrl_year function to Ferc714Settings * Fix < to > in xbrl_years * Change test output names to ferc1_xbrl from test_db * Fix ferc_settings definition in FercXBRLSQLiteIOManager load_input function to fix unit tests * Remove old FERC714_XBRL_FILES dict * Add and update dictionary descriptions * Update release notes * Fix ferc io_managers and test * Add csv_years property to Ferc714Settings * add csv_years reference to raw_ferc714_csv_asset_factory function * Add fixtures for FERC 714 XBRL tests --- docs/release_notes.rst | 1 + src/pudl/etl/__init__.py | 2 + src/pudl/extract/ferc714.py | 129 ++++++++++++++++++++++++++++++---- src/pudl/io_managers.py | 21 ++++-- src/pudl/metadata/sources.py | 4 +- src/pudl/settings.py | 19 +++-- src/pudl/transform/ferc714.py | 18 ++--- test/conftest.py | 51 ++++++++++++++ test/unit/io_managers_test.py | 8 +-- 9 files changed, 214 insertions(+), 39 deletions(-) diff --git a/docs/release_notes.rst b/docs/release_notes.rst index e59b64a6b2..ff5930a2be 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -97,6 +97,7 @@ FERC 714 * Added :ref:`core_ferc714__yearly_planning_area_demand_forecast` based on FERC Form 714, Part III, Schedule 2b. Data includes forecasted demand and net energy load. See issue :issue:`3519` and PR :pr:`3670`. +* WIP: Adding XBRL(2021+) data for FERC 714 tables. Track progress in :issue:`3822`. NREL ATB ~~~~~~~~ diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index b941e45256..84f1c505e8 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -25,6 +25,7 @@ epacems_io_manager, ferc1_dbf_sqlite_io_manager, ferc1_xbrl_sqlite_io_manager, + ferc714_xbrl_sqlite_io_manager, parquet_io_manager, pudl_mixed_format_io_manager, ) @@ -201,6 +202,7 @@ def _get_keys_from_assets( "pudl_io_manager": pudl_mixed_format_io_manager, "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager, "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager, + "ferc714_xbrl_sqlite_io_manager": ferc714_xbrl_sqlite_io_manager, "dataset_settings": dataset_settings, "ferc_to_sqlite_settings": ferc_to_sqlite_settings, "epacems_io_manager": epacems_io_manager, diff --git a/src/pudl/extract/ferc714.py b/src/pudl/extract/ferc714.py index 6570881f3e..69bc616c71 100644 --- a/src/pudl/extract/ferc714.py +++ b/src/pudl/extract/ferc714.py @@ -1,15 +1,20 @@ """Routines used for extracting the raw FERC 714 data.""" +import json from collections import OrderedDict +from itertools import chain +from pathlib import Path +from typing import Any import pandas as pd -from dagster import AssetsDefinition, asset +from dagster import AssetKey, AssetsDefinition, SourceAsset, asset import pudl +from pudl.workspace.setup import PudlPaths logger = pudl.logging_helpers.get_logger(__name__) -FERC714_FILES: OrderedDict[str, dict[str, str]] = OrderedDict( +FERC714_CSV_ENCODING: OrderedDict[str, dict[str, str]] = OrderedDict( { "yearly_id_certification": { "name": "Part 1 Schedule 1 - Identification Certification.csv", @@ -61,19 +66,37 @@ }, } ) -"""Dictionary mapping PUDL tables to FERC-714 filenames and character encodings.""" +"""Dictionary mapping PUDL tables to FERC-714 CSV filenames and character encodings.""" +TABLE_NAME_MAP_FERC714: OrderedDict[str, dict[str, str]] = OrderedDict( + { + "core_ferc714__yearly_planning_area_demand_forecast": { + "csv": "Part 3 Schedule 2 - Planning Area Forecast Demand.csv", + "xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2", + }, + "out_ferc714__hourly_planning_area_demand": { + "csv": "Part 3 Schedule 2 - Planning Area Hourly Demand.csv", + "xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2", + }, + "core_ferc714_respondent_id": { + "csv": "Respondent IDs.csv", + "xbrl": "identification_and_certification_01_1", + }, + } +) +"""A mapping of PUDL DB table names to their XBRL and CSV source table names.""" -def raw_ferc714_asset_factory(table_name: str) -> AssetsDefinition: - """Generates an asset for building the raw FERC 714 dataframe.""" - assert table_name in FERC714_FILES + +def raw_ferc714_csv_asset_factory(table_name: str) -> AssetsDefinition: + """Generates an asset for building the raw CSV-based FERC 714 dataframe.""" + assert table_name in FERC714_CSV_ENCODING @asset( - name=f"raw_ferc714__{table_name}", + name=f"raw_ferc714_csv__{table_name}", required_resource_keys={"datastore", "dataset_settings"}, compute_kind="pandas", ) - def _extract_raw_ferc714(context): + def _extract_raw_ferc714_csv(context): """Extract the raw FERC Form 714 dataframes from their original CSV files. Args: @@ -81,26 +104,104 @@ def _extract_raw_ferc714(context): """ ds = context.resources.datastore ferc714_settings = context.resources.dataset_settings.ferc714 - years = ", ".join(map(str, ferc714_settings.years)) + years = ", ".join(map(str, ferc714_settings.csv_years)) logger.info( f"Extracting {table_name} from CSV into pandas DataFrame (years: {years})." ) with ( ds.get_zipfile_resource("ferc714", name="ferc714.zip") as zf, - zf.open(FERC714_FILES[table_name]["name"]) as csv_file, + zf.open(FERC714_CSV_ENCODING[table_name]["name"]) as csv_file, ): df = pd.read_csv( csv_file, - encoding=FERC714_FILES[table_name]["encoding"], + encoding=FERC714_CSV_ENCODING[table_name]["encoding"], ) if table_name != "respondent_id": df = df.query("report_yr in @ferc714_settings.years") return df - return _extract_raw_ferc714 + return _extract_raw_ferc714_csv + + +@asset +def raw_ferc714_xbrl__metadata_json( + context, +) -> dict[str, dict[str, list[dict[str, Any]]]]: + """Extract the FERC 714 XBRL Taxonomy metadata we've stored as JSON. + + Returns: + A dictionary keyed by PUDL table name, with an instant and a duration entry + for each table, corresponding to the metadata for each of the respective instant + or duration tables from XBRL if they exist. Table metadata is returned as a list + of dictionaries, each of which can be interpreted as a row in a tabular + structure, with each row annotating a separate XBRL concept from the FERC 714 + filings. + """ + metadata_path = PudlPaths().output_dir / "ferc714_xbrl_taxonomy_metadata.json" + with Path.open(metadata_path) as f: + xbrl_meta_all = json.load(f) + + valid_tables = { + table_name: xbrl_table + for table_name in TABLE_NAME_MAP_FERC714 + if (xbrl_table := TABLE_NAME_MAP_FERC714.get(table_name, {}).get("xbrl")) + is not None + } + + def squash_period(xbrl_table: str | list[str], period, xbrl_meta_all): + if isinstance(xbrl_table, str): + xbrl_table = [xbrl_table] + return [ + metadata + for table in xbrl_table + for metadata in xbrl_meta_all.get(f"{table}_{period}", []) + if metadata + ] + + xbrl_meta_out = { + table_name: { + "instant": squash_period(xbrl_table, "instant", xbrl_meta_all), + "duration": squash_period(xbrl_table, "duration", xbrl_meta_all), + } + for table_name, xbrl_table in valid_tables.items() + } + + return xbrl_meta_out + + +def create_raw_ferc714_xbrl_assets() -> list[SourceAsset]: + """Create SourceAssets for raw FERC 714 XBRL tables. + SourceAssets allow you to access assets that are generated elsewhere. + In our case, the XBRL database contains the raw FERC 714 assets from + 2021 onward. Prior to that, the assets are distributed as CSVs and + are extracted with the ``raw_ferc714_csv_asset_factory`` function. -raw_ferc714_assets = [ - raw_ferc714_asset_factory(table_name) for table_name in FERC714_FILES + Returns: + A list of FERC 714 SourceAssets. + """ + # Create assets for the duration and instant tables + xbrls = (v["xbrl"] for v in TABLE_NAME_MAP_FERC714.values()) + flattened_xbrls = chain.from_iterable( + x if isinstance(x, list) else [x] for x in xbrls + ) + xbrls_with_periods = chain.from_iterable( + (f"{tn}_instant", f"{tn}_duration") for tn in flattened_xbrls + ) + xbrl_table_names = tuple(set(xbrls_with_periods)) + raw_ferc714_xbrl_assets = [ + SourceAsset( + key=AssetKey(f"raw_ferc714_xbrl__{table_name}"), + io_manager_key="ferc714_xbrl_sqlite_io_manager", + ) + for table_name in xbrl_table_names + ] + return raw_ferc714_xbrl_assets + + +raw_ferc714_csv_assets = [ + raw_ferc714_csv_asset_factory(table_name) for table_name in FERC714_CSV_ENCODING ] + +raw_ferc714_xbrl_assets = create_raw_ferc714_xbrl_assets() diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 04f9497057..12e1069a91 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -855,12 +855,14 @@ def load_input(self, context: InputContext) -> pd.DataFrame: context: dagster keyword that provides access output information like asset name. """ - # TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets. - ferc1_settings = context.resources.dataset_settings.ferc1 + ferc_settings = getattr( + context.resources.dataset_settings, + re.search(r"ferc\d+", self.db_name).group(), + ) table_name = get_table_name_from_context(context) # Remove preceeding asset name metadata - table_name = table_name.replace("raw_ferc1_xbrl__", "") + table_name = table_name.replace(f"raw_{self.db_name}__", "") # TODO (bendnorman): Figure out a better to handle tables that # don't have duration and instant @@ -887,7 +889,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame: ) .pipe( FercXBRLSQLiteIOManager.refine_report_year, - xbrl_years=ferc1_settings.xbrl_years, + xbrl_years=ferc_settings.xbrl_years, ) .drop(columns=["publication_time"]) ) @@ -895,13 +897,22 @@ def load_input(self, context: InputContext) -> pd.DataFrame: @io_manager(required_resource_keys={"dataset_settings"}) def ferc1_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager: - """Create a SQLiteManager dagster resource for the ferc1 dbf database.""" + """Create a SQLiteManager dagster resource for the ferc1 xbrl database.""" return FercXBRLSQLiteIOManager( base_dir=PudlPaths().output_dir, db_name="ferc1_xbrl", ) +@io_manager(required_resource_keys={"dataset_settings"}) +def ferc714_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager: + """Create a SQLiteManager dagster resource for the ferc714 xbrl database.""" + return FercXBRLSQLiteIOManager( + base_dir=PudlPaths().output_dir, + db_name="ferc714_xbrl", + ) + + class EpaCemsIOManager(UPathIOManager): """An IO Manager that dumps outputs to a parquet file.""" diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index c6d2153406..a8856bce16 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -631,9 +631,9 @@ ), "field_namespace": "ferc714", "working_partitions": { - # 2021 and later data is in XBRL and not yet supported. + # 2021 and later data is in XBRL. # 2006-2020 data is in monolithic CSV files, so any year means all years. - "years": sorted(set(range(2006, 2021))), + "years": sorted(set(range(2006, 2024))), }, "contributors": [ CONTRIBUTORS["catalyst-cooperative"], diff --git a/src/pudl/settings.py b/src/pudl/settings.py index b9c7e92c19..2f6752c63d 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -127,7 +127,7 @@ def dbf_years(self): @property def xbrl_years(self): - """Return validated years for which DBF data is available.""" + """Return validated years for which XBRL data is available.""" return [year for year in self.years if year >= 2021] @@ -139,12 +139,21 @@ class Ferc714Settings(GenericDatasetSettings): """ data_source: ClassVar[DataSource] = DataSource.from_id("ferc714") - - # Note: Only older data is currently supported. Starting in 2021 FERC-714 is being - # published as XBRL, and we haven't integrated it. The older data is published as - # monolithic CSV files, so asking for any year processes all of them. years: list[int] = data_source.working_partitions["years"] + # The older 714 data is distributed as CSV files and has a different extraction + # process than the FERC DBF extraction process. + + @property + def csv_years(self): + """Return validated years for which CSV data is available.""" + return [year for year in self.years if year < 2021] + + @property + def xbrl_years(self): + """Return validated years for which XBRL data is available.""" + return [year for year in self.years if year >= 2021] + class EpaCemsSettings(GenericDatasetSettings): """An immutable pydantic model to validate EPA CEMS settings. diff --git a/src/pudl/transform/ferc714.py b/src/pudl/transform/ferc714.py index ce2d249e6b..655b27c32e 100644 --- a/src/pudl/transform/ferc714.py +++ b/src/pudl/transform/ferc714.py @@ -383,7 +383,7 @@ def _standardize_offset_codes(df: pd.DataFrame, offset_fixes) -> pd.DataFrame: compute_kind="pandas", ) def core_ferc714__respondent_id( - raw_ferc714__respondent_id: pd.DataFrame, + raw_ferc714_csv__respondent_id: pd.DataFrame, ) -> pd.DataFrame: """Transform the FERC 714 respondent IDs, names, and EIA utility IDs. @@ -393,13 +393,13 @@ def core_ferc714__respondent_id( PacifiCorp). Args: - raw_ferc714__respondent_id: Raw table describing the FERC 714 Respondents. + raw_ferc714_csv__respondent_id: Raw table describing the FERC 714 Respondents. Returns: A clean(er) version of the FERC-714 respondents table. """ df = _pre_process( - raw_ferc714__respondent_id, table_name="core_ferc714__respondent_id" + raw_ferc714_csv__respondent_id, table_name="core_ferc714__respondent_id" ) df["respondent_name_ferc714"] = df.respondent_name_ferc714.str.strip() df.loc[df.eia_code == 0, "eia_code"] = pd.NA @@ -415,7 +415,7 @@ def core_ferc714__respondent_id( compute_kind="pandas", ) def out_ferc714__hourly_planning_area_demand( - raw_ferc714__hourly_planning_area_demand: pd.DataFrame, + raw_ferc714_csv__hourly_planning_area_demand: pd.DataFrame, ) -> pd.DataFrame: """Transform the hourly demand time series by Planning Area. @@ -429,7 +429,7 @@ def out_ferc714__hourly_planning_area_demand( - Flip negative signs for reported demand. Args: - raw_ferc714__hourly_planning_area_demand: Raw table containing hourly demand + raw_ferc714_csv__hourly_planning_area_demand: Raw table containing hourly demand time series by Planning Area. Returns: @@ -437,7 +437,7 @@ def out_ferc714__hourly_planning_area_demand( """ logger.info("Converting dates into pandas Datetime types.") df = _pre_process( - raw_ferc714__hourly_planning_area_demand, + raw_ferc714_csv__hourly_planning_area_demand, table_name="out_ferc714__hourly_planning_area_demand", ) @@ -549,7 +549,7 @@ def out_ferc714__hourly_planning_area_demand( compute_kind="pandas", ) def core_ferc714__yearly_planning_area_demand_forecast( - raw_ferc714__yearly_planning_area_demand_forecast: pd.DataFrame, + raw_ferc714_csv__yearly_planning_area_demand_forecast: pd.DataFrame, ) -> pd.DataFrame: """Transform the yearly planning area forecast data per Planning Area. @@ -559,7 +559,7 @@ def core_ferc714__yearly_planning_area_demand_forecast( - Remove duplicate rows and average out the metrics. Args: - raw_ferc714__yearly_planning_area_demand_forecast: Raw table containing, + raw_ferc714_csv__yearly_planning_area_demand_forecast: Raw table containing, for each year and each planning area, the forecasted summer and winter peak demand, in megawatts, and annual net energy for load, in megawatthours, for the next ten years. @@ -569,7 +569,7 @@ def core_ferc714__yearly_planning_area_demand_forecast( """ # Clean up columns df = _pre_process( - raw_ferc714__yearly_planning_area_demand_forecast, + raw_ferc714_csv__yearly_planning_area_demand_forecast, table_name="core_ferc714__yearly_planning_area_demand_forecast", ) diff --git a/test/conftest.py b/test/conftest.py index 3d1490c641..ef23a27a3e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -20,11 +20,13 @@ from pudl import resources from pudl.etl.cli import pudl_etl_job_factory from pudl.extract.ferc1 import Ferc1DbfExtractor, raw_ferc1_xbrl__metadata_json +from pudl.extract.ferc714 import raw_ferc714_xbrl__metadata_json from pudl.extract.xbrl import xbrl2sqlite_op_factory from pudl.io_managers import ( PudlMixedFormatIOManager, ferc1_dbf_sqlite_io_manager, ferc1_xbrl_sqlite_io_manager, + ferc714_xbrl_sqlite_io_manager, pudl_mixed_format_io_manager, ) from pudl.metadata import PUDL_PACKAGE @@ -260,6 +262,36 @@ def ferc1_dbf_sql_engine(ferc1_dbf_extract, dataset_settings_config) -> sa.Engin return ferc1_dbf_sqlite_io_manager(context).engine +@pytest.fixture(scope="session") +def ferc714_xbrl_extract( + live_dbs: bool, pudl_datastore_config, etl_settings: EtlSettings +): + """Runs ferc_to_sqlite dagster job for FERC Form 714 XBRL data.""" + + @graph + def local_xbrl_ferc714_graph(): + xbrl2sqlite_op_factory(XbrlFormNumber.FORM714)() + + if not live_dbs: + execute_result = local_xbrl_ferc714_graph.to_job( + name="ferc_to_sqlite_xbrl_ferc1", + resource_defs=pudl.ferc_to_sqlite.default_resources_defs, + ).execute_in_process( + run_config={ + "resources": { + "ferc_to_sqlite_settings": { + "config": etl_settings.ferc_to_sqlite_settings.model_dump(), + }, + "datastore": { + "config": pudl_datastore_config, + }, + "runtime_settings": {"config": {"xbrl_num_workers": 2}}, + }, + } + ) + assert execute_result.success, "ferc_to_sqlite_xbrl_ferc714 failed!" + + @pytest.fixture(scope="session", name="ferc1_engine_xbrl") def ferc1_xbrl_sql_engine(ferc1_xbrl_extract, dataset_settings_config) -> sa.Engine: """Grab a connection to the FERC Form 1 DB clone.""" @@ -278,10 +310,29 @@ def ferc1_xbrl_taxonomy_metadata(ferc1_engine_xbrl: sa.Engine): return result.output_for_node("raw_ferc1_xbrl__metadata_json") +@pytest.fixture(scope="session", name="ferc714_engine_xbrl") +def ferc714_xbrl_sql_engine(ferc714_xbrl_extract, dataset_settings_config) -> sa.Engine: + """Grab a connection to the FERC Form 714 DB clone.""" + context = build_init_resource_context( + resources={"dataset_settings": dataset_settings_config} + ) + return ferc714_xbrl_sqlite_io_manager(context).engine + + +@pytest.fixture(scope="session", name="ferc714_xbrl_taxonomy_metadata") +def ferc714_xbrl_taxonomy_metadata(ferc714_engine_xbrl: sa.Engine): + """Read the FERC 714 XBRL taxonomy metadata from JSON.""" + result = materialize_to_memory([raw_ferc714_xbrl__metadata_json]) + assert result.success + + return result.output_for_node("raw_ferc714_xbrl__metadata_json") + + @pytest.fixture(scope="session") def pudl_io_manager( ferc1_engine_dbf: sa.Engine, # Implicit dependency ferc1_engine_xbrl: sa.Engine, # Implicit dependency + ferc714_engine_xbrl: sa.Engine, live_dbs: bool, pudl_datastore_config, dataset_settings_config, diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index c78bd0fd83..4064bfd87a 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -360,12 +360,12 @@ def test_error_when_reading_view_without_metadata(fake_pudl_sqlite_io_manager_fi def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): - db_path = tmp_path / "test_db.sqlite" + db_path = tmp_path / "ferc1_test_db.sqlite" # fake datapackage descriptor just to see if we can find the primary keys - # lots of optional stuff dropped. datapackage = json.dumps( { - "name": "test_db", + "name": "ferc1_test_db", "title": "Ferc1 data extracted from XBRL filings", "resources": [ { @@ -417,7 +417,7 @@ def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): } ) - datapackage_path = tmp_path / "test_db_datapackage.json" + datapackage_path = tmp_path / "ferc1_test_db_datapackage.json" with datapackage_path.open("w") as f: f.write(datapackage) @@ -452,7 +452,7 @@ def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): ) }, ) - io_manager = FercXBRLSQLiteIOManager(base_dir=tmp_path, db_name="test_db") + io_manager = FercXBRLSQLiteIOManager(base_dir=tmp_path, db_name="ferc1_test_db") observed_table = io_manager.load_input(input_context) assert len(observed_table) == 1