Skip to content

Commit

Permalink
feat: + add batch log in columns
Browse files Browse the repository at this point in the history
  • Loading branch information
herve.le-bars committed Apr 5, 2024
1 parent c088c14 commit c09b6ba
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 6 deletions.
31 changes: 31 additions & 0 deletions src/alembic/versions/72af814ca33d_add_batch_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add batch reference
Revision ID: 72af814ca33d
Revises: 4e912be8a176
Create Date: 2024-04-03 21:45:21.431765
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '72af814ca33d'
down_revision = '4e912be8a176'
branch_labels = None
depends_on = '4e912be8a176'


def upgrade() -> None:
op.add_column('dim_vessel',sa.Column("batch", sa.String),schema='public')
op.add_column('spire_ais_data',sa.Column("batch", sa.String),
schema='public')
op.rename_table('spire_ais_data','stg_spire_ais_data',schema='public')
pass


def downgrade() -> None:
op.rename_table('stg_spire_ais_data','spire_ais_data',schema='public')
op.drop_column('spire_ais_data','batch',schema='public')
op.drop_column('dim_vessel','batch',schema='public')
pass
1 change: 1 addition & 0 deletions src/bloom/domain/excursion.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ class Excursion(BaseModel):
total_time_extincting_amp: Union[timedelta, None] = None
created_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
batch: Union[str, None] = None
1 change: 1 addition & 0 deletions src/bloom/domain/port.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ class Port(BaseModel):
has_excursion: bool = False
created_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
batch: Union[str, None] = None
1 change: 1 addition & 0 deletions src/bloom/domain/rel_segment_zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ class RelSegmentZone(BaseModel):
segment_id: int
zone_id: int
created_at: Union[datetime, None] = None
batch: Union[str, None] = None
1 change: 1 addition & 0 deletions src/bloom/domain/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class Segment(BaseModel):
last_vessel_segment: Union[bool , None] = None
created_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
batch: Union[str, None] = None
1 change: 1 addition & 0 deletions src/bloom/domain/spire_ais_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SpireAisData(BaseModel):
voyage_timestamp: Union[datetime , None] = None # noqa: UP007
voyage_update_timestamp: Union[datetime , None] = None # noqa: UP007
created_at: Union[datetime, None] = None # noqa: UP007
batch: Union[str, None] = None

def map_from_spire(spire_update_timestamp: datetime, vessel: dict[str, Any]): # noqa: ANN201
def deep_get(dictionary: dict[str, Any], *keys) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/bloom/domain/zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ class Zone(BaseModel):
json_data: Union[dict, None] = None
created_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
batch: Union[str, None] = None
6 changes: 6 additions & 0 deletions src/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Vessel(Base):
"created_at", DateTime(timezone=True), nullable=False, server_default=func.now(),
)
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
batch = Column("batch", String)


class VesselPositionSpire(Base):
Expand Down Expand Up @@ -164,6 +165,7 @@ class Port(Base):
has_excursion = Column("has_excursion", Boolean, default=False)
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
batch = Column("batch", String)


class SpireAisData(Base):
Expand Down Expand Up @@ -201,6 +203,7 @@ class SpireAisData(Base):
voyage_timestamp = Column("voyage_timestamp", DateTime(timezone=True))
voyage_update_timestamp = Column("voyage_update_timestamp", DateTime(timezone=True))
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
batch = Column("batch", String)


class Zone(Base):
Expand All @@ -213,6 +216,7 @@ class Zone(Base):
centroid = Column("centroid", Geometry(geometry_type="POINT", srid=settings.srid))
json_data = Column("json_data", JSONB)
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
batch = Column("batch", String)


class WhiteZone(Base):
Expand Down Expand Up @@ -293,6 +297,7 @@ class Excursion(Base):
total_time_extincting_amp = Column("total_time_extincting_amp", Interval)
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
batch = Column("batch", String)


class Segment(Base):
Expand All @@ -316,6 +321,7 @@ class Segment(Base):
last_vessel_segment = Column("last_vessel_segment", Boolean)
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
batch = Column("batch", String)


class TaskExecution(Base):
Expand Down
26 changes: 21 additions & 5 deletions src/tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from bloom.logger import logger

import hashlib
from datetime import datetime

class BaseTask():
args = None
Expand All @@ -15,22 +17,36 @@ def stop_on_error(self, value: bool) -> None:
self._stop_on_error = value

def __init__(self, *args, **kwargs):
if 'batch' not in kwargs:
kwargs['batch']=datetime.now().strftime(f"{self.__class__.__name__}-%y%m%d%H%M%S%f")
self.args = args
self.kwargs = kwargs

def start(self) -> None:
logger.info(f"Starting task {self.__class__.__name__}")
try:
self.run(*self.args, **self.kwargs)
logger.info(f"Task {self.__class__.__name__} finished")
logger.info(f"Task {self.__class__.__name__} sucess")
self.on_success(*self.args, **self.kwargs)
except Exception as e:
logger.error(f"Task {self} did not finished correctly. {e}")
if self.stop_on_error:
exit(e.args[0])
else:
raise (e)
self.on_error(*self.args, **self.kwargs)
raise(e)
#if self.stop_on_error:
# exit(e.args[0])
#else:
# raise (e)

def run(self, *args, **kwargs) -> None:
logger.info(f"Task {self.__class__.__name__} success")
pass

def on_success(self,*args, **kwargs) -> None:
logger.info(f"Task {self.__class__.__name__} success")
pass

def on_error(self,*args, **kwargs) -> None:
logger.error(f"Task {self.__class__.__name__} on_error")
pass

def stop(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/tasks/dimensions/load_dim_port_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ def run(self, *args, **kwargs):
except ValidationError as e:
logger.error("Erreur de validation des données de port")
logger.error(e.errors())
raise(e)
except DBException as e:
logger.error("Erreur d'insertion en base")
raise(e)
logger.info(f"{total} ports(s) créés")


Expand Down
8 changes: 7 additions & 1 deletion src/tasks/facts/extract_spire_data_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ def run(self, *args, **kwargs) -> None:
df = pd.read_csv(file_name, sep=";")
#df['spire_update_statement']=pd.to_datetime(df['spire_update_statement'], format='%Y-%m-%d %H:%M:%S.%f %z')
df['spire_update_statement']=df['spire_update_statement'].apply(datetime.fromisoformat)
logger.debug(df[['spire_update_statement']].info(verbose=True))
df['vessel_timestamp']=df['vessel_timestamp'].apply(datetime.fromisoformat)
df['vessel_update_timestamp']=df['vessel_update_timestamp'].apply(datetime.fromisoformat)
df['position_timestamp']=df['position_timestamp'].apply(datetime.fromisoformat)
df['position_update_timestamp']=df['position_update_timestamp'].apply(datetime.fromisoformat)
df['voyage_timestamp']=df['voyage_timestamp'].apply(datetime.fromisoformat)
df['voyage_update_timestamp']=df['voyage_update_timestamp'].apply(datetime.fromisoformat)
df['created_at']=df['created_at'].apply(datetime.fromisoformat)
#spire_ais_data = df.apply(self.map_to_domain, axis=1)
#with Path(file_name).open() as csv_data, db.session() as session:
# raw_vessels = json.load(json_data)
Expand Down
10 changes: 10 additions & 0 deletions src/tasks/load_dimensions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import sys

from bloom.logger import logger
from tasks.base import BaseTask
from tasks.dimensions import LoadDimPortFromCsv, LoadDimVesselFromCsv, LoadDimZoneAmpFromCsv,\
ComputePortGeometryBuffer

import hashlib
from datetime import datetime



class LoadDimensions(BaseTask):

Expand All @@ -13,6 +18,11 @@ def run(self, *args, **kwargs):
LoadDimPortFromCsv(*args, **kwargs).start()
LoadDimVesselFromCsv(*args, **kwargs).start()

def on_error(self, *args, **kwargs):
logger.error("LoadDimensions::on_error")
logger.error(f"batch:{kwargs['batch']}")
pass


if __name__ == "__main__":
task = LoadDimensions(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0),
Expand Down

0 comments on commit c09b6ba

Please sign in to comment.