diff --git a/backend/bloom/infra/repositories/repository_segment.py b/backend/bloom/infra/repositories/repository_segment.py index 612fa5a0..4053c489 100644 --- a/backend/bloom/infra/repositories/repository_segment.py +++ b/backend/bloom/infra/repositories/repository_segment.py @@ -7,7 +7,7 @@ from geoalchemy2.functions import ST_Within from geoalchemy2.shape import from_shape, to_shape from shapely import wkb -from sqlalchemy import and_, or_, select, update, text +from sqlalchemy import and_, or_, select, update, text, join from sqlalchemy.orm import Session from bloom.logger import logger @@ -207,15 +207,19 @@ def batch_update_segment(self, session: Session, segments: list[Segment]) -> lis # Mise à jour des derniers segments des excursions. En 2 étapes # passe à False de la colonne last_vessel_segment pour tous les segments des excursions transmises # passe à True la colonne last_vessel_segment pour tous les Id de segments les plus récent de chaque excursion - def update_last_segments(self, session: Session, excursion_ids: list[int]) -> int: - upd1 = update(sql_model.Segment).where(sql_model.Segment.excursion_id.in_(excursion_ids)).values( - last_vessel_segment=False) - session.execute(upd1) + def update_last_segments(self, session: Session, vessel_ids: list[int]) -> int: + for v_id in vessel_ids: + upd1 = (update(sql_model.Segment). + where( + sql_model.Segment.id.in_(select(sql_model.Segment.id).join(sql_model.Excursion).where(sql_model.Excursion.vessel_id == v_id))). + values(last_vessel_segment=False)) + session.execute(upd1) session.flush() - last_segments = session.execute(text("""SELECT DISTINCT ON (excursion_id) id FROM fct_segment - WHERE excursion_id in :excursion_ids - ORDER BY excursion_id, timestamp_start DESC"""), - {"excursion_ids": tuple(excursion_ids)}).all() + last_segments = session.execute(text("""SELECT DISTINCT ON (vessel_id) s.id FROM fct_segment s + JOIN fct_excursion e ON e.id = s.excursion_id + WHERE vessel_id in :vessel_ids + ORDER BY vessel_id, timestamp_start DESC"""), + {"vessel_ids": tuple(vessel_ids)}).all() ids = [r[0] for r in last_segments] upd2 = update(sql_model.Segment).where(sql_model.Segment.id.in_(ids)).values( last_vessel_segment=True) diff --git a/backend/bloom/tasks/create_update_excursions_segments.py b/backend/bloom/tasks/create_update_excursions_segments.py index 22992ee0..6e3de2ab 100644 --- a/backend/bloom/tasks/create_update_excursions_segments.py +++ b/backend/bloom/tasks/create_update_excursions_segments.py @@ -1,5 +1,5 @@ import warnings -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from time import perf_counter from typing import Optional @@ -281,6 +281,9 @@ def get_time_of_departure(): heading_at_end=result["heading_at_end"].iloc[i], type=result["type"].iloc[i], last_vessel_segment=result["last_vessel_segment"].iloc[i], + in_costal_waters=False, + in_amp_zone=False, + in_territorial_waters=False ) new_segments.append(new_segment) segment_repository.batch_create_segment(session, new_segments) @@ -291,7 +294,7 @@ def get_time_of_departure(): new_segments = None df = None - # Recherche des zones et calcul / mise ) jour des stats + # Recherche des zones et calcul / mise à jour des stats logger.info("Mise en relation des segments avec les zones et calcul des statistiques d'excursion") result = segment_repository.find_segments_in_zones_created_updated_after(session, point_in_time) new_rels = [] @@ -299,10 +302,9 @@ def get_time_of_departure(): segments = [] max_created_updated = point_in_time for segment, zones in result.items(): - segment.in_costal_waters = False - segment.in_amp_zone = False - segment.in_territorial_waters = False + segment_in_zone = False for zone in zones: + segment_in_zone = True new_rels.append(RelSegmentZone(segment_id=segment.id, zone_id=zone.id)) if zone.category == "amp": segment.in_amp_zone = True @@ -310,6 +312,8 @@ def get_time_of_departure(): segment.in_costal_waters = True elif zone.category == "Territorial seas": segment.in_territorial_waters = True + if segment_in_zone: + segments.append(segment) # Mise à jour de l'excursion avec le temps passé dans chaque type de zone excursion = excursions.get(segment.excursion_id, excursion_repository.get_excursion_by_id(session, segment.excursion_id)) @@ -343,7 +347,6 @@ def get_time_of_departure(): max_created_updated = segment.updated_at elif segment.created_at > max_created_updated: max_created_updated = segment.created_at - segments.append(segment) excursion_repository.batch_update_excursion(session, excursions.values()) logger.info(f"{len(excursions.values())} excursions mises à jour") @@ -351,14 +354,12 @@ def get_time_of_departure(): logger.info(f"{len(segments)} segments mis à jour") RelSegmentZoneRepository.batch_create_rel_segment_zone(session, new_rels) logger.info(f"{len(new_rels)} associations(s) créées") - nb_last = segment_repository.update_last_segments(session, list(exc.id for exc in excursions.values())) + vessels_ids = set(exc.vessel_id for exc in excursions.values()) + nb_last = segment_repository.update_last_segments(session, vessels_ids) logger.info(f"{nb_last} derniers segments mis à jour") - TaskExecutionRepository.set_point_in_time(session, "rel_segments_zones", max_created_updated) + now = datetime.now(timezone.utc) + TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments", now) - max_created = batch["created_at"].max() - if pd.notna(max_created): - TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments", - max_created) session.commit()