Skip to content

Commit

Permalink
Fix exists check, add fuel stats table and processing
Browse files Browse the repository at this point in the history
  • Loading branch information
conbrad committed Nov 9, 2023
1 parent d3c14cb commit 16dff03
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 40 deletions.
61 changes: 61 additions & 0 deletions api/alembic/versions/7cd069b79aaa_add_advisory_fuel_stats_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""add advisory fuel stats table
Revision ID: 7cd069b79aaa
Revises: 8e85e2b291a9
Create Date: 2023-11-09 15:34:35.225016
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '7cd069b79aaa'
down_revision = '8e85e2b291a9'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic ###
op.create_table('advisory_fuel_stats',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('advisory_shape_id', sa.Integer(), nullable=False),
sa.Column('threshold', sa.Integer(), nullable=False),
sa.Column('run_parameters', sa.Integer(), nullable=False),
sa.Column('fuel_type', sa.Integer(), nullable=False),
sa.Column('area', sa.Float(), nullable=False),
sa.ForeignKeyConstraint(['advisory_shape_id'], ['advisory_shapes.id'], ),
sa.ForeignKeyConstraint(['fuel_type'], ['sfms_fuel_types.id'], ),
sa.ForeignKeyConstraint(['run_parameters'], ['run_parameters.id'], ),
sa.ForeignKeyConstraint(['threshold'], ['advisory_hfi_classification_threshold.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('advisory_shape_id', 'threshold', 'run_parameters', 'fuel_type'),
comment='Fuel type stats per fire shape by advisory threshold'
)
op.create_index(op.f('ix_advisory_fuel_stats_advisory_shape_id'), 'advisory_fuel_stats', ['advisory_shape_id'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_fuel_type'), 'advisory_fuel_stats', ['fuel_type'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_id'), 'advisory_fuel_stats', ['id'], unique=False)
op.create_index(op.f('ix_advisory_fuel_stats_run_parameters'), 'advisory_fuel_stats', ['run_parameters'], unique=False)
op.create_table_comment(
'advisory_elevation_stats',
'Elevation stats per fire shape by advisory threshold',
existing_comment='Elevation stats per fire zone by advisory threshold',
schema=None
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.create_table_comment(
'advisory_elevation_stats',
'Elevation stats per fire zone by advisory threshold',
existing_comment='Elevation stats per fire shape by advisory threshold',
schema=None
)
op.drop_index(op.f('ix_advisory_fuel_stats_run_parameters'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_id'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_fuel_type'), table_name='advisory_fuel_stats')
op.drop_index(op.f('ix_advisory_fuel_stats_advisory_shape_id'), table_name='advisory_fuel_stats')
op.drop_table('advisory_fuel_stats')
# ### end Alembic commands ###
8 changes: 2 additions & 6 deletions api/app/auto_spatial_advisory/elevation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import text
from sqlalchemy.future import select
from sqlalchemy import cast, String
from app import config
from app.auto_spatial_advisory.classify_hfi import classify_hfi
from app.auto_spatial_advisory.run_type import RunType
from app.db.crud.auto_spatial_advisory import get_run_parameters_id, save_advisory_elevation_stats
from app.db.database import get_async_read_session_scope, get_async_write_session_scope, DB_READ_STRING
from app.db.models.auto_spatial_advisory import AdvisoryElevationStats, RunParameters
from app.db.models.auto_spatial_advisory import AdvisoryElevationStats
from app.utils.s3 import get_client


Expand All @@ -42,10 +41,7 @@ async def process_elevation(source_path: str, run_type: RunType, run_datetime: d
run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date)

stmt = select(AdvisoryElevationStats)\
.where(
cast(RunParameters.run_type, String) == run_type.value,
RunParameters.for_date == for_date,
RunParameters.run_datetime == run_datetime)
.where(AdvisoryElevationStats == run_parameters_id)

exists = (await session.execute(stmt)).scalars().first() is not None
if (not exists):
Expand Down
83 changes: 83 additions & 0 deletions api/app/auto_spatial_advisory/process_fuel_type_area.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
""" Code relating to processing high HFI area per fire zone
"""

import logging
from datetime import date, datetime
from time import perf_counter
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.auto_spatial_advisory.run_type import RunType
from app.db.database import get_async_write_session_scope
from app.db.models.auto_spatial_advisory import AdvisoryFuelStats, HighHfiArea
from app.db.crud.auto_spatial_advisory import (get_all_hfi_thresholds,
get_all_sfms_fuel_types,
get_high_hfi_fuel_types,
get_run_parameters_id,
save_advisory_fuel_stats,
save_high_hfi_area)


logger = logging.getLogger(__name__)


async def write_high_hfi_area(session: AsyncSession, row: any, run_parameters_id: int):
high_hfi_area = HighHfiArea(advisory_shape_id=row.shape_id,
run_parameters=run_parameters_id,
area=row.area,
threshold=row.threshold)
await save_high_hfi_area(session, high_hfi_area)


async def process_fuel_type_area(run_type: RunType, run_datetime: datetime, for_date: date):
""" Create new fuel type analysis records for the given date.
:param run_type: The type of run to process. (is it a forecast or actual run?)
:param run_date: The date of the run to process. (when was the hfi file created?)
:param for_date: The date of the hfi to process. (when is the hfi for?)
"""
logger.info('Processing fuel type area %s for run date: %s, for date: %s', run_type, run_datetime, for_date)
perf_start = perf_counter()

async with get_async_write_session_scope() as session:
run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date)

stmt = select(AdvisoryFuelStats)\
.where(AdvisoryFuelStats.run_parameters == run_parameters_id)

exists = (await session.execute(stmt)).scalars().first() is not None

if (not exists):
# get thresholds data
thresholds = await get_all_hfi_thresholds(session)
# get fuel type ids data
fuel_types = await get_all_sfms_fuel_types(session)
logger.info('Getting high hfi fuel types...')
fuel_type_high_hfi_areas = await get_high_hfi_fuel_types(session, run_type, run_datetime, for_date)
advisory_fuel_stats = []

for record in fuel_type_high_hfi_areas:
shape_id = record[0]
fuel_type_id = record[1]
threshold_id = record[2]
# area is stored in square metres in DB. For user convenience, convert to hectares
# 1 ha = 10,000 sq.m.
area = record[3] / 10000
fuel_type_obj = next((ft for ft in fuel_types if ft.fuel_type_id == fuel_type_id), None)
threshold_obj = next((th for th in thresholds if th.id == threshold_id), None)

advisory_fuel_stats.append(
AdvisoryFuelStats(advisory_shape_id=shape_id,
threshold=threshold_obj.id,
run_parameters=run_parameters_id,
fuel_type=fuel_type_obj.id,
area=area)
)

logger.info('Writing advisory fuel stats...')
await save_advisory_fuel_stats(session, advisory_fuel_stats)
else:
logger.info("Advisory fuel stats already processed")

perf_end = perf_counter()
delta = perf_end - perf_start
logger.info('%f delta count before and after processing high HFI area', delta)
10 changes: 5 additions & 5 deletions api/app/auto_spatial_advisory/process_high_hfi_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy.ext.asyncio import AsyncSession
from app.auto_spatial_advisory.run_type import RunType
from app.db.database import get_async_write_session_scope
from app.db.models.auto_spatial_advisory import ClassifiedHfi, HighHfiArea, RunParameters
from app.db.models.auto_spatial_advisory import ClassifiedHfi, HighHfiArea
from app.db.crud.auto_spatial_advisory import get_run_parameters_id, calculate_high_hfi_areas, save_high_hfi_area


Expand Down Expand Up @@ -40,9 +40,9 @@ async def process_high_hfi_area(run_type: RunType, run_datetime: datetime, for_d

stmt = select(ClassifiedHfi)\
.where(
cast(RunParameters.run_type, String) == run_type.value,
RunParameters.for_date == for_date,
RunParameters.run_datetime == run_datetime)
cast(ClassifiedHfi.run_type, String) == run_type.value,
ClassifiedHfi.for_date == for_date,
ClassifiedHfi.run_datetime == run_datetime)

exists = (await session.execute(stmt)).scalars().first() is not None

Expand All @@ -54,7 +54,7 @@ async def process_high_hfi_area(run_type: RunType, run_datetime: datetime, for_d
for row in high_hfi_areas:
await write_high_hfi_area(session, row, run_parameters_id)
else:
logger.info("High hfi area already computed")
logger.info("High hfi area already processed")

perf_end = perf_counter()
delta = perf_end - perf_start
Expand Down
52 changes: 28 additions & 24 deletions api/app/db/crud/auto_spatial_advisory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from sqlalchemy.engine.row import Row
from app.auto_spatial_advisory.run_type import RunType
from app.db.models.auto_spatial_advisory import (
Shape, ClassifiedHfi, HfiClassificationThreshold, SFMSFuelType, RunTypeEnum,
FuelType, HighHfiArea, RunParameters, AdvisoryElevationStats)
AdvisoryFuelStats, Shape, ClassifiedHfi, HfiClassificationThreshold, SFMSFuelType, RunTypeEnum,
FuelType, HighHfiArea, RunParameters, AdvisoryElevationStats, ShapeType)

logger = logging.getLogger(__name__)

Expand All @@ -37,13 +37,16 @@ async def save_fuel_type(session: AsyncSession, fuel_type: FuelType):
session.add(fuel_type)


async def get_hfi(session: AsyncSession, run_type: RunTypeEnum, run_date: datetime, for_date: date):
stmt = select(ClassifiedHfi).where(
ClassifiedHfi.run_type == run_type,
ClassifiedHfi.for_date == for_date,
ClassifiedHfi.run_datetime == run_date)
result = await session.execute(stmt)
return result.scalars()
async def get_fire_zone_unit_shape_type_id(session: AsyncSession):
statement = select(ShapeType).where(ShapeType.name == 'fire_zone_unit')
result = await session.execute(statement)
return result.scalars().first().id


async def get_fire_zone_units(session: AsyncSession, fire_zone_type_id: int):
statement = select(Shape).where(Shape.shape_type == fire_zone_type_id)
result = await session.execute(statement)
return result.all()


async def get_combustible_area(session: AsyncSession):
Expand Down Expand Up @@ -121,26 +124,26 @@ async def get_all_sfms_fuel_types(session: AsyncSession) -> List[SFMSFuelType]:
return fuel_types


async def get_high_hfi_fuel_types_for_zone(session: AsyncSession,
async def get_high_hfi_fuel_types_for_shape(session: AsyncSession,
run_type: RunTypeEnum,
run_datetime: datetime,
for_date: date,
zone_id: int) -> List[Row]:
shape_id: int) -> List[Row]:
"""
Union of fuel types by fuel_type_id (1 multipolygon for each fuel type)
Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date
for both 4K-10K and 10K+ HFI values
Intersected with fire zone geom for a specific fire zone identified by ID
"""
logger.info('starting fuel types/high hfi/zone intersection query for fire zone %s', str(zone_id))
logger.info('starting fuel types/high hfi/zone intersection query for fire zone %s', str(shape_id))
perf_start = perf_counter()

stmt = select(Shape.source_identifier, FuelType.fuel_type_id, ClassifiedHfi.threshold,
func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\
.join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\
.join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\
.where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date,
ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(zone_id))\
ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(shape_id))\
.group_by(Shape.source_identifier)\
.group_by(FuelType.fuel_type_id)\
.group_by(ClassifiedHfi.threshold)\
Expand All @@ -154,15 +157,14 @@ async def get_high_hfi_fuel_types_for_zone(session: AsyncSession,
return result.all()


async def get_fuel_types_with_high_hfi(session: AsyncSession,
run_type: RunTypeEnum,
run_datetime: datetime,
for_date: date) -> List[Row]:
async def get_high_hfi_fuel_types(session: AsyncSession,
run_type: RunTypeEnum,
run_datetime: datetime,
for_date: date) -> List[Row]:
"""
Union of fuel types by fuel_type_id (1 multipolygon for each type of fuel)
Intersect with union of ClassifiedHfi for given run_type, run_datetime, and for_date
Union of fuel types by fuel_type_id (1 multipolygon for each fuel type)
Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date
for both 4K-10K and 10K+ HFI values
Intersection with fire zone geom
"""
logger.info('starting fuel types/high hfi/zone intersection query')
perf_start = perf_counter()
Expand All @@ -171,22 +173,21 @@ async def get_fuel_types_with_high_hfi(session: AsyncSession,
func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\
.join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\
.join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\
.where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, ClassifiedHfi.run_datetime == run_datetime)\
.where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date,
ClassifiedHfi.run_datetime == run_datetime)\
.group_by(Shape.source_identifier)\
.group_by(FuelType.fuel_type_id)\
.group_by(ClassifiedHfi.threshold)\
.order_by(Shape.source_identifier)\
.order_by(FuelType.fuel_type_id)\
.order_by(ClassifiedHfi.threshold)

logger.info(str(stmt))
result = await session.execute(stmt)

perf_end = perf_counter()
delta = perf_end - perf_start
logger.info('%f delta count before and after fuel types/high hfi/zone intersection query', delta)
return result.all()


async def get_hfi_area(session: AsyncSession,
run_type: RunTypeEnum,
run_datetime: datetime,
Expand Down Expand Up @@ -239,6 +240,9 @@ async def save_high_hfi_area(session: AsyncSession, high_hfi_area: HighHfiArea):
session.add(high_hfi_area)


async def save_advisory_fuel_stats(session: AsyncSession, advisory_fuel_stats: List[AdvisoryFuelStats]):
session.add_all(advisory_fuel_stats)

async def calculate_high_hfi_areas(session: AsyncSession, run_type: RunType, run_datetime: datetime,
for_date: date) -> List[Row]:
"""
Expand Down
24 changes: 22 additions & 2 deletions api/app/db/models/auto_spatial_advisory.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ class HighHfiArea(Base):

class AdvisoryElevationStats(Base):
"""
Summary statistics about the elevation of area with high hfi (4k-10k and >10k) per firezone
Summary statistics about the elevation of area with high hfi (4k-10k and >10k) per fire shape
based on the set run_type, for_date and run_datetime.
"""
__tablename__ = 'advisory_elevation_stats'
__table_args__ = (
{
'comment': 'Elevation stats per fire zone by advisory threshold'
'comment': 'Elevation stats per fire shape by advisory threshold'
}
)
id = Column(Integer, primary_key=True, index=True)
Expand All @@ -169,3 +169,23 @@ class AdvisoryElevationStats(Base):
median = Column(Float, nullable=False)
quartile_75 = Column(Float, nullable=False)
maximum = Column(Float, nullable=False)


class AdvisoryFuelStats(Base):
"""
Summary statistics about the fuel type of area with high hfi (4k-10k and >10k) per fire shape
based on the set run_type, for_date and run_datetime.
"""
__tablename__ = 'advisory_fuel_stats'
__table_args__ = (
UniqueConstraint('advisory_shape_id', 'threshold', 'run_parameters', 'fuel_type'),
{
'comment': 'Fuel type stats per fire shape by advisory threshold'
}
)
id = Column(Integer, primary_key=True, index=True)
advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True)
threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False)
run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True)
fuel_type = Column(Integer, ForeignKey(SFMSFuelType.id), nullable=False, index=True)
area = Column(Float, nullable=False)
6 changes: 3 additions & 3 deletions api/app/routers/fba.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from app.db.crud.auto_spatial_advisory import (get_all_sfms_fuel_types,
get_all_hfi_thresholds,
get_hfi_area,
get_high_hfi_fuel_types_for_zone,
get_high_hfi_fuel_types_for_shape,
get_run_datetimes,
get_zonal_elevation_stats)
from app.db.models.auto_spatial_advisory import RunTypeEnum
Expand Down Expand Up @@ -82,11 +82,11 @@ async def get_hfi_fuels_data_for_fire_zone(run_type: RunType,
fuel_types = await get_all_sfms_fuel_types(session)

# get HFI/fuels data for specific zone
hfi_fuel_type_ids_for_zone = await get_high_hfi_fuel_types_for_zone(session,
hfi_fuel_type_ids_for_zone = await get_high_hfi_fuel_types_for_shape(session,

Check warning on line 85 in api/app/routers/fba.py

View check run for this annotation

Codecov / codecov/patch

api/app/routers/fba.py#L85

Added line #L85 was not covered by tests
run_type=RunTypeEnum(run_type.value),
for_date=for_date,
run_datetime=run_datetime,
zone_id=zone_id)
shape_id=zone_id)
data = []

for record in hfi_fuel_type_ids_for_zone:
Expand Down

0 comments on commit 16dff03

Please sign in to comment.