Skip to content

Commit

Permalink
Merge pull request #193 from dataforgoodfr/fix/189
Browse files Browse the repository at this point in the history
Fix/189
  • Loading branch information
njouanin authored Jul 1, 2024
2 parents 8622642 + 62c4d41 commit 1a6f580
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
22 changes: 13 additions & 9 deletions backend/bloom/infra/repositories/repository_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from geoalchemy2.functions import ST_Within
from geoalchemy2.shape import from_shape, to_shape
from shapely import wkb
from sqlalchemy import and_, or_, select, update, text
from sqlalchemy import and_, or_, select, update, text, join
from sqlalchemy.orm import Session

from bloom.logger import logger
Expand Down Expand Up @@ -207,15 +207,19 @@ def batch_update_segment(self, session: Session, segments: list[Segment]) -> lis
# Mise à jour des derniers segments des excursions. En 2 étapes
# passe à False de la colonne last_vessel_segment pour tous les segments des excursions transmises
# passe à True la colonne last_vessel_segment pour tous les Id de segments les plus récent de chaque excursion
def update_last_segments(self, session: Session, excursion_ids: list[int]) -> int:
upd1 = update(sql_model.Segment).where(sql_model.Segment.excursion_id.in_(excursion_ids)).values(
last_vessel_segment=False)
session.execute(upd1)
def update_last_segments(self, session: Session, vessel_ids: list[int]) -> int:
for v_id in vessel_ids:
upd1 = (update(sql_model.Segment).
where(
sql_model.Segment.id.in_(select(sql_model.Segment.id).join(sql_model.Excursion).where(sql_model.Excursion.vessel_id == v_id))).
values(last_vessel_segment=False))
session.execute(upd1)
session.flush()
last_segments = session.execute(text("""SELECT DISTINCT ON (excursion_id) id FROM fct_segment
WHERE excursion_id in :excursion_ids
ORDER BY excursion_id, timestamp_start DESC"""),
{"excursion_ids": tuple(excursion_ids)}).all()
last_segments = session.execute(text("""SELECT DISTINCT ON (vessel_id) s.id FROM fct_segment s
JOIN fct_excursion e ON e.id = s.excursion_id
WHERE vessel_id in :vessel_ids
ORDER BY vessel_id, timestamp_start DESC"""),
{"vessel_ids": tuple(vessel_ids)}).all()
ids = [r[0] for r in last_segments]
upd2 = update(sql_model.Segment).where(sql_model.Segment.id.in_(ids)).values(
last_vessel_segment=True)
Expand Down
25 changes: 13 additions & 12 deletions backend/bloom/tasks/create_update_excursions_segments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import warnings
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from time import perf_counter
from typing import Optional

Expand Down Expand Up @@ -281,6 +281,9 @@ def get_time_of_departure():
heading_at_end=result["heading_at_end"].iloc[i],
type=result["type"].iloc[i],
last_vessel_segment=result["last_vessel_segment"].iloc[i],
in_costal_waters=False,
in_amp_zone=False,
in_territorial_waters=False
)
new_segments.append(new_segment)
segment_repository.batch_create_segment(session, new_segments)
Expand All @@ -291,25 +294,26 @@ def get_time_of_departure():
new_segments = None
df = None

# Recherche des zones et calcul / mise ) jour des stats
# Recherche des zones et calcul / mise à jour des stats
logger.info("Mise en relation des segments avec les zones et calcul des statistiques d'excursion")
result = segment_repository.find_segments_in_zones_created_updated_after(session, point_in_time)
new_rels = []
excursions = {}
segments = []
max_created_updated = point_in_time
for segment, zones in result.items():
segment.in_costal_waters = False
segment.in_amp_zone = False
segment.in_territorial_waters = False
segment_in_zone = False
for zone in zones:
segment_in_zone = True
new_rels.append(RelSegmentZone(segment_id=segment.id, zone_id=zone.id))
if zone.category == "amp":
segment.in_amp_zone = True
elif zone.category.startswith("Fishing coastal waters"):
segment.in_costal_waters = True
elif zone.category == "Territorial seas":
segment.in_territorial_waters = True
if segment_in_zone:
segments.append(segment)
# Mise à jour de l'excursion avec le temps passé dans chaque type de zone
excursion = excursions.get(segment.excursion_id,
excursion_repository.get_excursion_by_id(session, segment.excursion_id))
Expand Down Expand Up @@ -343,22 +347,19 @@ def get_time_of_departure():
max_created_updated = segment.updated_at
elif segment.created_at > max_created_updated:
max_created_updated = segment.created_at
segments.append(segment)

excursion_repository.batch_update_excursion(session, excursions.values())
logger.info(f"{len(excursions.values())} excursions mises à jour")
segment_repository.batch_update_segment(session, segments)
logger.info(f"{len(segments)} segments mis à jour")
RelSegmentZoneRepository.batch_create_rel_segment_zone(session, new_rels)
logger.info(f"{len(new_rels)} associations(s) créées")
nb_last = segment_repository.update_last_segments(session, list(exc.id for exc in excursions.values()))
vessels_ids = set(exc.vessel_id for exc in excursions.values())
nb_last = segment_repository.update_last_segments(session, vessels_ids)
logger.info(f"{nb_last} derniers segments mis à jour")
TaskExecutionRepository.set_point_in_time(session, "rel_segments_zones", max_created_updated)
now = datetime.now(timezone.utc)
TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments", now)

max_created = batch["created_at"].max()
if pd.notna(max_created):
TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments",
max_created)
session.commit()


Expand Down

0 comments on commit 1a6f580

Please sign in to comment.