Files
ds2api/routes/admin/auth.py

156 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Admin 认证模块 - JWT 和登录相关"""
import base64
import os
import time
import hashlib
import hmac
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.config import logger
router = APIRouter()
security = HTTPBearer(auto_error=False)
# Admin Key 验证(默认值适用于开发/演示环境,生产环境请务必修改)
ADMIN_KEY = os.getenv("DS2API_ADMIN_KEY", "your-admin-secret-key")
# JWT 配置
JWT_SECRET = os.getenv("DS2API_JWT_SECRET", ADMIN_KEY or "ds2api-default-secret")
JWT_EXPIRE_HOURS = int(os.getenv("DS2API_JWT_EXPIRE_HOURS", "24"))
# ----------------------------------------------------------------------
# JWT 工具函数(轻量实现,无需额外依赖)
# ----------------------------------------------------------------------
def _b64_encode(data: bytes) -> str:
"""Base64 URL 安全编码"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64_decode(data: str) -> bytes:
"""Base64 URL 安全解码"""
padding = 4 - len(data) % 4
if padding != 4:
data += "=" * padding
return base64.urlsafe_b64decode(data)
def create_jwt_token(expire_hours: int = None) -> str:
"""创建 JWT Token"""
import json
if expire_hours is None:
expire_hours = JWT_EXPIRE_HOURS
header = {"alg": "HS256", "typ": "JWT"}
payload = {
"iat": int(time.time()),
"exp": int(time.time()) + (expire_hours * 3600),
"role": "admin"
}
header_b64 = _b64_encode(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = _b64_encode(json.dumps(payload, separators=(",", ":")).encode())
message = f"{header_b64}.{payload_b64}"
signature = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
signature_b64 = _b64_encode(signature)
return f"{message}.{signature_b64}"
def verify_jwt_token(token: str) -> dict:
"""验证 JWT Token返回 payload 或抛出异常"""
import json
try:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid token format")
header_b64, payload_b64, signature_b64 = parts
# 验证签名
message = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
actual_sig = _b64_decode(signature_b64)
if not hmac.compare_digest(expected_sig, actual_sig):
raise ValueError("Invalid signature")
# 解析 payload
payload = json.loads(_b64_decode(payload_b64))
# 验证过期时间
if payload.get("exp", 0) < time.time():
raise ValueError("Token expired")
return payload
except Exception as e:
raise ValueError(f"Token verification failed: {str(e)}")
# ----------------------------------------------------------------------
# 登录端点
# ----------------------------------------------------------------------
@router.post("/login")
async def admin_login(request: Request):
"""管理员登录,返回 JWT Token"""
try:
data = await request.json()
except:
data = {}
admin_key = data.get("admin_key", "")
expire_hours = data.get("expire_hours", JWT_EXPIRE_HOURS)
if admin_key != ADMIN_KEY:
raise HTTPException(status_code=401, detail="Invalid admin key")
token = create_jwt_token(expire_hours)
return JSONResponse(content={
"success": True,
"token": token,
"expires_in": expire_hours * 3600
})
@router.get("/verify")
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证当前 Token 是否有效"""
if not credentials:
raise HTTPException(status_code=401, detail="No credentials provided")
token = credentials.credentials
try:
payload = verify_jwt_token(token)
return JSONResponse(content={
"valid": True,
"expires_at": payload.get("exp"),
"remaining_seconds": max(0, payload.get("exp", 0) - int(time.time()))
})
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
def verify_admin(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证 Admin 权限(支持 JWT 和直接 admin key"""
if not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
token = credentials.credentials
# 尝试 JWT 验证
try:
verify_jwt_token(token)
return True
except ValueError:
pass
# 尝试直接 admin key
if token == ADMIN_KEY:
return True
raise HTTPException(status_code=401, detail="Invalid credentials")