From d54ec93f386559ca905ddd709d378a41cb0751f1 Mon Sep 17 00:00:00 2001 From: Tnix Date: Sat, 24 Aug 2024 02:53:41 +1200 Subject: [PATCH] feat: simpler signed tokens implementation --- database.py | 52 +++++++++++++++++++++++++++++------------------ rest_api/v0/me.py | 29 ++++++++++++++++++++++++++ security.py | 42 +++++++++++++++++++++++++++++++++----- 3 files changed, 98 insertions(+), 25 deletions(-) diff --git a/database.py b/database.py index f9301ea..8ee5b78 100644 --- a/database.py +++ b/database.py @@ -3,12 +3,10 @@ import redis import os import secrets -import time from radix import Radix -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey -from utils import log, create_ed25519_keys, import_priv_ed25519_key, import_pub_ed25519_key -from signing import SigningKeys +from utils import log CURRENT_DB_VERSION = 9 @@ -219,24 +217,38 @@ # Load existing signing keys or create new ones -# The active private key should be rotated every 10 days by the background thread -# and public keys older than 90 days should be invalidated. -_priv_key = Ed25519PrivateKey.generate() -if db.config.count_documents({"_id": "signing_key"}, limit=1): - _priv_key = Ed25519PrivateKey.from_private_bytes(db.config.find_one({"_id": "signing_key"})["raw"]) +signing_keys = {} +if db.config.count_documents({"_id": "signing_keys"}, limit=1): + data = db.config.count_documents({"_id": "signing_keys"}, limit=1) + + acc_priv = Ed25519PrivateKey.from_private_bytes(data["acc_priv"]) + email_priv = Ed25519PrivateKey.from_private_bytes(data["email_priv"]) + + signing_keys.update({ + "acc_priv": acc_priv, + "acc_pub": acc_priv.public_key(), + + "email_priv": email_priv, + "email_pub": email_priv.public_key() + }) else: - db.config.update_one({"_id": "signing_key"}, {"$set": { - "raw": _priv_key.private_bytes_raw(), - "rotated_at": int(time.time()) - }}, upsert=True) - db.pub_signing_keys.insert_one({ - "raw": _priv_key.public_key().public_bytes_raw(), - "created_at": int(time.time()) + acc_priv = Ed25519PrivateKey.generate() + email_priv = Ed25519PrivateKey.generate() + + signing_keys.update({ + "acc_priv": acc_priv, + "acc_pub": acc_priv.public_key(), + + "email_priv": email_priv, + "email_pub": email_priv.public_key() }) -signing_keys = SigningKeys(_priv_key, [ - Ed25519PublicKey.from_public_bytes(pub_signing_key["raw"]) - for pub_signing_key in db.pub_signing_keys.find({}) -]) + + data = { + "_id": "signing_keys", + "acc_priv": acc_priv.private_bytes_raw(), + "email_priv": email_priv.private_bytes_raw() + } + db.confing.insert_one(signing_keys) # Load netblocks diff --git a/rest_api/v0/me.py b/rest_api/v0/me.py index b98ac62..a2ead10 100644 --- a/rest_api/v0/me.py +++ b/rest_api/v0/me.py @@ -12,6 +12,7 @@ import qrcode, qrcode.image.svg import uuid import secrets +import os import security from database import db, rdb, get_total_pages @@ -45,6 +46,10 @@ class Config: validate_assignment = True str_strip_whitespace = True +class UpdateEmailBody(BaseModel): + password: str = Field(min_length=1, max_length=255) # change in API v1 + email: Optional[str] = Field(default=None, max_length=255) + class ChangePasswordBody(BaseModel): old: str = Field(min_length=1, max_length=255) # change in API v1 new: str = Field(min_length=8, max_length=72) @@ -200,6 +205,30 @@ async def get_relationships(): }, 200 +@me_bp.patch("/email") +@validate_request(UpdateEmailBody) +async def update_email(data: UpdateEmailBody): + # Make sure email is enabled + if not os.getenv("EMAIL_SMTP_HOST"): + return {"error": True, "type": "featureDisabled"}, 503 + + # Check authorization + if not request.user: + abort(401) + + # Check ratelimits + if security.ratelimited(f"login:u:{request.user}") or security.ratelimited(f"emailch:{request.user}"): + abort(429) + + # Check password + account = db.usersv0.find_one({"_id": request.user}, projection={"email": 1, "pswd": 1}) + if not security.check_password_hash(data.old, account["pswd"]): + security.ratelimit(f"login:u:{request.user}", 5, 60) + return {"error": True, "type": "invalidCredentials"}, 401 + + # Send email + + @me_bp.patch("/password") @validate_request(ChangePasswordBody) async def change_password(data: ChangePasswordBody): diff --git a/security.py b/security.py index 6d59202..78279cc 100644 --- a/security.py +++ b/security.py @@ -1,5 +1,6 @@ from hashlib import sha256 -from typing import Optional, Any +from typing import Optional, Any, Literal +from base64 import urlsafe_b64encode, urlsafe_b64decode from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formataddr @@ -47,6 +48,12 @@ TOKEN_BYTES = 64 +TOKEN_TYPES = Literal[ + "acc", # account authorization + "email", # email actions (such as email verification or account recovery) +] + + email_file_loader = jinja2.FileSystemLoader("email_templates") email_env = jinja2.Environment(loader=email_file_loader) @@ -142,6 +149,7 @@ def create_account(username: str, password: str, ip: str): "avatar": "", "avatar_color": "000000", "quote": "", + "email": "", "pswd": hash_password(password), "mfa_recovery_code": secrets.token_hex(5), "tokens": [], @@ -262,12 +270,36 @@ def create_user_token(username: str, ip: str, used_token: Optional[str] = None) return new_token -def create_token(ttype: int, subject: Any, scopes: int, expires_in: Optional[int] = None) -> str: - pass +def create_token(ttype: TOKEN_TYPES, claims: Any, expires_in: Optional[int] = None) -> str: + token = b"miau_" + ttype.encode() + + # Add claims + token += b"." + urlsafe_b64encode(msgpack(claims)) + + # Add expiration + token += b"." + urlsafe_b64encode(str(int(time.time())+expires_in).encode()) + + # Sign token and add signature to token + token += b"." + urlsafe_b64encode(signing_keys[ttype + "_priv"].sign(token)) + + return token.decode() + + +def extract_token(token: str, expected_type: TOKEN_TYPES) -> Optional[Any]: + # Extract data from the token + ttype, claims, expires_at, signature = token.split(".") + + # Check type + if ttype.replace("miau_", "") != expected_type: + return None + # Check signature + signing_keys[ttype.replace("miau_", "") + "_pub"].verify( + urlsafe_b64decode(signature), + (ttype.encode() + b"." + claims.encode() + b"." + expires_at.encode()) + ) -def extract_token(token: str) -> tuple[int, Any, int]: - pass + return msgpack.unpack(urlsafe_b64decode(claims)) def update_settings(username, newdata):