Skip to content

Commit

Permalink
feat: migrate compute_port_geometry
Browse files Browse the repository at this point in the history
Signed-off-by: herve.le-bars <[email protected]>
  • Loading branch information
herve.le-bars committed Mar 26, 2024
1 parent af721d1 commit 8eb4337
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 165 deletions.
4 changes: 4 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ LOGGING_LEVEL=INFO

AMP_DATA_CSV_PATH=
PORT_DATA_CSV_PATH=
PORT_POLYGON_DATA_CSV_PATH=
VESSEL_DATA_CSV_PATH=

VESSEL_POSITIONS_DATA_CSV_PATH=

#PORT_RADIUS_M=3000
#PORT_RESOLUTION=10


###############################################################################################
# DOCKER SPECIFIC VARIABLES
Expand Down
5 changes: 4 additions & 1 deletion src/bloom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class Settings(BaseSettings):
)

amp_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/zones_subset.csv")
port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv")
port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports.csv")
port_polygon_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv")
port_radius_m:int = 3000 # Radius in meters
port_resolution:int = 10 # Number of points in the resulting polygon
vessel_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/chalutiers_pelagiques.csv")


Expand Down
7 changes: 5 additions & 2 deletions src/bloom/infra/repositories/repository_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]:
return [PortRepository.map_to_domain(entity) for entity in q]

def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]:
stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
sql_model.Port.updated_at >= created_updated_after))
if created_updated_after is None:
stmt = select(sql_model.Port)
else:
stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
sql_model.Port.updated_at >= created_updated_after))
q = session.execute(stmt).scalars()
if not q:
return []
Expand Down
129 changes: 0 additions & 129 deletions src/bloom/tasks/compute_port_geometry_buffer.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/tasks/data/load_port_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def run(self,*args,**kwargs):
engine = create_engine(settings.db_url, echo=False)

df = pd.read_csv(
settings.port_data_csv_path,
settings.port_polygon_data_csv_path,
sep=";",
)

Expand Down
57 changes: 25 additions & 32 deletions src/tasks/dimensions/load_dim_port_from_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from tasks.base import BaseTask
from bloom.config import settings
import logging

from pathlib import Path
from time import perf_counter
Expand All @@ -15,12 +14,10 @@
from pydantic import ValidationError
from shapely import wkt

logging.basicConfig()
logging.getLogger("bloom.tasks").setLevel(settings.logging_level)


class LoadDimPortFromCsv(BaseTask):
def map_to_domain(row) -> Port:
def map_to_domain(self,row) -> Port:
iso_code = pycountry.countries.get(name=row["country"])
iso_code = iso_code.alpha_3 if iso_code is not None else "XXX"

Expand All @@ -33,38 +30,34 @@ def map_to_domain(row) -> Port:
longitude=float(row["longitude"]),
geometry_point=row["geometry_point"],
)
def run(self):


def run(csv_file_name: str) -> None:
use_cases = UseCases()
port_repository = use_cases.port_repository()
db = use_cases.db()

ports = []
total = 0
try:
df = pd.read_csv(csv_file_name, sep=";")
df["geometry_point"] = df["geometry_point"].apply(wkt.loads)
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)
ports = gdf.apply(map_to_domain, axis=1)
with db.session() as session:
ports = port_repository.batch_create_port(session, list(ports))
session.commit()
total = len(ports)
except ValidationError as e:
logger.error("Erreur de validation des données de port")
logger.error(e.errors())
except DBException as e:
logger.error("Erreur d'insertion en base")
logger.info(f"{total} ports(s) créés")
def run(self,*args,**kwargs):
use_cases = UseCases()
port_repository = use_cases.port_repository()
db = use_cases.db()

ports = []
total = 0
try:
df = pd.read_csv(settings.port_data_csv_path, sep=";")
df["geometry_point"] = df["geometry_point"].apply(wkt.loads)
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)
ports = gdf.apply(self.map_to_domain, axis=1)
with db.session() as session:
ports = port_repository.batch_create_port(session, list(ports))
session.commit()
total = len(ports)
except ValidationError as e:
logger.error("Erreur de validation des données de port")
logger.error(e.errors())
except DBException as e:
logger.error("Erreur d'insertion en base")
logger.info(f"{total} ports(s) créés")


if __name__ == "__main__":
time_start = perf_counter()
file_name = Path(settings.data_folder).joinpath("./ports.csv")
logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {file_name}")
run(file_name)
logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {settings.port_data_csv_path}")
LoadDimPortFromCsv().start()
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Chargement des données de ports en {duration:.2f}s")
129 changes: 129 additions & 0 deletions src/tasks/transformations/compute_port_geometry_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from time import perf_counter
from tasks.base import BaseTask
import geopandas as gpd
import pandas as pd
import pyproj
import shapely
from bloom.config import settings
from bloom.container import UseCases
from bloom.logger import logger
from scipy.spatial import Voronoi
from shapely.geometry import LineString, Polygon
from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository
from datetime import datetime, timezone



class ComputePort_GeometryBuffer(BaseTask):
radius_m = settings.port_radius_m # Radius in meters
resolution = settings.port_resolution # Number of points in the resulting polygon

# Function to create geodesic buffer around a point
def geodesic_point_buffer(lat: float, lon: float, radius_m: int, resolution: int) -> Polygon:
"""
Input
lat: latitude of the center point
lon: longitude of the center point
radius_m: radius of the buffer in meters
resolution: number of points in the resulting polygon
"""
geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid
# Create a circle in geodesic coordinates
angles = range(0, 360, 360 // resolution)
circle_points = []
for angle in angles:
# Calculate the point on the circle for this angle
lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m)
circle_points.append((lon2, lat2))
# Create a polygon from these points
return Polygon(circle_points)

def assign_voronoi_buffer(self,ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Computes a buffer around each port such as buffers do not overlap each other
:param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point"
from the table "dim_ports"
:return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced
by "geometry_buffer"
"""
# Convert to CRS 6933 to write distances as meters (instead of degrees)
ports.to_crs(6933, inplace=True)

# Create an 8km buffer around each port
# FIXME: maybe put the buffer distance as a global parameter
ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000))

# Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933
ports.to_crs(settings.srid, inplace=True)

# Convert buffers to 4326
ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid)

# Create Voronoi polygons, i.e. match each port to the area which is closest to this port
vor = Voronoi(list(zip(ports.longitude, ports.latitude)))
lines = []
for line in vor.ridge_vertices:
if -1 not in line:
lines.append(LineString(vor.vertices[line]))
else:
lines.append(LineString())
vor_polys = [poly for poly in shapely.ops.polygonize(lines)]

# Match each port to its Voronoi polygon
vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326)
vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy()
ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326)
vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"])

# Corner case: ports at extremes latitude and longitude have no Voroni polygon
# (15 / >5800) cases --> duplicate regular buffer instead
vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[
vor_ports.voronoi_poly.isna(), "buffer"
]

# Get intersection between fixed-size buffer and Voronoi polygon to get final buffer
vor_ports["geometry_buffer"] = vor_ports.apply(
lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]),
axis=1
)
vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy()
vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326)
vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy()

return vor_ports


def run(self,*args,**kwargs)-> None:
use_cases = UseCases()
port_repository = use_cases.port_repository()
db = use_cases.db()
items = []
with db.session() as session:
point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer")
logger.info(f"Point in time={point_in_time}")
now = datetime.now(timezone.utc)
ports = port_repository.get_ports_updated_created_after(session, point_in_time)
if ports:
df = pd.DataFrame(
[[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports],
columns=["id", "geometry_point", "latitude", "longitude"],
)
gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid)

# Apply the buffer function to create buffers
gdf = self.assign_voronoi_buffer(gdf)

for row in gdf.itertuples():
items.append({"id": row.id, "geometry_buffer": row.geometry_buffer})
port_repository.batch_update_geometry_buffer(session, items)
TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now)
session.commit()
logger.info(f"{len(items)} buffer de ports mis à jour")

if __name__ == "__main__":
time_start = perf_counter()
logger.info("DEBUT - Calcul des buffer de ports")
ComputePort_GeometryBuffer().start()
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s")

0 comments on commit 8eb4337

Please sign in to comment.