Skip to content

Commit

Permalink
Merge pull request #2826 from InfinityPacer/feature/security
Browse files Browse the repository at this point in the history
  • Loading branch information
jxxghp authored Oct 9, 2024
2 parents cadc0b0 + 55403cd commit 5ba555e
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 42 deletions.
4 changes: 2 additions & 2 deletions app/api/endpoints/douban.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
from app.chain.douban import DoubanChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.security import verify_token
from app.core.security import verify_token, verify_resource_token
from app.schemas import MediaType
from app.utils.http import RequestUtils

router = APIRouter()


@router.get("/img", summary="豆瓣图片代理")
def douban_img(imgurl: str) -> Any:
def douban_img(imgurl: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any:
"""
豆瓣图片代理
"""
Expand Down
3 changes: 1 addition & 2 deletions app/api/endpoints/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ async def login_access_token(
super_user=user.is_superuser,
user_name=user.name,
avatar=user.avatar,
level=level,
permissions=user.permissions or {}
level=level
)


Expand Down
18 changes: 10 additions & 8 deletions app/api/endpoints/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from typing import Union, Any

import tailer
from fastapi import APIRouter, HTTPException, Depends, Response
from fastapi import APIRouter, Depends, Response
from fastapi.responses import StreamingResponse

from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.config import settings, global_vars
from app.core.module import ModuleManager
from app.core.security import verify_token, verify_uri_token, verify_apitoken
from app.core.security import verify_token, verify_apitoken, verify_resource_token
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
Expand All @@ -31,7 +31,8 @@


@router.get("/img/{proxy}", summary="图片代理")
def proxy_img(imgurl: str, proxy: bool = False) -> Any:
def proxy_img(imgurl: str, proxy: bool = False,
_: schemas.TokenPayload = Depends(verify_resource_token)) -> Any:
"""
图片代理,可选是否使用代理服务器
"""
Expand All @@ -47,7 +48,7 @@ def proxy_img(imgurl: str, proxy: bool = False) -> Any:


@router.get("/cache/image", summary="图片缓存")
def cache_img(url: str) -> Any:
def cache_img(url: str, _: schemas.TokenPayload = Depends(verify_resource_token)) -> Any:
"""
本地缓存图片文件
"""
Expand Down Expand Up @@ -81,7 +82,7 @@ def get_global_setting():
"""
# FIXME: 新增敏感配置项时要在此处添加排除项
info = settings.dict(
exclude={"SECRET_KEY", "API_TOKEN", "TMDB_API_KEY", "TVDB_API_KEY", "FANART_API_KEY",
exclude={"SECRET_KEY", "RESOURCE_SECRET_KEY", "API_TOKEN", "TMDB_API_KEY", "TVDB_API_KEY", "FANART_API_KEY",
"COOKIECLOUD_KEY", "COOKIECLOUD_PASSWORD", "GITHUB_TOKEN", "REPO_GITHUB_TOKEN"}
)
return schemas.Response(success=True,
Expand All @@ -94,7 +95,7 @@ def get_env_setting(_: User = Depends(get_current_active_superuser)):
查询系统环境变量,包括当前版本号(仅管理员)
"""
info = settings.dict(
exclude={"SECRET_KEY", "API_TOKEN", "GITHUB_TOKEN", "REPO_GITHUB_TOKEN"}
exclude={"SECRET_KEY", "RESOURCE_SECRET_KEY", "API_TOKEN", "GITHUB_TOKEN", "REPO_GITHUB_TOKEN"}
)
info.update({
"VERSION": APP_VERSION,
Expand Down Expand Up @@ -186,7 +187,7 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None,


@router.get("/message", summary="实时消息")
def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_uri_token)):
def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统消息,返回格式为SSE
"""
Expand All @@ -204,7 +205,8 @@ def event_generator():


@router.get("/logging", summary="实时日志")
def get_logging(length: int = 50, logfile: str = "moviepilot.log", _: schemas.TokenPayload = Depends(verify_uri_token)):
def get_logging(length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统日志
length = -1 时, 返回text/plain
Expand Down
4 changes: 4 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ class Config:
FRONTEND_PATH: str = "/public"
# 密钥
SECRET_KEY: str = secrets.token_urlsafe(32)
# RESOURCE密钥
RESOURCE_SECRET_KEY: str = secrets.token_urlsafe(32)
# 允许的域名
ALLOWED_HOSTS: list = ["*"]
# TOKEN过期时间
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
# RESOURCE_TOKEN过期时间
RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS: int = 60 * 10
# 时区
TZ: str = "Asia/Shanghai"
# API监听地址
Expand Down
149 changes: 119 additions & 30 deletions app/core/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from cryptography.fernet import Fernet
from fastapi import HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery
from fastapi import HTTPException, status, Security, Request, Response
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery, APIKeyCookie
from passlib.context import CryptContext

from app import schemas
Expand All @@ -27,8 +27,8 @@
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)

# JWT TOKEN 通过 QUERY 认证
jwt_token_query = APIKeyQuery(name="token", auto_error=False, scheme_name="jwt_token_query")
# RESOURCE TOKEN 通过 Cookie 认证
resource_token_cookie = APIKeyCookie(name=settings.PROJECT_NAME, auto_error=False, scheme_name="resource_token_cookie")

# API TOKEN 通过 QUERY 认证
api_token_query = APIKeyQuery(name="token", auto_error=False, scheme_name="api_token_query")
Expand All @@ -45,50 +45,124 @@ def create_access_token(
username: str,
super_user: bool = False,
expires_delta: Optional[timedelta] = None,
level: int = 1
level: int = 1,
purpose: Optional[str] = "authentication"
) -> str:
"""
创建 JWT 访问令牌,包含用户 ID、用户名、是否为超级用户以及权限等级
:param userid: 用户的唯一标识符,通常是字符串或整数
:param username: 用户名,用于标识用户的账户名
:param super_user: 是否为超级用户,默认值为 False
:param expires_delta: 令牌的有效期时长,如果不提供则使用默认过期时间
:param expires_delta: 令牌的有效期时长,如果不提供则根据用途使用默认过期时间
:param level: 用户的权限级别,默认为 1
:param purpose: 令牌的用途,"authentication" 或 "resource"
:return: 编码后的 JWT 令牌字符串
:raises ValueError: 如果 expires_delta 为负数
"""
if purpose == "resource":
default_expire = timedelta(seconds=settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS)
secret_key = settings.RESOURCE_SECRET_KEY
else:
default_expire = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
secret_key = settings.SECRET_KEY

if expires_delta is not None:
if expires_delta.total_seconds() <= 0:
raise ValueError("过期时间必须为正数")
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
expire = datetime.utcnow() + default_expire

to_encode = {
"exp": expire,
"sub": str(userid),
"username": username,
"super_user": super_user,
"level": level
"level": level,
"purpose": purpose
}

encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt


def __verify_token(token: str) -> schemas.TokenPayload:
def __set_or_refresh_resource_token_cookie(request: Request, response: Response, payload: schemas.TokenPayload):
"""
设置资源令牌 Cookie
:param request: 包含请求相关的上下文数据
:param response: 用于在服务器响应时设置 Cookie
:param payload: 已通过身份验证的 TokenPayload 对象
"""
resource_token = request.cookies.get(settings.PROJECT_NAME)

if resource_token:
# 检查令牌剩余时间
try:
decoded_token = jwt.decode(resource_token, settings.RESOURCE_SECRET_KEY, algorithms=[ALGORITHM])
exp = decoded_token.get("exp")
if exp:
remaining_time = datetime.utcfromtimestamp(exp) - datetime.utcnow()
# 如果剩余时间少于 2 分钟,刷新令牌
if remaining_time < timedelta(minutes=2):
raise jwt.ExpiredSignatureError
except jwt.PyJWTError:
logger.debug(f"Token error occurred. refreshing token")
except Exception as e:
logger.debug(f"Unexpected error occurred while decoding token: {e}")
else:
# 如果令牌有效且没有即将过期,则不需要刷新
return

# 创建新的资源访问令牌
resource_token_expires = timedelta(seconds=settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS)
resource_token = create_access_token(
userid=payload.sub,
username=payload.username,
super_user=payload.super_user,
expires_delta=resource_token_expires,
level=payload.level,
purpose="resource"
)

# 设置会话级别的 HttpOnly Cookie
response.set_cookie(
key=settings.PROJECT_NAME,
value=resource_token,
httponly=True,
secure=request.url.scheme == "https",
samesite="strict"
)


def __verify_token(token: str, purpose: str = "authentication") -> schemas.TokenPayload:
"""
使用 JWT Token 进行身份认证并解析 Token 的内容
:param token: JWT 令牌
:param purpose: 期望的令牌用途,默认为 "authentication"
:return: 包含用户身份信息的 Token 负载数据
:raises HTTPException: 如果令牌无效或用途不匹配
"""
使用 JWT Token 进行身份认证并解析 Token 的内容
:param token: JWT 令牌
:return: 包含用户身份信息的 Token 负载数据
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
"""
try:
if purpose == "resource":
secret_key = settings.RESOURCE_SECRET_KEY
else:
secret_key = settings.SECRET_KEY

if not token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{purpose} token not found"
)

payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[ALGORITHM]
token, secret_key, algorithms=[ALGORITHM]
)

token_payload = schemas.TokenPayload(**payload)

if token_payload.purpose != purpose:
raise jwt.InvalidTokenError("令牌用途不匹配")

return schemas.TokenPayload(**payload)
except (jwt.DecodeError, jwt.InvalidTokenError, jwt.ImmatureSignatureError):
raise HTTPException(
Expand All @@ -97,24 +171,39 @@ def __verify_token(token: str) -> schemas.TokenPayload:
)


def verify_token(token: str = Security(oauth2_scheme)) -> schemas.TokenPayload:
def verify_token(
request: Request,
response: Response,
token: str = Security(oauth2_scheme)
) -> schemas.TokenPayload:
"""
使用 JWT Token 进行身份认证并解析 Token 的内容
:param token: JWT 令牌,从请求的 Authorization 头部获取
:return: 包含用户身份信息的 Token 负载数据
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
验证 JWT 令牌并自动处理 resource_token 写入
:param request: 请求对象,用于访问 Cookie 和请求信息
:param response: 响应对象,用于设置 Cookie
:param token: 从 Authorization 头部获取的 JWT 令牌
:return: 解析后的 TokenPayload
:raises HTTPException: 如果令牌无效或用途不匹配
"""
return __verify_token(token)
# 验证并解析 JWT 认证令牌
payload = __verify_token(token=token, purpose="authentication")

# 如果没有 resource_token,生成并写入到 Cookie
__set_or_refresh_resource_token_cookie(request, response, payload)

def verify_uri_token(token: str = Security(jwt_token_query)) -> schemas.TokenPayload:
return payload


def verify_resource_token(
resource_token: str = Security(resource_token_cookie)
) -> schemas.TokenPayload:
"""
使用 JWT Token 进行身份认证并解析 Token 的内容
:param token: JWT 令牌,从 URL 中的 `token` 查询参数获取
:return: 包含用户身份信息的 Token 负载数据
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
验证资源访问令牌(从 Cookie 中获取)
:param resource_token: 从 Cookie 中获取的资源访问令牌
:return: 解析后的 TokenPayload
:raises HTTPException: 如果资源访问令牌无效
"""
return __verify_token(token)
# 验证并解析资源访问令牌
return __verify_token(token=resource_token, purpose="resource")


def __get_api_token(
Expand Down
10 changes: 10 additions & 0 deletions app/schemas/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@


class Token(BaseModel):
# 令牌
access_token: str
# 令牌类型
token_type: str
# 超级用户
super_user: bool
# 用户名
user_name: str
# 头像
avatar: Optional[str] = None
# 权限级别
level: int = 1


Expand All @@ -19,3 +25,7 @@ class TokenPayload(BaseModel):
username: Optional[str] = None
# 超级用户
super_user: Optional[bool] = None
# 权限级别
level: Optional[int] = None
# 令牌用途 authentication\resource
purpose: Optional[str] = None

0 comments on commit 5ba555e

Please sign in to comment.