From 7628b1245f2042699fe3fa55adf9a85d1c326fbb Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Tue, 3 Dec 2024 14:48:19 -0800 Subject: [PATCH 01/12] Hourly FFMC --- api/app/jobs/rdps_sfms.py | 2 +- api/app/jobs/sfms_calculations.py | 42 ++++- api/app/sfms/daily_fwi_processor.py | 3 +- api/app/sfms/fwi_processor.py | 2 +- api/app/sfms/hourly_ffmc_processor.py | 95 +++++++++++ api/app/sfms/raster_addresser.py | 50 +++++- api/app/tests/dataset_common.py | 39 +++++ api/app/tests/jobs/test_sfms_calculations.py | 2 + .../tests/sfms/test_daily_fwi_processor.py | 48 +----- api/app/tests/sfms/test_fwi_processor.py | 4 +- .../tests/sfms/test_hourly_ffmc_processor.py | 156 ++++++++++++++++++ api/app/tests/sfms/test_raster_addresser.py | 39 +++++ .../rdps_filename_marshaller.py | 15 ++ 13 files changed, 441 insertions(+), 56 deletions(-) create mode 100644 api/app/sfms/hourly_ffmc_processor.py create mode 100644 api/app/tests/sfms/test_hourly_ffmc_processor.py diff --git a/api/app/jobs/rdps_sfms.py b/api/app/jobs/rdps_sfms.py index 2538a103b5..3e8ee931f8 100644 --- a/api/app/jobs/rdps_sfms.py +++ b/api/app/jobs/rdps_sfms.py @@ -37,7 +37,7 @@ DAYS_TO_RETAIN = 7 -MAX_MODEL_RUN_HOUR = 37 +MAX_MODEL_RUN_HOUR = 45 GRIB_LAYERS = {"temp": "TMP_TGL_2", "rh": "RH_TGL_2", "precip": "APCP_SFC_0", "wind_speed": "WIND_TGL_10"} diff --git a/api/app/jobs/sfms_calculations.py b/api/app/jobs/sfms_calculations.py index 2deb590e71..fea68d7c20 100644 --- a/api/app/jobs/sfms_calculations.py +++ b/api/app/jobs/sfms_calculations.py @@ -6,8 +6,10 @@ from app import configure_logging from app.geospatial.wps_dataset import multi_wps_dataset_context +from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR from app.rocketchat_notifications import send_rocketchat_notification from app.sfms.daily_fwi_processor import DailyFWIProcessor +from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor from app.sfms.raster_addresser import RasterKeyAddresser from app.utils.s3_client import S3Client from app.utils.time import get_utc_now @@ -18,12 +20,44 @@ class SFMSCalcJob: - async def calculate_daily_fwi(self, start_time: datetime): + async def calculate_fwi_rasters(self, start_time: datetime) -> None: """ - Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift, + Entry point for processing SFMS daily FWI rasters and hFFMC raster. To run from a specific date manually in openshift, see openshift/sfms-calculate/README.md + + :param start_time: The RDPS model run time to use for processing. + """ + + await self.calculate_daily_fwi(start_time) + await self.calculate_hffmc(start_time) + + async def calculate_hffmc(self, start_time: datetime) -> None: + """ + Entry point for calculating hourly FFMC rasters. Uses a 04:00 or 16:00 PST (12:00 or 24:00 UTC) hFFMC raster from SFMS as a base input. + + :param start_time: The date time to use for processing. Calculations will begin at the most recent RDPS model run (00Z or 12Z). + """ + logger.info("Begin hFFMC raster calculations.") + + start_exec = get_utc_now() + + hffmc_processor = HourlyFFMCProcessor(start_time, RasterKeyAddresser()) + + async with S3Client() as s3_client: + await hffmc_processor.process(s3_client, multi_wps_dataset_context, MAX_MODEL_RUN_HOUR) + + # calculate the execution time. + execution_time = get_utc_now() - start_exec + hours, remainder = divmod(execution_time.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + logger.info(f"hFFMC raster processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds") + + async def calculate_daily_fwi(self, start_time: datetime): + """ + Entry point for processing SFMS daily FWI rasters. """ - logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") + logger.info(f"Begin FWI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") start_exec = get_utc_now() @@ -55,7 +89,7 @@ def main(): job = SFMSCalcJob() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(job.calculate_daily_fwi(start_time)) + loop.run_until_complete(job.calculate_fwi_rasters(start_time)) except Exception as e: logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e) rc_message = ":scream: Encountered an error while processing SFMS raster data." diff --git a/api/app/sfms/daily_fwi_processor.py b/api/app/sfms/daily_fwi_processor.py index b958ff6778..815d553259 100644 --- a/api/app/sfms/daily_fwi_processor.py +++ b/api/app/sfms/daily_fwi_processor.py @@ -46,7 +46,7 @@ async def process( # Get and check existence of weather s3 keys temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour) - weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key) + weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) if not weather_keys_exist: logging.warning(f"Missing weather keys for {model_run_for_hour(self.start_datetime.hour):02} model run") break @@ -76,6 +76,7 @@ async def process( precip_ds.close() rh_ds.close() temp_ds.close() + wind_speed_ds.close() # Create latitude and month arrays needed for calculations latitude_array = dmc_ds.generate_latitude_array() month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month) diff --git a/api/app/sfms/fwi_processor.py b/api/app/sfms/fwi_processor.py index 2f2ca2ead5..5255fca751 100644 --- a/api/app/sfms/fwi_processor.py +++ b/api/app/sfms/fwi_processor.py @@ -66,7 +66,7 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0) start = perf_counter() - ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array) + ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, wind_speed_array, precip_array) logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start) nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask() diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py new file mode 100644 index 0000000000..e4bb513fbb --- /dev/null +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -0,0 +1,95 @@ +import logging +import os +import tempfile +from datetime import datetime, timedelta +from osgeo import gdal +from typing import List, cast + +from app.weather_models.rdps_filename_marshaller import model_run_for_hour + +from app.geospatial.wps_dataset import WPSDataset +from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR +from app.sfms.daily_fwi_processor import MultiDatasetContext +from app.sfms.fwi_processor import calculate_ffmc +from app.sfms.raster_addresser import RasterKeyAddresser +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3 import set_s3_gdal_config +from app.utils.s3_client import S3Client + + +logger = logging.getLogger(__name__) + + +class HourlyFFMCProcessor: + """ + Class for calculating/generating forecasted hourly FFMC rasters. + """ + + def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser): + self.start_datetime = start_datetime + self.addresser = addresser + + async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR): + set_s3_gdal_config() + + # 1 - Determine starting hFFMC (4am or 4pm) from SFMS and get key, confirm exists, if not, exit + # 2 - Determine what would be last key of run and check if exists, if exists, exit + # 3 - Get all weather variable keys and check if last one exists, if not, exit + # 4 - Use seed hFFMC plus: + # - rh, temp and wind speed from RDPS model run hour n = 000 + # - computed precip at n = 0Z or 12Z + # 5 - Use newly calculated hFFMC to calculate next hFFMC using: + # - rh, temp and wind speed from RDPS model run hour n + 1 + # - computed precip at n + 1 + # hFFMC files from SFMS use PST datetimes + + # Determine most recent RDPS model run + rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour) + rdps_model_run_start = datetime( + year=self.start_datetime.year, month=self.start_datetime.month, day=self.start_datetime.day, hour=rdps_model_run_hour, tzinfo=self.start_datetime.tzinfo + ) + + # Determine key to the initial/seed hFFMC from SFMS and check if it exists. Initial hffmc will be a 04 or 16 hour hffmc from SFMS. + hffmc_key = self.addresser.get_uploaded_hffmc_key(rdps_model_run_start) + hffmc_key_exists = await s3_client.all_objects_exist(hffmc_key) + if not hffmc_key_exists: + logger.warning(f"Missing initial hFFMC raster from SFMS for date {self.start_datetime}. Missing key is {hffmc_key}.") + return + + for hour in range(0, hours_to_process): + with tempfile.TemporaryDirectory() as temp_dir: + # Get and check existence of weather s3 keys + temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour) + weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) + if not weather_keys_exist: + logging.warning(f"Missing weather keys for model run: {rdps_model_run_start}") + break + + # Prefix our S3 keys for access via gdal + temp_key, rh_key, wind_speed_key, precip_key, hffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, hffmc_key) + with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, hffmc_key]) as input_datasets: + input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference + temp_ds, rh_ds, wind_speed_ds, precip_ds, hffmc_ds = input_datasets + # Warp weather datasets to match hffmc + warped_temp_ds = temp_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR) + warped_rh_ds = rh_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR) + warped_wind_speed_ds = wind_speed_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR) + warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR) + + # Create and store new hFFMC dataset + hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds) + new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour) + hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime) + geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform() + projection = hffmc_ds.as_gdal_ds().GetProjection() + hffmc_ds.close() + await s3_client.persist_raster_data( + temp_dir, + hffmc_key, + geotransform, + projection, + hffmc_values, + hffmc_no_data_value, + ) + # Clear gdal virtual file system cache of S3 metadata in order to allow newly uploaded hffmc rasters to be opened immediately. + gdal.VSICurlClearCache() diff --git a/api/app/sfms/raster_addresser.py b/api/app/sfms/raster_addresser.py index 3a99bd90a5..5d92f4c7c5 100644 --- a/api/app/sfms/raster_addresser.py +++ b/api/app/sfms/raster_addresser.py @@ -1,10 +1,11 @@ import os import enum -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo from app import config +from app.utils.time import convert_utc_to_pdt from app.weather_models import ModelEnum -from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key +from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key, compose_rdps_key_hffmc class WeatherParameter(enum.Enum): @@ -36,15 +37,16 @@ class RasterKeyAddresser: def __init__(self): self.sfms_calculated_prefix = "sfms/calculated" self.s3_prefix = f"/vsis3/{config.get('OBJECT_STORE_BUCKET')}" + self.smfs_hourly_upload_prefix = "sfms/uploads/hourlies" self.sfms_upload_prefix = "sfms/uploads/actual" self.weather_model_prefix = f"weather_models/{ModelEnum.RDPS.lower()}" def get_uploaded_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): assert_all_utc(datetime_utc) iso_date = datetime_utc.date().isoformat() - return f"{self.sfms_upload_prefix}/{iso_date}/{fwi_param.value}{iso_date.replace('-', '')}.tif" + def get_calculated_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): """ Generates the calculated fire weather index key that points to the associated raster artifact in the object store. @@ -105,3 +107,45 @@ def gdal_prefix_keys(self, *keys): :return: A tuple of all strings provided, prefixed with vsis3/{bucket} """ return tuple(f"{self.s3_prefix}/{key}" for key in keys) + + def get_uploaded_hffmc_key(self, datetime_utc: datetime): + """ + Given the start time of an RDPS model run, return a key to the most recent hFFMC raster which will be + equivalent to RDPS model run start time minus one hour in PDT. Note that the hFFMC rasters are stored according + to PDT times. hFFMC keys will end with 04 or 16 for their hour. + + :param datetime_utc: The RDPS model run start date and time. + :return: A key to the most recent hFFMC raster. + """ + assert_all_utc(datetime_utc) + + # Convert utc into pdt and substract one hour to get hFFMC source raster time. sfms only produces hFFMC from Apr - Oct which is always PDT + datetime_pdt = convert_utc_to_pdt(datetime_utc) - timedelta(hours=1) + iso_date = datetime_pdt.date().isoformat() + return f"{self.smfs_hourly_upload_prefix}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif" + + def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hour): + assert_all_utc(rdps_model_run_start) + non_precip_keys = tuple(self.get_model_data_key_hffmc(rdps_model_run_start, offset_hour, param) for param in WeatherParameter) + datetime_to_calculate_utc = rdps_model_run_start + timedelta(hours=offset_hour) + precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc) + all_weather_data_keys = non_precip_keys + (precip_key,) + return all_weather_data_keys + + def get_model_data_key_hffmc(self, start_time_utc: datetime, offset_hour: int, weather_param: WeatherParameter): + assert_all_utc(start_time_utc) + weather_model_date_prefix = f"{self.weather_model_prefix}/{start_time_utc.date().isoformat()}/" + return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(start_time_utc, offset_hour, weather_param.value)) + + def get_calculated_hffmc_index_key(self, datetime_utc: datetime): + """ + Given a UTC datetime return a calculated key based on PDT time as hFFMC rasters are named according to PDT. + + :param datetime_utc: A UTC datetime. + :return: An S3 key for hFFMC using PDT time. + """ + assert_all_utc(datetime_utc) + datetime_pdt = convert_utc_to_pdt(datetime_utc) + iso_date = datetime_pdt.date().isoformat() + weather_param_prefix = "fine_fuel_moisture_code" + return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif" \ No newline at end of file diff --git a/api/app/tests/dataset_common.py b/api/app/tests/dataset_common.py index 32b7353788..961f51ac11 100644 --- a/api/app/tests/dataset_common.py +++ b/api/app/tests/dataset_common.py @@ -1,5 +1,7 @@ +from contextlib import ExitStack, contextmanager import numpy as np from osgeo import osr, gdal +from typing import List import uuid from app.geospatial.wps_dataset import WPSDataset @@ -47,3 +49,40 @@ def create_mock_gdal_dataset(): def create_mock_wps_dataset(): mock_ds = create_mock_gdal_dataset() return WPSDataset(ds=mock_ds, ds_path=None) + +def create_mock_wps_datasets(num: int) -> List[WPSDataset]: + return [create_mock_wps_dataset() for _ in range(num)] + + +def create_mock_input_dataset_context(num: int): + input_datasets = create_mock_wps_datasets(num) + + @contextmanager + def mock_input_dataset_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in input_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in input_datasets: + ds.close() + + return input_datasets, mock_input_dataset_context + + +def create_mock_new_ds_context(number_of_datasets: int): + new_datasets = create_mock_wps_datasets(number_of_datasets) + + @contextmanager + def mock_new_datasets_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in new_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in new_datasets: + ds.close() + + return new_datasets, mock_new_datasets_context diff --git a/api/app/tests/jobs/test_sfms_calculations.py b/api/app/tests/jobs/test_sfms_calculations.py index 478cf104d4..2e68ad43e4 100644 --- a/api/app/tests/jobs/test_sfms_calculations.py +++ b/api/app/tests/jobs/test_sfms_calculations.py @@ -28,6 +28,7 @@ async def mock_job_error(): def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture): daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None) + hffmc_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_hffmc", return_value=None) test_datetime = "2024-10-10 5" monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime]) @@ -35,6 +36,7 @@ def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture): sfms_calculations.main() daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)) + hffmc_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)) @pytest.mark.anyio diff --git a/api/app/tests/sfms/test_daily_fwi_processor.py b/api/app/tests/sfms/test_daily_fwi_processor.py index 0f0bf8dc11..db22c94253 100644 --- a/api/app/tests/sfms/test_daily_fwi_processor.py +++ b/api/app/tests/sfms/test_daily_fwi_processor.py @@ -1,6 +1,4 @@ -from contextlib import ExitStack, contextmanager from datetime import datetime, timedelta, timezone -from typing import List from unittest.mock import AsyncMock import pytest @@ -10,7 +8,7 @@ from app.sfms import daily_fwi_processor from app.sfms.daily_fwi_processor import DailyFWIProcessor from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser -from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_wps_dataset +from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_input_dataset_context, create_mock_new_ds_context from app.utils.geospatial import GDALResamplingMethod from app.utils.s3_client import S3Client @@ -19,44 +17,6 @@ EXPECTED_SECOND_DAY = TEST_DATETIME.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=1) -def create_mock_wps_datasets(num: int) -> List[WPSDataset]: - return [create_mock_wps_dataset() for _ in range(num)] - - -def create_mock_input_dataset_context(): - input_datasets = create_mock_wps_datasets(7) - - @contextmanager - def mock_input_dataset_context(_: List[str]): - try: - # Enter each dataset's context and yield the list of instances - with ExitStack() as stack: - yield [stack.enter_context(ds) for ds in input_datasets] - finally: - # Close all datasets to ensure cleanup - for ds in input_datasets: - ds.close() - - return input_datasets, mock_input_dataset_context - - -def create_mock_new_ds_context(number_of_datasets: int): - new_datasets = create_mock_wps_datasets(number_of_datasets) - - @contextmanager - def mock_new_datasets_context(_: List[str]): - try: - # Enter each dataset's context and yield the list of instances - with ExitStack() as stack: - yield [stack.enter_context(ds) for ds in new_datasets] - finally: - # Close all datasets to ensure cleanup - for ds in new_datasets: - ds.close() - - return new_datasets, mock_new_datasets_context - - @pytest.mark.anyio async def test_daily_fwi_processor(mocker: MockerFixture): mock_key_addresser = RasterKeyAddresser() @@ -64,11 +24,11 @@ async def test_daily_fwi_processor(mocker: MockerFixture): get_weather_data_key_spy = mocker.spy(mock_key_addresser, "get_weather_data_keys") gdal_prefix_keys_spy = mocker.spy(mock_key_addresser, "gdal_prefix_keys") get_calculated_index_key_spy = mocker.spy(mock_key_addresser, "get_calculated_index_key") + fwi_processor = DailyFWIProcessor(TEST_DATETIME, 2, mock_key_addresser) - # mock/spy dataset storage # mock weather index, param datasets used for calculations - input_datasets, mock_input_dataset_context = create_mock_input_dataset_context() + input_datasets, mock_input_dataset_context = create_mock_input_dataset_context(7) mock_temp_ds, mock_rh_ds, mock_precip_ds, mock_wind_speed_ds, mock_dc_ds, mock_dmc_ds, mock_ffmc_ds = input_datasets temp_ds_spy = mocker.spy(mock_temp_ds, "warp_to_match") rh_ds_spy = mocker.spy(mock_rh_ds, "warp_to_match") @@ -234,7 +194,7 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[side_effect_1, side_effect_2]) - _, mock_input_dataset_context = create_mock_input_dataset_context() + _, mock_input_dataset_context = create_mock_input_dataset_context(7) _, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2) _, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1) diff --git a/api/app/tests/sfms/test_fwi_processor.py b/api/app/tests/sfms/test_fwi_processor.py index ea95639f57..8ef3b956c5 100644 --- a/api/app/tests/sfms/test_fwi_processor.py +++ b/api/app/tests/sfms/test_fwi_processor.py @@ -193,7 +193,7 @@ def test_calculate_ffmc_values(input_datasets): previous_ffmc_sample = temp_sample = rh_sample = precip_sample = wind_speed_sample = FWI_ARRAY[0, 0] - daily_ffmc_values, _ = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, precip_wps, wind_speed_wps) + daily_ffmc_values, _ = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, wind_speed_wps, precip_wps) static_ffmc = ffmc(previous_ffmc_sample, temp_sample, rh_sample, wind_speed_sample, precip_sample) @@ -207,7 +207,7 @@ def test_calculate_ffmc_masked_correctly(input_datasets): precip_wps = input_datasets.precip wind_speed_wps = input_datasets.wind_speed - daily_ffmc_values, nodata_value = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, precip_wps, wind_speed_wps) + daily_ffmc_values, nodata_value = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, wind_speed_wps, precip_wps) # validate output shape and nodata masking assert daily_ffmc_values.shape == (2, 2) diff --git a/api/app/tests/sfms/test_hourly_ffmc_processor.py b/api/app/tests/sfms/test_hourly_ffmc_processor.py new file mode 100644 index 0000000000..000a285065 --- /dev/null +++ b/api/app/tests/sfms/test_hourly_ffmc_processor.py @@ -0,0 +1,156 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock + +import pytest +from pytest_mock import MockerFixture + +from app.geospatial.wps_dataset import WPSDataset +from app.sfms import hourly_ffmc_processor +from app.sfms.hourly_ffmc_processor import HFFMC_HOURS, HourlyFFMCProcessor +from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser +from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_input_dataset_context, create_mock_new_ds_context +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3_client import S3Client + +TEST_DATETIME = datetime(2024, 10, 10, 10, tzinfo=timezone.utc) +RDPS_MODEL_RUN_DATETIME = datetime(2024, 10, 10, 0, tzinfo=timezone.utc) + + +@pytest.mark.anyio +async def test_source_hffmc_key_not_exist(mocker: MockerFixture): + mock_s3_client = S3Client() + mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[False]) + _, mock_input_dataset_context = create_mock_new_ds_context(2) + + # calculation spies + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, RasterKeyAddresser()) + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context) + + calculate_hffmc_spy.assert_not_called() + + +@pytest.mark.anyio +async def test_no_weather_keys_exist(mocker: MockerFixture): + mock_s3_client = S3Client() + mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[True, False]) + _, mock_input_dataset_context = create_mock_new_ds_context(2) + + # calculation spy + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, RasterKeyAddresser()) + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context) + + calculate_hffmc_spy.assert_not_called() + + +@pytest.mark.anyio +async def test_hourly_ffmc_processor(mocker: MockerFixture): + num_hours_to_process = 2 + mock_key_addresser = RasterKeyAddresser() + # key address spies + get_weather_data_keys_hffmc_spy = mocker.spy(mock_key_addresser, "get_weather_data_keys_hffmc") + gdal_prefix_keys_spy = mocker.spy(mock_key_addresser, "gdal_prefix_keys") + get_uploaded_hffmc_key_spy = mocker.spy(mock_key_addresser, "get_uploaded_hffmc_key") + get_calculated_hffmc_index_key_spy = mocker.spy(mock_key_addresser, "get_calculated_hffmc_index_key") + + # The processor instance + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, mock_key_addresser) + + # mock weather index and source hffmc dataset used for calculations + input_datasets, mock_input_dataset_context = create_mock_input_dataset_context(5) + mock_temp_ds, mock_rh_ds, mock_precip_ds, mock_wind_speed_ds, mock_hffmc_ds = input_datasets + + # dataset spies + temp_ds_spy = mocker.spy(mock_temp_ds, "warp_to_match") + rh_ds_spy = mocker.spy(mock_rh_ds, "warp_to_match") + wind_speed_ds_spy = mocker.spy(mock_wind_speed_ds, "warp_to_match") + precip_ds_spy = mocker.spy(mock_precip_ds, "warp_to_match") + + # mock gdal open + mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset()) + + # calculation spy + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + async with S3Client() as mock_s3_client: + # mock s3 client + mock_all_objects_exist = AsyncMock(return_value=True) + mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist) + persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif") + + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context, num_hours_to_process) + + # Verify weather model keys and actual keys are checked for both days + assert mock_all_objects_exist.call_count == num_hours_to_process + 1 + + # Verify retrivel of hffmc + assert get_uploaded_hffmc_key_spy.call_args_list == [mocker.call(TEST_DATETIME)] + + # Verify the arguments for each call for get_weather_data_keys + assert get_weather_data_keys_hffmc_spy.call_args_list == [ + mocker.call(RDPS_MODEL_RUN_DATETIME, 0), + mocker.call(RDPS_MODEL_RUN_DATETIME, 1), + ] + + for x in gdal_prefix_keys_spy.call_args: + for y in x: + print(y) + + # Verify the arguments for each call for gdal_prefix_keys + assert gdal_prefix_keys_spy.call_args_list == [ + # first hour weather models and source hffmc + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_00z.tif", + "sfms/uploads/hourlies/2024-10-10/fine_fuel_moisture_code2024101004.tif", + ), + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_01z.tif", + "sfms/calculated/hourlies/2024-10-10/fine_fuel_moisture_code2024101000.tif", + ), + ] + + # Verify calculated keys are generated in order + assert get_calculated_hffmc_index_key_spy.call_args_list == [ + # first day + mocker.call(RDPS_MODEL_RUN_DATETIME + timedelta(hours=0)), + mocker.call(RDPS_MODEL_RUN_DATETIME + timedelta(hours=1)), + ] + + # Verify weather inputs are warped to match source hffmc raster + assert temp_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert rh_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert wind_speed_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert precip_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + for hffmc_calls in calculate_hffmc_spy.call_args_list: + hffmc_ds = hffmc_calls.args[0] + assert hffmc_ds == mock_hffmc_ds + wps_datasets = hffmc_calls[0] # Extract dataset arguments + assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + + # 1 hffmc per day + assert persist_raster_spy.call_count == 2 \ No newline at end of file diff --git a/api/app/tests/sfms/test_raster_addresser.py b/api/app/tests/sfms/test_raster_addresser.py index fcf24aee24..c42e8da2cc 100644 --- a/api/app/tests/sfms/test_raster_addresser.py +++ b/api/app/tests/sfms/test_raster_addresser.py @@ -1,12 +1,22 @@ from app.sfms.raster_addresser import RasterKeyAddresser, FWIParameter import pytest from datetime import datetime, timezone +from app.sfms.raster_addresser import WeatherParameter TEST_DATETIME_1 = datetime(2024, 10, 10, 23, tzinfo=timezone.utc) TEST_DATE_1_ISO = TEST_DATETIME_1.date().isoformat() +TEST_DATETIME_2 = datetime(2024, 10, 10, 11, tzinfo=timezone.utc) +TEST_DATE_2_ISO = TEST_DATETIME_2.date().isoformat() + TEST_DATETIME_TO_CALC = TEST_DATETIME_1.replace(hour=20) +RDPS_MODEL_RUN_HOUR = 0 +RDPS_MODEL_RUN_00_START = datetime(2024, 10, 10, 0, tzinfo=timezone.utc) +RDPS_MODEL_RUN_12_START = datetime(2024, 10, 10, 12, tzinfo=timezone.utc) +HOUR_OFFSET = 3 +HFFMC_DATETIME = datetime(2024, 10, 10, 5, tzinfo=timezone.utc) + @pytest.fixture def raster_key_addresser(): @@ -27,3 +37,32 @@ def test_get_weather_data_keys(raster_key_addresser): result = raster_key_addresser.get_weather_data_keys(TEST_DATETIME_1, TEST_DATETIME_TO_CALC, 20) assert len(result) == 4 + +def test_get_uploaded_hffmc_key_00_hour(raster_key_addresser): + result = raster_key_addresser.get_uploaded_hffmc_key(RDPS_MODEL_RUN_00_START) + assert result == "sfms/uploads/hourlies/2024-10-09/fine_fuel_moisture_code2024100916.tif" + + +def test_get_uploaded_hffmc_key_afternoon(raster_key_addresser): + result = raster_key_addresser.get_uploaded_hffmc_key(RDPS_MODEL_RUN_12_START) + assert result == "sfms/uploads/hourlies/2024-10-10/fine_fuel_moisture_code2024101004.tif" + + +def test_get_weather_data_keys_hffmc(raster_key_addresser: RasterKeyAddresser): + result = raster_key_addresser.get_weather_data_keys_hffmc(RDPS_MODEL_RUN_00_START, HOUR_OFFSET) + assert len(result) == 4 + assert result[0] == "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P003.grib2" + assert result[1] == "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P003.grib2" + assert result[2] == "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P003.grib2" + assert result[3] == "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_03z.tif" + + +def test_get_model_data_key_hffmc(raster_key_addresser): + weather_param = WeatherParameter.TEMP + result = raster_key_addresser.get_model_data_key_hffmc(RDPS_MODEL_RUN_00_START, HOUR_OFFSET, weather_param) + assert result == "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P003.grib2" + + +def test_get_calculated_hffmc_index_key(raster_key_addresser: RasterKeyAddresser): + result = raster_key_addresser.get_calculated_hffmc_index_key(HFFMC_DATETIME) + assert result == "sfms/calculated/hourlies/2024-10-09/fine_fuel_moisture_code2024100922.tif" diff --git a/api/app/weather_models/rdps_filename_marshaller.py b/api/app/weather_models/rdps_filename_marshaller.py index c2f4203383..a2f14a3a13 100644 --- a/api/app/weather_models/rdps_filename_marshaller.py +++ b/api/app/weather_models/rdps_filename_marshaller.py @@ -144,3 +144,18 @@ def compose_computed_precip_rdps_key(accumulation_end_datetime: datetime): """Compose and return a computed RDPS url given the datetime that precip is being accumulated to.""" model_hour = model_run_for_hour(accumulation_end_datetime.hour) return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(accumulation_end_datetime)}" + + +def compose_rdps_key_hffmc(model_run_start: datetime, offset_hour: int, weather_parameter: str): + """Compose and return a computed RDPS url given a forecast start date and hour offset.""" + model_hour = model_run_for_hour(model_run_start.hour) + return f"{model_hour:02d}/{weather_parameter}/{compose_rdps_filename_hffmc(model_run_start, offset_hour, weather_parameter)}" + + +def compose_rdps_filename_hffmc(model_run_start: datetime, offset_hour: int, weather_parameter: str): + key_params = get_weather_key_params(weather_parameter) + file_ext = ".grib2" + return ( + f"{SourcePrefix.CMC.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}" + f"{model_run_start.date().isoformat().replace('-','')}{model_run_start.hour:02d}{DELIMITER}P{offset_hour:03d}{file_ext}" + ) From 5239ae25dd9ad4dac169662dfcd4d1bf17d67d31 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Tue, 3 Dec 2024 15:24:16 -0800 Subject: [PATCH 02/12] parameter order --- api/app/sfms/daily_fwi_processor.py | 2 +- api/app/sfms/hourly_ffmc_processor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/sfms/daily_fwi_processor.py b/api/app/sfms/daily_fwi_processor.py index 815d553259..9d645175dd 100644 --- a/api/app/sfms/daily_fwi_processor.py +++ b/api/app/sfms/daily_fwi_processor.py @@ -106,7 +106,7 @@ async def process( ) # Create and store FFMC dataset - ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds) + ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds) new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC) new_ffmc_path = await s3_client.persist_raster_data( temp_dir, diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py index e4bb513fbb..7169a1be08 100644 --- a/api/app/sfms/hourly_ffmc_processor.py +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -77,7 +77,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR) # Create and store new hFFMC dataset - hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds) + hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds) new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour) hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime) geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform() From 7454cbf999d1dfe6e1b02247e93ca5b566e96ecc Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Tue, 3 Dec 2024 15:34:24 -0800 Subject: [PATCH 03/12] minor fixes --- api/app/tests/sfms/test_fwi_processor.py | 4 ++-- api/app/tests/sfms/test_hourly_ffmc_processor.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api/app/tests/sfms/test_fwi_processor.py b/api/app/tests/sfms/test_fwi_processor.py index 8ef3b956c5..ea95639f57 100644 --- a/api/app/tests/sfms/test_fwi_processor.py +++ b/api/app/tests/sfms/test_fwi_processor.py @@ -193,7 +193,7 @@ def test_calculate_ffmc_values(input_datasets): previous_ffmc_sample = temp_sample = rh_sample = precip_sample = wind_speed_sample = FWI_ARRAY[0, 0] - daily_ffmc_values, _ = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, wind_speed_wps, precip_wps) + daily_ffmc_values, _ = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, precip_wps, wind_speed_wps) static_ffmc = ffmc(previous_ffmc_sample, temp_sample, rh_sample, wind_speed_sample, precip_sample) @@ -207,7 +207,7 @@ def test_calculate_ffmc_masked_correctly(input_datasets): precip_wps = input_datasets.precip wind_speed_wps = input_datasets.wind_speed - daily_ffmc_values, nodata_value = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, wind_speed_wps, precip_wps) + daily_ffmc_values, nodata_value = calculate_ffmc(previous_ffmc_wps, temp_wps, rh_wps, precip_wps, wind_speed_wps) # validate output shape and nodata masking assert daily_ffmc_values.shape == (2, 2) diff --git a/api/app/tests/sfms/test_hourly_ffmc_processor.py b/api/app/tests/sfms/test_hourly_ffmc_processor.py index 000a285065..99674e677d 100644 --- a/api/app/tests/sfms/test_hourly_ffmc_processor.py +++ b/api/app/tests/sfms/test_hourly_ffmc_processor.py @@ -6,8 +6,8 @@ from app.geospatial.wps_dataset import WPSDataset from app.sfms import hourly_ffmc_processor -from app.sfms.hourly_ffmc_processor import HFFMC_HOURS, HourlyFFMCProcessor -from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser +from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor +from app.sfms.raster_addresser import RasterKeyAddresser from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_input_dataset_context, create_mock_new_ds_context from app.utils.geospatial import GDALResamplingMethod from app.utils.s3_client import S3Client From bcd5ec052cdc72112093858afc486aef9d5e172c Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Tue, 3 Dec 2024 15:53:49 -0800 Subject: [PATCH 04/12] Fix test --- api/app/tests/sfms/test_hourly_ffmc_processor.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/api/app/tests/sfms/test_hourly_ffmc_processor.py b/api/app/tests/sfms/test_hourly_ffmc_processor.py index 99674e677d..18569d6556 100644 --- a/api/app/tests/sfms/test_hourly_ffmc_processor.py +++ b/api/app/tests/sfms/test_hourly_ffmc_processor.py @@ -87,7 +87,7 @@ async def test_hourly_ffmc_processor(mocker: MockerFixture): assert mock_all_objects_exist.call_count == num_hours_to_process + 1 # Verify retrivel of hffmc - assert get_uploaded_hffmc_key_spy.call_args_list == [mocker.call(TEST_DATETIME)] + assert get_uploaded_hffmc_key_spy.call_args_list == [mocker.call(RDPS_MODEL_RUN_DATETIME)] # Verify the arguments for each call for get_weather_data_keys assert get_weather_data_keys_hffmc_spy.call_args_list == [ @@ -95,10 +95,6 @@ async def test_hourly_ffmc_processor(mocker: MockerFixture): mocker.call(RDPS_MODEL_RUN_DATETIME, 1), ] - for x in gdal_prefix_keys_spy.call_args: - for y in x: - print(y) - # Verify the arguments for each call for gdal_prefix_keys assert gdal_prefix_keys_spy.call_args_list == [ # first hour weather models and source hffmc @@ -107,14 +103,14 @@ async def test_hourly_ffmc_processor(mocker: MockerFixture): "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P000.grib2", "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P000.grib2", "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_00z.tif", - "sfms/uploads/hourlies/2024-10-10/fine_fuel_moisture_code2024101004.tif", + "sfms/uploads/hourlies/2024-10-09/fine_fuel_moisture_code2024100916.tif", ), mocker.call( "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P001.grib2", "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P001.grib2", "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P001.grib2", "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_01z.tif", - "sfms/calculated/hourlies/2024-10-10/fine_fuel_moisture_code2024101000.tif", + "sfms/calculated/hourlies/2024-10-09/fine_fuel_moisture_code2024100917.tif", ), ] From c1b74d7ec8a9ebf8549af0e7f88e8801f87c09be Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Wed, 4 Dec 2024 08:17:42 -0800 Subject: [PATCH 05/12] docstrings --- api/app/jobs/sfms_calculations.py | 2 +- api/app/sfms/raster_addresser.py | 23 +++++++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/api/app/jobs/sfms_calculations.py b/api/app/jobs/sfms_calculations.py index fea68d7c20..37d42b5d91 100644 --- a/api/app/jobs/sfms_calculations.py +++ b/api/app/jobs/sfms_calculations.py @@ -22,7 +22,7 @@ class SFMSCalcJob: async def calculate_fwi_rasters(self, start_time: datetime) -> None: """ - Entry point for processing SFMS daily FWI rasters and hFFMC raster. To run from a specific date manually in openshift, + Entry point for processing SFMS daily FWI rasters and hFFMC rasters. To run from a specific date manually in openshift, see openshift/sfms-calculate/README.md :param start_time: The RDPS model run time to use for processing. diff --git a/api/app/sfms/raster_addresser.py b/api/app/sfms/raster_addresser.py index 5d92f4c7c5..99ed56540d 100644 --- a/api/app/sfms/raster_addresser.py +++ b/api/app/sfms/raster_addresser.py @@ -125,6 +125,13 @@ def get_uploaded_hffmc_key(self, datetime_utc: datetime): return f"{self.smfs_hourly_upload_prefix}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif" def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hour): + """ + Gets temp, rh, wind speed and calculated accumulated precip for the specified RDPS model run start date and hour. + + :param rdps_model_run_start: The RDPS model run start date and time. + :param offset_hour: The hour offset from the RDPS model run start hour. + :return: Keys to rasters in S3 storage for temp, rh, wind speed and calculated precip rasters. + """ assert_all_utc(rdps_model_run_start) non_precip_keys = tuple(self.get_model_data_key_hffmc(rdps_model_run_start, offset_hour, param) for param in WeatherParameter) datetime_to_calculate_utc = rdps_model_run_start + timedelta(hours=offset_hour) @@ -132,10 +139,18 @@ def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hou all_weather_data_keys = non_precip_keys + (precip_key,) return all_weather_data_keys - def get_model_data_key_hffmc(self, start_time_utc: datetime, offset_hour: int, weather_param: WeatherParameter): - assert_all_utc(start_time_utc) - weather_model_date_prefix = f"{self.weather_model_prefix}/{start_time_utc.date().isoformat()}/" - return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(start_time_utc, offset_hour, weather_param.value)) + def get_model_data_key_hffmc(self, rdps_model_run_start: datetime, offset_hour: int, weather_param: WeatherParameter): + """ + Gets a S3 key for the weather parameter of interest for the specified RDPS model run start date and time at the provided offset. + + :param rdps_model_run_start: The RDPS model run start date and time. + :param offset_hour: The hour offset from the RDPS model run start hour. + :param weather_param: The weather parameter of interest (temp, rh, or wind speed). + :return: A key to a raster in S3 storage. + """ + assert_all_utc(rdps_model_run_start) + weather_model_date_prefix = f"{self.weather_model_prefix}/{rdps_model_run_start.date().isoformat()}/" + return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(rdps_model_run_start, offset_hour, weather_param.value)) def get_calculated_hffmc_index_key(self, datetime_utc: datetime): """ From e2b9ad10a524ea4408aef10f9c4f8cd602e7d336 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Wed, 4 Dec 2024 08:57:03 -0800 Subject: [PATCH 06/12] Fix rh over 100 --- api/app/sfms/fwi_processor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/app/sfms/fwi_processor.py b/api/app/sfms/fwi_processor.py index 5255fca751..0d1ff459e0 100644 --- a/api/app/sfms/fwi_processor.py +++ b/api/app/sfms/fwi_processor.py @@ -65,6 +65,10 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS precip_array, _ = precip_ds.replace_nodata_with(0) wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0) + # Due to warping of the rh dataset, rh values can exceed 100 which breaks the ffmc calculation. + # Set rh values greater than 100 to the max allowable which is 100. + rh_array[rh_array > 100] = 100 + start = perf_counter() ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, wind_speed_array, precip_array) logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start) From 0e7bda3053128b3150a09055b2964353eb2a82a9 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Wed, 4 Dec 2024 09:57:53 -0800 Subject: [PATCH 07/12] Fix comments --- api/app/sfms/hourly_ffmc_processor.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py index 7169a1be08..91e08bfb6f 100644 --- a/api/app/sfms/hourly_ffmc_processor.py +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -32,16 +32,13 @@ def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser): async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR): set_s3_gdal_config() - # 1 - Determine starting hFFMC (4am or 4pm) from SFMS and get key, confirm exists, if not, exit - # 2 - Determine what would be last key of run and check if exists, if exists, exit - # 3 - Get all weather variable keys and check if last one exists, if not, exit - # 4 - Use seed hFFMC plus: - # - rh, temp and wind speed from RDPS model run hour n = 000 - # - computed precip at n = 0Z or 12Z - # 5 - Use newly calculated hFFMC to calculate next hFFMC using: - # - rh, temp and wind speed from RDPS model run hour n + 1 - # - computed precip at n + 1 - # hFFMC files from SFMS use PST datetimes + # hFFMC general process + # 1. cron job kicks off the job and we use current UTC time as start time + # 2. Create HourlyFFMCProcessor with the start time and begin processing + # 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z) + # 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT) + # 5. Start calculating hFFMC from model run hour 0 through to 47. Save the calculated hFFMCs to S3. Most recently calculated hFFMC is used as input to the next hour's hFFMC calculation. + # 6. hFFMC rasters are saved to S3 with PDT based keys. # Determine most recent RDPS model run rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour) From 51059b18f152bc506f52acc7ef5cc9b3059cc964 Mon Sep 17 00:00:00 2001 From: dgboss Date: Wed, 4 Dec 2024 15:11:11 -0800 Subject: [PATCH 08/12] Consistency Co-authored-by: Conor Brady --- api/app/sfms/hourly_ffmc_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py index 91e08bfb6f..fa09f012ce 100644 --- a/api/app/sfms/hourly_ffmc_processor.py +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -33,7 +33,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset set_s3_gdal_config() # hFFMC general process - # 1. cron job kicks off the job and we use current UTC time as start time + # 1. Cron job kicks off the job and we use current UTC time as start time # 2. Create HourlyFFMCProcessor with the start time and begin processing # 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z) # 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT) From c3487675581d4eea03fe3178097ca1a5f6c9c2f5 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Thu, 5 Dec 2024 09:34:53 -0800 Subject: [PATCH 09/12] name hffmc with UTC based time --- api/app/sfms/hourly_ffmc_processor.py | 2 +- api/app/sfms/raster_addresser.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py index fa09f012ce..b566820f3c 100644 --- a/api/app/sfms/hourly_ffmc_processor.py +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -59,7 +59,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour) weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) if not weather_keys_exist: - logging.warning(f"Missing weather keys for model run: {rdps_model_run_start}") + logging.warning(f"Missing weather keys for model run: {rdps_model_run_start} and hour {hour}") break # Prefix our S3 keys for access via gdal diff --git a/api/app/sfms/raster_addresser.py b/api/app/sfms/raster_addresser.py index 99ed56540d..c87f146097 100644 --- a/api/app/sfms/raster_addresser.py +++ b/api/app/sfms/raster_addresser.py @@ -160,7 +160,6 @@ def get_calculated_hffmc_index_key(self, datetime_utc: datetime): :return: An S3 key for hFFMC using PDT time. """ assert_all_utc(datetime_utc) - datetime_pdt = convert_utc_to_pdt(datetime_utc) - iso_date = datetime_pdt.date().isoformat() + iso_date = datetime_utc.date().isoformat() weather_param_prefix = "fine_fuel_moisture_code" - return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif" \ No newline at end of file + return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_utc.hour:02d}.tif" \ No newline at end of file From c0d3524a4132747b92228bc766c87dc8cf78a8b5 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Thu, 5 Dec 2024 09:39:50 -0800 Subject: [PATCH 10/12] Update docs to reflect UTC based naming of calculated hFFMC --- api/app/sfms/hourly_ffmc_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py index b566820f3c..13ed119c72 100644 --- a/api/app/sfms/hourly_ffmc_processor.py +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -38,7 +38,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset # 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z) # 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT) # 5. Start calculating hFFMC from model run hour 0 through to 47. Save the calculated hFFMCs to S3. Most recently calculated hFFMC is used as input to the next hour's hFFMC calculation. - # 6. hFFMC rasters are saved to S3 with PDT based keys. + # 6. hFFMC rasters are saved to S3 with UTC based keys. # Determine most recent RDPS model run rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour) From dda121e05894b99e5eac93d0c2990bcb61090f50 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Thu, 5 Dec 2024 09:51:02 -0800 Subject: [PATCH 11/12] Fix test --- api/app/tests/sfms/test_raster_addresser.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/app/tests/sfms/test_raster_addresser.py b/api/app/tests/sfms/test_raster_addresser.py index c42e8da2cc..130b62c078 100644 --- a/api/app/tests/sfms/test_raster_addresser.py +++ b/api/app/tests/sfms/test_raster_addresser.py @@ -16,6 +16,7 @@ RDPS_MODEL_RUN_12_START = datetime(2024, 10, 10, 12, tzinfo=timezone.utc) HOUR_OFFSET = 3 HFFMC_DATETIME = datetime(2024, 10, 10, 5, tzinfo=timezone.utc) +HFFMC_DATETIME_ISO = HFFMC_DATETIME.date().isoformat() @pytest.fixture @@ -65,4 +66,4 @@ def test_get_model_data_key_hffmc(raster_key_addresser): def test_get_calculated_hffmc_index_key(raster_key_addresser: RasterKeyAddresser): result = raster_key_addresser.get_calculated_hffmc_index_key(HFFMC_DATETIME) - assert result == "sfms/calculated/hourlies/2024-10-09/fine_fuel_moisture_code2024100922.tif" + assert result == f"sfms/calculated/hourlies/{HFFMC_DATETIME_ISO}/fine_fuel_moisture_code{HFFMC_DATETIME_ISO.replace('-','')}{HFFMC_DATETIME.hour:02d}.tif" From 972be1452898ca59429ca73732abd3cf8b298252 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Thu, 5 Dec 2024 10:03:01 -0800 Subject: [PATCH 12/12] Fix one more test --- api/app/tests/sfms/test_hourly_ffmc_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/tests/sfms/test_hourly_ffmc_processor.py b/api/app/tests/sfms/test_hourly_ffmc_processor.py index 18569d6556..fbead32f04 100644 --- a/api/app/tests/sfms/test_hourly_ffmc_processor.py +++ b/api/app/tests/sfms/test_hourly_ffmc_processor.py @@ -110,7 +110,7 @@ async def test_hourly_ffmc_processor(mocker: MockerFixture): "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P001.grib2", "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P001.grib2", "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_01z.tif", - "sfms/calculated/hourlies/2024-10-09/fine_fuel_moisture_code2024100917.tif", + "sfms/calculated/hourlies/2024-10-10/fine_fuel_moisture_code2024101000.tif", ), ]