Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precompute fuel type area by hfi threshold #3261

Merged
merged 13 commits into from
Nov 24, 2023
61 changes: 61 additions & 0 deletions api/alembic/versions/7cd069b79aaa_add_advisory_fuel_stats_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""add advisory fuel stats table

Revision ID: 7cd069b79aaa
Revises: 8e85e2b291a9
Create Date: 2023-11-09 15:34:35.225016

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '7cd069b79aaa'
down_revision = '8e85e2b291a9'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic ###
op.create_table('advisory_fuel_stats',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('advisory_shape_id', sa.Integer(), nullable=False),
sa.Column('threshold', sa.Integer(), nullable=False),
sa.Column('run_parameters', sa.Integer(), nullable=False),
sa.Column('fuel_type', sa.Integer(), nullable=False),
sa.Column('area', sa.Float(), nullable=False),
sa.ForeignKeyConstraint(['advisory_shape_id'], ['advisory_shapes.id'], ),
sa.ForeignKeyConstraint(['fuel_type'], ['sfms_fuel_types.id'], ),
sa.ForeignKeyConstraint(['run_parameters'], ['run_parameters.id'], ),
sa.ForeignKeyConstraint(['threshold'], ['advisory_hfi_classification_threshold.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('advisory_shape_id', 'threshold', 'run_parameters', 'fuel_type'),
comment='Fuel type stats per fire shape by advisory threshold'
)
op.create_index(op.f('ix_advisory_fuel_stats_advisory_shape_id'), 'advisory_fuel_stats', ['advisory_shape_id'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_fuel_type'), 'advisory_fuel_stats', ['fuel_type'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_id'), 'advisory_fuel_stats', ['id'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_run_parameters'), 'advisory_fuel_stats', ['run_parameters'], unique=False)
op.create_table_comment(
'advisory_elevation_stats',
'Elevation stats per fire shape by advisory threshold',
existing_comment='Elevation stats per fire zone by advisory threshold',
schema=None
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.create_table_comment(
'advisory_elevation_stats',
'Elevation stats per fire zone by advisory threshold',
existing_comment='Elevation stats per fire shape by advisory threshold',
schema=None
)
op.drop_index(op.f('ix_advisory_fuel_stats_run_parameters'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_id'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_fuel_type'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_advisory_shape_id'), table_name='advisory_fuel_stats')
op.drop_table('advisory_fuel_stats')
# ### end Alembic commands ###
6 changes: 3 additions & 3 deletions api/app/auto_spatial_advisory/common.py
conbrad marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@


def get_s3_key(run_type: RunType, run_date: date, for_date: date):
bucket = config.get('OBJECT_STORE_BUCKET')
bucket = config.get("OBJECT_STORE_BUCKET")
# TODO what really has to happen, is that we grab the most recent prediction for the given date,
# but this method doesn't even belong here, it's just a shortcut for now!
for_date_string = f'{for_date.year}{for_date.month:02d}{for_date.day:02d}'
for_date_string = f"{for_date.year}{for_date.month:02d}{for_date.day:02d}"

# The filename in our object store, prepended with "vsis3" - which tells GDAL to use
# it's S3 virtual file system driver to read the file.
# https://gdal.org/user/virtual_file_systems.html
key = f'/vsis3/{bucket}/sfms/uploads/{run_type.value}/{run_date.isoformat()}/hfi{for_date_string}.tif'
key = f"/vsis3/{bucket}/sfms/uploads/{run_type.value}/{run_date.isoformat()}/hfi{for_date_string}.tif"
return key
29 changes: 19 additions & 10 deletions api/app/auto_spatial_advisory/elevation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from osgeo import gdal
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import text
from sqlalchemy.future import select
from app import config
from app.auto_spatial_advisory.classify_hfi import classify_hfi
from app.auto_spatial_advisory.run_type import RunType
Expand Down Expand Up @@ -39,16 +40,24 @@ async def process_elevation(source_path: str, run_type: RunType, run_datetime: d
async with get_async_read_session_scope() as session:
run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date)

# The filename in our object store, prepended with "vsis3" - which tells GDAL to use
# it's S3 virtual file system driver to read the file.
# https://gdal.org/user/virtual_file_systems.html
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, 'classified.tif')
classify_hfi(source_path, temp_filename)
# thresholds: 1 = 4k-10k, 2 = >10k
thresholds = [1, 2]
for threshold in thresholds:
await process_threshold(threshold, temp_filename, temp_dir, run_parameters_id)
stmt = select(AdvisoryElevationStats)\
.where(AdvisoryElevationStats == run_parameters_id)

exists = (await session.execute(stmt)).scalars().first() is not None
if (not exists):
# The filename in our object store, prepended with "vsis3" - which tells GDAL to use
# it's S3 virtual file system driver to read the file.
# https://gdal.org/user/virtual_file_systems.html
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, 'classified.tif')
classify_hfi(source_path, temp_filename)
# thresholds: 1 = 4k-10k, 2 = >10k
thresholds = [1, 2]
for threshold in thresholds:
await process_threshold(threshold, temp_filename, temp_dir, run_parameters_id)
else:
logger.info("Elevation stats already computed")


perf_end = perf_counter()
delta = perf_end - perf_start
Expand Down
28 changes: 13 additions & 15 deletions api/app/auto_spatial_advisory/nats_consumer.py
conbrad marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from app.auto_spatial_advisory.process_elevation_hfi import process_hfi_elevation
from app.auto_spatial_advisory.process_hfi import RunType, process_hfi
from app.auto_spatial_advisory.process_high_hfi_area import process_high_hfi_area
from app.auto_spatial_advisory.process_fuel_type_area import process_fuel_type_hfi_by_shape
from app.nats_publish import publish
from app import configure_logging
from app.utils.time import get_utc_datetime
Expand All @@ -24,17 +25,16 @@

def parse_nats_message(msg: Msg):
"""
Parse the fields from the messages to drive the processing
Parse the fields from the messages to drive the processing
"""
if msg.subject == sfms_file_subject:
decoded_msg = json.loads(json.loads(msg.data.decode()))
run_type = RunType.from_str(decoded_msg['run_type'])
run_date = datetime.strptime(decoded_msg['run_date'], "%Y-%m-%d").date()
for_date = datetime.strptime(decoded_msg['for_date'], "%Y-%m-%d").date()

run_type = RunType.from_str(decoded_msg["run_type"])
run_date = datetime.strptime(decoded_msg["run_date"], "%Y-%m-%d").date()
for_date = datetime.strptime(decoded_msg["for_date"], "%Y-%m-%d").date()
# SFMS doesn't give us a timezone, but from the 2022 data it runs in local time
# so we localize it as such then convert it to UTC
run_datetime = get_utc_datetime(datetime.fromisoformat(decoded_msg['create_time']))
run_datetime = get_utc_datetime(datetime.fromisoformat(decoded_msg["create_time"]))
return (run_type, run_date, run_datetime, for_date)


Expand Down Expand Up @@ -63,29 +63,27 @@ async def closed_cb():
jetstream = nats_connection.jetstream()
# we create a stream, this is important, we need to messages to stick around for a while!
# idempotent operation, IFF stream with same configuration is added each time
await jetstream.add_stream(name=stream_name,
config=StreamConfig(retention=RetentionPolicy.WORK_QUEUE),
subjects=subjects)
sfms_sub = await jetstream.pull_subscribe(stream=stream_name,
subject=sfms_file_subject,
durable=hfi_classify_durable_group)
await jetstream.add_stream(name=stream_name, config=StreamConfig(retention=RetentionPolicy.WORK_QUEUE), subjects=subjects)
sfms_sub = await jetstream.pull_subscribe(stream=stream_name, subject=sfms_file_subject, durable=hfi_classify_durable_group)
while True:
msgs: List[Msg] = await sfms_sub.fetch(batch=1, timeout=None)
for msg in msgs:
try:
logger.info('Msg received - {}\n'.format(msg))
logger.info("Msg received - {}\n".format(msg))
await msg.ack()
run_type, run_date, run_datetime, for_date = parse_nats_message(msg)
logger.info('Awaiting process_hfi({}, {}, {})\n'.format(run_type, run_date, for_date))
logger.info("Awaiting process_hfi({}, {}, {})\n".format(run_type, run_date, for_date))
await process_hfi(run_type, run_date, run_datetime, for_date)
await process_hfi_elevation(run_type, run_date, run_datetime, for_date)
await process_high_hfi_area(run_type, run_datetime, for_date)
await process_fuel_type_hfi_by_shape(run_type, run_datetime, for_date)
except Exception as e:
logger.error("Error processing HFI message: %s, adding back to queue", msg.data, exc_info=e)
background_tasks = BackgroundTasks()
background_tasks.add_task(publish, stream_name, sfms_file_subject, msg, subjects)

if __name__ == '__main__':

if __name__ == "__main__":
configure_logging()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Expand Down
1 change: 0 additions & 1 deletion api/app/auto_spatial_advisory/process_elevation_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ async def process_hfi_elevation(run_type: RunType, run_date: date, run_datetime:
:param for_date: The date of the hfi to process. (when is the hfi for?)
"""

# TODO: check for already processed HFI elevation data based on run parameters
logger.info('Processing HFI elevation %s for run date: %s, for date: %s', run_type, run_date, for_date)
perf_start = perf_counter()

Expand Down
Loading
Loading