Skip to content

Commit

Permalink
Setup sqlite database (#56)
Browse files Browse the repository at this point in the history
* initial commit

* add some database queries

* integrate sqlite db

* Automatically reformatting code with black and isort

* remove api token requirement

* add unit tests

* Automatically reformatting code with black and isort

* updat edge inference

* Automatically reformatting code with black and isort

* remove database directory

* remove iqe cache

* updating image

* updating image

* Automatically reformatting code with black and isort

* fix tests

* update image

* Automatically reformatting code with black and isort

* refactoring

* refactor model updater

* update image

* Automatically reformatting code with black and isort

* Apply suggested change

Co-authored-by: Tyler Romero <[email protected]>

* add more constraints to the database tables

* Automatically reformatting code with black and isort

* add more unit tests

* Automatically reformatting code with black and isort

* add more tests

* Automatically reformatting code with black and isort

* temporarily comment out flaky test

* Automatically reformatting code with black and isort

* update image

* fix failing tests

* update image

* Automatically reformatting code with black and isort

* clean up

* update image

* move sqlalchemy logging to its own file

* log database stuff to a file inside the container

* update image

* Automatically reformatting code with black and isort

* address PR feedback

* Automatically reformatting code with black and isort

* turn async code into sync

* Automatically reformatting code with black and isort

* code clean up

* fixing a bug in post_image_query

* update image

* Automatically reformatting code with black and isort

* better description for the inference config

* add metadata query param

* upgrade sdk version

* replace rotating file handler with a custom file handler

* Automatically reformatting code with black and isort

* changing the image for the inference model

* clean up the database impl

* update image

* Automatically reformatting code with black and isort

* refactor database query

* new image

* Automatically reformatting code with black and isort

* fix test

* update image

---------

Co-authored-by: Auto-format Bot <[email protected]>
Co-authored-by: Tyler Romero <[email protected]>
  • Loading branch information
3 people authored Dec 27, 2023
1 parent caae066 commit 34a8b3f
Show file tree
Hide file tree
Showing 21 changed files with 845 additions and 283 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ RUN mkdir /etc/groundlight
RUN mkdir /etc/groundlight/edge-config && \
mkdir /etc/groundlight/inference-deployment

# Adding this here for testing purposes. In production, this will be mounted as persistent
# volume in the kubernetes cluster.
RUN mkdir -p /opt/groundlight/edge/sqlite

# Copy configs
COPY configs ${APP_ROOT}/configs

Expand Down
46 changes: 19 additions & 27 deletions app/api/routes/image_queries.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import logging
from datetime import datetime
from io import BytesIO
from typing import Optional

import numpy as np
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from groundlight import Groundlight
from model import ClassificationResult, Detector, ImageQuery, ImageQueryTypeEnum, ResultTypeEnum
from model import Detector, ImageQuery
from PIL import Image

from app.core.app_state import (
Expand All @@ -16,7 +15,7 @@
get_groundlight_sdk_instance,
)
from app.core.motion_detection import MotionDetectionManager
from app.core.utils import prefixed_ksuid, safe_call_api
from app.core.utils import create_iqe, prefixed_ksuid, safe_call_api

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,7 +124,6 @@ async def post_image_query(

img_numpy = np.asarray(image) # [H, W, C=3], dtype: uint8, RGB format

iqe_cache = app_state.iqe_cache
motion_detection_manager = app_state.motion_detection_manager
edge_inference_manager = app_state.edge_inference_manager
require_human_review = human_review == "ALWAYS"
Expand All @@ -147,13 +145,14 @@ async def post_image_query(
new_image_query = motion_detection_manager.get_image_query_response(detector_id=detector_id).copy(
deep=True, update={"id": prefixed_ksuid(prefix="iqe_")}
)

if new_image_query.result and _is_confident_enough(
confidence=new_image_query.result.confidence,
detector_metadata=get_detector_metadata(detector_id=detector_id, gl=gl),
confidence_threshold=confidence_threshold,
):
logger.debug("Motion detection confidence is high enough to return")
iqe_cache.update_cache(image_query=new_image_query)
app_state.db_manager.create_iqe_record(record=new_image_query)
return new_image_query

image_query = None
Expand All @@ -169,19 +168,30 @@ async def post_image_query(
):
logger.info("Edge detector confidence is high enough to return")

image_query = _create_iqe(
image_query = create_iqe(
detector_id=detector_id,
label=results["label"],
confidence=confidence,
query=detector_metadata.query,
)
iqe_cache.update_cache(image_query=image_query)
app_state.db_manager.create_iqe_record(record=image_query)
else:
logger.info(
"Ran inference locally, but detector confidence is not high enough to return. Current confidence:"
f" {confidence} is less than confidence threshold: {detector_metadata.confidence_threshold}."
" Escalating to the cloud API server."
)
else:
# Add a record to the inference deployments table to indicate that a k8s inference deployment has not yet been
# created for this detector.
api_token = gl.api_client.configuration.api_key["ApiToken"]
app_state.db_manager.create_inference_deployment_record(
record={
"detector_id": detector_id,
"api_token": api_token,
"deployment_created": False,
}
)

# Finally, fall back to submitting the image to the cloud
if not image_query:
Expand Down Expand Up @@ -214,32 +224,13 @@ async def get_image_query(
id: str, gl: Groundlight = Depends(get_groundlight_sdk_instance), app_state: AppState = Depends(get_app_state)
):
if id.startswith("iqe_"):
iqe_cache = app_state.iqe_cache

image_query = iqe_cache.get_cached_image_query(image_query_id=id)
image_query = app_state.db_manager.get_iqe_record(image_query_id=id)
if not image_query:
raise HTTPException(status_code=404, detail=f"Image query with ID {id} not found")
return image_query
return safe_call_api(gl.get_image_query, id=id)


def _create_iqe(detector_id: str, label: str, confidence: float, query: str = "") -> ImageQuery:
iq = ImageQuery(
id=prefixed_ksuid(prefix="iqe_"),
type=ImageQueryTypeEnum.image_query,
created_at=datetime.utcnow(),
query=query,
detector_id=detector_id,
result_type=ResultTypeEnum.binary_classification,
result=ClassificationResult(
confidence=confidence,
label=label,
),
metadata=None,
)
return iq


def _improve_cached_image_query_confidence(
gl: Groundlight,
detector_id: str,
Expand All @@ -253,6 +244,7 @@ def _improve_cached_image_query_confidence(
:param motion_detection_manager: Application's motion detection manager instance.
This manages the motion detection state for all detectors.
:param img: the image to submit.
:param metadata: Optional metadata to attach to the image query.
"""

detector_metadata: Detector = get_detector_metadata(detector_id=detector_id, gl=gl)
Expand Down
52 changes: 31 additions & 21 deletions app/core/app_state.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import os
from functools import lru_cache
from typing import Dict
from typing import Dict, Tuple

import cachetools
import yaml
Expand All @@ -10,9 +10,9 @@
from model import Detector

from .configs import LocalInferenceConfig, MotionDetectionConfig, RootEdgeConfig
from .database import DatabaseManager
from .edge_inference import EdgeInferenceManager
from .file_paths import DEFAULT_EDGE_CONFIG_PATH
from .iqe_cache import IQECache
from .motion_detection import MotionDetectionManager
from .utils import safe_call_api

Expand Down Expand Up @@ -44,6 +44,29 @@ def load_edge_config() -> RootEdgeConfig:
raise FileNotFoundError(f"Could not find edge config file in default location: {DEFAULT_EDGE_CONFIG_PATH}")


def get_inference_and_motion_detection_configs(
root_edge_config: RootEdgeConfig,
) -> Tuple[Dict[str, LocalInferenceConfig], Dict[str, MotionDetectionConfig]]:
motion_detection_templates: Dict[str, MotionDetectionConfig] = root_edge_config.motion_detection_templates
edge_inference_templates: Dict[str, LocalInferenceConfig] = root_edge_config.local_inference_templates

# Filter out detectors whose ID's are empty strings
detectors = list(filter(lambda detector: detector.detector_id != "", root_edge_config.detectors))

motion_detection_config = None
inference_config = None
if detectors:
motion_detection_config: Dict[str, MotionDetectionConfig] = {
detector.detector_id: motion_detection_templates[detector.motion_detection_template]
for detector in detectors
}
inference_config: Dict[str, LocalInferenceConfig] = {
detector.detector_id: edge_inference_templates[detector.local_inference_template] for detector in detectors
}

return inference_config, motion_detection_config


@lru_cache(maxsize=MAX_SDK_INSTANCES_CACHE_SIZE)
def _get_groundlight_sdk_instance_internal(api_token: str):
return Groundlight(api_token=api_token)
Expand Down Expand Up @@ -74,27 +97,14 @@ def get_detector_metadata(detector_id: str, gl: Groundlight) -> Detector:

class AppState:
def __init__(self):
# Create a global shared image query ID cache in the app's state
self.iqe_cache = IQECache()
self.edge_config = load_edge_config()
inference_config, motion_detection_config = get_inference_and_motion_detection_configs(
root_edge_config=self.edge_config
)

motion_detection_templates: Dict[str, MotionDetectionConfig] = self.edge_config.motion_detection_templates
edge_inference_templates: Dict[str, LocalInferenceConfig] = self.edge_config.local_inference_templates

self.motion_detection_config: Dict[str, MotionDetectionConfig] = {
detector.detector_id: motion_detection_templates[detector.motion_detection_template]
for detector in self.edge_config.detectors
}
self.inference_config: Dict[str, LocalInferenceConfig] = {
detector.detector_id: edge_inference_templates[detector.local_inference_template]
for detector in self.edge_config.detectors
}

# Create a global shared motion detection manager object in the app's state
self.motion_detection_manager = MotionDetectionManager(config=self.motion_detection_config)

# Create global shared edge inference manager object in the app's state
self.edge_inference_manager = EdgeInferenceManager(config=self.inference_config)
self.motion_detection_manager = MotionDetectionManager(config=motion_detection_config)
self.edge_inference_manager = EdgeInferenceManager(config=inference_config)
self.db_manager = DatabaseManager()


def get_app_state(request: Request) -> AppState:
Expand Down
1 change: 1 addition & 0 deletions app/core/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class LocalInferenceConfig(BaseModel):
"""

enabled: bool = Field(False, description="Determines if local edge inference is enabled for a specific detector.")
api_token: Optional[str] = Field(None, description="API token to fetch the inference model for this detector.")
refresh_rate: float = Field(
120.0,
description=(
Expand Down
Loading

0 comments on commit 34a8b3f

Please sign in to comment.