Skip to content

Commit

Permalink
integrate sqlite db
Browse files Browse the repository at this point in the history
  • Loading branch information
blaise-muhirwa committed Oct 27, 2023
1 parent 858d1f5 commit afc2433
Show file tree
Hide file tree
Showing 16 changed files with 469 additions and 116 deletions.
29 changes: 18 additions & 11 deletions app/api/routes/image_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from io import BytesIO
from typing import Optional
import asyncio

import numpy as np
from fastapi import APIRouter, Depends, HTTPException, Query, Request
Expand All @@ -17,7 +18,6 @@
)
from app.core.motion_detection import MotionDetectionManager
from app.core.utils import prefixed_ksuid, safe_call_api
from app.core.database import db


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,7 +103,8 @@ async def post_image_query(
new_image_query = motion_detection_manager.get_image_query_response(detector_id=detector_id).copy(
deep=True, update={"id": prefixed_ksuid(prefix="iqe_")}
)
iqe_cache.update_cache(image_query=new_image_query)
# iqe_cache.update_cache(image_query=new_image_query)
asyncio.create_task(app_state.db_manager.create_iqe_record(record=new_image_query))
return new_image_query

image_query = None
Expand All @@ -121,7 +122,8 @@ async def post_image_query(
confidence=confidence,
query=detector_metadata.query,
)
iqe_cache.update_cache(image_query=image_query)
# iqe_cache.update_cache(image_query=image_query)
asyncio.create_task(app_state.db_manager.create_iqe_record(record=image_query))
else:
logger.info(
"Ran inference locally, but detector confidence is not high enough to return. Current confidence:"
Expand All @@ -131,12 +133,16 @@ async def post_image_query(
else:
# Run an asynchronous task to create a record in the database table for this detector to indicate that
# edge inference for the given detector ID is not yet set up.
db.create_record(
record={
"detector_id": detector_id,
"api_token": gl.configuration.api_key["ApiToken"],
"deployment_created": False,
}
logger.info(f"Creating database record for {detector_id=}")
api_token = gl.api_client.configuration.api_key["ApiToken"]
asyncio.create_task(
app_state.db_manager.create_detector_deployment_record(
record={
"detector_id": detector_id,
"api_token": api_token,
"deployment_created": False,
}
)
)

# Finally, fall back to submitting the image to the cloud
Expand All @@ -161,9 +167,10 @@ async def get_image_query(
id: str, gl: Groundlight = Depends(get_groundlight_sdk_instance), app_state: AppState = Depends(get_app_state)
):
if id.startswith("iqe_"):
iqe_cache = app_state.iqe_cache
# iqe_cache = app_state.iqe_cache

image_query = iqe_cache.get_cached_image_query(image_query_id=id)
# image_query = iqe_cache.get_cached_image_query(image_query_id=id)
image_query = app_state.db_manager.get_iqe_record(image_query_id=id)
if not image_query:
raise HTTPException(status_code=404, detail=f"Image query with ID {id} not found")
return image_query
Expand Down
4 changes: 4 additions & 0 deletions app/core/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .iqe_cache import IQECache
from .motion_detection import MotionDetectionManager
from .utils import safe_call_api
from .database import DatabaseManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,6 +97,9 @@ def __init__(self):
# Create global shared edge inference manager object in the app's state
self.edge_inference_manager = EdgeInferenceManager(config=self.inference_config)

# Initialize a database manager object
self.db_manager = DatabaseManager()


def get_app_state(request: Request) -> AppState:
return request.app.state.app_state
1 change: 1 addition & 0 deletions app/core/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class LocalInferenceConfig(BaseModel):
"""

enabled: bool = Field(False, description="Determines if local edge inference is enabled for a specific detector.")
api_token: Optional[str] = Field(None, description="API token required to fetch inference models.")
refresh_rate: float = Field(
120.0,
description=(
Expand Down
181 changes: 145 additions & 36 deletions app/core/database.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from sqlalchemy import Column, Integer, String, Boolean, create_engine
from sqlalchemy import Column, Integer, String, Boolean, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from .file_paths import DATABASE_FILEPATH
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.asyncio.engine import AsyncEngine
from app.core.file_paths import DATABASE_CONTAINER_NAME, DATABASE_FILEPATH
from model import ImageQuery
from typing import List, Dict
import os
from sqlalchemy.exc import OperationalError, IntegrityError
import logging
from sqlalchemy import select


logger = logging.getLogger(__name__)
Base = declarative_base()


Expand All @@ -17,58 +24,160 @@ class DetectorDeployment(Base):
__tablename__ = "detector_deployments"

id = Column(Integer, primary_key=True)
detector_id = Column(String)
detector_id = Column(String, unique=True)
api_token = Column(String)
deployment_created = Column(Boolean)

class IQECache(Base):
class ImageQueriesEdge(Base):
"""
Schema for the `image_queries_edge` database table.
"""

__tablename__ = "image_queries_edge"
id = Column(Integer, primary_key=True)
image_query_id = Column(String, unique=True)
image_query = Column(JSON)

def __init__(self, verbose=False) -> None:
"""
Sets up a database connection and create database tables if they don't exist.
:param verbose: If True, will print out all executed database queries.
:type verbose: bool
:return: None
:rtype: None
"""
db_url = "sqlite+aiosqlite:////var/groundlight/sqlite/sqlite.db"
# db_url = f"sqlite:///{DATABASE_CONTAINER_NAME}:/{DATABASE_FILEPATH}"
self._engine: AsyncEngine = create_async_engine(db_url, echo=verbose)
self.session = sessionmaker(bind=self._engine, expire_on_commit=False, class_=AsyncSession)

async def create_detector_deployment_record(self, record: Dict[str, str]) -> None:
"""
Creates a new record in the `detector_deployments` table.
:param record: A dictionary containing the detector_id, api_token, and deployment_created fields.
:type record: Dict[str, str]
:throws IntegrityError: If the detector_id already exists in the database.
:return: None
:rtype: None
"""
Schema for the `iqe_cache` database table
try:
async with AsyncSession(self._engine) as session:
new_record = self.DetectorDeployment(
detector_id=record["detector_id"],
api_token=record["api_token"],
deployment_created=record["deployment_created"],
)
session.add(new_record)
await session.commit()

except IntegrityError as e:
await session.rollback()

# Check if the error specifically occurred due to the unique constraint on the detector_id column.
# If it did, then we can ignore the error.
if "detector_id" in str(e.orig):
logger.debug(f"Detector ID {record['detector_id']} already exists in the database.")
else:
logger.error(f"Integrity error occured", exc_info=True)

async def update_detector_deployment_record(self, detector_id: str) -> None:
"""
Check if detector_id is a record in the database. If it is, and the deployment_created field is False,
update the deployment_created field to True.
:param detector_id: Detector ID
:type detector_id: str
__tablename__ = "iqe_cache"
:return: None
:rtype: None
"""

def __init__(self):
self._engine = create_engine(f"sqlite:///{self.validate_filepath(DATABASE_FILEPATH)}")
self.session = sessionmaker(bind=self.engine)
try:
async with AsyncSession(self._engine) as session:
query = select(self.DetectorDeployment).filter_by(detector_id=detector_id)
result = await session.execute(query)

tables = ["detector_deployments", "iqe_cache"]
self._create_tables(tables=tables)
detector_record = result.scalar_one_or_none()
if detector_record is None:
return

@staticmethod
def validate_filepath(filepath: str) -> str:
if not os.path.exists(filepath):
raise FileNotFoundError(f"Invalid filepath: {filepath}")
if not detector_record.deployment_created:
detector_record.deployment_created = True
await session.commit()
except IntegrityError:
logger.debug(f"Error occured while updating database record for {detector_id=}.", exc_info=True)
await session.rollback()

return filepath
async def create_iqe_record(self, record: ImageQuery) -> None:
"""
Creates a new record in the `image_queries_edge` table.
:param record: A image query .
:type record: ImageQuery
async def create_record(self, record: Dict[str, str]) -> None:
with Session(self._engine) as session:
new_record = self.DetectorDeployment(
detector_id=record["detector_id"],
api_token=record["api_token"],
deployment_created=record["deployment_created"],
:throws IntegrityError: If the image_query_id already exists in the database.
:return: None
:rtype: None
"""
try:
async with AsyncSession(self._engine) as session:
image_query_id = record.id
image_query_json = record.json()
new_record = self.ImageQueriesEdge(
image_query_id=image_query_id,
image_query=image_query_json,
)
session.add(new_record)
await session.commit()

except IntegrityError:
logger.debug(f"Image query {record['image_query_id']} already exists in the database.")
await session.rollback()

async def get_iqe_record(self, image_query_id: str) -> ImageQuery | None:
"""
Gets a record from the `image_queries_edge` table.
:param image_query_id: The ID of the image query.
:type image_query_id: str
:return: The image query record.
:rtype: ImageQuery | None
"""
async with AsyncSession(self._engine) as session:
query = select(self.ImageQueriesEdge.image_query).filter_by(image_query_id=image_query_id)
result = await session.execute(query)
result_row: dict | None = result.scalar_one_or_none()
if result_row is None:
return None
return ImageQuery(**result_row[0])

async def get_detectors_without_deployments(self) -> List[Dict[str, str]] | None:
async with AsyncSession(self._engine) as session:
query = select(self.DetectorDeployment.detector_id, self.DetectorDeployment.api_token).filter_by(
deployment_created=False
)
session.add(new_record)
session.commit()

def get_detectors_without_deployments(self) -> List[str]:
with Session(self._engine) as session:
query = session.query(self.DetectorDeployment.detector_id).filter_by(deployment_created=False)
undeployed_detectors = [result[0] for result in query.all()]
query_results = await session.execute(query)

undeployed_detectors = [{"detector_id": row[0], "api_token": row[1]} for row in query_results.fetchall()]
return undeployed_detectors

def _create_tables(self, tables: List[str]) -> None:
return None

async def create_tables(self) -> None:
"""
Checks if the database tables exist and if they don't create them
:param tables: A list of database tables in the database
:return: None
:rtype: None
"""
for table_name in tables:
if not self.engine.dialect.has_table(self._engine, table_name):
Base.metadata.create_all(self._engine)
try:
async with self._engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)
except OperationalError:
logger.error("Could not create database tables.", exc_info=True)


db = DatabaseManager()
def on_shutdown(self) -> None:
"""
This ensures that we release the resources.
"""
self._engine.dispose()
18 changes: 14 additions & 4 deletions app/core/edge_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,26 @@ def __init__(self, config: Dict[str, LocalInferenceConfig], verbose: bool = Fals
a specific detector and the model name and version to use for inference.
"""
self.inference_config = config
self.verbose = verbose

if self.inference_config:
self.inference_clients = {
detector_id: tritonclient.InferenceServerClient(
url=get_edge_inference_service_name(detector_id) + ":8000", verbose=verbose
url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose
)
for detector_id in self.inference_config.keys()
if self.detector_configured_for_local_inference(detector_id)
}

def update_inference_config(self, detector_id: str, api_token: str) -> None:

if detector_id not in self.inference_config.keys():
self.inference_config[detector_id] = LocalInferenceConfig(enabbled=True, api_token=api_token)

self.inference_clients[detector_id] = tritonclient.InferenceServerClient(
url=get_edge_inference_service_name(detector_id) + ":8000", verbose=self.verbose
)

def detector_configured_for_local_inference(self, detector_id: str) -> bool:
"""
Checks if the detector is configured to run local inference.
Expand Down Expand Up @@ -131,7 +141,7 @@ def update_model(self, detector_id: str) -> bool:
Returns True if a new model was downloaded and saved, False otherwise.
"""
logger.info(f"Checking if there is a new model available for {detector_id}")
model_urls = fetch_model_urls(detector_id)
model_urls = fetch_model_urls(detector_id, api_token=self.inference_config[detector_id].api_token)

cloud_binary_ksuid = model_urls.get("model_binary_id", None)
if cloud_binary_ksuid is None:
Expand All @@ -158,9 +168,9 @@ def update_model(self, detector_id: str) -> bool:
return True


def fetch_model_urls(detector_id: str) -> dict[str, str]:
def fetch_model_urls(detector_id: str, api_token: Optional[str] = None) -> dict[str, str]:
try:
groundlight_api_token = os.environ["GROUNDLIGHT_API_TOKEN"]
groundlight_api_token = api_token or os.environ["GROUNDLIGHT_API_TOKEN"]
except KeyError as ex:
logger.error("GROUNDLIGHT_API_TOKEN environment variable is not set", exc_info=True)
raise ex
Expand Down
8 changes: 7 additions & 1 deletion app/core/file_paths.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
DEFAULT_EDGE_CONFIG_PATH = "/etc/groundlight/edge-config/edge-config.yaml"
INFERENCE_DEPLOYMENT_TEMPLATE_PATH = "/etc/groundlight/inference-deployment/inference_deployment_template.yaml"

DATABASE_FILEPATH = "sqlite:///edge-endpoint.db"
# Name of the database container in the edge-endpoint deployment.
# If you change this, you must also change it in the edge-endpoint deployment and vice versa.
DATABASE_CONTAINER_NAME = "sqlite-db"

# Path to the database file mounted into the sqlite-db container.
# This must also match the path used in the PersistentVolumeClaim definition for the database.
DATABASE_FILEPATH = "/var/groundlight/sqlite/sqlite.db"
Loading

0 comments on commit afc2433

Please sign in to comment.