diff --git a/src/alembic/versions/72af814ca33d_add_batch_reference.py b/src/alembic/versions/72af814ca33d_add_batch_reference.py new file mode 100644 index 00000000..5dd6c7b9 --- /dev/null +++ b/src/alembic/versions/72af814ca33d_add_batch_reference.py @@ -0,0 +1,31 @@ +"""add batch reference + +Revision ID: 72af814ca33d +Revises: 4e912be8a176 +Create Date: 2024-04-03 21:45:21.431765 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '72af814ca33d' +down_revision = '4e912be8a176' +branch_labels = None +depends_on = '4e912be8a176' + + +def upgrade() -> None: + op.add_column('dim_vessel',sa.Column("batch", sa.String),schema='public') + op.add_column('spire_ais_data',sa.Column("batch", sa.String), + schema='public') + op.rename_table('spire_ais_data','stg_spire_ais_data',schema='public') + pass + + +def downgrade() -> None: + op.rename_table('stg_spire_ais_data','spire_ais_data',schema='public') + op.drop_column('spire_ais_data','batch',schema='public') + op.drop_column('dim_vessel','batch',schema='public') + pass diff --git a/src/bloom/domain/excursion.py b/src/bloom/domain/excursion.py index f8c5d647..70c3ea78 100644 --- a/src/bloom/domain/excursion.py +++ b/src/bloom/domain/excursion.py @@ -26,3 +26,4 @@ class Excursion(BaseModel): total_time_extincting_amp: Union[timedelta, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/port.py b/src/bloom/domain/port.py index 8e36d6a8..de26bd60 100644 --- a/src/bloom/domain/port.py +++ b/src/bloom/domain/port.py @@ -21,3 +21,4 @@ class Port(BaseModel): has_excursion: bool = False created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/rel_segment_zone.py b/src/bloom/domain/rel_segment_zone.py index 0c74431c..5650d1bb 100644 --- a/src/bloom/domain/rel_segment_zone.py +++ b/src/bloom/domain/rel_segment_zone.py @@ -9,3 +9,4 @@ class RelSegmentZone(BaseModel): segment_id: int zone_id: int created_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/segment.py b/src/bloom/domain/segment.py index 9e7fd2fa..a729f0b6 100644 --- a/src/bloom/domain/segment.py +++ b/src/bloom/domain/segment.py @@ -23,3 +23,4 @@ class Segment(BaseModel): last_vessel_segment: Union[bool , None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/spire_ais_data.py b/src/bloom/domain/spire_ais_data.py index 3c73ff5c..7ec450ea 100644 --- a/src/bloom/domain/spire_ais_data.py +++ b/src/bloom/domain/spire_ais_data.py @@ -40,6 +40,7 @@ class SpireAisData(BaseModel): voyage_timestamp: Union[datetime , None] = None # noqa: UP007 voyage_update_timestamp: Union[datetime , None] = None # noqa: UP007 created_at: Union[datetime, None] = None # noqa: UP007 + batch: Union[str, None] = None def map_from_spire(spire_update_timestamp: datetime, vessel: dict[str, Any]): # noqa: ANN201 def deep_get(dictionary: dict[str, Any], *keys) -> str: diff --git a/src/bloom/domain/zone.py b/src/bloom/domain/zone.py index 83d70b68..0f38369f 100644 --- a/src/bloom/domain/zone.py +++ b/src/bloom/domain/zone.py @@ -16,3 +16,4 @@ class Zone(BaseModel): json_data: Union[dict, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/infra/database/sql_model.py b/src/bloom/infra/database/sql_model.py index 040fdfdb..653fd33f 100644 --- a/src/bloom/infra/database/sql_model.py +++ b/src/bloom/infra/database/sql_model.py @@ -44,6 +44,7 @@ class Vessel(Base): "created_at", DateTime(timezone=True), nullable=False, server_default=func.now(), ) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class VesselPositionSpire(Base): @@ -164,6 +165,7 @@ class Port(Base): has_excursion = Column("has_excursion", Boolean, default=False) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class SpireAisData(Base): @@ -201,6 +203,7 @@ class SpireAisData(Base): voyage_timestamp = Column("voyage_timestamp", DateTime(timezone=True)) voyage_update_timestamp = Column("voyage_update_timestamp", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) + batch = Column("batch", String) class Zone(Base): @@ -213,6 +216,7 @@ class Zone(Base): centroid = Column("centroid", Geometry(geometry_type="POINT", srid=settings.srid)) json_data = Column("json_data", JSONB) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) + batch = Column("batch", String) class WhiteZone(Base): @@ -293,6 +297,7 @@ class Excursion(Base): total_time_extincting_amp = Column("total_time_extincting_amp", Interval) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class Segment(Base): @@ -316,6 +321,7 @@ class Segment(Base): last_vessel_segment = Column("last_vessel_segment", Boolean) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class TaskExecution(Base): diff --git a/src/tasks/base.py b/src/tasks/base.py index 7000bbe7..73919f5e 100644 --- a/src/tasks/base.py +++ b/src/tasks/base.py @@ -1,5 +1,7 @@ from bloom.logger import logger +import hashlib +from datetime import datetime class BaseTask(): args = None @@ -15,6 +17,8 @@ def stop_on_error(self, value: bool) -> None: self._stop_on_error = value def __init__(self, *args, **kwargs): + if 'batch' not in kwargs: + kwargs['batch']=datetime.now().strftime(f"{self.__class__.__name__}-%y%m%d%H%M%S%f") self.args = args self.kwargs = kwargs @@ -22,15 +26,27 @@ def start(self) -> None: logger.info(f"Starting task {self.__class__.__name__}") try: self.run(*self.args, **self.kwargs) - logger.info(f"Task {self.__class__.__name__} finished") + logger.info(f"Task {self.__class__.__name__} sucess") + self.on_success(*self.args, **self.kwargs) except Exception as e: logger.error(f"Task {self} did not finished correctly. {e}") - if self.stop_on_error: - exit(e.args[0]) - else: - raise (e) + self.on_error(*self.args, **self.kwargs) + raise(e) + #if self.stop_on_error: + # exit(e.args[0]) + #else: + # raise (e) def run(self, *args, **kwargs) -> None: + logger.info(f"Task {self.__class__.__name__} success") + pass + + def on_success(self,*args, **kwargs) -> None: + logger.info(f"Task {self.__class__.__name__} success") + pass + + def on_error(self,*args, **kwargs) -> None: + logger.error(f"Task {self.__class__.__name__} on_error") pass def stop(self) -> None: diff --git a/src/tasks/dimensions/load_dim_port_from_csv.py b/src/tasks/dimensions/load_dim_port_from_csv.py index 0a1b47d1..9adf395d 100644 --- a/src/tasks/dimensions/load_dim_port_from_csv.py +++ b/src/tasks/dimensions/load_dim_port_from_csv.py @@ -50,8 +50,10 @@ def run(self, *args, **kwargs): except ValidationError as e: logger.error("Erreur de validation des données de port") logger.error(e.errors()) + raise(e) except DBException as e: logger.error("Erreur d'insertion en base") + raise(e) logger.info(f"{total} ports(s) créés") diff --git a/src/tasks/facts/extract_spire_data_from_csv.py b/src/tasks/facts/extract_spire_data_from_csv.py index 2dde4cdd..622e3b09 100644 --- a/src/tasks/facts/extract_spire_data_from_csv.py +++ b/src/tasks/facts/extract_spire_data_from_csv.py @@ -39,7 +39,13 @@ def run(self, *args, **kwargs) -> None: df = pd.read_csv(file_name, sep=";") #df['spire_update_statement']=pd.to_datetime(df['spire_update_statement'], format='%Y-%m-%d %H:%M:%S.%f %z') df['spire_update_statement']=df['spire_update_statement'].apply(datetime.fromisoformat) - logger.debug(df[['spire_update_statement']].info(verbose=True)) + df['vessel_timestamp']=df['vessel_timestamp'].apply(datetime.fromisoformat) + df['vessel_update_timestamp']=df['vessel_update_timestamp'].apply(datetime.fromisoformat) + df['position_timestamp']=df['position_timestamp'].apply(datetime.fromisoformat) + df['position_update_timestamp']=df['position_update_timestamp'].apply(datetime.fromisoformat) + df['voyage_timestamp']=df['voyage_timestamp'].apply(datetime.fromisoformat) + df['voyage_update_timestamp']=df['voyage_update_timestamp'].apply(datetime.fromisoformat) + df['created_at']=df['created_at'].apply(datetime.fromisoformat) #spire_ais_data = df.apply(self.map_to_domain, axis=1) #with Path(file_name).open() as csv_data, db.session() as session: # raw_vessels = json.load(json_data) diff --git a/src/tasks/load_dimensions.py b/src/tasks/load_dimensions.py index 2ba4cae3..68f517e5 100644 --- a/src/tasks/load_dimensions.py +++ b/src/tasks/load_dimensions.py @@ -1,9 +1,14 @@ import sys +from bloom.logger import logger from tasks.base import BaseTask from tasks.dimensions import LoadDimPortFromCsv, LoadDimVesselFromCsv, LoadDimZoneAmpFromCsv,\ ComputePortGeometryBuffer +import hashlib +from datetime import datetime + + class LoadDimensions(BaseTask): @@ -13,6 +18,11 @@ def run(self, *args, **kwargs): LoadDimPortFromCsv(*args, **kwargs).start() LoadDimVesselFromCsv(*args, **kwargs).start() + def on_error(self, *args, **kwargs): + logger.error("LoadDimensions::on_error") + logger.error(f"batch:{kwargs['batch']}") + pass + if __name__ == "__main__": task = LoadDimensions(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0),