From 34a8b3fb49a7abbbe7876dfba7825897b3538906 Mon Sep 17 00:00:00 2001 From: blaise-muhirwa <135643310+blaise-muhirwa@users.noreply.github.com> Date: Wed, 27 Dec 2023 12:42:07 -0800 Subject: [PATCH] Setup sqlite database (#56) * initial commit * add some database queries * integrate sqlite db * Automatically reformatting code with black and isort * remove api token requirement * add unit tests * Automatically reformatting code with black and isort * updat edge inference * Automatically reformatting code with black and isort * remove database directory * remove iqe cache * updating image * updating image * Automatically reformatting code with black and isort * fix tests * update image * Automatically reformatting code with black and isort * refactoring * refactor model updater * update image * Automatically reformatting code with black and isort * Apply suggested change Co-authored-by: Tyler Romero * add more constraints to the database tables * Automatically reformatting code with black and isort * add more unit tests * Automatically reformatting code with black and isort * add more tests * Automatically reformatting code with black and isort * temporarily comment out flaky test * Automatically reformatting code with black and isort * update image * fix failing tests * update image * Automatically reformatting code with black and isort * clean up * update image * move sqlalchemy logging to its own file * log database stuff to a file inside the container * update image * Automatically reformatting code with black and isort * address PR feedback * Automatically reformatting code with black and isort * turn async code into sync * Automatically reformatting code with black and isort * code clean up * fixing a bug in post_image_query * update image * Automatically reformatting code with black and isort * better description for the inference config * add metadata query param * upgrade sdk version * replace rotating file handler with a custom file handler * Automatically reformatting code with black and isort * changing the image for the inference model * clean up the database impl * update image * Automatically reformatting code with black and isort * refactor database query * new image * Automatically reformatting code with black and isort * fix test * update image --------- Co-authored-by: Auto-format Bot Co-authored-by: Tyler Romero --- Dockerfile | 4 + app/api/routes/image_queries.py | 46 ++-- app/core/app_state.py | 52 ++-- app/core/configs.py | 1 + app/core/database.py | 233 ++++++++++++++++++ app/core/edge_inference.py | 50 +++- app/core/file_paths.py | 8 + app/core/iqe_cache.py | 34 --- app/core/motion_detection.py | 22 +- app/core/utils.py | 18 ++ app/main.py | 45 ++++ app/model_updater/update_models.py | 173 +++++++++---- deploy/README.md | 75 ++---- deploy/bin/cluster_setup.sh | 15 +- deploy/bin/install-k3s.sh | 3 +- deploy/bin/make-gl-api-token-secret.sh | 11 - deploy/bin/setup_db.sh | 65 +++++ .../k3s/edge_deployment/edge_deployment.yaml | 63 +++-- pyproject.toml | 2 + .../database_manager/test_database_manager.py | 170 +++++++++++++ test/setup_inference_test_env.sh | 38 --- 21 files changed, 845 insertions(+), 283 deletions(-) create mode 100644 app/core/database.py delete mode 100644 app/core/iqe_cache.py delete mode 100755 deploy/bin/make-gl-api-token-secret.sh create mode 100755 deploy/bin/setup_db.sh create mode 100644 test/database_manager/test_database_manager.py delete mode 100755 test/setup_inference_test_env.sh diff --git a/Dockerfile b/Dockerfile index bf2a1619..bdff0261 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,6 +53,10 @@ RUN mkdir /etc/groundlight RUN mkdir /etc/groundlight/edge-config && \ mkdir /etc/groundlight/inference-deployment +# Adding this here for testing purposes. In production, this will be mounted as persistent +# volume in the kubernetes cluster. +RUN mkdir -p /opt/groundlight/edge/sqlite + # Copy configs COPY configs ${APP_ROOT}/configs diff --git a/app/api/routes/image_queries.py b/app/api/routes/image_queries.py index dd075487..b8922a03 100644 --- a/app/api/routes/image_queries.py +++ b/app/api/routes/image_queries.py @@ -1,12 +1,11 @@ import logging -from datetime import datetime from io import BytesIO from typing import Optional import numpy as np from fastapi import APIRouter, Depends, HTTPException, Query, Request from groundlight import Groundlight -from model import ClassificationResult, Detector, ImageQuery, ImageQueryTypeEnum, ResultTypeEnum +from model import Detector, ImageQuery from PIL import Image from app.core.app_state import ( @@ -16,7 +15,7 @@ get_groundlight_sdk_instance, ) from app.core.motion_detection import MotionDetectionManager -from app.core.utils import prefixed_ksuid, safe_call_api +from app.core.utils import create_iqe, prefixed_ksuid, safe_call_api logger = logging.getLogger(__name__) @@ -125,7 +124,6 @@ async def post_image_query( img_numpy = np.asarray(image) # [H, W, C=3], dtype: uint8, RGB format - iqe_cache = app_state.iqe_cache motion_detection_manager = app_state.motion_detection_manager edge_inference_manager = app_state.edge_inference_manager require_human_review = human_review == "ALWAYS" @@ -147,13 +145,14 @@ async def post_image_query( new_image_query = motion_detection_manager.get_image_query_response(detector_id=detector_id).copy( deep=True, update={"id": prefixed_ksuid(prefix="iqe_")} ) + if new_image_query.result and _is_confident_enough( confidence=new_image_query.result.confidence, detector_metadata=get_detector_metadata(detector_id=detector_id, gl=gl), confidence_threshold=confidence_threshold, ): logger.debug("Motion detection confidence is high enough to return") - iqe_cache.update_cache(image_query=new_image_query) + app_state.db_manager.create_iqe_record(record=new_image_query) return new_image_query image_query = None @@ -169,19 +168,30 @@ async def post_image_query( ): logger.info("Edge detector confidence is high enough to return") - image_query = _create_iqe( + image_query = create_iqe( detector_id=detector_id, label=results["label"], confidence=confidence, query=detector_metadata.query, ) - iqe_cache.update_cache(image_query=image_query) + app_state.db_manager.create_iqe_record(record=image_query) else: logger.info( "Ran inference locally, but detector confidence is not high enough to return. Current confidence:" f" {confidence} is less than confidence threshold: {detector_metadata.confidence_threshold}." " Escalating to the cloud API server." ) + else: + # Add a record to the inference deployments table to indicate that a k8s inference deployment has not yet been + # created for this detector. + api_token = gl.api_client.configuration.api_key["ApiToken"] + app_state.db_manager.create_inference_deployment_record( + record={ + "detector_id": detector_id, + "api_token": api_token, + "deployment_created": False, + } + ) # Finally, fall back to submitting the image to the cloud if not image_query: @@ -214,32 +224,13 @@ async def get_image_query( id: str, gl: Groundlight = Depends(get_groundlight_sdk_instance), app_state: AppState = Depends(get_app_state) ): if id.startswith("iqe_"): - iqe_cache = app_state.iqe_cache - - image_query = iqe_cache.get_cached_image_query(image_query_id=id) + image_query = app_state.db_manager.get_iqe_record(image_query_id=id) if not image_query: raise HTTPException(status_code=404, detail=f"Image query with ID {id} not found") return image_query return safe_call_api(gl.get_image_query, id=id) -def _create_iqe(detector_id: str, label: str, confidence: float, query: str = "") -> ImageQuery: - iq = ImageQuery( - id=prefixed_ksuid(prefix="iqe_"), - type=ImageQueryTypeEnum.image_query, - created_at=datetime.utcnow(), - query=query, - detector_id=detector_id, - result_type=ResultTypeEnum.binary_classification, - result=ClassificationResult( - confidence=confidence, - label=label, - ), - metadata=None, - ) - return iq - - def _improve_cached_image_query_confidence( gl: Groundlight, detector_id: str, @@ -253,6 +244,7 @@ def _improve_cached_image_query_confidence( :param motion_detection_manager: Application's motion detection manager instance. This manages the motion detection state for all detectors. :param img: the image to submit. + :param metadata: Optional metadata to attach to the image query. """ detector_metadata: Detector = get_detector_metadata(detector_id=detector_id, gl=gl) diff --git a/app/core/app_state.py b/app/core/app_state.py index 2dd4165e..76d9544f 100644 --- a/app/core/app_state.py +++ b/app/core/app_state.py @@ -1,7 +1,7 @@ import logging import os from functools import lru_cache -from typing import Dict +from typing import Dict, Tuple import cachetools import yaml @@ -10,9 +10,9 @@ from model import Detector from .configs import LocalInferenceConfig, MotionDetectionConfig, RootEdgeConfig +from .database import DatabaseManager from .edge_inference import EdgeInferenceManager from .file_paths import DEFAULT_EDGE_CONFIG_PATH -from .iqe_cache import IQECache from .motion_detection import MotionDetectionManager from .utils import safe_call_api @@ -44,6 +44,29 @@ def load_edge_config() -> RootEdgeConfig: raise FileNotFoundError(f"Could not find edge config file in default location: {DEFAULT_EDGE_CONFIG_PATH}") +def get_inference_and_motion_detection_configs( + root_edge_config: RootEdgeConfig, +) -> Tuple[Dict[str, LocalInferenceConfig], Dict[str, MotionDetectionConfig]]: + motion_detection_templates: Dict[str, MotionDetectionConfig] = root_edge_config.motion_detection_templates + edge_inference_templates: Dict[str, LocalInferenceConfig] = root_edge_config.local_inference_templates + + # Filter out detectors whose ID's are empty strings + detectors = list(filter(lambda detector: detector.detector_id != "", root_edge_config.detectors)) + + motion_detection_config = None + inference_config = None + if detectors: + motion_detection_config: Dict[str, MotionDetectionConfig] = { + detector.detector_id: motion_detection_templates[detector.motion_detection_template] + for detector in detectors + } + inference_config: Dict[str, LocalInferenceConfig] = { + detector.detector_id: edge_inference_templates[detector.local_inference_template] for detector in detectors + } + + return inference_config, motion_detection_config + + @lru_cache(maxsize=MAX_SDK_INSTANCES_CACHE_SIZE) def _get_groundlight_sdk_instance_internal(api_token: str): return Groundlight(api_token=api_token) @@ -74,27 +97,14 @@ def get_detector_metadata(detector_id: str, gl: Groundlight) -> Detector: class AppState: def __init__(self): - # Create a global shared image query ID cache in the app's state - self.iqe_cache = IQECache() self.edge_config = load_edge_config() + inference_config, motion_detection_config = get_inference_and_motion_detection_configs( + root_edge_config=self.edge_config + ) - motion_detection_templates: Dict[str, MotionDetectionConfig] = self.edge_config.motion_detection_templates - edge_inference_templates: Dict[str, LocalInferenceConfig] = self.edge_config.local_inference_templates - - self.motion_detection_config: Dict[str, MotionDetectionConfig] = { - detector.detector_id: motion_detection_templates[detector.motion_detection_template] - for detector in self.edge_config.detectors - } - self.inference_config: Dict[str, LocalInferenceConfig] = { - detector.detector_id: edge_inference_templates[detector.local_inference_template] - for detector in self.edge_config.detectors - } - - # Create a global shared motion detection manager object in the app's state - self.motion_detection_manager = MotionDetectionManager(config=self.motion_detection_config) - - # Create global shared edge inference manager object in the app's state - self.edge_inference_manager = EdgeInferenceManager(config=self.inference_config) + self.motion_detection_manager = MotionDetectionManager(config=motion_detection_config) + self.edge_inference_manager = EdgeInferenceManager(config=inference_config) + self.db_manager = DatabaseManager() def get_app_state(request: Request) -> AppState: diff --git a/app/core/configs.py b/app/core/configs.py index 1d0c6e8f..c009f74c 100644 --- a/app/core/configs.py +++ b/app/core/configs.py @@ -32,6 +32,7 @@ class LocalInferenceConfig(BaseModel): """ enabled: bool = Field(False, description="Determines if local edge inference is enabled for a specific detector.") + api_token: Optional[str] = Field(None, description="API token to fetch the inference model for this detector.") refresh_rate: float = Field( 120.0, description=( diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 00000000..653bdb06 --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,233 @@ +import datetime +import json +import logging +from logging.handlers import RotatingFileHandler +from typing import Dict, List + +from model import ImageQuery +from sqlalchemy import JSON, Boolean, Column, DateTime, String, create_engine, select +from sqlalchemy.engine.base import Engine +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import declarative_base, sessionmaker + +from .file_paths import DATABASE_FILEPATH, DATABASE_ORM_LOG_FILE, DATABASE_ORM_LOG_FILE_SIZE + +logger = logging.getLogger(__name__) +Base = declarative_base() + + +class DatabaseManager: + class InferenceDeployment(Base): + """ + Schema for the the `inference_deployments` database table. + This is used by both the `edge-endpoint` and `inference-model-updater` containers. + + - The `edge-endpoint` container uses this table to add new detector ID's for which + kubernetes deployments need to be created. + - The `inference-model-updater` container uses it to create inference deployments for + new detectors. + + """ + + __tablename__ = "inference_deployments" + detector_id = Column(String(44), primary_key=True, unique=True, nullable=False, comment="Detector ID") + + api_token = Column(String(66), nullable=False, comment="API token") + deployment_created = Column( + Boolean, + default=False, + nullable=False, + comment=( + "Indicates whether the given detector already has an inference deployment in the kubernetes cluster." + ), + ) + deployment_name = Column( + String(100), + nullable=True, + comment="Name of the kubernetes deployment for the inference server.", + ) + + created_at = Column( + DateTime, nullable=True, default=datetime.datetime.utcnow, comment="Timestamp of record creation" + ) + updated_at = Column( + DateTime, + nullable=True, + default=datetime.datetime.utcnow, + onupdate=datetime.datetime.utcnow, + comment="Timestamp of record update", + ) + + class ImageQueriesEdge(Base): + """ + Schema for the `image_queries_edge` database table. + This table is used by the `edge-endpoint` container to store image queries created from the + `POST /image-queries` endpoint on the edge. + + This is necessary because the core Groundlight service does not recognize these image queries. + Storing them in this table allows us to properly handle `GET /image-queries/{image_query_id}` on the edge. + + """ + + __tablename__ = "image_queries_edge" + image_query_id = Column( + String, + primary_key=True, + unique=True, + nullable=False, + index=True, + comment="Image query ID. This is expected to be prefixed with `iqe_`.", + ) + image_query = Column(JSON, nullable=False, comment="JSON representation of the ImageQuery data model.") + + def __init__(self, verbose: bool = False) -> None: + """ + Initializes the database engine which manages creating and closing connection pools. + :param verbose: If True, it will log all executed database queries. + """ + + log_level = logging.DEBUG if verbose else logging.INFO + self._setup_logging(level=log_level) + + db_url = f"sqlite:///{DATABASE_FILEPATH}" + self._engine: Engine = create_engine(db_url, echo=verbose) + + # Factory for creating new Session objects. + # A session is a mutable, stateful object that represents a single database transaction in progress. + self.session_maker = sessionmaker(bind=self._engine) + + def _setup_logging(self, level) -> None: + """ + Configures logging for SQLAlchemy. This is just so we can declutter the logs. + Logs from the database will be written to the file specified by `DATABASE_ORM_LOG_FILE`. + :param level: The logging level. + """ + # configure SQLAlchemy logging + sqlalchemy_logger = logging.getLogger("sqlalchemy.engine") + sqlalchemy_logger.setLevel(level) + + file_handler = RotatingFileHandler(DATABASE_ORM_LOG_FILE, maxBytes=DATABASE_ORM_LOG_FILE_SIZE, backupCount=1) + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + sqlalchemy_logger.addHandler(file_handler) + + # Ensure that other handlers do not propagate here + sqlalchemy_logger.propagate = False + + def create_inference_deployment_record(self, record: Dict[str, str]) -> None: + """ + Creates a new record in the `inference_deployments` table. If the record exists, but the API token has + changed, we will update the record with the new API token. + :param record: A dictionary containing a subset of the fields in the `inference_deployments` table. + + TODO: Use a pydantic model for the record. + """ + api_token = record["api_token"] + try: + with self.session_maker() as session: + new_record = self.InferenceDeployment(**record) + session.add(new_record) + session.commit() + + except IntegrityError as e: + # Check if the error specifically occurred due to the unique constraint on the detector_id column. + # If it did, then we can ignore the error. + + if "detector_id" in str(e.orig): + logger.debug(f"Detector ID {record['detector_id']} already exists in the database.") + + detectors = self.query_inference_deployments(detector_id=record["detector_id"]) + if len(detectors) != 1: + raise AssertionError("Expected exactly one detector to be returned.") + + existing_api_token = detectors[0].api_token + + if existing_api_token != api_token: + logger.info(f"Updating API token for detector ID {record['detector_id']}.") + self.update_inference_deployment_record(detector_id=record["detector_id"], new_record=record) + + else: + raise e + + def update_inference_deployment_record(self, detector_id: str, new_record: Dict[str, str]) -> None: + """ + Update the record for the given detector. + :param detector_id: Detector ID + :param new_record: A dictionary containing the new values for the record. This is expected to be + a subset of the fields in the `inference_deployments` table. + """ + + if not new_record: + return + + with self.session_maker() as session: + query = select(self.InferenceDeployment).filter_by(detector_id=detector_id) + result = session.execute(query) + + detector_record = result.scalar_one_or_none() + if detector_record is None: + return + + for field, value in new_record.items(): + if hasattr(detector_record, field): + setattr(detector_record, field, value) + session.commit() + + def query_inference_deployments(self, **kwargs) -> List[InferenceDeployment]: + """ + Query the database table for detectors based on a given query predicate. + :param kwargs: A dictionary containing the query predicate. + """ + with self.session_maker() as session: + query = select(self.InferenceDeployment).filter_by(**kwargs) + query_results = session.execute(query) + query_results = query_results.fetchall() + + # SQLAlchemy returns single element tuples for each query result. + query_results = [result[0] for result in query_results] + return query_results + + def create_iqe_record(self, record: ImageQuery) -> None: + """ + Creates a new record in the `image_queries_edge` table. + :param record: A image query . + """ + with self.session_maker() as session: + image_query_id = record.id + image_query_json = json.loads(record.json()) + + new_record = self.ImageQueriesEdge( + image_query_id=image_query_id, + image_query=image_query_json, + ) + session.add(new_record) + session.commit() + + def get_iqe_record(self, image_query_id: str) -> ImageQuery | None: + """ + Gets a record from the `image_queries_edge` table. + :param image_query_id: The ID of the image query. + """ + with self.session_maker() as session: + query = select(self.ImageQueriesEdge.image_query).filter_by(image_query_id=image_query_id) + result = session.execute(query) + result_row: dict | None = result.scalar_one_or_none() + if result_row is None: + return None + + return ImageQuery(**result_row) + + def create_tables(self) -> None: + """ + Create the database tables if they don't exist. + `Base.metadata.create_all` will create tables from all classes that inherit from `Base`. + If the tables already exist, this will do nothing. + """ + with self._engine.begin() as connection: + Base.metadata.create_all(connection) + + def shutdown(self) -> None: + """ + This ensures that we release the resources. + """ + self._engine.dispose() diff --git a/app/core/edge_inference.py b/app/core/edge_inference.py index d680aa9c..ec40540a 100644 --- a/app/core/edge_inference.py +++ b/app/core/edge_inference.py @@ -24,7 +24,7 @@ class EdgeInferenceManager: INFERENCE_SERVER_URL = "inference-service:8000" MODEL_REPOSITORY = "/mnt/models" - def __init__(self, config: Dict[str, LocalInferenceConfig], verbose: bool = False) -> None: + def __init__(self, config: Dict[str, LocalInferenceConfig] | None, verbose: bool = False) -> None: """ Initializes the edge inference manager. Args: @@ -35,17 +35,34 @@ def __init__(self, config: Dict[str, LocalInferenceConfig], verbose: bool = Fals 2) the `LocalInferenceConfig` object determines if local inference is enabled for a specific detector and the model name and version to use for inference. """ - self.inference_config = config - - if self.inference_config: + self.verbose = verbose + self.inference_config, self.inference_clients = {}, {} + if config: + self.inference_config = config self.inference_clients = { detector_id: tritonclient.InferenceServerClient( - url=get_edge_inference_service_name(detector_id) + ":8000", verbose=verbose + url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose ) for detector_id in self.inference_config.keys() if self.detector_configured_for_local_inference(detector_id) } + def update_inference_config(self, detector_id: str, api_token: str) -> None: + """ + Adds a new detector to the inference config at runtime. This is useful when new + detectors are added to the database and we want to create an inference deployment for them. + Args: + detector_id: ID of the detector on which to run local edge inference + api_token: API token required to fetch inference models + + """ + if detector_id not in self.inference_config.keys(): + self.inference_config[detector_id] = LocalInferenceConfig(enabled=True, api_token=api_token) + + self.inference_clients[detector_id] = tritonclient.InferenceServerClient( + url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose + ) + def detector_configured_for_local_inference(self, detector_id: str) -> bool: """ Checks if the detector is configured to run local inference. @@ -54,6 +71,9 @@ def detector_configured_for_local_inference(self, detector_id: str) -> bool: Returns: True if the detector is configured to run local inference, False otherwise """ + if not self.inference_config: + return False + return detector_id in self.inference_config.keys() and self.inference_config[detector_id].enabled def inference_is_available(self, detector_id: str, model_version: str = "") -> bool: @@ -131,7 +151,14 @@ def update_model(self, detector_id: str) -> bool: Returns True if a new model was downloaded and saved, False otherwise. """ logger.info(f"Checking if there is a new model available for {detector_id}") - model_urls = fetch_model_urls(detector_id) + + api_token = ( + self.inference_config[detector_id].api_token + if self.detector_configured_for_local_inference(detector_id) + else None + ) + + model_urls = fetch_model_urls(detector_id, api_token=api_token) cloud_binary_ksuid = model_urls.get("model_binary_id", None) if cloud_binary_ksuid is None: @@ -158,18 +185,15 @@ def update_model(self, detector_id: str) -> bool: return True -def fetch_model_urls(detector_id: str) -> dict[str, str]: - try: - groundlight_api_token = os.environ["GROUNDLIGHT_API_TOKEN"] - except KeyError as ex: - logger.error("GROUNDLIGHT_API_TOKEN environment variable is not set", exc_info=True) - raise ex +def fetch_model_urls(detector_id: str, api_token: Optional[str] = None) -> dict[str, str]: + if not api_token: + raise ValueError(f"No API token provided for {detector_id=}") logger.debug(f"Fetching model URLs for {detector_id}") url = f"https://api.groundlight.ai/edge-api/v1/fetch-model-urls/{detector_id}/" headers = { - "x-api-token": groundlight_api_token, + "x-api-token": api_token, } response = requests.get(url, headers=headers, timeout=10) logger.debug(f"fetch-model-urls response = {response}") diff --git a/app/core/file_paths.py b/app/core/file_paths.py index 64da49d4..5de538f4 100644 --- a/app/core/file_paths.py +++ b/app/core/file_paths.py @@ -1,2 +1,10 @@ DEFAULT_EDGE_CONFIG_PATH = "/etc/groundlight/edge-config/edge-config.yaml" INFERENCE_DEPLOYMENT_TEMPLATE_PATH = "/etc/groundlight/inference-deployment/inference_deployment_template.yaml" + +# Path to the database file. +# This must also match the path used in the PersistentVolumeClaim definition for the database. +DATABASE_FILEPATH = "/opt/groundlight/edge/sqlite/sqlite.db" + +# Path to the database log file. This will contain all SQL queries executed by the ORM. +DATABASE_ORM_LOG_FILE = "sqlalchemy.log" +DATABASE_ORM_LOG_FILE_SIZE = 10_000_000 # 10 MB diff --git a/app/core/iqe_cache.py b/app/core/iqe_cache.py deleted file mode 100644 index 13bbbb9a..00000000 --- a/app/core/iqe_cache.py +++ /dev/null @@ -1,34 +0,0 @@ -from cachetools import LRUCache -from model import ImageQuery - - -class IQECache: - """ - The cache allows us to control the SDK's polling behavior for image queries - without having to change the SDK itself. - - Note(on default cache size): Assuming a 16GB RAM machine and assuming further that - we want to use at most 1/10 of the available RAM for the cache, we will need - approximately 1,000,000 entries in the cache before we start evicting old entries. - - """ - - def __init__(self, cache_size=1_000_000) -> None: - # Cache for image query responses whose IDs are prefixed with "iqe_". This is needed - # because the cloud API does not currently recognize such IDs. - # The cache maintains a mapping from "iqe"'s to image query objects. - # NOTE: This cache is not thread-safe and it is global across all detectors. - self.global_cache = LRUCache(maxsize=cache_size) - - def get_cached_image_query(self, image_query_id: str) -> ImageQuery | None: - return self.global_cache.get(image_query_id, None) - - def update_cache(self, image_query: ImageQuery) -> None: - if image_query.id in self.global_cache: - return - - # Add to the global cache. - self.global_cache[image_query.id] = image_query - - def __str__(self) -> str: - return str(self.global_cache) diff --git a/app/core/motion_detection.py b/app/core/motion_detection.py index 32e79cc5..83aee8b8 100644 --- a/app/core/motion_detection.py +++ b/app/core/motion_detection.py @@ -51,10 +51,6 @@ def unconfident_iq_reescalation_interval_exceeded(self) -> bool: return False - def enable(self) -> None: - if not self._motion_detection_enabled: - self._motion_detection_enabled = True - def motion_detected(self, new_img: np.ndarray) -> bool: if self._previous_motion_detection_time is not None: current_time = time.monotonic() @@ -71,7 +67,7 @@ def motion_detected(self, new_img: np.ndarray) -> bool: class MotionDetectionManager: - def __init__(self, config: Dict[str, MotionDetectionConfig]) -> None: + def __init__(self, config: Dict[str, MotionDetectionConfig] | None) -> None: """ Initializes the motion detection manager. Args: @@ -79,10 +75,12 @@ def __init__(self, config: Dict[str, MotionDetectionConfig]) -> None: `MotionDetectionConfig` objects consist of different parameters needed to run motion detection. """ - self.detectors = { - detector_id: MotionDetectorWrapper(parameters=motion_detection_config) - for detector_id, motion_detection_config in config.items() - } + self.detectors = {} + if config: + self.detectors = { + detector_id: MotionDetectorWrapper(parameters=motion_detection_config) + for detector_id, motion_detection_config in config.items() + } def motion_detection_is_enabled(self, detector_id: str) -> bool: """ @@ -103,9 +101,15 @@ def motion_detection_is_available(self, detector_id: str) -> bool: ) def update_image_query_response(self, detector_id: str, response: ImageQuery) -> None: + if not self.motion_detection_is_enabled(detector_id=detector_id): + raise AssertionError( + f"Cannot update image query response because motion detection is not enabled for {detector_id=}" + ) self.detectors[detector_id].image_query_response = response def get_image_query_response(self, detector_id: str) -> Optional[ImageQuery]: + if detector_id not in self.detectors.keys(): + return None return self.detectors[detector_id].image_query_response def run_motion_detection(self, detector_id: str, new_img: np.ndarray) -> bool: diff --git a/app/core/utils.py b/app/core/utils.py index 67fd223a..ce06a82d 100644 --- a/app/core/utils.py +++ b/app/core/utils.py @@ -1,11 +1,29 @@ +from datetime import datetime from io import BytesIO from typing import Callable import ksuid from fastapi import HTTPException +from model import ClassificationResult, ImageQuery, ImageQueryTypeEnum, ResultTypeEnum from PIL import Image +def create_iqe(detector_id: str, label: str, confidence: float, query: str = "") -> ImageQuery: + iq = ImageQuery( + id=prefixed_ksuid(prefix="iqe_"), + type=ImageQueryTypeEnum.image_query, + created_at=datetime.utcnow(), + query=query, + detector_id=detector_id, + result_type=ResultTypeEnum.binary_classification, + result=ClassificationResult( + confidence=confidence, + label=label, + ), + ) + return iq + + def safe_call_api(api_method: Callable, **kwargs): """ This ensures that we correctly handle HTTP error status codes. In some cases, diff --git a/app/main.py b/app/main.py index 0f4ec843..795644d6 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,8 @@ import logging import os +from typing import Dict, List +from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI from app.api.api import api_router, ping_router @@ -9,6 +11,7 @@ from .core.app_state import AppState LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() +DEPLOY_DETECTOR_LEVEL_INFERENCE = bool(int(os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", 0))) logging.basicConfig(level=LOG_LEVEL) @@ -18,3 +21,45 @@ app.include_router(router=ping_router) app.state.app_state = AppState() +scheduler = AsyncIOScheduler() + + +def update_inference_config(app_state: AppState) -> None: + """ + Update the edge inference config by querying the database for new detectors. + + :param app_state: Application's state manager. + :type app_state: AppState + :return: None + :rtype: None + """ + + db_manager = app_state.db_manager + detectors: List[Dict[str, str]] = db_manager.query_inference_deployments(deployment_created=True) + if detectors: + for detector_record in detectors: + detector_id, api_token = detector_record.detector_id, detector_record.api_token + app_state.edge_inference_manager.update_inference_config(detector_id=detector_id, api_token=api_token) + + +@app.on_event("startup") +async def startup_event(): + # Initialize the database tables + db_manager = app.state.app_state.db_manager + db_manager.create_tables() + + if DEPLOY_DETECTOR_LEVEL_INFERENCE: + # Add job to periodically update the inference config + scheduler.add_job(update_inference_config, "interval", seconds=30, args=[app.state.app_state]) + + # Start the scheduler + scheduler.start() + + +@app.on_event("shutdown") +async def shutdown_event(): + # Dispose off the database engine + app.state.app_state.db_manager.shutdown() + + if DEPLOY_DETECTOR_LEVEL_INFERENCE: + scheduler.shutdown() diff --git a/app/model_updater/update_models.py b/app/model_updater/update_models.py index 8ce02d0d..668212d3 100644 --- a/app/model_updater/update_models.py +++ b/app/model_updater/update_models.py @@ -1,9 +1,11 @@ import logging import os import time +from typing import Dict, List -from app.core.app_state import load_edge_config +from app.core.app_state import get_inference_and_motion_detection_configs, load_edge_config from app.core.configs import RootEdgeConfig +from app.core.database import DatabaseManager from app.core.edge_inference import EdgeInferenceManager, delete_old_model_versions from app.core.kubernetes_management import InferenceDeploymentManager @@ -19,72 +21,141 @@ def sleep_forever(message: str | None = None): time.sleep(TEN_MINUTES) -def update_models(edge_inference_manager: EdgeInferenceManager, deployment_manager: InferenceDeploymentManager): - if not os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", None) or not edge_inference_manager.inference_config: - sleep_forever("Edge inference is disabled globally... sleeping forever.") +def get_refresh_rate(root_edge_config: RootEdgeConfig) -> float: + """ + Get the time interval (in seconds) between model update calls. + """ + if not root_edge_config or not root_edge_config.local_inference_templates: + raise ValueError("Invalid root edge config") + + default_inference_config = root_edge_config.local_inference_templates["default"] + return default_inference_config.refresh_rate + + +def _check_new_models_and_inference_deployments( + detector_id: str, + edge_inference_manager: EdgeInferenceManager, + deployment_manager: InferenceDeploymentManager, + db_manager: DatabaseManager, +) -> None: + """ + Check that a new model is available for the detector_id. If so, update the inference deployment + to reflect the new state. This is also the entrypoint for creating a new inference deployment + and updating the database record for the detector_id (i.e., setting deployment_created to True + when we have successfully rolled out the inference deployment). + + :param detector_id: the detector_id for which we are checking for new models and inference deployments. + :param edge_inference_manager: the edge inference manager object. + :param deployment_manager: the inference deployment manager object. + :param db_manager: the database manager object. + + """ + # Download and write new model to model repo on disk + new_model = edge_inference_manager.update_model(detector_id=detector_id) + + deployment = deployment_manager.get_inference_deployment(detector_id=detector_id) + if deployment is None: + logging.info(f"Creating a new inference deployment for {detector_id}") + deployment_manager.create_inference_deployment(detector_id=detector_id) return - inference_config = edge_inference_manager.inference_config - - if not any([config.enabled for config in inference_config.values()]): - sleep_forever("Edge inference is not enabled for any detectors... sleeping forever.") + if new_model: + # Update inference deployment and rollout a new pod + logging.info(f"Updating inference deployment for {detector_id}") + deployment_manager.update_inference_deployment(detector_id=detector_id) + + poll_start = time.time() + while not deployment_manager.is_inference_deployment_rollout_complete(detector_id): + time.sleep(5) + if time.time() - poll_start > TEN_MINUTES: + raise TimeoutError("Inference deployment is not ready within time limit") + + # Now that we have successfully rolled out a new model version, we can clean up our model repository a bit. + # To be a bit conservative, we keep the current model version as well as the version before that. Older + # versions of the model for the current detector_id will be removed from disk. + logging.info(f"Cleaning up old model versions for {detector_id}") + delete_old_model_versions(detector_id, repository_root=edge_inference_manager.MODEL_REPOSITORY, num_to_keep=2) + + if deployment_manager.is_inference_deployment_rollout_complete(detector_id): + # Database transaction to update the deployment_created field for the detector_id + # At this time, we are sure that the deployment for the detector has been successfully created and rolled out. + db_manager.update_inference_deployment_record( + detector_id=detector_id, + new_record={"deployment_created": True, "deployment_name": deployment.metadata.name}, + ) + + +def update_models( + edge_inference_manager: EdgeInferenceManager, + deployment_manager: InferenceDeploymentManager, + db_manager: DatabaseManager, + refresh_rate: float, +) -> None: + """ + Periodically update inference models for detectors. + + - For existing inference deployments, if a new model is available (i.e., it was fetched + successfully from the edge-api/v1/fetch-model-urls endpoint), then we will rollout a new + pod with the new model. If a new model is not available, then we will do nothing. + + - We will also look for new detectors that need to be deployed. These are expected to be + found in the database. Found detectors will be added to the queue of detectors that need + an inference deployment. + + NOTE: The periodicity of this task is controlled by the refresh_rate parameter. + It is settable in the edge config file (defaults to 2 minutes). + + :param edge_inference_manager: the edge inference manager object. + :param deployment_manager: the inference deployment manager object. + :param db_manager: the database manager object. + :param refresh_rate: the time interval (in seconds) between model update calls. + """ + deploy_detector_level_inference = bool(int(os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", 0))) + if not deploy_detector_level_inference: + sleep_forever("Edge inference is disabled globally... sleeping forever.") return - # Filter to only detectors that have inference enabled - inference_config = {detector_id: config for detector_id, config in inference_config.items() if config.enabled} - - # All enabled detectors should have the same refresh rate. - refresh_rates = [config.refresh_rate for config in inference_config.values()] - if len(set(refresh_rates)) != 1: - logging.error("Detectors have different refresh rates.") - refresh_rate_s = refresh_rates[0] - while True: start = time.time() - for detector_id in inference_config.keys(): + for detector_id in edge_inference_manager.inference_config.keys(): try: - # Download and write new model to model repo on disk - new_model = edge_inference_manager.update_model(detector_id=detector_id) - - deployment = deployment_manager.get_inference_deployment(detector_id=detector_id) - if deployment is None: - logging.info(f"Creating a new inference deployment for {detector_id}") - deployment_manager.create_inference_deployment(detector_id=detector_id) - elif new_model: - # Update inference deployment and rollout a new pod - logging.info(f"Updating inference deployment for {detector_id}") - deployment_manager.update_inference_deployment(detector_id=detector_id) - - poll_start = time.time() - while not deployment_manager.is_inference_deployment_rollout_complete(detector_id): - time.sleep(5) - if time.time() - poll_start > TEN_MINUTES: - raise TimeoutError("Inference deployment is not ready within time limit") - - # Now that we have successfully rolled out a new model version, we can clean up our model repository a bit. - # To be a bit conservative, we keep the current model version as well as the version before that. Older - # versions of the model for the current detector_id will be removed from disk. - logging.info(f"Cleaning up old model versions for {detector_id}") - delete_old_model_versions( - detector_id, repository_root=edge_inference_manager.MODEL_REPOSITORY, num_to_keep=2 - ) + _check_new_models_and_inference_deployments( + detector_id=detector_id, + edge_inference_manager=edge_inference_manager, + deployment_manager=deployment_manager, + db_manager=db_manager, + ) except Exception: logging.error(f"Failed to update model for {detector_id}", exc_info=True) elapsed_s = time.time() - start - if elapsed_s < refresh_rate_s: - time.sleep(refresh_rate_s - elapsed_s) + if elapsed_s < refresh_rate: + time.sleep(refresh_rate - elapsed_s) + + # Fetch detector IDs that need to be deployed from the database and add them to the config + undeployed_detector_ids: List[Dict[str, str]] = db_manager.query_inference_deployments(deployment_created=False) + if undeployed_detector_ids: + for detector_record in undeployed_detector_ids: + edge_inference_manager.update_inference_config( + detector_id=detector_record.detector_id, api_token=detector_record.api_token + ) if __name__ == "__main__": edge_config: RootEdgeConfig = load_edge_config() - edge_inference_templates = edge_config.local_inference_templates - inference_config = { - detector.detector_id: edge_inference_templates[detector.local_inference_template] - for detector in edge_config.detectors - } + refresh_rate = get_refresh_rate(root_edge_config=edge_config) + inference_config, _ = get_inference_and_motion_detection_configs(root_edge_config=edge_config) edge_inference_manager = EdgeInferenceManager(config=inference_config, verbose=True) deployment_manager = InferenceDeploymentManager() - update_models(edge_inference_manager=edge_inference_manager, deployment_manager=deployment_manager) + # We will delegate creation of database tables to the edge-endpoint container. + # So here we don't run a task to create the tables if they don't already exist. + db_manager = DatabaseManager() + + update_models( + edge_inference_manager=edge_inference_manager, + deployment_manager=deployment_manager, + db_manager=db_manager, + refresh_rate=refresh_rate, + ) diff --git a/deploy/README.md b/deploy/README.md index 81676062..a7711602 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -7,61 +7,35 @@ If you don't have [k3s](https://docs.k3s.io/) installed, go ahead and install it > ./deploy/bin/install_k3s.sh ``` -Before we can start the edge-endpoint kubernetes deployment, we need to first ensure that -the `GROUNDLIGHT_API_TOKEN` is set as well as have the AWS secret for accessing the container -registry. Once you have the token, go ahead and run the following -script to create a kubernetes secret for it. -```shell -> ./deploy/bin/make-gl-api-token-secret.sh -``` - -Then run the following script to register credentials for accessing ECR in k3s. +If you intend to run motion detection, make sure to add the detector ID's to the [edge config file](/configs/edge-config.yaml). +If you only intend to run edge inference, you don't need to configure any detectors. By default, edge inference will be set up for +each detector ID for which the Groundlight service receives requests. -```shell -> ./deploy/bin/make-aws-secret.sh +To start the cluster, run +```shell +> ./deploy/bin/cluster_setup.sh ``` -The edge endpoint application requires a [YAML config file](/configs/edge-config.yaml) (currently for setting up necessary parameters -for motion detection and local edge inference). In our k3s cluster, we mount this into the edge-endpoint deployment as a configmap, so we need to first -create this configmap. In order to do so, run - -The edge endpoint application requires a [YAML config file](/configs/edge-config.yaml) (currently for setting up necessary parameters -for motion detection and local edge inference). In addition, it also requires a [templated inference model deployment](/deploy/k3s/inference_deployment.yaml). In the k3s cluster, we mount these files as configmaps by running +Sometimes it might be desirable to reset all database tables(i.e., delete all existing data) for a fresh start. In that case, +you will need to start the cluster with an extra argument: ```shell -> k3s kubectl create configmap edge-config --from-file=configs/edge-config.yaml -> k3s kubectl create configmap inference-deployment-template --from-file=deploy/k3s/inference_deployment.yaml +> ./deploy/bin/cluster_setup.sh db_reset ``` -NOTE: These configmaps will not be automatically synced with the edge logic deployment. Thus, every time one needs to -edit them, they shoud first delete the existing ones and then run the above commands and other -necessary commands as needed. +This will create the edge-endpoint deployment with two containers: one for the edge logic and another one for creating/updating inference +deployments. After a while you should be able to see something like this if you run `kubectl get pods`: ```shell -> k3s kubectl delete configmap edge-config -> k3s kubectl delete configmap inference-deployment-template -``` - -For now we have a hard-coded docker image from ECR in the [edge-endpoint](/edge-endpoint/deploy/k3s/edge_deployment.yaml) -deployment. If you want to make modifications to the code inside the endpoint and push a different -image to ECR see [Pushing/Pulling Images from ECR](#pushingpulling-images-from-elastic-container-registry-ecr). - -To start the kubernetes deployment, run -```shell -> kubectl apply -f deploy/k3s/edge_deployment.yaml +NAME READY STATUS RESTARTS AGE +edge-endpoint-594d645588-5mf28 2/2 Running 0 4s ``` -This will create two k3s resources: our edge-endpoint service of type NodePort and our edge-endpoint -deployment. For simplicity, the service, deployment and pod names (even container names) are all -edge-endpoint. Hopefully this will not be a source of confusion. -After a while you should be able to see something like this if you run `kubectl get pods`: - -```shell -NAME READY STATUS RESTARTS AGE -edge-endpoint-594d645588-5mf28 1/1 Running 0 4s -``` +We currently have a hard-coded docker image from ECR in the [edge-endpoint](/edge-endpoint/deploy/k3s/edge_deployment.yaml) +deployment. If you want to make modifications to the edge endpoint code and push a different +image to ECR see [Pushing/Pulling Images from ECR](#pushingpulling-images-from-elastic-container-registry-ecr). ## Pushing/Pulling Images from Elastic Container Registry (ECR) @@ -74,21 +48,8 @@ then using that ID in the [edge_deployment](/edge-endpoint/deploy/k3s/edge_deplo Follow the following steps: ```shell -# Creating our edge-endpoint docker image. Make sure you are in the root directory -> docker build --tag edge-endpoint . - -# Check that the image was created successfully -> docker images - -# Push the image to ECR -> ./deploy/bin/push-edge-endpoint-image.sh - +# Build and push image to ECR +> ./deploy/bin/build-push-edge-endpoint-image.sh ``` -Once you've pushed the image to the remote registry, you can retrieve the image ID and add temporarily -use it in the deployment file. To apply the changes to the deployment, run - -```shell -kubectl scale deployment edge-endpoint --replicas=1 --namespace=default -``` diff --git a/deploy/bin/cluster_setup.sh b/deploy/bin/cluster_setup.sh index 8fdbc72e..22d045dd 100755 --- a/deploy/bin/cluster_setup.sh +++ b/deploy/bin/cluster_setup.sh @@ -10,12 +10,20 @@ fail() { exit 1 } - K="k3s kubectl" INFERENCE_FLAVOR=${INFERENCE_FLAVOR:-"GPU"} +DB_RESTART=$1 + +# Ensure database file has been correctly setup. If the first argument is "db_reset", +# all the data in the database will be deleted first. +# For now, this means all +# - detectors in the `inference_deployments` table +# - image queries in the `image_queries_edge` table +# For more on these tables you can examine the database file at +# /opt/groundlight/edge/sqlite/sqlite.db +./deploy/bin/setup_db.sh $DB_RESTART # Secrets -./deploy/bin/make-gl-api-token-secret.sh ./deploy/bin/make-aws-secret.sh # Verify secrets have been properly created @@ -23,9 +31,6 @@ if ! $K get secret registry-credentials; then fail "registry-credentials secret not found" fi -if ! $K get secret groundlight-secrets; then - fail "groundlight-secrets secret not found" -fi # Configmaps and deployments $K delete configmap --ignore-not-found edge-config diff --git a/deploy/bin/install-k3s.sh b/deploy/bin/install-k3s.sh index 79a5b311..ebf71868 100755 --- a/deploy/bin/install-k3s.sh +++ b/deploy/bin/install-k3s.sh @@ -7,9 +7,10 @@ K="k3s kubectl" # Update system sudo apt update && sudo apt upgrade -y + # Install k3s echo "Installing k3s..." -curl -sfL https://get.k3s.io | K3S_KUBECONFIG_MODE="644" sh - +curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=v1.28.2+k3s1 K3S_KUBECONFIG_MODE="644" sh - check_k3s_is_running() { local TIMEOUT=30 # Maximum wait time of 30 seconds diff --git a/deploy/bin/make-gl-api-token-secret.sh b/deploy/bin/make-gl-api-token-secret.sh deleted file mode 100755 index cd6cc1dd..00000000 --- a/deploy/bin/make-gl-api-token-secret.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash - -K="k3s kubectl" - -$K delete --ignore-not-found secret groundlight-secrets - -# Create a kubernetes secret for the groundlight api token -# Make sure that you have the groundlight api token set in your environment - -$K create secret generic groundlight-secrets \ - --from-literal=api-token=${GROUNDLIGHT_API_TOKEN} \ No newline at end of file diff --git a/deploy/bin/setup_db.sh b/deploy/bin/setup_db.sh new file mode 100755 index 00000000..d9102869 --- /dev/null +++ b/deploy/bin/setup_db.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +set -ex + +cd "$(dirname "$0")" + + +DATABASE_DIRECTORY="/opt/groundlight/edge/sqlite" +DATABASE_PATH="${DATABASE_DIRECTORY}/sqlite.db" +ENTRY_QUERY="CREATE TABLE IF NOT EXISTS test_table (id INTEGER);" +DROP_QUERY="DROP TABLE IF EXISTS test_table;" + + +reset_tables() { + TABLES=( "inference_deployments" "image_queries_edge" ) + + for TABLE_NAME in "${TABLES[@]}"; do + if [[ $(sqlite3 "${DATABASE_PATH}" "SELECT name FROM sqlite_master WHERE type='table' AND name='${TABLE_NAME}';") == "${TABLE_NAME}" ]]; then + echo "${TABLE_NAME} table exists. Deleting records..." + sqlite3 "${DATABASE_PATH}" "DELETE FROM ${TABLE_NAME};" + else + echo "${TABLE_NAME} table doesn't exist." + fi + done +} + + +# Check if we have sqlite3 CLI installed. If not, install it +if [ ! -x "/usr/bin/sqlite3" ]; then + echo "sqlite3 could not be found. Installing it now..." + sudo apt-get update + sudo apt install -y sqlite3 + +else + echo "sqlite3 is already installed." + which sqlite3 +fi + + +# If the database already exists, exit. Otherwise, create it +if [[ -f "${DATABASE_PATH}" ]]; then + echo "SQLite database file exists and is mounted correctly." +else + echo "SQLite database file doesn't exist or wasn't mounted correctly. Creating it now..." + sudo mkdir -p ${DATABASE_DIRECTORY} + sudo chown -R "$(id -u)":"$(id -g)" "${DATABASE_DIRECTORY}" + + # SQLite is eccentric in a sense that if you just invoke `sqlite3 `, it won't + # actually create the file. We are using a hack here to initialize the database with + # a test table and then drop it. + echo "${ENTRY_QUERY}" | sqlite3 "${DATABASE_PATH}" + echo "${DROP_QUERY}" | sqlite3 "${DATABASE_PATH}" + + # Set journal model to Write-Ahead Logging. This makes it much faster, at the risk of + # possibly losing data if the machine crashes suddenly. + # https://www.sqlite.org/wal.html + echo "PRAGMA journal_mode=WAL;" | sqlite3 "${DATABASE_PATH}" +fi + + +# Reset tables if the first argument is "db_reset" +if [ "$1" == "db_reset" ]; then + echo "Resetting database tables..." + reset_tables +fi \ No newline at end of file diff --git a/deploy/k3s/edge_deployment/edge_deployment.yaml b/deploy/k3s/edge_deployment/edge_deployment.yaml index 0f4d53e9..3c40a2d6 100644 --- a/deploy/k3s/edge_deployment/edge_deployment.yaml +++ b/deploy/k3s/edge_deployment/edge_deployment.yaml @@ -5,6 +5,32 @@ metadata: name: nvidia handler: nvidia --- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: sqlite-pv +spec: + capacity: + storage: 1Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + storageClassName: local-path + hostPath: + path: /opt/groundlight/edge/sqlite +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: sqlite-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: local-path +--- apiVersion: v1 kind: Service metadata: @@ -40,39 +66,38 @@ spec: serviceAccountName: edge-endpoint-service-account containers: - name: edge-endpoint - image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:7280f93c7-triton-memory-mgmt-issue + image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:9a0d8be12-setup-sqlite-database imagePullPolicy: IfNotPresent ports: - containerPort: 6717 env: - name: LOG_LEVEL - value: "DEBUG" + value: "INFO" + # We need this feature flag since we run the edge logic server in two separate environments: + # 1. In docker (on the GitHub Actions runner) for testing + # 2. In kubernetes (currently a dedicated EC2 instance) + # This feature flag is basically good for knowing when to use the python kubernetes API + # (i.e., creating deployments, etc.). We don't want to use the python kubernetes API + # if we are only running the edge logic server in docker. + # TODO: Once we have kubernetes-based tests, we can remove this feature flag. - name: DEPLOY_DETECTOR_LEVEL_INFERENCE - value: "True" - - name: GROUNDLIGHT_API_TOKEN - valueFrom: - secretKeyRef: - name: groundlight-secrets - key: api-token + value: "1" volumeMounts: - name: edge-config-volume mountPath: /etc/groundlight/edge-config + - name: sqlite-data + mountPath: /opt/groundlight/edge/sqlite - name: inference-model-updater - image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:7280f93c7-triton-memory-mgmt-issue + image: 723181461334.dkr.ecr.us-west-2.amazonaws.com/edge-endpoint:9a0d8be12-setup-sqlite-database imagePullPolicy: IfNotPresent command: ["/bin/bash", "-c"] args: ["poetry run python -m app.model_updater.update_models"] env: - name: LOG_LEVEL - value: "DEBUG" + value: "INFO" - name: DEPLOY_DETECTOR_LEVEL_INFERENCE - value: "True" - - name: GROUNDLIGHT_API_TOKEN - valueFrom: - secretKeyRef: - name: groundlight-secrets - key: api-token + value: "1" volumeMounts: - name: edge-config-volume mountPath: /etc/groundlight/edge-config @@ -80,6 +105,8 @@ spec: mountPath: /etc/groundlight/inference-deployment - name: model-repo mountPath: /mnt/models + - name: sqlite-data + mountPath: /opt/groundlight/edge/sqlite imagePullSecrets: - name: registry-credentials @@ -97,3 +124,7 @@ spec: # TODO: check out k3s local path provisioner: https://docs.k3s.io/storage#setting-up-the-local-storage-provider path: /var/groundlight/serving/model_repository type: DirectoryOrCreate + + - name: sqlite-data + persistentVolumeClaim: + claimName: sqlite-pvc \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 02a197e4..012649d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,8 @@ cachetools = "^5.3.1" tritonclient = {extras = ["all"], version = "2.36.0"} kubernetes = "^27.2.0" jinja2 = "^3.1.2" +SQLAlchemy = "2.0.22" +APScheduler = "3.10.4" groundlight = "^0.13.1" [tool.poetry.group.dev.dependencies] diff --git a/test/database_manager/test_database_manager.py b/test/database_manager/test_database_manager.py new file mode 100644 index 00000000..49481ac5 --- /dev/null +++ b/test/database_manager/test_database_manager.py @@ -0,0 +1,170 @@ +import pytest +from model import ImageQuery +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import sessionmaker + +from app.core.database import * +from app.core.database import DatabaseManager +from app.core.utils import create_iqe, prefixed_ksuid + +NUM_TESTING_RECORDS = 100 + + +@pytest.fixture(scope="module") +def db_manager(): + """ + Create a database manager for the entire test module. + """ + db_manager = DatabaseManager(verbose=False) + + # Create an in-memory database + # sqlite:///:memory means that the database will be created in memory, so it's ephemeral. + engine = create_engine("sqlite:///:memory:", echo=False) + db_manager._engine = engine + db_manager.session_maker = sessionmaker(bind=db_manager._engine) + db_manager.create_tables() + + yield db_manager + + # Tear down + db_manager.shutdown() + + +@pytest.fixture(scope="function") +def database_reset(db_manager: DatabaseManager): + """ + Reset the database before every test function and yield control to the test function. + """ + with db_manager.session_maker() as session: + session.execute(text("DELETE FROM inference_deployments")) + session.execute(text("DELETE FROM image_queries_edge")) + session.commit() + yield + + +def test_create_inference_deployment_record(db_manager: DatabaseManager, database_reset): + """ + Test creating a new detector deployment record. + """ + + records = [ + { + "detector_id": prefixed_ksuid("det_"), + "api_token": prefixed_ksuid("api_"), + "deployment_created": False, + } + for _ in range(NUM_TESTING_RECORDS) + ] + + for record in records: + db_manager.create_inference_deployment_record(record=record) + with db_manager.session_maker() as session: + query_text = f"SELECT * FROM inference_deployments WHERE detector_id = '{record['detector_id']}'" + query = session.execute(text(query_text)) + result = query.first() + assert result.detector_id == record["detector_id"] + assert result.api_token == record["api_token"] + assert result.deployment_created == record["deployment_created"] is False + + +def test_get_detectors_without_deployments(db_manager, database_reset): + """ + Check that when we retrieve detector deployment records we get what we expect. + """ + records = [ + { + "detector_id": prefixed_ksuid("det_"), + "api_token": prefixed_ksuid("api_"), + "deployment_created": False, + } + for _ in range(NUM_TESTING_RECORDS) + ] + + for record in records: + db_manager.create_inference_deployment_record(record=record) + + undeployed_detectors = db_manager.query_inference_deployments(deployment_created=False) + assert len(undeployed_detectors) == NUM_TESTING_RECORDS + for record in undeployed_detectors: + assert record.detector_id in [r["detector_id"] for r in records] + assert record.api_token in [r["api_token"] for r in records] + + +def test_get_iqe_record(db_manager, database_reset): + image_query: ImageQuery = create_iqe( + detector_id=prefixed_ksuid("det_"), label="test_label", confidence=0.5, query="test_query" + ) + db_manager.create_iqe_record(record=image_query) + + # Get the record + retrieved_record = db_manager.get_iqe_record(image_query_id=image_query.id) + assert retrieved_record == image_query + + +def test_update_inference_deployment_record(db_manager, database_reset): + """ + Create a few testing records, update the deployment_created field, and check that the update was successful. + """ + records = [ + { + "detector_id": prefixed_ksuid("det_"), + "api_token": prefixed_ksuid("api_"), + "deployment_created": False, + } + for _ in range(NUM_TESTING_RECORDS) + ] + + for record in records: + db_manager.create_inference_deployment_record(record=record) + db_manager.update_inference_deployment_record( + detector_id=record["detector_id"], new_record={"deployment_created": True} + ) + + with db_manager.session_maker() as session: + query_text = f"SELECT * FROM inference_deployments WHERE detector_id = '{record['detector_id']}'" + query = session.execute(text(query_text)) + result = query.first() + assert result.detector_id == record["detector_id"] + assert result.api_token == record["api_token"] + assert bool(result.deployment_created) is True + + +def test_update_api_token_for_detector(db_manager, database_reset): + record = { + "detector_id": prefixed_ksuid("det_"), + "api_token": prefixed_ksuid("api_"), + "deployment_created": False, + } + db_manager.create_inference_deployment_record(record=record) + detectors = db_manager.query_inference_deployments(detector_id=record["detector_id"]) + assert len(detectors) == 1 + assert detectors[0].api_token == record["api_token"] + assert bool(detectors[0].deployment_created) is False + + # Now change the API token + new_api_token = prefixed_ksuid("api_") + db_manager.update_inference_deployment_record( + detector_id=record["detector_id"], new_record={"api_token": new_api_token} + ) + + # Check that the API token has been updated + detectors = db_manager.query_inference_deployments(detector_id=record["detector_id"]) + assert len(detectors) == 1 + assert detectors[0].api_token == new_api_token + assert bool(detectors[0].deployment_created) is False + + +def test_query_inference_deployments_raises_sqlalchemy_error(db_manager: DatabaseManager, database_reset): + detector_record = { + "detector_id": prefixed_ksuid("det_"), + "api_token": prefixed_ksuid("api_"), + "deployment_created": False, + } + db_manager.create_inference_deployment_record(record=detector_record) + + # We will query with invalid parameters and make sure that we get an error + # Here `image_query_id` is not a valid field in the `inference_deployments` table, so + # we should get an error. + with pytest.raises(SQLAlchemyError): + db_manager.query_inference_deployments(detector_id=detector_record["detector_id"], image_query_id="invalid_id") diff --git a/test/setup_inference_test_env.sh b/test/setup_inference_test_env.sh deleted file mode 100755 index 9cc443a1..00000000 --- a/test/setup_inference_test_env.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash - -EDGE_CONFIG=$(cat <<- EOM -motion_detection_templates: - default: - enabled: true - val_threshold: 50 - percentage_threshold: 5.0 - max_time_between_images: 45 - - super-sensitive: - enabled: true - val_threshold: 5 - percentage_threshold: 0.05 - max_time_between_images: 45 - - disabled: - enabled: false - -local_inference_templates: - default: - enabled: true - refresh_rate: 120 - disabled: - enabled: false - -detectors: - - detector_id: 'det_2UOxalD1gegjk4TnyLbtGggiJ8p' - motion_detection_template: 'disabled' - local_inference_template: 'default' - - - detector_id: 'det_2UOxao4HZyB9gv4ZVtwMOvdqgh9' - motion_detection_template: 'disabled' - local_inference_template: 'default' -EOM -) - -export EDGE_CONFIG \ No newline at end of file