feat: Enhance test_account_api to optionally send actual messages and parse streaming AI responses, including PoW challenge/answer, or only test session creation.

This commit is contained in:
CJACK
2026-02-01 03:58:46 +08:00
parent b90901d5a0
commit 31ca94204e

View File

@@ -444,9 +444,17 @@ async def validate_all_accounts(_: bool = Depends(verify_admin)):
# 账号 API 测试(实际发送请求)
# ----------------------------------------------------------------------
async def test_account_api(account: dict, model: str = "deepseek-chat", message: str = "") -> dict:
"""测试单个账号的 API 调用能力(通过创建会话验证)"""
"""测试单个账号的 API 调用能力
如果提供 message会发送实际请求并返回 AI 回复;
否则只快速测试创建会话。
"""
from curl_cffi import requests as cffi_requests
from core.deepseek import DEEPSEEK_CREATE_SESSION_URL, BASE_HEADERS
from core.deepseek import DEEPSEEK_CREATE_SESSION_URL, DEEPSEEK_COMPLETION_URL, BASE_HEADERS
from core.pow import get_pow_response, compute_pow_answer, WASM_PATH
from core.config import WASM_PATH
from core.models import get_model_config
import json
acc_id = get_account_identifier(account)
result = {
@@ -493,9 +501,145 @@ async def test_account_api(account: dict, model: str = "deepseek-chat", message:
account["token"] = ""
return result
session_id = session_data.get("data", {}).get("biz_data", {}).get("id")
# 如果没有消息,只测试会话创建
if not message.strip():
result["success"] = True
result["message"] = "API 测试成功(仅会话创建)"
result["response_time"] = round((time.time() - start_time) * 1000)
return result
# 获取 PoW
pow_url = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge"
pow_resp = cffi_requests.post(
pow_url,
headers=headers,
json={"target_path": "/api/v0/chat/completion"},
timeout=30,
impersonate="safari15_3",
)
pow_data = pow_resp.json()
if pow_data.get("code") != 0:
result["message"] = f"获取 PoW 失败: {pow_data.get('msg')}"
return result
# 计算 PoW 答案
import base64
challenge = pow_data["data"]["biz_data"]["challenge"]
try:
answer = compute_pow_answer(
challenge["algorithm"],
challenge["challenge"],
challenge["salt"],
challenge.get("difficulty", 144000),
challenge.get("expire_at", 1680000000),
challenge["signature"],
challenge["target_path"],
WASM_PATH,
)
except Exception as e:
result["message"] = f"PoW 计算失败: {str(e)}"
return result
pow_dict = {
"algorithm": challenge["algorithm"],
"challenge": challenge["challenge"],
"salt": challenge["salt"],
"answer": answer,
"signature": challenge["signature"],
"target_path": challenge["target_path"],
}
pow_str = json.dumps(pow_dict, separators=(",", ":"), ensure_ascii=False)
pow_header = base64.b64encode(pow_str.encode("utf-8")).decode("utf-8").rstrip()
# 准备请求参数
thinking_enabled, search_enabled = get_model_config(model)
if thinking_enabled is None:
thinking_enabled = False
search_enabled = False
# 发送实际请求
payload = {
"chat_session_id": session_id,
"prompt": f"<User>{message}",
"ref_file_ids": [],
"thinking_enabled": thinking_enabled,
"search_enabled": search_enabled,
}
completion_headers = {**headers, "x-ds-pow-response": pow_header}
completion_resp = cffi_requests.post(
DEEPSEEK_COMPLETION_URL,
headers=completion_headers,
json=payload,
impersonate="safari15_3",
timeout=60,
stream=True,
)
if completion_resp.status_code != 200:
result["message"] = f"请求失败: HTTP {completion_resp.status_code}"
return result
# 收集响应
thinking_parts = []
content_parts = []
for line in completion_resp.iter_lines():
if not line:
continue
try:
line_str = line.decode("utf-8")
except:
continue
if not line_str.startswith("data:"):
continue
data_str = line_str[5:].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
if "v" in chunk:
v_value = chunk["v"]
path = chunk.get("p", "")
# 跳过搜索状态
if path == "response/search_status":
continue
# 判断内容类型
ptype = "text"
if "thinking" in path:
ptype = "thinking"
if isinstance(v_value, str):
if v_value == "FINISHED":
break
# 收集内容
if ptype == "thinking":
thinking_parts.append(v_value)
else:
content_parts.append(v_value)
elif isinstance(v_value, list):
for item in v_value:
if item.get("p") == "status" and item.get("v") == "FINISHED":
break
except:
continue
completion_resp.close()
result["success"] = True
result["message"] = "API 测试成功"
result["response_time"] = round((time.time() - start_time) * 1000)
result["message"] = "".join(content_parts) or "(无回复内容)"
if thinking_parts:
result["thinking"] = "".join(thinking_parts)
except Exception as e:
result["message"] = f"测试失败: {str(e)}"