Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bias adjusted precip #3202

Merged
merged 11 commits into from
Nov 1, 2023
68 changes: 68 additions & 0 deletions api/alembic/versions/f2e027a47a3f_bias_adjusted_precip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""bias adjusted precip

Revision ID: f2e027a47a3f
Revises: 5b745fe0bd7a
Create Date: 2023-10-30 11:34:21.603046

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = 'f2e027a47a3f'
down_revision = '5b745fe0bd7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('weather_station_model_predictions', sa.Column('bias_adjusted_precip_24h', sa.Float(), nullable=True))
# Drop morecast_2_materialized view and recreate with the bias_adjusted_precip_24h field
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.bias_adjusted_precip_24h, weather_station_model_predictions.update_date,
weather_station_model_predictions.prediction_model_run_timestamp_id
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")


def downgrade():
# Drop morecast_2_materialized view before dropping the column in the table in order to avoid dependency issues
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.drop_column('weather_station_model_predictions', 'bias_adjusted_precip_24h')
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.update_date
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")
76 changes: 74 additions & 2 deletions api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
"""
import datetime
from typing import List
from sqlalchemy import and_, select
from sqlalchemy import and_, select, text
from sqlalchemy.sql import func
from sqlalchemy.orm import Session
from app.db.models.weather_models import ModelRunPrediction, PredictionModelRunTimestamp
from app.db.models.weather_models import (ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp,
WeatherStationModelPrediction)
from app.db.models.observations import HourlyActual


Expand Down Expand Up @@ -69,3 +70,74 @@ def get_accumulated_precipitation(session: Session, station_code: int, start_dat
if result is None:
return 0
return result


def get_accumulated_precip_by_24h_interval(session: Session, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Get the accumulated precip for 24 hour intervals for a given station code within the specified time interval.
:param session: The ORM/database session.
:param station_code: The numeric code identifying the weather station of interest.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.

Note: I couldn't construct this query in SQLAlchemy, hence the need for the 'text' based query.

generate_series(\'{}\', \'{}\', '24 hours'::interval)

This gives us a one column table of dates separated by 24 hours between the start and end dates. For example, if start and end dates are 2023-10-31 20:00:00 to 2023-11-03 20:00:00 we would have a table like:

day
2023-10-31 20:00:00
2023-11-01 20:00:00
2023-11-02 20:00:00
2023-11-03 20:00:00

We then join the HourlyActuals table so that we can sum hourly precip in a 24 hour period. The join is based on the weather_date field in the HourlyActuals table being in a 24 hour range using this odd looking syntax:

weather_date <@ tstzrange(day, day + '24 hours', '(]')

Using 2023-10-31 20:00:00 as an example, rows with the following dates would match. The (] syntax means the lower bound is excluded but the upper bound is included.

2023-10-31 21:00:00
2023-10-31 22:00:00
2023-10-31 23:00:00
2023-11-01 00:00:00
2023-11-01 01:00:00
....
2023-11-01 19:00:00
2023-11-01 20:00:00
"""
stmt = """
SELECT day, station_code, sum(precipitation) actual_precip_24h
FROM
generate_series(\'{}\', \'{}\', '24 hours'::interval) day
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this syntax, but is this creating a series of days based on 24 hour intervals, then selecting each weather date based on it's hour being less than the subset of the series, then summing up the precip based on the hourly actuals that fit in that window? (There's probably a better way of expressing this in English)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have/need an index on hourly_actuals.weather_date for this query?

Copy link
Collaborator Author

@dgboss dgboss Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, you have the gist of it. I'm glad you asked this question, I think I found an off-by-one day logic error.

I'll add to the comment with a description of what this is doing.

generate_series(\'{}\', \'{}\', '24 hours'::interval)
This gives us a one column table of dates separated by 24 hours between the start and end dates. For example, if start and end dates are 2023-10-31 20:00:00 to 2023-11-03 20:00:00 we would have a table like:

day
2023-10-31 20:00:00
2023-11-01 20:00:00
2023-11-02 20:00:00
2023-11-03 20:00:00

We then join the HourlyActuals table so that we can sum hourly precip in a 24 hour period. The join is based on the weather_date field in the HourlyActuals table being in a 24 hour range using this odd looking syntax:

weather_date <@ tstzrange(day, day + '24 hours', '(]')

Using 2023-10-31 20:00:00 as an example, rows with the following dates would match. The (] syntax means the lower bound is excluded but the upper bound is included.

2023-10-31 21:00:00
2023-10-31 22:00:00
2023-10-31 23:00:00
2023-11-01 00:00:00
2023-11-01 01:00:00
....
2023-11-01 19:00:00
2023-11-01 20:00:00

My off by one error is that right now I am attributing this accumulated precipitation to October 31, but it should actually be for Nov 1 (we need the preceding 24 hour precip).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An index on weather_date is a great idea!

LEFT JOIN
hourly_actuals
ON
weather_date <@ tstzrange(day, day - '24 hours', '(]')
WHERE
station_code = {}
GROUP BY
day, station_code;
""".format(start_datetime, end_datetime, station_code)
result = session.execute(text(stmt))
return result.all()


def get_predicted_daily_precip(session: Session, model: PredictionModel, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Gets rows from WeatherStationModelPrediction for the given model and station within the
specified time interval at 20:00:00 UTC each day.
:param session: The ORM/database session
:param model: The numeric weather prediction model
:param station_code: The code identifying the weather station.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.
"""
result = session.query(WeatherStationModelPrediction)\
.join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id)\
.filter(PredictionModelRunTimestamp.prediction_model_id == model.id)\
.filter(WeatherStationModelPrediction.station_code == station_code)\
.filter(WeatherStationModelPrediction.prediction_timestamp >= start_datetime)\
.filter(WeatherStationModelPrediction.prediction_timestamp < end_datetime)\
.filter(func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20)\
.order_by(WeatherStationModelPrediction.prediction_timestamp)
return result.all()
1 change: 1 addition & 0 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def get_latest_station_prediction_mat_view(session: Session,
MoreCast2MaterializedView.bias_adjusted_wind_speed,
MoreCast2MaterializedView.bias_adjusted_wdir,
MoreCast2MaterializedView.precip_24h,
MoreCast2MaterializedView.bias_adjusted_precip_24h,
MoreCast2MaterializedView.wdir_tgl_10,
MoreCast2MaterializedView.wind_tgl_10,
MoreCast2MaterializedView.update_date).\
Expand Down
5 changes: 4 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,13 @@ class WeatherStationModelPrediction(Base):
default=time_utils.get_utc_now(), index=True)
# Precipitation predicted for the previous 24 hour period.
precip_24h = Column(Float, nullable=True)
# Precipitation predicted for the previous 24 hour period adjusted for bias.
bias_adjusted_precip_24h = Column(Float, nullable=True)

def __str__(self):
return ('{self.station_code} {self.prediction_timestamp} {self.tmp_tgl_2} {self.bias_adjusted_temperature} '
'{self.rh_tgl_2} {self.bias_adjusted_rh} {self.wdir_tgl_10} {self.bias_adjusted_wdir} {self.wind_tgl_10} '
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip}').format(self=self)
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip} {self.bias_adjusted_precip_24h}').format(self=self)


class MoreCast2MaterializedView(Base):
Expand All @@ -267,6 +269,7 @@ class MoreCast2MaterializedView(Base):
primary_key=True, nullable=False, index=True)
abbreviation = Column(String, nullable=False)
apcp_sfc_0 = Column(Float, nullable=False)
bias_adjusted_precip_24h = Column(Float, nullable=False)
bias_adjusted_rh = Column(Float, nullable=False)
bias_adjusted_temperature = Column(Float, nullable=False)
bias_adjusted_wind_speed = Column(Float, nullable=False)
Expand Down
7 changes: 6 additions & 1 deletion api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def _process_prediction(self,
station_prediction.apcp_sfc_0 = 0.0
else:
station_prediction.apcp_sfc_0 = prediction.apcp_sfc_0
# Calculate the delta_precipitation based on station's previous prediction_timestamp
# Calculate the delta_precipitation and 24 hour precip based on station's previous prediction_timestamp
# for the same model run
self.session.flush()
station_prediction.precip_24h = self._calculate_past_24_hour_precip(
Expand Down Expand Up @@ -280,6 +280,11 @@ def _process_prediction(self,
station_prediction.wdir_tgl_10,
station_prediction.prediction_timestamp
)
# Predict the 24 hour precipitation
station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(
station_prediction.precip_24h,
station_prediction.prediction_timestamp
)

# Update the update time (this might be an update)
station_prediction.update_date = time_utils.get_utc_now()
Expand Down
41 changes: 40 additions & 1 deletion api/app/tests/weather_models/crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
""" Some crud responses used to mock our calls to app.db.crud
"""
from datetime import datetime
from app.db.models.weather_models import ModelRunPrediction
from app.db.models.weather_models import ModelRunPrediction, WeatherStationModelPrediction
from app.db.models.observations import HourlyActual


class MockActualPrecip:
day: datetime
actual_precip_24h: float

def __init__(self, day, actual_precip_24h):
self.day=day
self.actual_precip_24h=actual_precip_24h


def get_actuals_left_outer_join_with_predictions(*args):
""" Fixed response as replacement for app.db.crud.observations.get_actuals_left_outer_join_with_predictions
"""
Expand Down Expand Up @@ -89,3 +98,33 @@ def get_actuals_left_outer_join_with_predictions(*args):
prediction_timestamp=datetime(2020, 10, 11, 21))]
]
return result

def get_accumulated_precip_by_24h_interval(*args):
""" Fixed response as replacement for app.db.crud.observations.get_accumulated_precip_by_24h_interval
"""
return [
MockActualPrecip(
day=datetime(2023,10,10,20,0,0),
actual_precip_24h=3

),
MockActualPrecip(
day=datetime(2023,10,11,20,0,0),
actual_precip_24h=3
)
]


def get_predicted_daily_precip(*args):
return [
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,10,20,0,0)
),
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,11,20,0,0)
)
]
28 changes: 25 additions & 3 deletions api/app/tests/weather_models/test_machine_learning.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from datetime import datetime
import pytest
from app.weather_models import machine_learning
from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions
from app.tests.weather_models.crud import (get_actuals_left_outer_join_with_predictions,
get_accumulated_precip_by_24h_interval,
get_predicted_daily_precip)
from app.db.models.weather_models import PredictionModel
from app.weather_models.machine_learning import StationMachineLearning

Expand All @@ -11,9 +13,23 @@ def mock_get_actuals_left_outer_join_with_predictions(monkeypatch):
""" Mock out call to DB returning actuals macthed with predictions """
monkeypatch.setattr(machine_learning, 'get_actuals_left_outer_join_with_predictions',
get_actuals_left_outer_join_with_predictions)

@pytest.fixture()
def mock_get_accumulated_precip_by_24h_interval(monkeypatch):
""" Mock out call to DB returning actual 24 hour precipitation data """
monkeypatch.setattr(machine_learning, 'get_accumulated_precip_by_24h_interval',
get_accumulated_precip_by_24h_interval)


@pytest.fixture()
def mock_get_predicted_daily_precip(monkeypatch):
""" Mock out call to DB returning modelled/predicted 24 hour precipitation data """
monkeypatch.setattr(machine_learning, 'get_predicted_daily_precip',
get_predicted_daily_precip)

def test_bias_adjustment_with_samples(mock_get_actuals_left_outer_join_with_predictions):
def test_bias_adjustment_with_samples(mock_get_actuals_left_outer_join_with_predictions,
mock_get_accumulated_precip_by_24h_interval,
mock_get_predicted_daily_precip):
predict_date_with_samples = datetime.fromisoformat("2020-09-03T21:14:51.939836+00:00")

machine_learner = StationMachineLearning(
Expand All @@ -30,12 +46,16 @@ def test_bias_adjustment_with_samples(mock_get_actuals_left_outer_join_with_pred
temp_result = machine_learner.predict_temperature(20, predict_date_with_samples)
rh_result = machine_learner.predict_rh(50, predict_date_with_samples)
wdir_result = machine_learner.predict_wind_direction(10, 120, predict_date_with_samples)
precip_result = machine_learner.predict_precipitation(3, datetime(2023,10,26,20,0,0))
assert temp_result == 30
assert rh_result == 100
assert wdir_result == 120
assert precip_result == 3


def test_bias_adjustment_without_samples(mock_get_actuals_left_outer_join_with_predictions):
def test_bias_adjustment_without_samples(mock_get_actuals_left_outer_join_with_predictions,
mock_get_accumulated_precip_by_24h_interval,
mock_get_predicted_daily_precip):
predict_date_without_samples = datetime.fromisoformat("2020-09-03T01:14:51.939836+00:00")

machine_learner = StationMachineLearning(
Expand All @@ -52,6 +72,8 @@ def test_bias_adjustment_without_samples(mock_get_actuals_left_outer_join_with_p
temp_result = machine_learner.predict_temperature(20, predict_date_without_samples)
rh_result = machine_learner.predict_rh(50, predict_date_without_samples)
wdir_result = machine_learner.predict_wind_direction(10, 290, predict_date_without_samples)
precip_result = machine_learner.predict_precipitation(3, predict_date_without_samples)
assert temp_result is None
assert rh_result is None
assert wdir_result is None
assert precip_result is None
28 changes: 28 additions & 0 deletions api/app/tests/weather_models/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from app.weather_models.utils import construct_dictionary_from_list_by_property


class TestObject:
id: int
name: str

def __init__(self, id, name):
self.id = id
self.name = name


object_1 = TestObject(1, "foo")
object_2 = TestObject(2, "bar")
object_3 = TestObject(3, "foo")

object_list = [object_1, object_2, object_3]


def test_construct_dictionary_from_list_by_property():
result = construct_dictionary_from_list_by_property(object_list, "name")
assert result is not None
assert len(result.keys()) == 2
assert len(result["foo"]) == 2
assert len(result["bar"]) == 1
assert object_1 in result["foo"]
assert object_2 in result["bar"]
assert object_3 in result["foo"]
2 changes: 1 addition & 1 deletion api/app/weather_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# Key values on ModelRunGridSubsetPrediction.
# Wind direction (wdir_tgl_10_b) is handled slightly differently, so not included here.
SCALAR_MODEL_VALUE_KEYS = ('tmp_tgl_2', 'rh_tgl_2', 'wind_tgl_10', 'apcp_sfc_0')
SCALAR_MODEL_VALUE_KEYS = ('tmp_tgl_2', 'rh_tgl_2', 'wind_tgl_10')


class ModelEnum(str, Enum):
Expand Down
3 changes: 2 additions & 1 deletion api/app/weather_models/fetch/predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess

daily_result = get_latest_station_prediction_mat_view(
session, active_station_codes, day_start, day_end)
for timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, bias_adjusted_wind_speed, bias_adjusted_wdir, precip_24hours, wind_dir, wind_speed, update_date in daily_result:
for timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, bias_adjusted_wind_speed, bias_adjusted_wdir, precip_24hours, bias_adjusted_precip_24h, wind_dir, wind_speed, update_date in daily_result:

# Create two WeatherIndeterminates, one for model predictions and one for bias corrected predictions
results.append(
Expand All @@ -163,6 +163,7 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess
utc_timestamp=timestamp,
temperature=bias_adjusted_temp,
relative_humidity=bias_adjusted_rh,
precipitation=bias_adjusted_precip_24h,
wind_speed=bias_adjusted_wind_speed,
wind_direction=bias_adjusted_wdir
))
Expand Down
Loading