Skip to content

Commit

Permalink
Merge pull request #257 from lxiam26/main
Browse files Browse the repository at this point in the history
Update model version 4 - January 2025
  • Loading branch information
dwreeves authored Jan 31, 2025
2 parents b60b5d9 + 5b709cb commit 1d4350b
Show file tree
Hide file tree
Showing 21 changed files with 1,028 additions and 88 deletions.
25 changes: 25 additions & 0 deletions alembic/versions/rev005.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
empty message
Revision ID: 6ab68552a4a6
Revises: e433f34dd4bd
Create Date: 2025-01-24 10:25:29.083842
"""

from alembic import op


# revision identifiers, used by Alembic.
revision = "6ab68552a4a6"
down_revision = "e433f34dd4bd"
branch_labels = None
depends_on = None


def upgrade():
op.alter_column("prediction", "probability", new_column_name="predicted_ecoli_cfu_100ml")


def downgrade():
op.alter_column("prediction", "predicted_ecoli_cfu_100ml", new_column_name="probability")
68 changes: 61 additions & 7 deletions app/admin/views/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
from app.data.celery import combine_data_v1_task
from app.data.celery import combine_data_v2_task
from app.data.celery import combine_data_v3_task
from app.data.celery import combine_data_v4_task
from app.data.celery import live_hobolink_data_task
from app.data.celery import live_usgs_data_task
from app.data.celery import predict_v1_task
from app.data.celery import predict_v2_task
from app.data.celery import predict_v3_task
from app.data.celery import predict_v4_task
from app.data.celery import update_db_task
from app.data.database import execute_sql
from app.data.database import get_current_time
Expand Down Expand Up @@ -86,7 +88,15 @@ class DownloadView(BaseView):
are handy because they get around limitations of the Heroku free tier.
"""

TABLES = ["hobolink", "usgs", "processed_data", "prediction", "boathouse", "override_history"]
TABLES = [
"hobolink",
"usgs_w",
"usgs_b",
"processed_data",
"prediction",
"boathouse",
"override_history",
]

@expose("/")
def index(self):
Expand Down Expand Up @@ -122,11 +132,18 @@ def source_hobolink(self):
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="hobolink")
)

@expose("/csv/src/usgs_source")
def source_usgs(self):
@expose("/csv/src/usgs_w_source")
def source_usgs_w(self):
async_result = live_usgs_data_task.delay(days_ago=90)
return redirect(
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="usgs")
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="usgs_w")
)

@expose("/csv/src/usgs_b_source")
def source_usgs_b(self):
async_result = live_usgs_data_task.delay(days_ago=90)
return redirect(
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="usgs_b")
)

@expose("/csv/src/processed_data_v1_source")
Expand Down Expand Up @@ -156,6 +173,15 @@ def source_combine_data_v3(self):
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="combined")
)

@expose("/csv/src/processed_data_v4_source")
def source_combine_data_v4(self):
async_result = combine_data_v4_task.delay(
export_name="code_for_boston_export_90d", days_ago=90
)
return redirect(
url_for("admin_downloadview.csv_wait", task_id=async_result.id, data_source="combined")
)

@expose("/csv/src/prediction_v1_source")
def source_prediction_v1(self):
async_result = predict_v1_task.delay(export_name="code_for_boston_export_90d", days_ago=90)
Expand Down Expand Up @@ -183,6 +209,15 @@ def source_prediction_v3(self):
)
)

@expose("/csv/src/prediction_v4_source")
def source_prediction_v4(self):
async_result = predict_v4_task.delay(export_name="code_for_boston_export_90d", days_ago=90)
return redirect(
url_for(
"admin_downloadview.csv_wait", task_id=async_result.id, data_source="prediction"
)
)

@expose("/csv/wait")
def csv_wait(self):
task_id = request.args.get("task_id")
Expand Down Expand Up @@ -222,10 +257,15 @@ def sync_source_hobolink(self):
df = live_hobolink_data_task.run("code_for_boston_export_90d")
return send_csv_attachment_of_dataframe(df=pd.DataFrame(df), filename="hobolink_source.csv")

@expose("/csv/src_sync/usgs_source")
def sync_source_usgs(self):
@expose("/csv/src_sync/usgs_w_source")
def sync_source_usgs_w(self):
df = live_usgs_data_task.run(days_ago=90)
return send_csv_attachment_of_dataframe(df=pd.DataFrame(df), filename="usgs_source.csv")
return send_csv_attachment_of_dataframe(df=pd.DataFrame(df), filename="usgs_w_source.csv")

@expose("/csv/src_sync/usgs_b_source")
def sync_source_usgs_b(self):
df = live_usgs_data_task.run(days_ago=90)
return send_csv_attachment_of_dataframe(df=pd.DataFrame(df), filename="usgs_b_source.csv")

@expose("/csv/src_sync/processed_data_v1_source")
def sync_source_combine_data_v1(self):
Expand All @@ -248,6 +288,13 @@ def sync_source_combine_data_v3(self):
df=pd.DataFrame(df), filename="model_processed_data.csv"
)

@expose("/csv/src_sync/processed_data_v4_source")
def sync_source_combine_data_v4(self):
df = combine_data_v4_task.run(days_ago=90, export_name="code_for_boston_export_90d")
return send_csv_attachment_of_dataframe(
df=pd.DataFrame(df), filename="model_processed_data.csv"
)

@expose("/csv/src_sync/prediction_v1_source")
def sync_source_prediction_v1(self):
df = predict_v1_task.run(days_ago=90, export_name="code_for_boston_export_90d")
Expand All @@ -269,6 +316,13 @@ def sync_source_prediction_v3(self):
df=pd.DataFrame(df), filename="prediction_source.csv"
)

@expose("/csv/src_sync/prediction_v4_source")
def sync_source_prediction_v4(self):
df = predict_v4_task.run(days_ago=90, export_name="code_for_boston_export_90d")
return send_csv_attachment_of_dataframe(
df=pd.DataFrame(df), filename="prediction_source.csv"
)


class DatabaseView(BaseView):
"""Exposes an "update database" button to the user."""
Expand Down
18 changes: 18 additions & 0 deletions app/data/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ def combine_data_v3_task(*args, **kwargs) -> RecordsType:
return df.to_dict(orient="records")


@celery_app.task
def combine_data_v4_task(*args, **kwargs) -> RecordsType:
from app.data.processing.core import combine_v4_job

df = combine_v4_job(*args, **kwargs)
return df.to_dict(orient="records")


@celery_app.task
def predict_v1_task(*args, **kwargs) -> RecordsType:
from app.data.processing.core import predict_v1_job
Expand All @@ -124,6 +132,14 @@ def predict_v3_task(*args, **kwargs) -> RecordsType:
return df.to_dict(orient="records")


@celery_app.task
def predict_v4_task(*args, **kwargs) -> RecordsType:
from app.data.processing.core import predict_v4_job

df = predict_v4_job(*args, **kwargs)
return df.to_dict(orient="records")


@celery_app.task
def update_db_task(tweet_status: bool = False) -> None:
from app.data.globals import website_options
Expand All @@ -150,9 +166,11 @@ def send_database_exports_task() -> None:
combine_data_v1_task: WithAppContextTask
combine_data_v2_task: WithAppContextTask
combine_data_v3_task: WithAppContextTask
combine_data_v4_task: WithAppContextTask
clear_cache_task: WithAppContextTask
predict_v1_task: WithAppContextTask
predict_v2_task: WithAppContextTask
predict_v3_task: WithAppContextTask
predict_v4_task: WithAppContextTask
update_db_task: WithAppContextTask
send_database_exports_task: WithAppContextTask
24 changes: 15 additions & 9 deletions app/data/models/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ class Prediction(db.Model):
__tablename__ = "prediction"
reach_id = db.Column(db.Integer, db.ForeignKey("reach.id"), primary_key=True, nullable=False)
time = db.Column(db.DateTime, primary_key=True, nullable=False)
# predicted_ecoli_cfu_100ml = db.Column(db.Numeric)
probability = db.Column(db.Numeric)
predicted_ecoli_cfu_100ml = db.Column(db.Numeric)
# probability = db.Column(db.Numeric)
safe = db.Column(db.Boolean)

reach = db.relationship("Reach", back_populates="predictions")

# @property
# def predicted_ecoli_cfu_100ml_rounded(self) -> float:
# return round(self.predicted_ecoli_cfu_100ml, 1)

@property
def probability_rounded_and_formatted(self) -> str:
return str(round(self.probability * 100, 1)) + "%"
def predicted_ecoli_cfu_100ml_rounded(self) -> float:
return round(self.predicted_ecoli_cfu_100ml, 1)

# @property
# def probability_rounded_and_formatted(self) -> str:
# return str(round(self.probability * 100, 1)) + "%"

@classmethod
def _latest_ts_scalar_subquery(cls):
Expand All @@ -44,8 +44,14 @@ def get_latest(cls, reach: int) -> "Prediction":
def get_all_latest(cls) -> List["Prediction"]:
return db.session.query(cls).filter(cls.time == cls._latest_ts_scalar_subquery()).all()

# def api_v1_to_dict(self) -> Dict[str, Any]:
# return {"prediction": float(self.probability), "safe": self.safe, "time": self.time}
def api_v1_to_dict(self) -> Dict[str, Any]:
return {"prediction": float(self.probability), "safe": self.safe, "time": self.time}
return {
"prediction": float(self.predicted_ecoli_cfu_100ml),
"safe": self.safe,
"time": self.time,
}


def get_latest_prediction_time() -> datetime:
Expand Down
50 changes: 37 additions & 13 deletions app/data/processing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from app.data.processing.hobolink import HOBOLINK_ROWS_PER_HOUR
from app.data.processing.hobolink import get_live_hobolink_data
from app.data.processing.usgs import USGS_DEFAULT_DAYS_AGO
from app.data.processing.usgs import USGS_ROWS_PER_HOUR
from app.data.processing.usgs import USGS_ROWS_PER_HOUR_MUDDY_RIVER
from app.data.processing.usgs import USGS_ROWS_PER_HOUR_WALTHAM
from app.data.processing.usgs import get_live_usgs_data
from app.mail import ExportEmail
from app.mail import mail
Expand All @@ -38,7 +39,9 @@ def _write_to_db(df: pd.DataFrame, table_name: str, rows: Optional[int] = None)
class ModelModule(Protocol):
MODEL_YEAR: str

def process_data(self, df_hobolink: pd.DataFrame, df_usgs: pd.DataFrame) -> pd.DataFrame: ...
def process_data(
self, df_hobolink: pd.DataFrame, df_usgs_w: pd.DataFrame, df_usgs_b: pd.DataFrame
) -> pd.DataFrame: ...

def all_models(self, df: pd.DataFrame, *args, **kwargs) -> pd.DataFrame: ...

Expand All @@ -47,6 +50,7 @@ class ModelVersion(str, Enum):
v1 = "v1"
v2 = "v2"
v3 = "v3"
v4 = "v4"

def get_module(self) -> ModelModule:
if self == self.__class__.v1:
Expand All @@ -61,11 +65,15 @@ def get_module(self) -> ModelModule:
from app.data.processing.predictive_models import v3

return v3
elif self == self.__class__.v4:
from app.data.processing.predictive_models import v4

return v4
else:
raise ValueError(f"Unclear what happened; {self} not supported")


DEFAULT_MODEL_VERSION = ModelVersion.v1
DEFAULT_MODEL_VERSION = ModelVersion.v4


@mail_on_fail
Expand All @@ -75,15 +83,19 @@ def _combine_job(
model_version: ModelVersion = DEFAULT_MODEL_VERSION,
) -> pd.DataFrame:
mod = model_version.get_module()
df_usgs = get_live_usgs_data(days_ago=days_ago)
df_usgs_w = get_live_usgs_data(days_ago=days_ago, site_no="01104500")
df_usgs_b = get_live_usgs_data(days_ago=days_ago, site_no="01104683")
df_hobolink = get_live_hobolink_data(export_name=export_name)
df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs)
df_combined = mod.process_data(
df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b
)
return df_combined


combine_v1_job = partial(_combine_job, model_version=ModelVersion.v1)
combine_v2_job = partial(_combine_job, model_version=ModelVersion.v2)
combine_v3_job = partial(_combine_job, model_version=ModelVersion.v3)
combine_v4_job = partial(_combine_job, model_version=ModelVersion.v4)


@mail_on_fail
Expand All @@ -93,29 +105,37 @@ def _predict_job(
model_version: ModelVersion = DEFAULT_MODEL_VERSION,
) -> pd.DataFrame:
mod = model_version.get_module()
df_usgs = get_live_usgs_data(days_ago=days_ago)
df_usgs_w = get_live_usgs_data(days_ago=days_ago, site_no="01104500")
df_usgs_b = get_live_usgs_data(days_ago=days_ago, site_no="01104683")
df_hobolink = get_live_hobolink_data(export_name=export_name)
df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs)
df_combined = mod.process_data(
df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b
)
df_predictions = mod.all_models(df_combined)
return df_predictions


predict_v1_job = partial(_predict_job, model_version=ModelVersion.v1)
predict_v2_job = partial(_predict_job, model_version=ModelVersion.v2)
predict_v3_job = partial(_predict_job, model_version=ModelVersion.v3)
predict_v4_job = partial(_predict_job, model_version=ModelVersion.v4)


@mail_on_fail
def update_db() -> None:
mod = DEFAULT_MODEL_VERSION.get_module()
df_usgs = get_live_usgs_data()
df_usgs_w = get_live_usgs_data(site_no="01104500")
df_usgs_b = get_live_usgs_data(site_no="01104683")
df_hobolink = get_live_hobolink_data()
df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs)
df_combined = mod.process_data(
df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b
)
df_predictions = mod.all_models(df_combined)

hours = current_app.config["STORAGE_HOURS"]
try:
_write_to_db(df_usgs, "usgs", rows=hours * USGS_ROWS_PER_HOUR)
_write_to_db(df_usgs_w, "usgs_w", rows=hours * USGS_ROWS_PER_HOUR_WALTHAM)
_write_to_db(df_usgs_b, "usgs_b", rows=hours * USGS_ROWS_PER_HOUR_MUDDY_RIVER)
_write_to_db(df_hobolink, "hobolink", rows=hours * HOBOLINK_ROWS_PER_HOUR)
_write_to_db(df_combined, "processed_data")
_write_to_db(df_predictions, Prediction.__tablename__)
Expand All @@ -129,17 +149,21 @@ def update_db() -> None:
@mail_on_fail
def send_database_exports() -> None:
mod = DEFAULT_MODEL_VERSION.get_module()
df_usgs = get_live_usgs_data(days_ago=90)
df_usgs_w = get_live_usgs_data(days_ago=90, site_no="01104500")
df_usgs_b = get_live_usgs_data(days_ago=90, site_no="01104683")
df_hobolink = get_live_hobolink_data(export_name="code_for_boston_export_90d")
df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs)
df_combined = mod.process_data(
df_hobolink=df_hobolink, df_usgs_w=df_usgs_w, df_usgs_b=df_usgs_b
)
df_predictions = mod.all_models(df_combined)
df_override_history = execute_sql("select * from override_history;")

todays_date = get_current_time().strftime("%Y_%m_%d")

msg = ExportEmail()

msg.attach_dataframe(df_usgs, f"{todays_date}-usgs.csv")
msg.attach_dataframe(df_usgs_w, f"{todays_date}-usgs_w.csv")
msg.attach_dataframe(df_usgs_b, f"{todays_date}-usgs_b.csv")
msg.attach_dataframe(df_hobolink, f"{todays_date}-hobolink.csv")
msg.attach_dataframe(df_combined, f"{todays_date}-combined.csv")
msg.attach_dataframe(df_predictions, f"{todays_date}-prediction.csv")
Expand Down
Loading

0 comments on commit 1d4350b

Please sign in to comment.