Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
judtinzhang committed Apr 21, 2024
2 parents beaf219 + 77237c1 commit 135f653
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 59 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pipenv run pre-commit install

This guide details the steps to set up `Redis`, `Redis Insight`, `postgres` and `pgweb` instances using Docker, making it easy for development.

> [!NOTE]
> [NOTE]
> Docker installed on your system. If Docker and Docker Compose is not installed, please follow the installation guide at [Docker's official documentation](https://docs.docker.com/get-docker/).
Run all the services by:
Expand All @@ -58,6 +58,8 @@ After ensuring that your .env file is properly configured, you can create the lo
pipenv run python src/database.py
```

😎 Happy Hacking!

### Development Guide

The structure of this project is setup based on [FastAPI Best Practices](https://github.com/zhanymkanov/fastapi-best-practices). We will try to adhere to it as much as possible, here are the most important conventions to follow. PRs that violate these rules may not be merged.
Expand Down
11 changes: 10 additions & 1 deletion k8s/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Construct } from 'constructs';
import { App } from 'cdk8s';
import { Application, PennLabsChart, RedisApplication } from '@pennlabs/kittyhawk';
import { Application, PennLabsChart, RedisApplication, CronJob } from '@pennlabs/kittyhawk';

const cronTime = require('cron-time-generator');

export class MyChart extends PennLabsChart {
constructor(scope: Construct) {
Expand Down Expand Up @@ -42,6 +44,13 @@ export class MyChart extends PennLabsChart {
...ingressProps,
}
});
new CronJob(this, 'load-flush-db', {
schedule: cronTime.everyDayAt(7),
image: backendImage,
secret,
cmd: ["python", "scripts/flush_db.py", "full"],
env: []
});
}
}

Expand Down
23 changes: 18 additions & 5 deletions scripts/flush_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
from datetime import datetime

import asyncpg
from settings.config import DB_SETTINGS, REDIS_BATCH_SIZE, REDIS_URL
from settings.config import DATABASE_URL, REDIS_BATCH_SIZE, REDIS_URL

from redis.asyncio import Redis


async def batch_insert(events):
# TODO: Ensure number of events does not exceed SQL max statement tokens
BATCH_INSERT_COMMAND = """
INSERT INTO event (product, pennkey, datapoint, value, timestamp)
VALUES ($1, $2, $3, $4, $5)
"""

try:
conn = await asyncpg.connect(**DB_SETTINGS)
conn = await asyncpg.connect(dsn=DATABASE_URL)
# This is probably? sql injection safe, see:
# https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L1901
await conn.executemany(BATCH_INSERT_COMMAND, events)
except Exception as error:
print(f"Error: {error}")
Expand Down Expand Up @@ -53,9 +54,21 @@ async def main():

await batch_insert(events)

# await redis.flushall()
await redis.flushall()


async def redis_count():
redis = await Redis.from_url(str(REDIS_URL))
return await redis.dbsize()


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
count = loop.run_until_complete(redis_count())
print(f"{count} items found in redis")
while count > 0:
loop.run_until_complete(main())
count -= REDIS_BATCH_SIZE
count = max(count, 0)
print(f"{count} items left in redis")
print("Redis flushed")
8 changes: 0 additions & 8 deletions scripts/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,4 @@
REDIS_URL = os.getenv("REDIS_URL")
REDIS_BATCH_SIZE = 1000

DB_SETTINGS = {
"database": os.getenv("POSTGRES_DB"),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"host": os.getenv("POSTGRES_HOST"),
"port": os.getenv("POSTGRES_PORT"),
}

DATABASE_URL = os.getenv("DATABASE_URL")
Empty file added src/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

from sqlalchemy import Column, DateTime, Identity, Integer, MetaData, String, Table
from sqlalchemy.ext.asyncio import create_async_engine

from src.config import settings


DATABASE_URL = settings.DATABASE_URL

engine = create_async_engine(DATABASE_URL)

metadata = MetaData()

event = Table(
"event",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("product", String, nullable=False),
Column("pennkey", String, nullable=True),
Column("datapoint", String, nullable=False),
Column("value", String, nullable=False),
Column("timestamp", DateTime, nullable=False),
)


# Create all tables in the metadata
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)


asyncio.run(create_tables())
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from fastapi import Depends, FastAPI, HTTPException, Request

from src.auth import verify_jwt
from src.models import AnalyticsTxn
from src.redis import set_redis_from_tx
from src.schemas import AnalyticsTxn


app = FastAPI()
Expand Down
38 changes: 38 additions & 0 deletions src/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import hashlib
import json
from datetime import datetime
from enum import Enum
from typing import Optional

from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, ConfigDict
Expand Down Expand Up @@ -43,3 +46,38 @@ def __str__(self):
class RedisEvent(CustomModel):
key: bytes | str
value: bytes | str


class AnalyticsTxn(CustomModel):
product: Product
pennkey: Optional[str] = None
timestamp: datetime
data: list[RedisEvent]

# init with JSON data
def __init__(self, **data):
super().__init__(**data)
self.timestamp = datetime.fromtimestamp(data["timestamp"])
self.data = [RedisEvent(**event) for event in data["data"]]
self.product = Product(data["product"])
self.pennkey = data.get("pennkey")

def get_redis_key(self):
return f"{self.product}.{self.hash_as_key()}"

def build_redis_data(self) -> list[RedisEvent]:
return [
RedisEvent(
key=f"{self.get_redis_key()}.{event.hash_as_key()}",
value=json.dumps(
{
"product": str(self.product),
"pennkey": self.pennkey,
"timestamp": self.timestamp.timestamp(),
"datapoint": event.key,
"value": event.value,
}
),
)
for event in self.data
]
3 changes: 1 addition & 2 deletions src/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from redis.asyncio import Redis
from src.config import settings
from src.models import RedisEvent
from src.schemas import AnalyticsTxn
from src.models import AnalyticsTxn, RedisEvent


redis_client: Redis = Redis.from_url(str(settings.REDIS_URL))
Expand Down
40 changes: 0 additions & 40 deletions src/schemas.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from src.models import RedisEvent
from src.redis import get_by_key, set_redis_keys
from src.schemas import RedisEvent


@pytest.mark.asyncio
Expand Down

0 comments on commit 135f653

Please sign in to comment.