Skip to content

Commit

Permalink
add caching and loading of df_all_sky + df_raw prior to train / test …
Browse files Browse the repository at this point in the history
…split
  • Loading branch information
bnb32 committed Aug 22, 2024
1 parent 85f9347 commit 61bfdc9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 89 deletions.
206 changes: 124 additions & 82 deletions mlclouds/data_handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Data handlers for training and validation data."""

import copy
import logging
import os

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -35,6 +38,7 @@ def __init__(
config=CONFIG,
test_fraction=None,
nsrdb_files=None,
cache_pattern=None,
):
"""
Parameters
Expand All @@ -55,6 +59,9 @@ def __init__(
is used to compute the sky class for these locations which is then
used to filter cloud type data for false positives / negatives.
Each file needs to have a four digit year and east / west label.
cache_pattern : str
File path pattern for saving training data. e.g. ``./df_{}.csv``.
This will be used to save ``self.x``, ``self.y``, and ``self.p``
"""

self.fp_surfrad_data = FP_SURFRAD_DATA
Expand All @@ -76,12 +83,26 @@ def __init__(
self.train_files = train_files

logger.info("Loading training data")
self._load_data(test_fraction, nsrdb_files=nsrdb_files)
if self.cache_exists(cache_pattern):
self.df_raw, self.df_all_sky = self.load_all_data(cache_pattern)
else:
self._load_data(test_fraction, nsrdb_files=nsrdb_files)
if cache_pattern is not None and not self.cache_exists(cache_pattern):
self.save_all_data(cache_pattern)

logger.info("Prepping training data")
self._prep_data(
kwargs=config.get("training_prep_kwargs", TRAINING_PREP_KWARGS)
)

@staticmethod
def cache_exists(cache_pattern):
"""Check if cache files for ``df_raw`` and ``df_all_sky`` exist."""
return cache_pattern is not None and all(
os.path.exists(cache_pattern.format(name))
for name in ["raw", "all_sky"]
)

def _load_surf(self, gid, year, area, time_step, nsrdb_files=None):
"""Load surfrad data for a given gid, year and region. If nsrdb_files
are provided then also compute sky_class and add this to surfrad
Expand Down Expand Up @@ -128,6 +149,67 @@ def _load_surf(self, gid, year, area, time_step, nsrdb_files=None):
)
return temp_surf

@staticmethod
def _add_rest2_data(df_all_sky):
"""Add rest2 data to df_all_sky."""
logger.debug(
"Extracting 2D arrays to run rest2 for clearsky PhyGNN inputs."
)
n = len(df_all_sky)
time_index = pd.DatetimeIndex(df_all_sky.time_index.astype(str))
aod = df_all_sky.aod.values.reshape((n, 1))
alpha = df_all_sky.alpha.values.reshape((n, 1))
surface_pressure = df_all_sky.surface_pressure.values.reshape((n, 1))
surface_albedo = df_all_sky.surface_albedo.values.reshape((n, 1))
ssa = df_all_sky.ssa.values.reshape((n, 1))
asymmetry = df_all_sky.asymmetry.values.reshape((n, 1))
solar_zenith_angle = df_all_sky.solar_zenith_angle.values.reshape(
(n, 1)
)
ozone = df_all_sky.ozone.values.reshape((n, 1))
total_precipitable_water = (
df_all_sky.total_precipitable_water.values.reshape((n, 1))
)
doy = time_index.dayofyear.values

logger.debug("Running rest2 for clearsky PhyGNN inputs.")
radius = ti_to_radius(time_index, n_cols=1)
beta = calc_beta(aod, alpha)
rest_data = rest2(
surface_pressure,
surface_albedo,
ssa,
asymmetry,
solar_zenith_angle,
radius,
alpha,
beta,
ozone,
total_precipitable_water,
)
Tuuclr = rest2_tuuclr(
surface_pressure,
surface_albedo,
ssa,
radius,
alpha,
ozone,
total_precipitable_water,
parallel=False,
)

df_all_sky["doy"] = doy
df_all_sky["radius"] = radius
df_all_sky["Tuuclr"] = Tuuclr
df_all_sky["clearsky_ghi"] = rest_data.ghi
df_all_sky["clearsky_dni"] = rest_data.dni
df_all_sky["Ruuclr"] = rest_data.Ruuclr
df_all_sky["Tddclr"] = rest_data.Tddclr
df_all_sky["Tduclr"] = rest_data.Tduclr
logger.debug("Completed rest2 run for clearsky PhyGNN inputs.")

return df_all_sky

def _load_data(self, test_fraction, nsrdb_files=None):
"""
Load training data
Expand Down Expand Up @@ -163,11 +245,19 @@ def _load_data(self, test_fraction, nsrdb_files=None):
self.train_sites, train_file
)
)
year, area = extract_file_meta(train_file)
train_sites = []
for gid in self.train_sites:
surfrad_file = self.fp_surfrad_data.format(
year=year, code=surf_meta().loc[gid, "surfrad_id"]
)
if os.path.exists(surfrad_file):
train_sites.append(gid)

with NSRDBFeatures(train_file) as res:
temp_raw = res.extract_features(self.train_sites, var_names)
temp_raw = res.extract_features(train_sites, var_names)
temp_all_sky = res.extract_features(
self.train_sites,
self._config.get("all_sky_vars", ALL_SKY_VARS),
train_sites, self._config.get("all_sky_vars", ALL_SKY_VARS)
)

self.observation_sources += len(temp_raw) * [train_file]
Expand All @@ -190,13 +280,12 @@ def _load_data(self, test_fraction, nsrdb_files=None):
logger.debug("\tTime step is {} minutes".format(time_step))

# ------ Grab surface data
year, area = extract_file_meta(train_file)
logger.debug(
"\tGrabbing surface data for {} and {}".format(
year, self.train_sites
)
)
for gid in self.train_sites:
for gid in train_sites:
temp_surf = self._load_surf(
gid=gid,
year=year,
Expand Down Expand Up @@ -233,6 +322,7 @@ def _load_data(self, test_fraction, nsrdb_files=None):
df_all_sky = df_all_sky.join(df_surf)

assert len(df_raw) == len(df_all_sky)

if test_fraction:
np.random.seed(self._config["phygnn_seed"])

Expand All @@ -242,61 +332,7 @@ def _load_data(self, test_fraction, nsrdb_files=None):

assert len(df_raw) == len(df_all_sky)

logger.debug(
"Extracting 2D arrays to run rest2 for " "clearsky PhyGNN inputs."
)
n = len(df_all_sky)
time_index = pd.DatetimeIndex(df_all_sky.time_index.astype(str))
aod = df_all_sky.aod.values.reshape((n, 1))
alpha = df_all_sky.alpha.values.reshape((n, 1))
surface_pressure = df_all_sky.surface_pressure.values.reshape((n, 1))
surface_albedo = df_all_sky.surface_albedo.values.reshape((n, 1))
ssa = df_all_sky.ssa.values.reshape((n, 1))
asymmetry = df_all_sky.asymmetry.values.reshape((n, 1))
solar_zenith_angle = df_all_sky.solar_zenith_angle.values.reshape(
(n, 1)
)
ozone = df_all_sky.ozone.values.reshape((n, 1))
total_precipitable_water = (
df_all_sky.total_precipitable_water.values.reshape((n, 1))
)
doy = time_index.dayofyear.values

logger.debug("Running rest2 for clearsky PhyGNN inputs.")
radius = ti_to_radius(time_index, n_cols=1)
beta = calc_beta(aod, alpha)
rest_data = rest2(
surface_pressure,
surface_albedo,
ssa,
asymmetry,
solar_zenith_angle,
radius,
alpha,
beta,
ozone,
total_precipitable_water,
)
Tuuclr = rest2_tuuclr(
surface_pressure,
surface_albedo,
ssa,
radius,
alpha,
ozone,
total_precipitable_water,
parallel=False,
)

df_all_sky["doy"] = doy
df_all_sky["radius"] = radius
df_all_sky["Tuuclr"] = Tuuclr
df_all_sky["clearsky_ghi"] = rest_data.ghi
df_all_sky["clearsky_dni"] = rest_data.dni
df_all_sky["Ruuclr"] = rest_data.Ruuclr
df_all_sky["Tddclr"] = rest_data.Tddclr
df_all_sky["Tduclr"] = rest_data.Tduclr
logger.debug("Completed rest2 run for clearsky PhyGNN inputs.")
df_all_sky = self._add_rest2_data(df_all_sky)

self.df_raw = df_raw

Expand Down Expand Up @@ -366,8 +402,8 @@ def _prep_data(self, kwargs=TRAINING_PREP_KWARGS):
not_features = drop_list + list(self._config["y_labels"])
features = [f for f in features if f not in not_features]

self.y = self.df_train[self._config["y_labels"]]
self.x = self.df_train[features]
self.y = self.df_train[self._config["y_labels"]].astype(np.float32)
self.x = self.df_train[features].astype(np.float32)
self.p = self.df_all_sky

logger.debug(
Expand Down Expand Up @@ -439,32 +475,38 @@ def _test_train_split(
)
return df_raw, df_all_sky

@property
def all_data(self):
"""Get a joined dataframe of training data (x), physical feature data
(p), and output label data (y).
def save_all_data(self, fp_pattern):
"""Save all raw / all_sky data to disk
Returns
-------
pd.DataFrame
Parameters
----------
fp_pattern : str
.csv filepath pattern to save data to. e.g. ./df_{}.csv
"""
cols_p = [c for c in self.p.columns if c not in self.x]
out = self.x.join(self.p[cols_p])
cols_y = [c for c in self.y.columns if c not in out]
out = out.join(self.y[cols_y])
return out
if fp_pattern is not None:
for df, name in zip(
[self.df_raw, self.df_all_sky], ["raw", "all_sky"]
):
fp = fp_pattern.format(name)
logger.info("Saving training data to: {}".format(fp))
df.to_csv(fp)

def save_all_data(self, fp):
"""Save all x/y/p data to disk
def load_all_data(self, fp_pattern):
"""Load all df_raw / df_all_sky from csv files.
Parameters
----------
fp : str
.csv filepath to save data to.
fp_pattern : str
.csv filepath pattern to load data from. e.g. ./df_{}.csv
"""
if fp is not None:
logger.info("Saving training data to: {}".format(fp))
self.all_data.to_csv(fp)
if fp_pattern is not None:
df_raw_file = fp_pattern.format("raw")
logger.info("Loading df_raw from %s", df_raw_file)
df_raw = pd.read_csv(df_raw_file, index_col=0)
df_all_sky_file = fp_pattern.format("all_sky")
logger.info("Loading df_all_sky from %s", df_all_sky_file)
df_all_sky = pd.read_csv(df_all_sky_file, index_col=0)
return df_raw, df_all_sky


class ValidationData:
Expand Down
4 changes: 2 additions & 2 deletions mlclouds/model/production_model/train_n_val.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
if not os.path.exists(out_dir):
os.makedirs(out_dir)

years = range(2016, 2020)
years = range(2016, 2023)
fp_base = (
"/projects/pxs/mlclouds/training_data/{y}_{ew}_v322/"
"mlclouds_surfrad_{ew}_{y}.h5"
Expand Down Expand Up @@ -58,7 +58,7 @@
config=config,
test_fraction=0.2,
nsrdb_files=nsrdb_files,
fp_save_data="./mlclouds_training_data.csv",
cache_pattern="./mlclouds_df_{}.csv",
)

t.model.history.to_csv(fp_history)
Expand Down
12 changes: 7 additions & 5 deletions mlclouds/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
config=CONFIG,
test_fraction=None,
nsrdb_files=None,
fp_save_data=None,
cache_pattern=None,
):
"""
Train PHYGNN model
Expand All @@ -43,8 +43,11 @@ def __init__(
Nsrdb files including irradiance data for the training sites. This
is used to compute the sky class for these locations which is then
used to filter cloud type data for false positives / negatives
fp_save_data : str
Optional .csv filepath to save training data to
cache_pattern : str
Optional .csv filepath pattern to save data to. e.g.
``./df_{}.csv``. This will be used to save
``self.train_data.df_raw`` and ``self.train_data.df_all_sky``
before they have been split into training and validation sets
"""

logger.info(
Expand Down Expand Up @@ -72,9 +75,8 @@ def __init__(
config=self._config,
test_fraction=test_fraction,
nsrdb_files=nsrdb_files,
cache_pattern=cache_pattern,
)
if fp_save_data is not None:
self.train_data.save_all_data(fp_save_data)

self.x = self.train_data.x
self.y = self.train_data.y
Expand Down

0 comments on commit 61bfdc9

Please sign in to comment.