# -*- coding: utf-8 -*- """Admin 认证模块 - JWT 和登录相关""" import base64 import os import time import hashlib import hmac from fastapi import APIRouter, HTTPException, Request, Depends from fastapi.responses import JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from core.config import logger router = APIRouter() security = HTTPBearer(auto_error=False) # Admin Key 验证(默认值适用于开发/演示环境,生产环境请务必修改) ADMIN_KEY = os.getenv("DS2API_ADMIN_KEY", "your-admin-secret-key") # JWT 配置 JWT_SECRET = os.getenv("DS2API_JWT_SECRET", ADMIN_KEY or "ds2api-default-secret") JWT_EXPIRE_HOURS = int(os.getenv("DS2API_JWT_EXPIRE_HOURS", "24")) # ---------------------------------------------------------------------- # JWT 工具函数(轻量实现,无需额外依赖) # ---------------------------------------------------------------------- def _b64_encode(data: bytes) -> str: """Base64 URL 安全编码""" return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def _b64_decode(data: str) -> bytes: """Base64 URL 安全解码""" padding = 4 - len(data) % 4 if padding != 4: data += "=" * padding return base64.urlsafe_b64decode(data) def create_jwt_token(expire_hours: int = None) -> str: """创建 JWT Token""" import json if expire_hours is None: expire_hours = JWT_EXPIRE_HOURS header = {"alg": "HS256", "typ": "JWT"} payload = { "iat": int(time.time()), "exp": int(time.time()) + (expire_hours * 3600), "role": "admin" } header_b64 = _b64_encode(json.dumps(header, separators=(",", ":")).encode()) payload_b64 = _b64_encode(json.dumps(payload, separators=(",", ":")).encode()) message = f"{header_b64}.{payload_b64}" signature = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest() signature_b64 = _b64_encode(signature) return f"{message}.{signature_b64}" def verify_jwt_token(token: str) -> dict: """验证 JWT Token,返回 payload 或抛出异常""" import json try: parts = token.split(".") if len(parts) != 3: raise ValueError("Invalid token format") header_b64, payload_b64, signature_b64 = parts # 验证签名 message = f"{header_b64}.{payload_b64}" expected_sig = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest() actual_sig = _b64_decode(signature_b64) if not hmac.compare_digest(expected_sig, actual_sig): raise ValueError("Invalid signature") # 解析 payload payload = json.loads(_b64_decode(payload_b64)) # 验证过期时间 if payload.get("exp", 0) < time.time(): raise ValueError("Token expired") return payload except Exception as e: raise ValueError(f"Token verification failed: {str(e)}") # ---------------------------------------------------------------------- # 登录端点 # ---------------------------------------------------------------------- @router.post("/login") async def admin_login(request: Request): """管理员登录,返回 JWT Token""" try: data = await request.json() except: data = {} admin_key = data.get("admin_key", "") expire_hours = data.get("expire_hours", JWT_EXPIRE_HOURS) if admin_key != ADMIN_KEY: raise HTTPException(status_code=401, detail="Invalid admin key") token = create_jwt_token(expire_hours) return JSONResponse(content={ "success": True, "token": token, "expires_in": expire_hours * 3600 }) @router.get("/verify") async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """验证当前 Token 是否有效""" if not credentials: raise HTTPException(status_code=401, detail="No credentials provided") token = credentials.credentials try: payload = verify_jwt_token(token) return JSONResponse(content={ "valid": True, "expires_at": payload.get("exp"), "remaining_seconds": max(0, payload.get("exp", 0) - int(time.time())) }) except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) def verify_admin(credentials: HTTPAuthorizationCredentials = Depends(security)): """验证 Admin 权限(支持 JWT 和直接 admin key)""" if not credentials: raise HTTPException(status_code=401, detail="Authentication required") token = credentials.credentials # 尝试 JWT 验证 try: verify_jwt_token(token) return True except ValueError: pass # 尝试直接 admin key if token == ADMIN_KEY: return True raise HTTPException(status_code=401, detail="Invalid credentials")