diff --git a/app/api/routes/image_queries.py b/app/api/routes/image_queries.py index e3611275..18e959bb 100644 --- a/app/api/routes/image_queries.py +++ b/app/api/routes/image_queries.py @@ -2,6 +2,7 @@ from datetime import datetime from io import BytesIO from typing import Optional +import asyncio import numpy as np from fastapi import APIRouter, Depends, HTTPException, Query, Request @@ -17,7 +18,6 @@ ) from app.core.motion_detection import MotionDetectionManager from app.core.utils import prefixed_ksuid, safe_call_api -from app.core.database import db logger = logging.getLogger(__name__) @@ -103,7 +103,8 @@ async def post_image_query( new_image_query = motion_detection_manager.get_image_query_response(detector_id=detector_id).copy( deep=True, update={"id": prefixed_ksuid(prefix="iqe_")} ) - iqe_cache.update_cache(image_query=new_image_query) + # iqe_cache.update_cache(image_query=new_image_query) + asyncio.create_task(app_state.db_manager.create_iqe_record(record=new_image_query)) return new_image_query image_query = None @@ -121,7 +122,8 @@ async def post_image_query( confidence=confidence, query=detector_metadata.query, ) - iqe_cache.update_cache(image_query=image_query) + # iqe_cache.update_cache(image_query=image_query) + asyncio.create_task(app_state.db_manager.create_iqe_record(record=image_query)) else: logger.info( "Ran inference locally, but detector confidence is not high enough to return. Current confidence:" @@ -131,12 +133,16 @@ async def post_image_query( else: # Run an asynchronous task to create a record in the database table for this detector to indicate that # edge inference for the given detector ID is not yet set up. - db.create_record( - record={ - "detector_id": detector_id, - "api_token": gl.configuration.api_key["ApiToken"], - "deployment_created": False, - } + logger.info(f"Creating database record for {detector_id=}") + api_token = gl.api_client.configuration.api_key["ApiToken"] + asyncio.create_task( + app_state.db_manager.create_detector_deployment_record( + record={ + "detector_id": detector_id, + "api_token": api_token, + "deployment_created": False, + } + ) ) # Finally, fall back to submitting the image to the cloud @@ -161,9 +167,10 @@ async def get_image_query( id: str, gl: Groundlight = Depends(get_groundlight_sdk_instance), app_state: AppState = Depends(get_app_state) ): if id.startswith("iqe_"): - iqe_cache = app_state.iqe_cache + # iqe_cache = app_state.iqe_cache - image_query = iqe_cache.get_cached_image_query(image_query_id=id) + # image_query = iqe_cache.get_cached_image_query(image_query_id=id) + image_query = app_state.db_manager.get_iqe_record(image_query_id=id) if not image_query: raise HTTPException(status_code=404, detail=f"Image query with ID {id} not found") return image_query diff --git a/app/core/app_state.py b/app/core/app_state.py index 2dd4165e..f7b8584c 100644 --- a/app/core/app_state.py +++ b/app/core/app_state.py @@ -15,6 +15,7 @@ from .iqe_cache import IQECache from .motion_detection import MotionDetectionManager from .utils import safe_call_api +from .database import DatabaseManager logger = logging.getLogger(__name__) @@ -96,6 +97,9 @@ def __init__(self): # Create global shared edge inference manager object in the app's state self.edge_inference_manager = EdgeInferenceManager(config=self.inference_config) + # Initialize a database manager object + self.db_manager = DatabaseManager() + def get_app_state(request: Request) -> AppState: return request.app.state.app_state diff --git a/app/core/configs.py b/app/core/configs.py index 1d0c6e8f..9e36639c 100644 --- a/app/core/configs.py +++ b/app/core/configs.py @@ -32,6 +32,7 @@ class LocalInferenceConfig(BaseModel): """ enabled: bool = Field(False, description="Determines if local edge inference is enabled for a specific detector.") + api_token: Optional[str] = Field(None, description="API token required to fetch inference models.") refresh_rate: float = Field( 120.0, description=( diff --git a/app/core/database.py b/app/core/database.py index 390d7ee6..17ca9781 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,10 +1,17 @@ -from sqlalchemy import Column, Integer, String, Boolean, create_engine +from sqlalchemy import Column, Integer, String, Boolean, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session -from .file_paths import DATABASE_FILEPATH +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.ext.asyncio.engine import AsyncEngine +from app.core.file_paths import DATABASE_CONTAINER_NAME, DATABASE_FILEPATH +from model import ImageQuery from typing import List, Dict -import os +from sqlalchemy.exc import OperationalError, IntegrityError +import logging +from sqlalchemy import select + +logger = logging.getLogger(__name__) Base = declarative_base() @@ -17,48 +24,145 @@ class DetectorDeployment(Base): __tablename__ = "detector_deployments" id = Column(Integer, primary_key=True) - detector_id = Column(String) + detector_id = Column(String, unique=True) api_token = Column(String) deployment_created = Column(Boolean) - class IQECache(Base): + class ImageQueriesEdge(Base): + """ + Schema for the `image_queries_edge` database table. + """ + + __tablename__ = "image_queries_edge" + id = Column(Integer, primary_key=True) + image_query_id = Column(String, unique=True) + image_query = Column(JSON) + + def __init__(self, verbose=False) -> None: + """ + Sets up a database connection and create database tables if they don't exist. + :param verbose: If True, will print out all executed database queries. + :type verbose: bool + :return: None + :rtype: None + """ + db_url = "sqlite+aiosqlite:////var/groundlight/sqlite/sqlite.db" + # db_url = f"sqlite:///{DATABASE_CONTAINER_NAME}:/{DATABASE_FILEPATH}" + self._engine: AsyncEngine = create_async_engine(db_url, echo=verbose) + self.session = sessionmaker(bind=self._engine, expire_on_commit=False, class_=AsyncSession) + + async def create_detector_deployment_record(self, record: Dict[str, str]) -> None: + """ + Creates a new record in the `detector_deployments` table. + :param record: A dictionary containing the detector_id, api_token, and deployment_created fields. + :type record: Dict[str, str] + + :throws IntegrityError: If the detector_id already exists in the database. + :return: None + :rtype: None """ - Schema for the `iqe_cache` database table + try: + async with AsyncSession(self._engine) as session: + new_record = self.DetectorDeployment( + detector_id=record["detector_id"], + api_token=record["api_token"], + deployment_created=record["deployment_created"], + ) + session.add(new_record) + await session.commit() + + except IntegrityError as e: + await session.rollback() + + # Check if the error specifically occurred due to the unique constraint on the detector_id column. + # If it did, then we can ignore the error. + if "detector_id" in str(e.orig): + logger.debug(f"Detector ID {record['detector_id']} already exists in the database.") + else: + logger.error(f"Integrity error occured", exc_info=True) + + async def update_detector_deployment_record(self, detector_id: str) -> None: """ + Check if detector_id is a record in the database. If it is, and the deployment_created field is False, + update the deployment_created field to True. + :param detector_id: Detector ID + :type detector_id: str - __tablename__ = "iqe_cache" + :return: None + :rtype: None + """ - def __init__(self): - self._engine = create_engine(f"sqlite:///{self.validate_filepath(DATABASE_FILEPATH)}") - self.session = sessionmaker(bind=self.engine) + try: + async with AsyncSession(self._engine) as session: + query = select(self.DetectorDeployment).filter_by(detector_id=detector_id) + result = await session.execute(query) - tables = ["detector_deployments", "iqe_cache"] - self._create_tables(tables=tables) + detector_record = result.scalar_one_or_none() + if detector_record is None: + return - @staticmethod - def validate_filepath(filepath: str) -> str: - if not os.path.exists(filepath): - raise FileNotFoundError(f"Invalid filepath: {filepath}") + if not detector_record.deployment_created: + detector_record.deployment_created = True + await session.commit() + except IntegrityError: + logger.debug(f"Error occured while updating database record for {detector_id=}.", exc_info=True) + await session.rollback() - return filepath + async def create_iqe_record(self, record: ImageQuery) -> None: + """ + Creates a new record in the `image_queries_edge` table. + :param record: A image query . + :type record: ImageQuery - async def create_record(self, record: Dict[str, str]) -> None: - with Session(self._engine) as session: - new_record = self.DetectorDeployment( - detector_id=record["detector_id"], - api_token=record["api_token"], - deployment_created=record["deployment_created"], + :throws IntegrityError: If the image_query_id already exists in the database. + :return: None + :rtype: None + """ + try: + async with AsyncSession(self._engine) as session: + image_query_id = record.id + image_query_json = record.json() + new_record = self.ImageQueriesEdge( + image_query_id=image_query_id, + image_query=image_query_json, + ) + session.add(new_record) + await session.commit() + + except IntegrityError: + logger.debug(f"Image query {record['image_query_id']} already exists in the database.") + await session.rollback() + + async def get_iqe_record(self, image_query_id: str) -> ImageQuery | None: + """ + Gets a record from the `image_queries_edge` table. + :param image_query_id: The ID of the image query. + :type image_query_id: str + + :return: The image query record. + :rtype: ImageQuery | None + """ + async with AsyncSession(self._engine) as session: + query = select(self.ImageQueriesEdge.image_query).filter_by(image_query_id=image_query_id) + result = await session.execute(query) + result_row: dict | None = result.scalar_one_or_none() + if result_row is None: + return None + return ImageQuery(**result_row[0]) + + async def get_detectors_without_deployments(self) -> List[Dict[str, str]] | None: + async with AsyncSession(self._engine) as session: + query = select(self.DetectorDeployment.detector_id, self.DetectorDeployment.api_token).filter_by( + deployment_created=False ) - session.add(new_record) - session.commit() - - def get_detectors_without_deployments(self) -> List[str]: - with Session(self._engine) as session: - query = session.query(self.DetectorDeployment.detector_id).filter_by(deployment_created=False) - undeployed_detectors = [result[0] for result in query.all()] + query_results = await session.execute(query) + + undeployed_detectors = [{"detector_id": row[0], "api_token": row[1]} for row in query_results.fetchall()] return undeployed_detectors - def _create_tables(self, tables: List[str]) -> None: + return None + + async def create_tables(self) -> None: """ Checks if the database tables exist and if they don't create them :param tables: A list of database tables in the database @@ -66,9 +170,14 @@ def _create_tables(self, tables: List[str]) -> None: :return: None :rtype: None """ - for table_name in tables: - if not self.engine.dialect.has_table(self._engine, table_name): - Base.metadata.create_all(self._engine) + try: + async with self._engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + except OperationalError: + logger.error("Could not create database tables.", exc_info=True) - -db = DatabaseManager() + def on_shutdown(self) -> None: + """ + This ensures that we release the resources. + """ + self._engine.dispose() diff --git a/app/core/edge_inference.py b/app/core/edge_inference.py index d680aa9c..2513089c 100644 --- a/app/core/edge_inference.py +++ b/app/core/edge_inference.py @@ -36,16 +36,26 @@ def __init__(self, config: Dict[str, LocalInferenceConfig], verbose: bool = Fals a specific detector and the model name and version to use for inference. """ self.inference_config = config + self.verbose = verbose if self.inference_config: self.inference_clients = { detector_id: tritonclient.InferenceServerClient( - url=get_edge_inference_service_name(detector_id) + ":8000", verbose=verbose + url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose ) for detector_id in self.inference_config.keys() if self.detector_configured_for_local_inference(detector_id) } + def update_inference_config(self, detector_id: str, api_token: str) -> None: + + if detector_id not in self.inference_config.keys(): + self.inference_config[detector_id] = LocalInferenceConfig(enabbled=True, api_token=api_token) + + self.inference_clients[detector_id] = tritonclient.InferenceServerClient( + url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose + ) + def detector_configured_for_local_inference(self, detector_id: str) -> bool: """ Checks if the detector is configured to run local inference. @@ -131,7 +141,7 @@ def update_model(self, detector_id: str) -> bool: Returns True if a new model was downloaded and saved, False otherwise. """ logger.info(f"Checking if there is a new model available for {detector_id}") - model_urls = fetch_model_urls(detector_id) + model_urls = fetch_model_urls(detector_id, api_token=self.inference_config[detector_id].api_token) cloud_binary_ksuid = model_urls.get("model_binary_id", None) if cloud_binary_ksuid is None: @@ -158,9 +168,9 @@ def update_model(self, detector_id: str) -> bool: return True -def fetch_model_urls(detector_id: str) -> dict[str, str]: +def fetch_model_urls(detector_id: str, api_token: Optional[str] = None) -> dict[str, str]: try: - groundlight_api_token = os.environ["GROUNDLIGHT_API_TOKEN"] + groundlight_api_token = api_token or os.environ["GROUNDLIGHT_API_TOKEN"] except KeyError as ex: logger.error("GROUNDLIGHT_API_TOKEN environment variable is not set", exc_info=True) raise ex diff --git a/app/core/file_paths.py b/app/core/file_paths.py index 7d20c341..f89d0ccc 100644 --- a/app/core/file_paths.py +++ b/app/core/file_paths.py @@ -1,4 +1,10 @@ DEFAULT_EDGE_CONFIG_PATH = "/etc/groundlight/edge-config/edge-config.yaml" INFERENCE_DEPLOYMENT_TEMPLATE_PATH = "/etc/groundlight/inference-deployment/inference_deployment_template.yaml" -DATABASE_FILEPATH = "sqlite:///edge-endpoint.db" +# Name of the database container in the edge-endpoint deployment. +# If you change this, you must also change it in the edge-endpoint deployment and vice versa. +DATABASE_CONTAINER_NAME = "sqlite-db" + +# Path to the database file mounted into the sqlite-db container. +# This must also match the path used in the PersistentVolumeClaim definition for the database. +DATABASE_FILEPATH = "/var/groundlight/sqlite/sqlite.db" diff --git a/app/main.py b/app/main.py index 0f4ec843..4429decb 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,6 @@ import logging import os - +from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI from app.api.api import api_router, ping_router @@ -18,3 +18,43 @@ app.include_router(router=ping_router) app.state.app_state = AppState() + +scheduler = AsyncIOScheduler() + + +async def update_inference_config(app_state: AppState) -> None: + """ + Update the edge inference config by checking the database for detectors for which no deployments exist. + :param app_state: Application's state manager. + :type app_state: AppState + :return: None + :rtype: None + """ + + db_manager = app_state.db_manager + undeployed_detector_ids = await db_manager.get_detectors_without_deployments() + if undeployed_detector_ids: + for detector_record in undeployed_detector_ids: + detector_id, api_token = detector_record["detector_id"], detector_record["api_token"] + app_state.edge_inference_manager.update_inference_config(detector_id=detector_id, api_token=api_token) + + +@app.on_event("startup") +async def startup_event(): + # Initialize the database tables + db_manager = app.state.app_state.db_manager + await db_manager.create_tables() + + # Add job to periodically update the inference config + scheduler.add_job(update_inference_config, "interval", seconds=30, args=[app.state.app_state]) + + # Start the scheduler + scheduler.start() + + +@app.on_event("shutdown") +async def shutdown_event(): + # Dispose off the database engine + db_manager = app.state.app_state.db_manager + db_manager.on_shutdown() + scheduler.shutdown() diff --git a/app/model_updater/update_models.py b/app/model_updater/update_models.py index 367e644e..13296fb1 100644 --- a/app/model_updater/update_models.py +++ b/app/model_updater/update_models.py @@ -6,74 +6,93 @@ from app.core.configs import RootEdgeConfig from app.core.edge_inference import EdgeInferenceManager, delete_old_model_versions from app.core.kubernetes_management import InferenceDeploymentManager -from typing import List -from app.core.database import db +from typing import List, Dict +import asyncio +from app.core.database import DatabaseManager log_level = os.environ.get("LOG_LEVEL", "INFO").upper() logging.basicConfig(level=log_level) TEN_MINUTES = 60 * 10 +DATABASE_CHECK_INTERVAL = 60 def sleep_forever(message: str | None = None): while True: logging.info(message) time.sleep(TEN_MINUTES) - - -def get_detectors_without_deployments() -> List[str]: - return db.get_detectors_without_deployments() -def update_models(edge_inference_manager: EdgeInferenceManager, deployment_manager: InferenceDeploymentManager): +def get_detector_ids_without_deployments(db_manager: DatabaseManager) -> List[Dict[str, str]] | None: + return asyncio.run(db_manager.get_detectors_without_deployments()) + + +def update_models_helper( + detector_id: str, + edge_inference_manager: EdgeInferenceManager, + deployment_manager: InferenceDeploymentManager, + db_manager: DatabaseManager, +): + # Download and write new model to model repo on disk + new_model = edge_inference_manager.update_model(detector_id=detector_id) + + deployment = deployment_manager.get_inference_deployment(detector_id=detector_id) + if deployment is None: + logging.info(f"Creating a new inference deployment for {detector_id}") + deployment_manager.create_inference_deployment(detector_id=detector_id) + elif new_model: + # Update inference deployment and rollout a new pod + logging.info(f"Updating inference deployment for {detector_id}") + deployment_manager.update_inference_deployment(detector_id=detector_id) + + poll_start = time.time() + while not deployment_manager.is_inference_deployment_rollout_complete(detector_id): + time.sleep(5) + if time.time() - poll_start > TEN_MINUTES: + raise TimeoutError("Inference deployment is not ready within time limit") + + # Database transaction to update the deployment_created field for the detector_id + # At this time, we are sure that the deployment for the detector has been successfully created and rolled out. + asyncio.run(db_manager.update_detector_deployment_record(detector_id=detector_id)) + + # Now that we have successfully rolled out a new model version, we can clean up our model repository a bit. + # To be a bit conservative, we keep the current model version as well as the version before that. Older + # versions of the model for the current detector_id will be removed from disk. + logging.info(f"Cleaning up old model versions for {detector_id}") + delete_old_model_versions(detector_id, repository_root=edge_inference_manager.MODEL_REPOSITORY, num_to_keep=2) + + +def update_models( + edge_inference_manager: EdgeInferenceManager, + deployment_manager: InferenceDeploymentManager, + db_manager: DatabaseManager, +) -> None: if not os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", None) or not edge_inference_manager.inference_config: sleep_forever("Edge inference is disabled globally... sleeping forever.") return - inference_config = edge_inference_manager.inference_config - - if not any([config.enabled for config in inference_config.values()]): + if not any([config.enabled for config in edge_inference_manager.inference_config.values()]): sleep_forever("Edge inference is not enabled for any detectors... sleeping forever.") - return # Filter to only detectors that have inference enabled - inference_config = {detector_id: config for detector_id, config in inference_config.items() if config.enabled} + # inference_config = {detector_id: config for detector_id, config in inference_config.items() if config.enabled} # All enabled detectors should have the same refresh rate. - refresh_rates = [config.refresh_rate for config in inference_config.values()] + refresh_rates = [config.refresh_rate for config in edge_inference_manager.inference_config.values()] if len(set(refresh_rates)) != 1: logging.error("Detectors have different refresh rates.") refresh_rate_s = refresh_rates[0] while True: start = time.time() - for detector_id in inference_config.keys(): + for detector_id in edge_inference_manager.inference_config.keys(): try: - # Download and write new model to model repo on disk - new_model = edge_inference_manager.update_model(detector_id=detector_id) - - deployment = deployment_manager.get_inference_deployment(detector_id=detector_id) - if deployment is None: - logging.info(f"Creating a new inference deployment for {detector_id}") - deployment_manager.create_inference_deployment(detector_id=detector_id) - elif new_model: - # Update inference deployment and rollout a new pod - logging.info(f"Updating inference deployment for {detector_id}") - deployment_manager.update_inference_deployment(detector_id=detector_id) - - poll_start = time.time() - while not deployment_manager.is_inference_deployment_rollout_complete(detector_id): - time.sleep(5) - if time.time() - poll_start > TEN_MINUTES: - raise TimeoutError("Inference deployment is not ready within time limit") - - # Now that we have successfully rolled out a new model version, we can clean up our model repository a bit. - # To be a bit conservative, we keep the current model version as well as the version before that. Older - # versions of the model for the current detector_id will be removed from disk. - logging.info(f"Cleaning up old model versions for {detector_id}") - delete_old_model_versions( - detector_id, repository_root=edge_inference_manager.MODEL_REPOSITORY, num_to_keep=2 - ) + update_models_helper( + detector_id=detector_id, + edge_inference_manager=edge_inference_manager, + deployment_manager=deployment_manager, + db_manager=db_manager, + ) except Exception: logging.error(f"Failed to update model for {detector_id}", exc_info=True) @@ -81,6 +100,15 @@ def update_models(edge_inference_manager: EdgeInferenceManager, deployment_manag if elapsed_s < refresh_rate_s: time.sleep(refresh_rate_s - elapsed_s) + # Fetch detector IDs that need to be deployed from the database + undeployed_detector_ids = get_detector_ids_without_deployments(db_manager=db_manager) + if undeployed_detector_ids: + logging.info("Found detectors that need to be deployed from the database.") + logging.info(f"Record = {undeployed_detector_ids}") + for detector_record in undeployed_detector_ids: + detector_id, api_token = detector_record["detector_id"], detector_record["api_token"] + edge_inference_manager.update_inference_config(detector_id=detector_id, api_token=api_token) + if __name__ == "__main__": edge_config: RootEdgeConfig = load_edge_config() @@ -93,4 +121,10 @@ def update_models(edge_inference_manager: EdgeInferenceManager, deployment_manag edge_inference_manager = EdgeInferenceManager(config=inference_config, verbose=True) deployment_manager = InferenceDeploymentManager() - update_models(edge_inference_manager=edge_inference_manager, deployment_manager=deployment_manager) + # We will delegate creation of database tables to the edge-endpoint container. + # So here we don't run a task to create the tables if they don't exist. + db_manager = DatabaseManager() + + update_models( + edge_inference_manager=edge_inference_manager, deployment_manager=deployment_manager, db_manager=db_manager + ) diff --git a/database/Dockerfile b/database/Dockerfile new file mode 100644 index 00000000..0f9ca692 --- /dev/null +++ b/database/Dockerfile @@ -0,0 +1,29 @@ + +ARG ROOT_DIR="/data" +ARG DB_PORT=1433 +ARG DB_PATH="/var/groundlight/sqlite" + +# use a lightweight Linux distribution as the base image + +FROM alpine:latest + +# Install SQLite +RUN apk add --no-cache sqlite + +# Install python3 and pip +RUN apk add --no-cache python3 py3-pip + +RUN mkdir /var/groundlight +RUN mkdir /var/groundlight/sqlite + +# Set the working directory to ROOT_DIR +WORKDIR ${ROOT_DIR} + +# COPY start.py ${ROOT_DIR}/start.py +COPY start_db.sh ${ROOT_DIR}/start_db.sh +RUN chmod +x ${ROOT_DIR}/start_db.sh + +# Expose the SQLite port +EXPOSE ${DB_PORT} + +ENTRYPOINT ["sh", "start_db.sh"] diff --git a/database/__init__.py b/database/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database/build_database_image.sh b/database/build_database_image.sh new file mode 100755 index 00000000..17acf3cb --- /dev/null +++ b/database/build_database_image.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +set -ex + +# Ensure that you're in the same directory as this script before running it +cd "$(dirname "$0")" + +pwd + +TAG=$(../deploy/bin/git-tag-name.sh) +SQLITE_DB_IMAGE="edge-sqlite-db" +ECR_URL="723181461334.dkr.ecr.us-west-2.amazonaws.com" + +# Authenticate docker to ECR +aws ecr get-login-password --region us-west-2 | docker login \ + --username AWS \ + --password-stdin ${ECR_URL} + +# Check if the first argument is "dev". If it is, only build the image for the current +# platform +if [ "$1" == "dev" ]; then + docker build --tag ${SQLITE_DB_IMAGE} . + docker tag ${SQLITE_DB_IMAGE}:latest ${ECR_URL}/${SQLITE_DB_IMAGE}:${TAG} + docker push ${ECR_URL}/${SQLITE_DB_IMAGE}:${TAG} + exit 0 +fi + +docker build --tag ${SQLITE_DB_IMAGE} . +docker tag ${SQLITE_DB_IMAGE}:latest ${ECR_URL}/${SQLITE_DB_IMAGE}:${TAG} +docker push ${ECR_URL}/${SQLITE_DB_IMAGE}:${TAG} \ No newline at end of file diff --git a/database/start.py b/database/start.py new file mode 100644 index 00000000..3d72939f --- /dev/null +++ b/database/start.py @@ -0,0 +1,19 @@ +import subprocess +import logging +import time +import sqlite3 + +DATABASE_PATH = "/var/groundlight/sqlite/sqlite.db" + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + while True: + logging.info("Attempting to start sqlite3 database") + try: + subprocess.run(["sqlite3", DATABASE_PATH]) + + except Exception as e: + logging.info(f"Failed to start database", exc_info=True) + + time.sleep(30) diff --git a/database/start_db.sh b/database/start_db.sh new file mode 100755 index 00000000..9aebb67e --- /dev/null +++ b/database/start_db.sh @@ -0,0 +1,25 @@ +#!/bin/sh + + +DATABASE_DIRECTORY="/var/groundlight/sqlite" +ENTRY_QUERY="CREATE TABLE IF NOT EXISTS test_table (id INTEGER);" +DROP_QUERY="DROP TABLE IF EXISTS test_table;" +SLEEP_TIME=3600 + +# If the database already exists, exit. Otherwise, create it +if [[ -f "${DATABASE_DIRECTORY}/sqlite.db" ]]; then + echo "SQLite database file exists and is mounted correctly." +else + echo "SQLite database file doesn't exist or wasn't mounted correctly. Creating it now..." + mkdir -p ${DATABASE_DIRECTORY} + # chown -R "$(id -u)":"$(id -g)" "${DATABASE_DIRECTORY}" + chmod -R 777 "${DATABASE_DIRECTORY}" + echo "${ENTRY_QUERY}" | sqlite3 "${DATABASE_DIRECTORY}/sqlite.db" + echo "${DROP_QUERY}" | sqlite3 "${DATABASE_DIRECTORY}/sqlite.db" +fi + +# Loop indefinitely to keep the container running +while true; do + echo "SQLite database is running..." + sleep ${SLEEP_TIME} +done \ No newline at end of file diff --git a/deploy/bin/cluster_setup.sh b/deploy/bin/cluster_setup.sh index 8fdbc72e..12f00b42 100755 --- a/deploy/bin/cluster_setup.sh +++ b/deploy/bin/cluster_setup.sh @@ -10,12 +10,11 @@ fail() { exit 1 } - K="k3s kubectl" INFERENCE_FLAVOR=${INFERENCE_FLAVOR:-"GPU"} # Secrets -./deploy/bin/make-gl-api-token-secret.sh +# ./deploy/bin/make-gl-api-token-secret.sh ./deploy/bin/make-aws-secret.sh # Verify secrets have been properly created @@ -23,9 +22,9 @@ if ! $K get secret registry-credentials; then fail "registry-credentials secret not found" fi -if ! $K get secret groundlight-secrets; then - fail "groundlight-secrets secret not found" -fi +# if ! $K get secret groundlight-secrets; then +# fail "groundlight-secrets secret not found" +# fi # Configmaps and deployments $K delete configmap --ignore-not-found edge-config diff --git a/deploy/k3s/edge_deployment/edge_deployment.yaml b/deploy/k3s/edge_deployment/edge_deployment.yaml index 922fc206..91b0c6a7 100644 --- a/deploy/k3s/edge_deployment/edge_deployment.yaml +++ b/deploy/k3s/edge_deployment/edge_deployment.yaml @@ -5,6 +5,32 @@ metadata: name: nvidia handler: nvidia --- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: sqlite-pv +spec: + capacity: + storage: 1Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + storageClassName: local-path + hostPath: + path: /var/groundlight/sqlite +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: sqlite-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: local-path +--- apiVersion: v1 kind: Service metadata: @@ -40,39 +66,41 @@ spec: serviceAccountName: edge-endpoint-service-account containers: - name: edge-endpoint - image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:c4e05f237-tyler-small-updates + image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:setup-sqlite-database-858d1f585-dirty-735c30c4f0bd243 imagePullPolicy: IfNotPresent ports: - containerPort: 6717 env: - name: LOG_LEVEL - value: "DEBUG" + value: "INFO" - name: DEPLOY_DETECTOR_LEVEL_INFERENCE value: "True" - - name: GROUNDLIGHT_API_TOKEN - valueFrom: - secretKeyRef: - name: groundlight-secrets - key: api-token + # - name: GROUNDLIGHT_API_TOKEN + # valueFrom: + # secretKeyRef: + # name: groundlight-secrets + # key: api-token volumeMounts: - name: edge-config-volume mountPath: /etc/groundlight/edge-config + - name: sqlite-data + mountPath: /var/groundlight/sqlite - name: inference-model-updater - image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:c4e05f237-tyler-small-updates + image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:setup-sqlite-database-858d1f585-dirty-735c30c4f0bd243 imagePullPolicy: IfNotPresent command: ["/bin/bash", "-c"] args: ["poetry run python -m app.model_updater.update_models"] env: - name: LOG_LEVEL - value: "DEBUG" + value: "INFO" - name: DEPLOY_DETECTOR_LEVEL_INFERENCE value: "True" - - name: GROUNDLIGHT_API_TOKEN - valueFrom: - secretKeyRef: - name: groundlight-secrets - key: api-token + # - name: GROUNDLIGHT_API_TOKEN + # valueFrom: + # secretKeyRef: + # name: groundlight-secrets + # key: api-token volumeMounts: - name: edge-config-volume mountPath: /etc/groundlight/edge-config @@ -80,12 +108,18 @@ spec: mountPath: /etc/groundlight/inference-deployment - name: model-repo mountPath: /mnt/models - + - name: sqlite-data + mountPath: /var/groundlight/sqlite + + # If the SQLite container remains in the same deployment as the "edge endpoint" and "inference model updater" + # containers, they will share the same network namespace and can communicate with each other using a file path. + # In this scenario, you can continue to use a file path as the db_url to connect to the SQLite database. - name: sqlite-db - image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:c4e05f237-tyler-small-updates + image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-sqlite-db:setup-sqlite-database-858d1f585-dirty-c17ebe68e26d0a6 imagePullPolicy: IfNotPresent - command: ["/bin/bash", "-c"] - args: ["poetry run python -c app.core.database"] + volumeMounts: + - name: sqlite-data + mountPath: /var/groundlight/sqlite imagePullSecrets: - name: registry-credentials @@ -103,3 +137,7 @@ spec: # TODO: check out k3s local path provisioner: https://docs.k3s.io/storage#setting-up-the-local-storage-provider path: /var/groundlight/serving/model_repository type: DirectoryOrCreate + + - name: sqlite-data + persistentVolumeClaim: + claimName: sqlite-pvc \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d911888b..f14d53d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,8 @@ tritonclient = {extras = ["all"], version = "2.36.0"} kubernetes = "^27.2.0" jinja2 = "^3.1.2" SQLAlchemy = "2.0.22" +aiosqlite = "0.19.0" +APScheduler = "3.10.4" [tool.poetry.group.dev.dependencies] pytest = "^7.2.0"