feat: Implement admin routes for DeepSeek account validation and API testing, refactoring admin functionality into a package.

This commit is contained in:
CJACK
2026-02-01 04:02:47 +08:00
parent 31ca94204e
commit 70cd51309f
7 changed files with 1051 additions and 1047 deletions

File diff suppressed because it is too large Load Diff

20
routes/admin/__init__.py Normal file
View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
"""Admin 路由模块 - 合并所有子模块路由"""
from fastapi import APIRouter
from .auth import router as auth_router, verify_admin, ADMIN_KEY
from .config import router as config_router
from .accounts import router as accounts_router
from .vercel import router as vercel_router
# 创建主路由
router = APIRouter(prefix="/admin", tags=["admin"])
# 包含所有子路由
router.include_router(auth_router)
router.include_router(config_router)
router.include_router(accounts_router)
router.include_router(vercel_router)
# 导出常用依赖
__all__ = ["router", "verify_admin", "ADMIN_KEY"]

404
routes/admin/accounts.py Normal file
View File

@@ -0,0 +1,404 @@
# -*- coding: utf-8 -*-
"""Admin 账号管理模块 - 账号验证和测试"""
import asyncio
import json
import base64
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from core.config import CONFIG, save_config, logger, WASM_PATH
from core.auth import init_account_queue, get_account_identifier
from core.deepseek import (
login_deepseek_via_account,
DEEPSEEK_CREATE_SESSION_URL,
DEEPSEEK_COMPLETION_URL,
BASE_HEADERS,
)
from core.pow import compute_pow_answer
from core.models import get_model_config
from .auth import verify_admin
router = APIRouter()
# ----------------------------------------------------------------------
# 账号验证
# ----------------------------------------------------------------------
async def validate_single_account(account: dict) -> dict:
"""验证单个账号的有效性"""
acc_id = get_account_identifier(account)
result = {
"account": acc_id,
"valid": False,
"has_token": bool(account.get("token", "").strip()),
"message": "",
}
try:
if result["has_token"]:
result["valid"] = True
result["message"] = "已有有效 token"
else:
try:
login_deepseek_via_account(account)
result["valid"] = True
result["has_token"] = True
result["message"] = "登录成功"
except Exception as e:
result["valid"] = False
result["message"] = f"登录失败: {str(e)}"
except Exception as e:
result["message"] = f"验证出错: {str(e)}"
return result
@router.post("/accounts/validate")
async def validate_account(request: Request, _: bool = Depends(verify_admin)):
"""验证单个账号"""
data = await request.json()
identifier = data.get("identifier", "").strip()
if not identifier:
raise HTTPException(status_code=400, detail="需要账号标识email 或 mobile")
account = None
for acc in CONFIG.get("accounts", []):
if acc.get("email") == identifier or acc.get("mobile") == identifier:
account = acc
break
if not account:
raise HTTPException(status_code=404, detail="账号不存在")
result = await validate_single_account(account)
if result["valid"] and result["has_token"]:
save_config(CONFIG)
return JSONResponse(content=result)
@router.post("/accounts/validate-all")
async def validate_all_accounts(_: bool = Depends(verify_admin)):
"""批量验证所有账号"""
accounts = CONFIG.get("accounts", [])
if not accounts:
return JSONResponse(content={
"total": 0, "valid": 0, "invalid": 0, "results": [],
})
results = []
valid_count = 0
for acc in accounts:
result = await validate_single_account(acc)
results.append(result)
if result["valid"]:
valid_count += 1
await asyncio.sleep(0.5)
save_config(CONFIG)
return JSONResponse(content={
"total": len(accounts),
"valid": valid_count,
"invalid": len(accounts) - valid_count,
"results": results,
})
# ----------------------------------------------------------------------
# 账号 API 测试
# ----------------------------------------------------------------------
async def test_account_api(account: dict, model: str = "deepseek-chat", message: str = "") -> dict:
"""测试单个账号的 API 调用能力
如果提供 message会发送实际请求并返回 AI 回复;
否则只快速测试创建会话。
"""
from curl_cffi import requests as cffi_requests
import time
acc_id = get_account_identifier(account)
result = {
"account": acc_id,
"success": False,
"response_time": 0,
"message": "",
"model": model,
}
start_time = time.time()
try:
token = account.get("token", "").strip()
if not token:
try:
login_deepseek_via_account(account)
token = account.get("token", "")
except Exception as e:
result["message"] = f"登录失败: {str(e)}"
return result
headers = {**BASE_HEADERS, "authorization": f"Bearer {token}"}
session_resp = cffi_requests.post(
DEEPSEEK_CREATE_SESSION_URL,
headers=headers,
json={"agent": "chat"},
impersonate="safari15_3",
timeout=15,
)
if session_resp.status_code != 200:
result["message"] = f"创建会话失败: HTTP {session_resp.status_code}"
return result
session_data = session_resp.json()
if session_data.get("code") != 0:
result["message"] = f"创建会话失败: {session_data.get('msg', 'Unknown error')}"
account["token"] = ""
return result
session_id = session_data.get("data", {}).get("biz_data", {}).get("id")
if not message.strip():
result["success"] = True
result["message"] = "API 测试成功(仅会话创建)"
result["response_time"] = round((time.time() - start_time) * 1000)
return result
pow_url = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge"
pow_resp = cffi_requests.post(
pow_url,
headers=headers,
json={"target_path": "/api/v0/chat/completion"},
timeout=30,
impersonate="safari15_3",
)
pow_data = pow_resp.json()
if pow_data.get("code") != 0:
result["message"] = f"获取 PoW 失败: {pow_data.get('msg')}"
return result
challenge = pow_data["data"]["biz_data"]["challenge"]
try:
answer = compute_pow_answer(
challenge["algorithm"],
challenge["challenge"],
challenge["salt"],
challenge.get("difficulty", 144000),
challenge.get("expire_at", 1680000000),
challenge["signature"],
challenge["target_path"],
WASM_PATH,
)
except Exception as e:
result["message"] = f"PoW 计算失败: {str(e)}"
return result
pow_dict = {
"algorithm": challenge["algorithm"],
"challenge": challenge["challenge"],
"salt": challenge["salt"],
"answer": answer,
"signature": challenge["signature"],
"target_path": challenge["target_path"],
}
pow_str = json.dumps(pow_dict, separators=(",", ":"), ensure_ascii=False)
pow_header = base64.b64encode(pow_str.encode("utf-8")).decode("utf-8").rstrip()
thinking_enabled, search_enabled = get_model_config(model)
if thinking_enabled is None:
thinking_enabled = False
search_enabled = False
payload = {
"chat_session_id": session_id,
"prompt": f"<User>{message}",
"ref_file_ids": [],
"thinking_enabled": thinking_enabled,
"search_enabled": search_enabled,
}
completion_headers = {**headers, "x-ds-pow-response": pow_header}
completion_resp = cffi_requests.post(
DEEPSEEK_COMPLETION_URL,
headers=completion_headers,
json=payload,
impersonate="safari15_4",
timeout=60,
stream=True,
)
if completion_resp.status_code != 200:
result["message"] = f"请求失败: HTTP {completion_resp.status_code}"
return result
thinking_parts = []
content_parts = []
for line in completion_resp.iter_lines():
if not line:
continue
try:
line_str = line.decode("utf-8")
except:
continue
if not line_str.startswith("data:"):
continue
data_str = line_str[5:].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
if "v" in chunk:
v_value = chunk["v"]
path = chunk.get("p", "")
if path == "response/search_status":
continue
ptype = "thinking" if "thinking" in path else "text"
if isinstance(v_value, str):
if v_value == "FINISHED":
break
if ptype == "thinking":
thinking_parts.append(v_value)
else:
content_parts.append(v_value)
elif isinstance(v_value, list):
for item in v_value:
if item.get("p") == "status" and item.get("v") == "FINISHED":
break
except:
continue
completion_resp.close()
result["success"] = True
result["response_time"] = round((time.time() - start_time) * 1000)
result["message"] = "".join(content_parts) or "(无回复内容)"
if thinking_parts:
result["thinking"] = "".join(thinking_parts)
except Exception as e:
result["message"] = f"测试失败: {str(e)}"
return result
@router.post("/accounts/test")
async def test_single_account(request: Request, _: bool = Depends(verify_admin)):
"""测试单个账号的 API 调用"""
data = await request.json()
identifier = data.get("identifier", "")
model = data.get("model", "deepseek-chat")
message = data.get("message", "")
if not identifier:
raise HTTPException(status_code=400, detail="需要账号标识email 或 mobile")
account = None
for acc in CONFIG.get("accounts", []):
if acc.get("email") == identifier or acc.get("mobile") == identifier:
account = acc
break
if not account:
raise HTTPException(status_code=404, detail="账号不存在")
result = await test_account_api(account, model, message)
save_config(CONFIG)
return JSONResponse(content=result)
@router.post("/accounts/test-all")
async def test_all_accounts(request: Request, _: bool = Depends(verify_admin)):
"""批量测试所有账号的 API 调用"""
data = await request.json()
model = data.get("model", "deepseek-chat")
accounts = CONFIG.get("accounts", [])
if not accounts:
return JSONResponse(content={
"total": 0, "success": 0, "failed": 0, "results": [],
})
results = []
success_count = 0
for acc in accounts:
result = await test_account_api(acc, model)
results.append(result)
if result["success"]:
success_count += 1
await asyncio.sleep(1)
save_config(CONFIG)
return JSONResponse(content={
"total": len(accounts),
"success": success_count,
"failed": len(accounts) - success_count,
"results": results,
})
# ----------------------------------------------------------------------
# 批量导入
# ----------------------------------------------------------------------
@router.post("/import")
async def batch_import(request: Request, _: bool = Depends(verify_admin)):
"""批量导入 keys 和 accounts"""
try:
data = await request.json()
imported_keys = 0
imported_accounts = 0
if "keys" in data:
for key in data["keys"]:
if key not in CONFIG.get("keys", []):
if "keys" not in CONFIG:
CONFIG["keys"] = []
CONFIG["keys"].append(key)
imported_keys += 1
if "accounts" in data:
existing_ids = set()
for acc in CONFIG.get("accounts", []):
existing_ids.add(acc.get("email", ""))
existing_ids.add(acc.get("mobile", ""))
for acc in data["accounts"]:
acc_id = acc.get("email", "") or acc.get("mobile", "")
if acc_id and acc_id not in existing_ids:
if "accounts" not in CONFIG:
CONFIG["accounts"] = []
CONFIG["accounts"].append(acc)
existing_ids.add(acc_id)
imported_accounts += 1
init_account_queue()
save_config(CONFIG)
return JSONResponse(content={
"success": True,
"imported_keys": imported_keys,
"imported_accounts": imported_accounts,
})
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="无效的 JSON 格式")
except Exception as e:
logger.error(f"[batch_import] 错误: {e}")
raise HTTPException(status_code=500, detail=str(e))

170
routes/admin/auth.py Normal file
View File

@@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
"""Admin 认证模块 - JWT 和登录相关"""
import base64
import os
import time
import hashlib
import hmac
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.config import logger
router = APIRouter()
security = HTTPBearer(auto_error=False)
# Admin Key 验证
ADMIN_KEY = os.getenv("DS2API_ADMIN_KEY", "")
# JWT 配置
JWT_SECRET = os.getenv("DS2API_JWT_SECRET", ADMIN_KEY or "ds2api-default-secret")
JWT_EXPIRE_HOURS = int(os.getenv("DS2API_JWT_EXPIRE_HOURS", "24"))
# ----------------------------------------------------------------------
# JWT 工具函数(轻量实现,无需额外依赖)
# ----------------------------------------------------------------------
def _b64_encode(data: bytes) -> str:
"""Base64 URL 安全编码"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64_decode(data: str) -> bytes:
"""Base64 URL 安全解码"""
padding = 4 - len(data) % 4
if padding != 4:
data += "=" * padding
return base64.urlsafe_b64decode(data)
def create_jwt_token(expire_hours: int = None) -> str:
"""创建 JWT Token"""
import json
if expire_hours is None:
expire_hours = JWT_EXPIRE_HOURS
header = {"alg": "HS256", "typ": "JWT"}
payload = {
"iat": int(time.time()),
"exp": int(time.time()) + (expire_hours * 3600),
"role": "admin"
}
header_b64 = _b64_encode(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = _b64_encode(json.dumps(payload, separators=(",", ":")).encode())
message = f"{header_b64}.{payload_b64}"
signature = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
signature_b64 = _b64_encode(signature)
return f"{message}.{signature_b64}"
def verify_jwt_token(token: str) -> dict:
"""验证 JWT Token返回 payload 或抛出异常"""
import json
try:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid token format")
header_b64, payload_b64, signature_b64 = parts
# 验证签名
message = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
actual_sig = _b64_decode(signature_b64)
if not hmac.compare_digest(expected_sig, actual_sig):
raise ValueError("Invalid signature")
# 解析 payload
payload = json.loads(_b64_decode(payload_b64))
# 验证过期时间
if payload.get("exp", 0) < time.time():
raise ValueError("Token expired")
return payload
except Exception as e:
raise ValueError(f"Token verification failed: {str(e)}")
# ----------------------------------------------------------------------
# 登录端点
# ----------------------------------------------------------------------
@router.post("/login")
async def admin_login(request: Request):
"""管理员登录,返回 JWT Token"""
try:
data = await request.json()
except:
data = {}
admin_key = data.get("admin_key", "")
expire_hours = data.get("expire_hours", JWT_EXPIRE_HOURS)
# 开发模式:如果没有配置 ADMIN_KEY允许任意登录
if not ADMIN_KEY:
logger.warning("[admin_login] 开发模式:未配置 ADMIN_KEY允许任意登录")
token = create_jwt_token(expire_hours)
return JSONResponse(content={
"success": True,
"token": token,
"expires_in": expire_hours * 3600,
"warning": "开发模式 - 未配置 ADMIN_KEY"
})
if admin_key != ADMIN_KEY:
raise HTTPException(status_code=401, detail="Invalid admin key")
token = create_jwt_token(expire_hours)
return JSONResponse(content={
"success": True,
"token": token,
"expires_in": expire_hours * 3600
})
@router.get("/verify")
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证当前 Token 是否有效"""
if not credentials:
raise HTTPException(status_code=401, detail="No credentials provided")
token = credentials.credentials
try:
payload = verify_jwt_token(token)
return JSONResponse(content={
"valid": True,
"expires_at": payload.get("exp"),
"remaining_seconds": max(0, payload.get("exp", 0) - int(time.time()))
})
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
def verify_admin(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证 Admin 权限(支持 JWT 和直接 admin key"""
# 开发模式:如果没有配置 ADMIN_KEY允许所有操作
if not ADMIN_KEY:
return True
if not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
token = credentials.credentials
# 尝试 JWT 验证
try:
verify_jwt_token(token)
return True
except ValueError:
pass
# 尝试直接 admin key
if token == ADMIN_KEY:
return True
raise HTTPException(status_code=401, detail="Invalid credentials")

183
routes/admin/config.py Normal file
View File

@@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
"""Admin 配置管理模块 - 配置、API Keys、账号管理"""
import os
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from core.config import CONFIG, save_config, logger
from core.auth import init_account_queue, get_queue_status, get_account_identifier
from core.deepseek import login_deepseek_via_account
from .auth import verify_admin
router = APIRouter()
# Vercel 预配置
VERCEL_TOKEN = os.getenv("VERCEL_TOKEN", "")
VERCEL_PROJECT_ID = os.getenv("VERCEL_PROJECT_ID", "")
VERCEL_TEAM_ID = os.getenv("VERCEL_TEAM_ID", "")
# ----------------------------------------------------------------------
# Vercel 预配置信息
# ----------------------------------------------------------------------
@router.get("/vercel/config")
async def get_vercel_config(_: bool = Depends(verify_admin)):
"""获取预配置的 Vercel 信息(脱敏)"""
return JSONResponse(content={
"has_token": bool(VERCEL_TOKEN),
"project_id": VERCEL_PROJECT_ID,
"team_id": VERCEL_TEAM_ID or None,
})
# ----------------------------------------------------------------------
# 配置管理
# ----------------------------------------------------------------------
@router.get("/config")
async def get_config(_: bool = Depends(verify_admin)):
"""获取当前配置(密码脱敏)"""
safe_config = {
"keys": CONFIG.get("keys", []),
"accounts": [],
"claude_mapping": CONFIG.get("claude_mapping", {}),
}
for acc in CONFIG.get("accounts", []):
safe_acc = {
"email": acc.get("email", ""),
"mobile": acc.get("mobile", ""),
"has_password": bool(acc.get("password")),
"has_token": bool(acc.get("token")),
"token_preview": acc.get("token", "")[:20] + "..." if acc.get("token") else "",
}
safe_config["accounts"].append(safe_acc)
return JSONResponse(content=safe_config)
@router.post("/config")
async def update_config(request: Request, _: bool = Depends(verify_admin)):
"""更新完整配置"""
data = await request.json()
if "keys" in data:
CONFIG["keys"] = data["keys"]
if "accounts" in data:
# 保留原有密码和 token
existing = {get_account_identifier(a): a for a in CONFIG.get("accounts", [])}
for acc in data["accounts"]:
acc_id = get_account_identifier(acc)
if acc_id in existing:
if not acc.get("password"):
acc["password"] = existing[acc_id].get("password", "")
if not acc.get("token"):
acc["token"] = existing[acc_id].get("token", "")
CONFIG["accounts"] = data["accounts"]
init_account_queue()
if "claude_mapping" in data:
CONFIG["claude_mapping"] = data["claude_mapping"]
save_config(CONFIG)
return JSONResponse(content={"success": True, "message": "配置已更新"})
# ----------------------------------------------------------------------
# API Keys 管理
# ----------------------------------------------------------------------
@router.post("/keys")
async def add_key(request: Request, _: bool = Depends(verify_admin)):
"""添加 API Key"""
data = await request.json()
key = data.get("key", "").strip()
if not key:
raise HTTPException(status_code=400, detail="Key 不能为空")
if key in CONFIG.get("keys", []):
raise HTTPException(status_code=400, detail="Key 已存在")
if "keys" not in CONFIG:
CONFIG["keys"] = []
CONFIG["keys"].append(key)
save_config(CONFIG)
return JSONResponse(content={"success": True, "total_keys": len(CONFIG["keys"])})
@router.delete("/keys/{key}")
async def delete_key(key: str, _: bool = Depends(verify_admin)):
"""删除 API Key"""
if key not in CONFIG.get("keys", []):
raise HTTPException(status_code=404, detail="Key 不存在")
CONFIG["keys"].remove(key)
save_config(CONFIG)
return JSONResponse(content={"success": True, "total_keys": len(CONFIG["keys"])})
# ----------------------------------------------------------------------
# 账号管理
# ----------------------------------------------------------------------
@router.post("/accounts")
async def add_account(request: Request, _: bool = Depends(verify_admin)):
"""添加账号"""
data = await request.json()
email = data.get("email", "").strip()
mobile = data.get("mobile", "").strip()
password = data.get("password", "").strip()
token = data.get("token", "").strip()
if not email and not mobile:
raise HTTPException(status_code=400, detail="需要 email 或 mobile")
# 检查是否已存在
for acc in CONFIG.get("accounts", []):
if email and acc.get("email") == email:
raise HTTPException(status_code=400, detail="邮箱已存在")
if mobile and acc.get("mobile") == mobile:
raise HTTPException(status_code=400, detail="手机号已存在")
new_account = {}
if email:
new_account["email"] = email
if mobile:
new_account["mobile"] = mobile
if password:
new_account["password"] = password
if token:
new_account["token"] = token
if "accounts" not in CONFIG:
CONFIG["accounts"] = []
CONFIG["accounts"].append(new_account)
init_account_queue()
save_config(CONFIG)
return JSONResponse(content={"success": True, "total_accounts": len(CONFIG["accounts"])})
@router.delete("/accounts/{identifier}")
async def delete_account(identifier: str, _: bool = Depends(verify_admin)):
"""删除账号(通过 email 或 mobile"""
accounts = CONFIG.get("accounts", [])
for i, acc in enumerate(accounts):
if acc.get("email") == identifier or acc.get("mobile") == identifier:
accounts.pop(i)
init_account_queue()
save_config(CONFIG)
return JSONResponse(content={"success": True, "total_accounts": len(accounts)})
raise HTTPException(status_code=404, detail="账号不存在")
# ----------------------------------------------------------------------
# 账号队列状态
# ----------------------------------------------------------------------
@router.get("/queue/status")
async def get_account_queue_status(_: bool = Depends(verify_admin)):
"""获取账号轮询队列状态"""
return JSONResponse(content=get_queue_status())

274
routes/admin/vercel.py Normal file
View File

@@ -0,0 +1,274 @@
# -*- coding: utf-8 -*-
"""Admin Vercel 模块 - Vercel 同步和部署"""
import asyncio
import base64
import json
import os
import httpx
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from core.config import CONFIG, save_config, logger
from core.auth import get_account_identifier, init_account_queue
from core.deepseek import login_deepseek_via_account
from .auth import verify_admin
router = APIRouter()
# Vercel 预配置
VERCEL_TOKEN = os.getenv("VERCEL_TOKEN", "")
VERCEL_PROJECT_ID = os.getenv("VERCEL_PROJECT_ID", "")
VERCEL_TEAM_ID = os.getenv("VERCEL_TEAM_ID", "")
# ----------------------------------------------------------------------
# API 测试(通过本地 API
# ----------------------------------------------------------------------
@router.post("/test")
async def test_api(request: Request, _: bool = Depends(verify_admin)):
"""测试 API 调用"""
try:
data = await request.json()
model = data.get("model", "deepseek-chat")
message = data.get("message", "你好")
api_key = data.get("api_key", "")
if not api_key:
keys = CONFIG.get("keys", [])
if not keys:
raise HTTPException(status_code=400, detail="没有可用的 API Key")
api_key = keys[0]
host = request.headers.get("host", "localhost:5001")
scheme = "https" if "vercel" in host.lower() else "http"
base_url = f"{scheme}://{host}"
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{base_url}/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": model,
"messages": [{"role": "user", "content": message}],
"stream": False,
},
)
return JSONResponse(content={
"success": response.status_code == 200,
"status_code": response.status_code,
"response": response.json() if response.status_code == 200 else response.text,
})
except Exception as e:
logger.error(f"[test_api] 错误: {e}")
return JSONResponse(content={"success": False, "error": str(e)})
# ----------------------------------------------------------------------
# Vercel 同步
# ----------------------------------------------------------------------
@router.post("/vercel/sync")
async def sync_to_vercel(request: Request, _: bool = Depends(verify_admin)):
"""同步配置到 Vercel 并触发重新部署"""
try:
data = await request.json()
vercel_token = data.get("vercel_token", "")
project_id = data.get("project_id", "")
team_id = data.get("team_id", "")
auto_validate = data.get("auto_validate", True)
save_vercel_credentials = data.get("save_credentials", True)
use_preconfig = vercel_token == "__USE_PRECONFIG__" or not vercel_token
if use_preconfig:
vercel_token = VERCEL_TOKEN
if not project_id:
project_id = VERCEL_PROJECT_ID
if not team_id:
team_id = VERCEL_TEAM_ID
if not vercel_token or not project_id:
raise HTTPException(status_code=400, detail="需要 Vercel Token 和 Project ID")
# 自动验证账号
validated_count = 0
failed_accounts = []
if auto_validate:
accounts = CONFIG.get("accounts", [])
for acc in accounts:
acc_id = get_account_identifier(acc)
if not acc.get("token", "").strip():
try:
logger.info(f"[sync_to_vercel] 自动验证账号: {acc_id}")
login_deepseek_via_account(acc)
validated_count += 1
except Exception as e:
logger.warning(f"[sync_to_vercel] 账号 {acc_id} 验证失败: {e}")
failed_accounts.append(acc_id)
await asyncio.sleep(0.5)
config_json = json.dumps(CONFIG, ensure_ascii=False, separators=(",", ":"))
config_b64 = base64.b64encode(config_json.encode("utf-8")).decode("utf-8")
headers = {"Authorization": f"Bearer {vercel_token}"}
base_url = "https://api.vercel.com"
async with httpx.AsyncClient(timeout=30.0) as client:
params = {"teamId": team_id} if team_id else {}
env_resp = await client.get(
f"{base_url}/v9/projects/{project_id}/env",
headers=headers,
params=params,
)
if env_resp.status_code != 200:
raise HTTPException(status_code=env_resp.status_code, detail=f"获取环境变量失败: {env_resp.text}")
env_vars = env_resp.json().get("envs", [])
existing_env = None
for env in env_vars:
if env.get("key") == "DS2API_CONFIG_JSON":
existing_env = env
break
if existing_env:
env_id = existing_env["id"]
update_resp = await client.patch(
f"{base_url}/v9/projects/{project_id}/env/{env_id}",
headers=headers,
params=params,
json={"value": config_b64},
)
if update_resp.status_code not in [200, 201]:
raise HTTPException(status_code=update_resp.status_code, detail=f"更新环境变量失败: {update_resp.text}")
else:
create_resp = await client.post(
f"{base_url}/v10/projects/{project_id}/env",
headers=headers,
params=params,
json={
"key": "DS2API_CONFIG_JSON",
"value": config_b64,
"type": "encrypted",
"target": ["production", "preview"],
},
)
if create_resp.status_code not in [200, 201]:
raise HTTPException(status_code=create_resp.status_code, detail=f"创建环境变量失败: {create_resp.text}")
# 保存 Vercel 凭证
saved_credentials = []
if save_vercel_credentials and not use_preconfig:
creds_to_save = [
("VERCEL_TOKEN", vercel_token),
("VERCEL_PROJECT_ID", project_id),
]
if team_id:
creds_to_save.append(("VERCEL_TEAM_ID", team_id))
for key, value in creds_to_save:
existing = None
for env in env_vars:
if env.get("key") == key:
existing = env
break
if existing:
upd_resp = await client.patch(
f"{base_url}/v9/projects/{project_id}/env/{existing['id']}",
headers=headers,
params=params,
json={"value": value},
)
if upd_resp.status_code in [200, 201]:
saved_credentials.append(key)
else:
crt_resp = await client.post(
f"{base_url}/v10/projects/{project_id}/env",
headers=headers,
params=params,
json={
"key": key,
"value": value,
"type": "encrypted",
"target": ["production", "preview"],
},
)
if crt_resp.status_code in [200, 201]:
saved_credentials.append(key)
# 触发重新部署
project_resp = await client.get(
f"{base_url}/v9/projects/{project_id}",
headers=headers,
params=params,
)
if project_resp.status_code == 200:
project_data = project_resp.json()
repo = project_data.get("link", {})
if repo.get("type") == "github":
deploy_resp = await client.post(
f"{base_url}/v13/deployments",
headers=headers,
params=params,
json={
"name": project_id,
"project": project_id,
"target": "production",
"gitSource": {
"type": "github",
"repoId": repo.get("repoId"),
"ref": repo.get("productionBranch", "main"),
},
},
)
if deploy_resp.status_code in [200, 201]:
deploy_data = deploy_resp.json()
result = {
"success": True,
"message": "配置已同步,正在重新部署...",
"deployment_url": deploy_data.get("url"),
"validated_accounts": validated_count,
}
if failed_accounts:
result["failed_accounts"] = failed_accounts
if saved_credentials:
result["saved_credentials"] = saved_credentials
return JSONResponse(content=result)
result = {
"success": True,
"message": "配置已同步到 Vercel请手动触发重新部署",
"manual_deploy_required": True,
"validated_accounts": validated_count,
}
if failed_accounts:
result["failed_accounts"] = failed_accounts
if saved_credentials:
result["saved_credentials"] = saved_credentials
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"[sync_to_vercel] 错误: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ----------------------------------------------------------------------
# 导出配置
# ----------------------------------------------------------------------
@router.get("/export")
async def export_config(_: bool = Depends(verify_admin)):
"""导出完整配置JSON 和 Base64"""
config_json = json.dumps(CONFIG, ensure_ascii=False, separators=(",", ":"))
config_b64 = base64.b64encode(config_json.encode("utf-8")).decode("utf-8")
return JSONResponse(content={
"json": config_json,
"base64": config_b64,
})

View File

@@ -1,10 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>🚀 服务已启动</title>
</head>
<body>
<p>deepseek-to-api已启动</p>
</body>
</html>