Skip to content

Commit

Permalink
Default to offline (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibdex authored Mar 4, 2025
1 parent 4d89dff commit 8559827
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 40 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
22 changes: 7 additions & 15 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from datetime import timedelta
from pathlib import Path
from typing import Annotated

from pydantic import (
AliasChoices,
DirectoryPath,
Field,
FilePath,
HttpUrl,
PostgresDsn,
TypeAdapter,
field_validator,
Expand All @@ -25,15 +21,17 @@ class Config(BaseSettings):

model_config = SettingsConfigDict(frozen=True)

data_refresh_period: timedelta | None = timedelta(minutes=1)
check_mapping_lookups: bool = __debug__

data_refresh_period: float | None = None
"""How often the station status data should be refreshed.
If ``None``, only local data will be used: no requests will be made to external APIs.
"""

# The $PORT environment variable is used by most PaaS to indicate the port the app server should bind to.
port: int = 9090

reverse_geocoding_path: HttpUrl | FilePath = TypeAdapter(HttpUrl).validate_python(
"https://api-adresse.data.gouv.fr/reverse/csv/"
)

user_content_storage: Annotated[
PostgresDsn | Path | None,
Field(
Expand All @@ -43,12 +41,6 @@ class Config(BaseSettings):
),
] = Path("content")

velib_data_base_path: HttpUrl | DirectoryPath = TypeAdapter(
HttpUrl
).validate_python(
"https://velib-metropole-opendata.smovengo.cloud/opendata/Velib_Metropole"
)

@field_validator("user_content_storage")
@classmethod
def normalize_postgres_dsn(cls, value: object) -> object:
Expand Down
27 changes: 22 additions & 5 deletions app/load_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import atoti as tt
import httpx
import pandas as pd
from pydantic import HttpUrl
from pydantic import DirectoryPath, FilePath, HttpUrl

from .config import Config
from .path import RESOURCES_DIRECTORY
from .skeleton import SKELETON
from .util import read_json, reverse_geocode

Expand Down Expand Up @@ -106,19 +107,35 @@ async def load_tables(
config: Config,
http_client: httpx.AsyncClient,
) -> None:
if config.data_refresh_period is None:
reverse_geocoding_path: HttpUrl | FilePath = (
RESOURCES_DIRECTORY / "station_location.csv"
)
velib_data_base_path: HttpUrl | DirectoryPath = RESOURCES_DIRECTORY
else:
reverse_geocoding_path = HttpUrl(
"https://api-adresse.data.gouv.fr/reverse/csv/"
)
velib_data_base_path = HttpUrl(
"https://velib-metropole-opendata.smovengo.cloud/opendata/Velib_Metropole"
)

station_details_df, station_status_df = await asyncio.gather(
read_station_details(
http_client=http_client,
reverse_geocoding_path=config.reverse_geocoding_path,
velib_data_base_path=config.velib_data_base_path,
reverse_geocoding_path=reverse_geocoding_path,
velib_data_base_path=velib_data_base_path,
),
read_station_status(
config.velib_data_base_path,
velib_data_base_path,
http_client=http_client,
),
)

with tt.mapping_lookup(check=__debug__), session.tables.data_transaction():
with (
tt.mapping_lookup(check=config.check_mapping_lookups),
session.tables.data_transaction(),
):
await asyncio.gather(
session.tables[SKELETON.tables.STATION_DETAILS.key].load_async(
station_details_df
Expand Down
4 changes: 4 additions & 0 deletions app/path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pathlib import Path

APP_DIRECTORY = Path(__file__).parent
RESOURCES_DIRECTORY = APP_DIRECTORY / "__resources__"
2 changes: 1 addition & 1 deletion app/start_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def start_session(
"""Start the session, declare the data model and load the initial data."""
session_config = get_session_config(config)
with tt.Session.start(session_config) as session:
with tt.mapping_lookup(check=__debug__):
with tt.mapping_lookup(check=config.check_mapping_lookups):
create_and_join_tables(session)
create_cubes(session)
await load_tables(session, config=config, http_client=http_client)
Expand Down
6 changes: 2 additions & 4 deletions app/util/run_periodically.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from datetime import timedelta


@asynccontextmanager
async def run_periodically(
callback: Callable[[], Awaitable[None]],
/,
*,
period: timedelta,
period: float,
) -> AsyncGenerator[None]:
period_in_seconds = period.total_seconds()
stopped = asyncio.Event()

async def loop() -> None:
while not stopped.is_set():
await callback()
await asyncio.sleep(period_in_seconds)
await asyncio.sleep(period)

task = asyncio.create_task(loop())

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ignore = [
"PLC0414", # Redundant imports are used for re-exporting (See https://peps.python.org/pep-0484/#stub-files).
"S101", # `assert` is useful when used correctly (https://realpython.com/python-assert-statement).
"TCH", # Pydantic needs to resolve type annotations at runtime.
"TID252",
"TRY003", # Not worth the annoyance.
]
select = ["ALL"]
Expand Down
6 changes: 6 additions & 0 deletions task-definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"containerDefinitions": [
{
"name": "atoti-session",
"environment": [
{
"name": "DATA_REFRESH_PERIOD",
"value": "60"
}
],
"essential": true,
"logConfiguration": {
"logDriver": "awslogs",
Expand Down
4 changes: 0 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from app import Config, start_app

_TESTS_DIRECTORY = Path(__file__).parent
_TESTS_RESOURCES_DIRECTORY = _TESTS_DIRECTORY / "__resources__"
_PROJECT_DIRECTORY = _TESTS_DIRECTORY.parent


Expand All @@ -19,11 +18,8 @@ def project_name_fixture() -> str:
@pytest.fixture(name="config", scope="session")
def config_fixture() -> Config:
return Config(
data_refresh_period=None,
reverse_geocoding_path=_TESTS_RESOURCES_DIRECTORY / "station_location.csv",
port=0,
user_content_storage=None,
velib_data_base_path=_TESTS_RESOURCES_DIRECTORY,
)


Expand Down
4 changes: 3 additions & 1 deletion tests/docker/_docker_container.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from uuid import uuid4

Expand All @@ -13,10 +13,12 @@ def docker_container(
*,
client: docker.DockerClient,
container_name: str | None = None,
env: Mapping[str, str] | None = None,
) -> Generator[Container, None, None]:
container = client.containers.run(
image_name,
detach=True,
environment=env,
name=container_name or str(uuid4()),
publish_all_ports=True,
)
Expand Down
9 changes: 8 additions & 1 deletion tests/docker/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ def session_inside_docker_container_fixture(
) -> Generator[tt.Session, None, None]:
timeout = Timeout(timedelta(minutes=1))

with docker_container(docker_image_name, client=docker_client) as container:
with docker_container(
docker_image_name,
client=docker_client,
env={
# Test external APIs.
"DATA_REFRESH_PERIOD": "30"
},
) as container:
logs = container.logs(stream=True)

while "Session listening on port" not in next(logs).decode():
Expand Down
16 changes: 12 additions & 4 deletions tests/docker/test_docker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import atoti as tt

from app import SKELETON
from app.util.skeleton import CONTRIBUTORS_COUNT

from ..total_capacity import TOTAL_CAPACITY


def test_session_inside_docker_container(
session_inside_docker_container: tt.Session,
) -> None:
cube = session_inside_docker_container.cubes[SKELETON.cubes.STATION.key]
result_df = cube.query(cube.measures[CONTRIBUTORS_COUNT])
assert result_df[CONTRIBUTORS_COUNT][0] > 0
skeleton = SKELETON.cubes.STATION
cube = session_inside_docker_container.cubes[skeleton.key]
result_df = cube.query(cube.measures[skeleton.measures.CAPACITY.key])
total_capacity = result_df[skeleton.measures.CAPACITY.name][0]
assert total_capacity > 0, (
"There should be at least one station with one dock or more."
)
assert total_capacity != TOTAL_CAPACITY, (
"The data fetched from the external API should lead to a different capacity than the one of the local data since new stations have been created since the data was snapshotted."
)
10 changes: 5 additions & 5 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
from app import SKELETON
from app.util.skeleton import CONTRIBUTORS_COUNT

from .total_capacity import TOTAL_CAPACITY


def test_total_capacity(session: tt.Session) -> None:
skeleton = SKELETON.cubes.STATION
cube = session.cubes[skeleton.key]
result = cube.query(cube.measures[skeleton.measures.CAPACITY.key])
expected_result = pd.DataFrame(
columns=[skeleton.measures.CAPACITY.name],
data=[
(45_850),
],
dtype="Int32",
{
skeleton.measures.CAPACITY.name: pd.Series([TOTAL_CAPACITY], dtype="Int32"),
}
)
pd.testing.assert_frame_equal(result, expected_result)

Expand Down
2 changes: 2 additions & 0 deletions tests/total_capacity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
TOTAL_CAPACITY = 45_850
"""The expected total capacity of the system when using local data."""

0 comments on commit 8559827

Please sign in to comment.