Skip to content

Commit

Permalink
Extract 714 xbrl (#3822)
Browse files Browse the repository at this point in the history
* Add _csv suffix to current FERC714 raw assets and functions

* Add create_raw_ferc714_xbrl_assets and raw_ferc714_xbrl__metadata_json functions to FERC 714 extract module and include ferc714 io_manager.

* Update XBRL table dict to mimic the TABLE_NAME_MAP_FERC1 format and fix functions mimic those of FERC 1

* Add .get_datasets() to ferc_settings definition in FercXBRLSQLiteIOManager

* Update FERC714 working paritions and add xbrl_year function to Ferc714Settings

* Fix < to > in xbrl_years

* Change test output names to ferc1_xbrl from test_db

* Fix ferc_settings definition in FercXBRLSQLiteIOManager load_input function to fix unit tests

* Remove old FERC714_XBRL_FILES dict

* Add and update dictionary descriptions

* Update release notes

* Fix ferc io_managers and test

* Add csv_years property to Ferc714Settings

* add csv_years reference to raw_ferc714_csv_asset_factory function

* Add fixtures for FERC 714 XBRL tests
  • Loading branch information
aesharpe authored Sep 6, 2024
1 parent 1d6363d commit 3435de7
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ FERC 714
* Added :ref:`core_ferc714__yearly_planning_area_demand_forecast` based on FERC
Form 714, Part III, Schedule 2b. Data includes forecasted demand and net energy load.
See issue :issue:`3519` and PR :pr:`3670`.
* WIP: Adding XBRL(2021+) data for FERC 714 tables. Track progress in :issue:`3822`.

NREL ATB
~~~~~~~~
Expand Down
2 changes: 2 additions & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
epacems_io_manager,
ferc1_dbf_sqlite_io_manager,
ferc1_xbrl_sqlite_io_manager,
ferc714_xbrl_sqlite_io_manager,
parquet_io_manager,
pudl_mixed_format_io_manager,
)
Expand Down Expand Up @@ -201,6 +202,7 @@ def _get_keys_from_assets(
"pudl_io_manager": pudl_mixed_format_io_manager,
"ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager,
"ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager,
"ferc714_xbrl_sqlite_io_manager": ferc714_xbrl_sqlite_io_manager,
"dataset_settings": dataset_settings,
"ferc_to_sqlite_settings": ferc_to_sqlite_settings,
"epacems_io_manager": epacems_io_manager,
Expand Down
129 changes: 115 additions & 14 deletions src/pudl/extract/ferc714.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Routines used for extracting the raw FERC 714 data."""

import json
from collections import OrderedDict
from itertools import chain
from pathlib import Path
from typing import Any

import pandas as pd
from dagster import AssetsDefinition, asset
from dagster import AssetKey, AssetsDefinition, SourceAsset, asset

import pudl
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)

FERC714_FILES: OrderedDict[str, dict[str, str]] = OrderedDict(
FERC714_CSV_ENCODING: OrderedDict[str, dict[str, str]] = OrderedDict(
{
"yearly_id_certification": {
"name": "Part 1 Schedule 1 - Identification Certification.csv",
Expand Down Expand Up @@ -61,46 +66,142 @@
},
}
)
"""Dictionary mapping PUDL tables to FERC-714 filenames and character encodings."""
"""Dictionary mapping PUDL tables to FERC-714 CSV filenames and character encodings."""

TABLE_NAME_MAP_FERC714: OrderedDict[str, dict[str, str]] = OrderedDict(
{
"core_ferc714__yearly_planning_area_demand_forecast": {
"csv": "Part 3 Schedule 2 - Planning Area Forecast Demand.csv",
"xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2",
},
"out_ferc714__hourly_planning_area_demand": {
"csv": "Part 3 Schedule 2 - Planning Area Hourly Demand.csv",
"xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2",
},
"core_ferc714_respondent_id": {
"csv": "Respondent IDs.csv",
"xbrl": "identification_and_certification_01_1",
},
}
)
"""A mapping of PUDL DB table names to their XBRL and CSV source table names."""

def raw_ferc714_asset_factory(table_name: str) -> AssetsDefinition:
"""Generates an asset for building the raw FERC 714 dataframe."""
assert table_name in FERC714_FILES

def raw_ferc714_csv_asset_factory(table_name: str) -> AssetsDefinition:
"""Generates an asset for building the raw CSV-based FERC 714 dataframe."""
assert table_name in FERC714_CSV_ENCODING

@asset(
name=f"raw_ferc714__{table_name}",
name=f"raw_ferc714_csv__{table_name}",
required_resource_keys={"datastore", "dataset_settings"},
compute_kind="pandas",
)
def _extract_raw_ferc714(context):
def _extract_raw_ferc714_csv(context):
"""Extract the raw FERC Form 714 dataframes from their original CSV files.
Args:
context: dagster keyword that provides access to resources and config.
"""
ds = context.resources.datastore
ferc714_settings = context.resources.dataset_settings.ferc714
years = ", ".join(map(str, ferc714_settings.years))
years = ", ".join(map(str, ferc714_settings.csv_years))

logger.info(
f"Extracting {table_name} from CSV into pandas DataFrame (years: {years})."
)
with (
ds.get_zipfile_resource("ferc714", name="ferc714.zip") as zf,
zf.open(FERC714_FILES[table_name]["name"]) as csv_file,
zf.open(FERC714_CSV_ENCODING[table_name]["name"]) as csv_file,
):
df = pd.read_csv(
csv_file,
encoding=FERC714_FILES[table_name]["encoding"],
encoding=FERC714_CSV_ENCODING[table_name]["encoding"],
)
if table_name != "respondent_id":
df = df.query("report_yr in @ferc714_settings.years")
return df

return _extract_raw_ferc714
return _extract_raw_ferc714_csv


@asset
def raw_ferc714_xbrl__metadata_json(
context,
) -> dict[str, dict[str, list[dict[str, Any]]]]:
"""Extract the FERC 714 XBRL Taxonomy metadata we've stored as JSON.
Returns:
A dictionary keyed by PUDL table name, with an instant and a duration entry
for each table, corresponding to the metadata for each of the respective instant
or duration tables from XBRL if they exist. Table metadata is returned as a list
of dictionaries, each of which can be interpreted as a row in a tabular
structure, with each row annotating a separate XBRL concept from the FERC 714
filings.
"""
metadata_path = PudlPaths().output_dir / "ferc714_xbrl_taxonomy_metadata.json"
with Path.open(metadata_path) as f:
xbrl_meta_all = json.load(f)

valid_tables = {
table_name: xbrl_table
for table_name in TABLE_NAME_MAP_FERC714
if (xbrl_table := TABLE_NAME_MAP_FERC714.get(table_name, {}).get("xbrl"))
is not None
}

def squash_period(xbrl_table: str | list[str], period, xbrl_meta_all):
if isinstance(xbrl_table, str):
xbrl_table = [xbrl_table]
return [
metadata
for table in xbrl_table
for metadata in xbrl_meta_all.get(f"{table}_{period}", [])
if metadata
]

xbrl_meta_out = {
table_name: {
"instant": squash_period(xbrl_table, "instant", xbrl_meta_all),
"duration": squash_period(xbrl_table, "duration", xbrl_meta_all),
}
for table_name, xbrl_table in valid_tables.items()
}

return xbrl_meta_out


def create_raw_ferc714_xbrl_assets() -> list[SourceAsset]:
"""Create SourceAssets for raw FERC 714 XBRL tables.
SourceAssets allow you to access assets that are generated elsewhere.
In our case, the XBRL database contains the raw FERC 714 assets from
2021 onward. Prior to that, the assets are distributed as CSVs and
are extracted with the ``raw_ferc714_csv_asset_factory`` function.
raw_ferc714_assets = [
raw_ferc714_asset_factory(table_name) for table_name in FERC714_FILES
Returns:
A list of FERC 714 SourceAssets.
"""
# Create assets for the duration and instant tables
xbrls = (v["xbrl"] for v in TABLE_NAME_MAP_FERC714.values())
flattened_xbrls = chain.from_iterable(
x if isinstance(x, list) else [x] for x in xbrls
)
xbrls_with_periods = chain.from_iterable(
(f"{tn}_instant", f"{tn}_duration") for tn in flattened_xbrls
)
xbrl_table_names = tuple(set(xbrls_with_periods))
raw_ferc714_xbrl_assets = [
SourceAsset(
key=AssetKey(f"raw_ferc714_xbrl__{table_name}"),
io_manager_key="ferc714_xbrl_sqlite_io_manager",
)
for table_name in xbrl_table_names
]
return raw_ferc714_xbrl_assets


raw_ferc714_csv_assets = [
raw_ferc714_csv_asset_factory(table_name) for table_name in FERC714_CSV_ENCODING
]

raw_ferc714_xbrl_assets = create_raw_ferc714_xbrl_assets()
21 changes: 16 additions & 5 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,12 +855,14 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
context: dagster keyword that provides access output information like asset
name.
"""
# TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets.
ferc1_settings = context.resources.dataset_settings.ferc1
ferc_settings = getattr(
context.resources.dataset_settings,
re.search(r"ferc\d+", self.db_name).group(),
)

table_name = get_table_name_from_context(context)
# Remove preceeding asset name metadata
table_name = table_name.replace("raw_ferc1_xbrl__", "")
table_name = table_name.replace(f"raw_{self.db_name}__", "")

# TODO (bendnorman): Figure out a better to handle tables that
# don't have duration and instant
Expand All @@ -887,21 +889,30 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
)
.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc1_settings.xbrl_years,
xbrl_years=ferc_settings.xbrl_years,
)
.drop(columns=["publication_time"])
)


@io_manager(required_resource_keys={"dataset_settings"})
def ferc1_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager:
"""Create a SQLiteManager dagster resource for the ferc1 dbf database."""
"""Create a SQLiteManager dagster resource for the ferc1 xbrl database."""
return FercXBRLSQLiteIOManager(
base_dir=PudlPaths().output_dir,
db_name="ferc1_xbrl",
)


@io_manager(required_resource_keys={"dataset_settings"})
def ferc714_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager:
"""Create a SQLiteManager dagster resource for the ferc714 xbrl database."""
return FercXBRLSQLiteIOManager(
base_dir=PudlPaths().output_dir,
db_name="ferc714_xbrl",
)


class EpaCemsIOManager(UPathIOManager):
"""An IO Manager that dumps outputs to a parquet file."""

Expand Down
4 changes: 2 additions & 2 deletions src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,9 @@
),
"field_namespace": "ferc714",
"working_partitions": {
# 2021 and later data is in XBRL and not yet supported.
# 2021 and later data is in XBRL.
# 2006-2020 data is in monolithic CSV files, so any year means all years.
"years": sorted(set(range(2006, 2021))),
"years": sorted(set(range(2006, 2024))),
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down
19 changes: 14 additions & 5 deletions src/pudl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def dbf_years(self):

@property
def xbrl_years(self):
"""Return validated years for which DBF data is available."""
"""Return validated years for which XBRL data is available."""
return [year for year in self.years if year >= 2021]


Expand All @@ -139,12 +139,21 @@ class Ferc714Settings(GenericDatasetSettings):
"""

data_source: ClassVar[DataSource] = DataSource.from_id("ferc714")

# Note: Only older data is currently supported. Starting in 2021 FERC-714 is being
# published as XBRL, and we haven't integrated it. The older data is published as
# monolithic CSV files, so asking for any year processes all of them.
years: list[int] = data_source.working_partitions["years"]

# The older 714 data is distributed as CSV files and has a different extraction
# process than the FERC DBF extraction process.

@property
def csv_years(self):
"""Return validated years for which CSV data is available."""
return [year for year in self.years if year < 2021]

@property
def xbrl_years(self):
"""Return validated years for which XBRL data is available."""
return [year for year in self.years if year >= 2021]


class EpaCemsSettings(GenericDatasetSettings):
"""An immutable pydantic model to validate EPA CEMS settings.
Expand Down
18 changes: 9 additions & 9 deletions src/pudl/transform/ferc714.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def _standardize_offset_codes(df: pd.DataFrame, offset_fixes) -> pd.DataFrame:
compute_kind="pandas",
)
def core_ferc714__respondent_id(
raw_ferc714__respondent_id: pd.DataFrame,
raw_ferc714_csv__respondent_id: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the FERC 714 respondent IDs, names, and EIA utility IDs.
Expand All @@ -393,13 +393,13 @@ def core_ferc714__respondent_id(
PacifiCorp).
Args:
raw_ferc714__respondent_id: Raw table describing the FERC 714 Respondents.
raw_ferc714_csv__respondent_id: Raw table describing the FERC 714 Respondents.
Returns:
A clean(er) version of the FERC-714 respondents table.
"""
df = _pre_process(
raw_ferc714__respondent_id, table_name="core_ferc714__respondent_id"
raw_ferc714_csv__respondent_id, table_name="core_ferc714__respondent_id"
)
df["respondent_name_ferc714"] = df.respondent_name_ferc714.str.strip()
df.loc[df.eia_code == 0, "eia_code"] = pd.NA
Expand All @@ -415,7 +415,7 @@ def core_ferc714__respondent_id(
compute_kind="pandas",
)
def out_ferc714__hourly_planning_area_demand(
raw_ferc714__hourly_planning_area_demand: pd.DataFrame,
raw_ferc714_csv__hourly_planning_area_demand: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the hourly demand time series by Planning Area.
Expand All @@ -429,15 +429,15 @@ def out_ferc714__hourly_planning_area_demand(
- Flip negative signs for reported demand.
Args:
raw_ferc714__hourly_planning_area_demand: Raw table containing hourly demand
raw_ferc714_csv__hourly_planning_area_demand: Raw table containing hourly demand
time series by Planning Area.
Returns:
Clean(er) version of the hourly demand time series by Planning Area.
"""
logger.info("Converting dates into pandas Datetime types.")
df = _pre_process(
raw_ferc714__hourly_planning_area_demand,
raw_ferc714_csv__hourly_planning_area_demand,
table_name="out_ferc714__hourly_planning_area_demand",
)

Expand Down Expand Up @@ -549,7 +549,7 @@ def out_ferc714__hourly_planning_area_demand(
compute_kind="pandas",
)
def core_ferc714__yearly_planning_area_demand_forecast(
raw_ferc714__yearly_planning_area_demand_forecast: pd.DataFrame,
raw_ferc714_csv__yearly_planning_area_demand_forecast: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the yearly planning area forecast data per Planning Area.
Expand All @@ -559,7 +559,7 @@ def core_ferc714__yearly_planning_area_demand_forecast(
- Remove duplicate rows and average out the metrics.
Args:
raw_ferc714__yearly_planning_area_demand_forecast: Raw table containing,
raw_ferc714_csv__yearly_planning_area_demand_forecast: Raw table containing,
for each year and each planning area, the forecasted summer and winter peak demand,
in megawatts, and annual net energy for load, in megawatthours, for the next
ten years.
Expand All @@ -569,7 +569,7 @@ def core_ferc714__yearly_planning_area_demand_forecast(
"""
# Clean up columns
df = _pre_process(
raw_ferc714__yearly_planning_area_demand_forecast,
raw_ferc714_csv__yearly_planning_area_demand_forecast,
table_name="core_ferc714__yearly_planning_area_demand_forecast",
)

Expand Down
Loading

0 comments on commit 3435de7

Please sign in to comment.