Remove the ds2api application and update deployment and contributing documentation.

This commit is contained in:
CJACK
2026-02-15 20:08:21 +08:00
parent a50e2ef5cd
commit bd788a12b1
35 changed files with 113 additions and 7466 deletions

View File

@@ -1,138 +0,0 @@
# DS2API 测试文档
## 测试文件结构
```
tests/
├── __init__.py # 测试模块初始化
├── test_unit.py # 单元测试(不依赖网络)
├── test_all.py # API 集成测试
├── test_accounts.py # 账号池测试
└── run_tests.sh # 测试运行脚本
```
## 快速开始
### 运行所有测试
```bash
# 使用脚本
./tests/run_tests.sh all
# 或直接运行
python3 tests/test_unit.py # 单元测试
python3 tests/test_all.py # API 测试
```
### 运行单元测试
```bash
python3 tests/test_unit.py
```
测试内容:
- 配置加载
- 消息处理(`messages_prepare`
- WASM 缓存
- 模型配置获取
- 正则表达式模式
- 流式响应解析
- **工具调用解析**`parse_tool_calls`
- **Token 估算**
### 运行 API 集成测试
```bash
# 完整测试
python3 tests/test_all.py
# 快速测试(跳过耗时测试)
python3 tests/test_all.py --quick
# 指定端点
python3 tests/test_all.py --endpoint http://your-server.com
# 详细输出
python3 tests/test_all.py --verbose
```
测试覆盖:
| 类别 | 测试项 |
|-----|--------|
| 基础 | 服务健康检查 |
| OpenAI | 模型列表、非流式对话、流式对话、无效模型处理、认证错误、Reasoner 模式 |
| Claude | 模型列表、非流式消息、流式消息、Token 计数 |
| 高级 | 多轮对话、长输入处理 |
| **工具调用** | OpenAI 工具调用(流式/非流式、Claude 工具调用 |
| **搜索模式** | OpenAI 搜索模式 |
### 运行账号测试
```bash
# 测试所有账号登录
python3 tests/test_accounts.py --login
# 测试账号轮换
python3 tests/test_accounts.py --rotation
# 运行所有
python3 tests/test_accounts.py --all
```
## 配置
测试使用 `config.json` 中的配置:
```json
{
"keys": ["test-api-key-001"],
"accounts": [
{"email": "xxx@gmail.com", "password": "xxx", "token": ""}
]
}
```
## 预期输出
### 单元测试
```
Ran 32 tests in 9.0s
OK
```
### API 测试
```
📊 测试报告
总计: 18 个测试
✅ 通过: 18
❌ 失败: 0
⏱️ 耗时: ~60s
📈 通过率: 100.0%
```
## 故障排除
### 服务未运行
```
⚠️ 服务未运行,跳过其他测试
```
解决:先启动服务 `python dev.py`
### 认证失败
```
❌ 失败: 状态码: 401
```
解决:检查 `config.json` 中的 API key 和账号配置
### 流式测试超时
可能是 DeepSeek API 响应慢,可以尝试:
- 使用 `--quick` 模式
- 增加测试超时时间

View File

@@ -1 +0,0 @@
# DS2API 测试模块

View File

@@ -1,111 +0,0 @@
#!/bin/bash
# DS2API 测试运行器
set -e
cd "$(dirname "$0")/.."
echo "=================================================="
echo " 🧪 DS2API 测试套件"
echo "=================================================="
echo ""
# 颜色
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m'
# 检查服务是否运行
check_service() {
echo -e "${YELLOW}检查服务状态...${NC}"
if curl -s http://localhost:5001/ > /dev/null 2>&1; then
echo -e "${GREEN}✅ 服务运行中${NC}"
return 0
else
echo -e "${RED}❌ 服务未运行${NC}"
echo "请先启动服务: python dev.py"
return 1
fi
}
# 运行单元测试
run_unit_tests() {
echo ""
echo "=================================================="
echo " 📋 单元测试"
echo "=================================================="
python3 -m pytest tests/test_unit.py -v --tb=short 2>/dev/null || python3 tests/test_unit.py
}
# 运行 API 测试
run_api_tests() {
echo ""
echo "=================================================="
echo " 🌐 API 集成测试"
echo "=================================================="
python3 tests/test_all.py "$@"
}
# 运行账号测试
run_account_tests() {
echo ""
echo "=================================================="
echo " 🔑 账号测试"
echo "=================================================="
python3 tests/test_accounts.py --all
}
# 显示帮助
show_help() {
echo "用法: $0 [选项]"
echo ""
echo "选项:"
echo " unit 只运行单元测试"
echo " api 只运行 API 测试"
echo " api --quick 快速 API 测试"
echo " accounts 只运行账号测试"
echo " all 运行所有测试"
echo " help 显示此帮助"
echo ""
echo "示例:"
echo " $0 unit"
echo " $0 api --quick"
echo " $0 all"
}
# 主逻辑
case "${1:-all}" in
unit)
run_unit_tests
;;
api)
if check_service; then
shift
run_api_tests "$@"
fi
;;
accounts)
run_account_tests
;;
all)
run_unit_tests
echo ""
if check_service; then
run_api_tests --quick
fi
;;
help|--help|-h)
show_help
;;
*)
echo "未知选项: $1"
show_help
exit 1
;;
esac
echo ""
echo "=================================================="
echo " ✨ 测试完成"
echo "=================================================="

View File

@@ -1,189 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DS2API 账号池测试
测试账号登录和轮换功能
"""
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass
from typing import Optional
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@dataclass
class AccountTestResult:
email: str
login_success: bool
has_token: bool
token_preview: str
error: Optional[str] = None
def test_account_login(account: dict) -> AccountTestResult:
"""测试单个账号登录"""
from core.deepseek import login_deepseek_via_account
from core.config import logger
email = account.get("email", account.get("mobile", "unknown"))
print(f"\n📧 测试账号: {email}")
print("-" * 40)
try:
login_deepseek_via_account(account)
token = account.get("token", "")
if token:
print(f"✅ 登录成功")
print(f" Token: {token[:30]}...{token[-10:]}")
return AccountTestResult(
email=email,
login_success=True,
has_token=True,
token_preview=f"{token[:30]}...{token[-10:]}"
)
else:
print(f"⚠️ 登录完成但无 Token")
return AccountTestResult(
email=email,
login_success=True,
has_token=False,
token_preview=""
)
except Exception as e:
print(f"❌ 登录失败: {e}")
return AccountTestResult(
email=email,
login_success=False,
has_token=False,
token_preview="",
error=str(e)
)
def test_account_pool():
"""测试整个账号池"""
from core.config import CONFIG, logger
accounts = CONFIG.get("accounts", [])
if not accounts:
print("⚠️ 配置中没有账号")
return
print("\n" + "=" * 60)
print(" 🔑 DS2API 账号池测试")
print("=" * 60)
print(f"{len(accounts)} 个账号\n")
results = []
for account in accounts:
result = test_account_login(account)
results.append(result)
time.sleep(1) # 避免请求过快
# 打印汇总
print("\n" + "=" * 60)
print(" 📊 测试结果汇总")
print("=" * 60)
success_count = sum(1 for r in results if r.login_success)
token_count = sum(1 for r in results if r.has_token)
print(f"\n总计: {len(results)} 个账号")
print(f"✅ 登录成功: {success_count}")
print(f"🔑 获取Token: {token_count}")
print(f"❌ 登录失败: {len(results) - success_count}")
if any(not r.login_success for r in results):
print("\n失败的账号:")
for r in results:
if not r.login_success:
print(f"{r.email}: {r.error}")
print("\n" + "=" * 60)
# 保存更新后的配置(如果获取了新 token
if token_count > 0:
print("\n💾 更新配置文件中的 token...")
from core.config import save_config
save_config(CONFIG)
print("✅ 配置已保存")
return results
def test_account_rotation():
"""测试账号轮换功能"""
from core.auth import choose_account, release_account, account_queue
from core.config import CONFIG
accounts = CONFIG.get("accounts", [])
if len(accounts) < 2:
print("⚠️ 需要至少 2 个账号来测试轮换")
return
print("\n" + "=" * 60)
print(" 🔄 账号轮换测试")
print("=" * 60)
# 测试选择账号
print("\n选择账号 (连续3次):")
selected = []
for i in range(3):
account = choose_account()
if account:
email = account.get("email", account.get("mobile", "unknown"))
selected.append(email)
print(f"{i+1}次: {email}")
else:
print(f"{i+1}次: 无可用账号")
# 释放账号
print("\n释放账号:")
for i, email in enumerate(selected):
for acc in accounts:
if acc.get("email") == email:
release_account(acc)
print(f" 已释放: {email}")
break
# 再次选择
print("\n释放后再选择:")
for i in range(2):
account = choose_account()
if account:
email = account.get("email", account.get("mobile", "unknown"))
print(f"{i+1}次: {email}")
release_account(account)
print("\n✅ 账号轮换功能正常")
def main():
parser = argparse.ArgumentParser(description="DS2API 账号测试")
parser.add_argument("--login", action="store_true", help="测试账号登录")
parser.add_argument("--rotation", action="store_true", help="测试账号轮换")
parser.add_argument("--all", action="store_true", help="运行所有测试")
args = parser.parse_args()
if args.all or args.login:
test_account_pool()
if args.all or args.rotation:
test_account_rotation()
if not (args.all or args.login or args.rotation):
parser.print_help()
print("\n使用 --all 运行所有测试")
if __name__ == "__main__":
main()

View File

@@ -1,969 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DS2API 全面自动化测试套件
测试覆盖:
- 配置加载和认证
- 会话创建
- PoW 计算
- OpenAI 兼容 API
- Claude 兼容 API
- 流式和非流式响应
- 错误处理
- Token 计数
使用方法:
python tests/test_all.py # 运行所有测试
python tests/test_all.py --quick # 快速测试(跳过耗时测试)
python tests/test_all.py --verbose # 详细输出
python tests/test_all.py --endpoint URL # 指定测试端点
"""
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass
from typing import Optional
import requests
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 测试配置
DEFAULT_ENDPOINT = "http://localhost:5001"
TEST_API_KEY = "test-api-key-001" # 配置中的 API key
TEST_TIMEOUT = 120 # 超时时间(秒)
@dataclass
class TestResult:
"""测试结果"""
name: str
passed: bool
duration: float
message: str = ""
details: Optional[dict] = None
class TestRunner:
"""测试运行器"""
def __init__(self, endpoint: str, api_key: str, verbose: bool = False):
self.endpoint = endpoint.rstrip("/")
self.api_key = api_key
self.verbose = verbose
self.results: list[TestResult] = []
def log(self, message: str, level: str = "INFO"):
"""日志输出"""
colors = {
"INFO": "\033[94m",
"SUCCESS": "\033[92m",
"WARNING": "\033[93m",
"ERROR": "\033[91m",
"RESET": "\033[0m"
}
if self.verbose or level in ("ERROR", "SUCCESS"):
print(f"{colors.get(level, '')}{message}{colors['RESET']}")
def run_test(self, name: str, test_func):
"""运行单个测试"""
print(f"\n{'='*60}")
print(f"🧪 测试: {name}")
print('='*60)
start_time = time.time()
try:
result = test_func()
duration = time.time() - start_time
if result.get("success", False):
self.log(f"✅ 通过 ({duration:.2f}s)", "SUCCESS")
self.results.append(TestResult(
name=name,
passed=True,
duration=duration,
message=result.get("message", ""),
details=result.get("details")
))
else:
self.log(f"❌ 失败: {result.get('message', '未知错误')}", "ERROR")
self.results.append(TestResult(
name=name,
passed=False,
duration=duration,
message=result.get("message", ""),
details=result.get("details")
))
except Exception as e:
duration = time.time() - start_time
self.log(f"❌ 异常: {e}", "ERROR")
self.results.append(TestResult(
name=name,
passed=False,
duration=duration,
message=str(e)
))
def get_headers(self, is_claude: bool = False) -> dict:
"""获取请求头"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
if is_claude:
headers["anthropic-version"] = "2024-01-01"
return headers
# =====================================================================
# 基础测试
# =====================================================================
def test_health_check(self) -> dict:
"""测试服务健康状态"""
try:
resp = requests.get(f"{self.endpoint}/", timeout=10)
if resp.status_code == 200:
return {"success": True, "message": "服务运行正常"}
return {"success": False, "message": f"状态码: {resp.status_code}"}
except requests.exceptions.ConnectionError:
return {"success": False, "message": "无法连接到服务"}
# =====================================================================
# OpenAI 兼容 API 测试
# =====================================================================
def test_openai_models_list(self) -> dict:
"""测试 OpenAI /v1/models 端点"""
resp = requests.get(
f"{self.endpoint}/v1/models",
headers=self.get_headers(),
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
if data.get("object") != "list":
return {"success": False, "message": "响应格式错误"}
models = [m["id"] for m in data.get("data", [])]
expected_models = ["deepseek-chat", "deepseek-reasoner", "deepseek-chat-search", "deepseek-reasoner-search"]
for model in expected_models:
if model not in models:
return {"success": False, "message": f"缺少模型: {model}"}
return {
"success": True,
"message": f"返回 {len(models)} 个模型",
"details": {"models": models}
}
def test_openai_chat_non_stream(self) -> dict:
"""测试 OpenAI 非流式对话"""
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "请用一句话回答1+1等于多少"}
],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}", "details": {"response": resp.text}}
data = resp.json()
if "error" in data:
return {"success": False, "message": data["error"]}
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
if not content:
return {"success": False, "message": "响应内容为空"}
return {
"success": True,
"message": f"收到 {len(content)} 字符响应",
"details": {
"content_preview": content[:100] + "..." if len(content) > 100 else content,
"usage": data.get("usage", {})
}
}
def test_openai_chat_stream(self) -> dict:
"""测试 OpenAI 流式对话"""
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "'你好'"}
],
"stream": True
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
stream=True,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
chunks = []
content = ""
for line in resp.iter_lines():
if line:
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
data_str = line_str[6:]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
chunks.append(chunk)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "content" in delta:
content += delta["content"]
except json.JSONDecodeError:
pass
if not chunks:
return {"success": False, "message": "未收到任何流式数据块"}
return {
"success": True,
"message": f"收到 {len(chunks)} 个数据块,内容: {content[:50]}",
"details": {"chunk_count": len(chunks), "content": content}
}
def test_openai_reasoner_stream(self) -> dict:
"""测试 OpenAI Reasoner 模式(思考链)"""
payload = {
"model": "deepseek-reasoner",
"messages": [
{"role": "user", "content": "1加2等于多少"}
],
"stream": True
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
stream=True,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
content = ""
reasoning = ""
for line in resp.iter_lines():
if line:
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
data_str = line_str[6:]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "content" in delta:
content += delta["content"]
if "reasoning_content" in delta:
reasoning += delta["reasoning_content"]
except json.JSONDecodeError:
pass
return {
"success": True,
"message": f"思考: {len(reasoning)}字, 回答: {len(content)}",
"details": {
"reasoning_preview": reasoning[:100] + "..." if len(reasoning) > 100 else reasoning,
"content": content
}
}
def test_openai_invalid_model(self) -> dict:
"""测试无效模型错误处理"""
payload = {
"model": "invalid-model-name",
"messages": [{"role": "user", "content": "test"}],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
timeout=TEST_TIMEOUT
)
# 应该返回 503 或 400
if resp.status_code in (503, 400):
return {"success": True, "message": f"正确返回错误状态码 {resp.status_code}"}
return {"success": False, "message": f"期望 503/400实际: {resp.status_code}"}
def test_openai_missing_auth(self) -> dict:
"""测试缺少认证的错误处理"""
payload = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": "test"}]
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers={"Content-Type": "application/json"}, # 无 Authorization
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code == 401:
return {"success": True, "message": "正确返回 401 未认证"}
return {"success": False, "message": f"期望 401实际: {resp.status_code}"}
# =====================================================================
# Claude 兼容 API 测试
# =====================================================================
def test_claude_models_list(self) -> dict:
"""测试 Claude /anthropic/v1/models 端点"""
resp = requests.get(
f"{self.endpoint}/anthropic/v1/models",
headers=self.get_headers(is_claude=True),
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
models = [m["id"] for m in data.get("data", [])]
if not models:
return {"success": False, "message": "模型列表为空"}
return {
"success": True,
"message": f"返回 {len(models)} 个 Claude 模型",
"details": {"models": models}
}
def test_claude_messages_non_stream(self) -> dict:
"""测试 Claude 非流式消息"""
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Say 'Hello' in Chinese"}
],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/anthropic/v1/messages",
headers=self.get_headers(is_claude=True),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}", "details": {"response": resp.text}}
data = resp.json()
if "error" in data:
return {"success": False, "message": str(data["error"])}
content_blocks = data.get("content", [])
text_content = ""
for block in content_blocks:
if block.get("type") == "text":
text_content += block.get("text", "")
if not text_content:
return {"success": False, "message": "响应内容为空"}
return {
"success": True,
"message": f"收到 Claude 格式响应: {len(text_content)} 字符",
"details": {
"content": text_content[:100],
"stop_reason": data.get("stop_reason"),
"usage": data.get("usage", {})
}
}
def test_claude_messages_stream(self) -> dict:
"""测试 Claude 流式消息"""
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 50,
"messages": [
{"role": "user", "content": "Reply with just 'OK'"}
],
"stream": True
}
resp = requests.post(
f"{self.endpoint}/anthropic/v1/messages",
headers=self.get_headers(is_claude=True),
json=payload,
stream=True,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
events = []
for line in resp.iter_lines():
if line:
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
try:
event = json.loads(line_str[6:])
events.append(event)
except json.JSONDecodeError:
pass
event_types = [e.get("type") for e in events]
# 检查必要的事件类型
required_types = ["message_start", "message_stop"]
for rt in required_types:
if rt not in event_types:
return {"success": False, "message": f"缺少事件类型: {rt}"}
return {
"success": True,
"message": f"收到 {len(events)} 个 Claude 流事件",
"details": {"event_types": event_types}
}
def test_claude_count_tokens(self) -> dict:
"""测试 Claude token 计数"""
payload = {
"model": "claude-sonnet-4-20250514",
"messages": [
{"role": "user", "content": "Hello, how are you today?"}
]
}
resp = requests.post(
f"{self.endpoint}/anthropic/v1/messages/count_tokens",
headers=self.get_headers(is_claude=True),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
input_tokens = data.get("input_tokens", 0)
if input_tokens <= 0:
return {"success": False, "message": f"token 计数无效: {input_tokens}"}
return {
"success": True,
"message": f"Token 计数: {input_tokens}",
"details": data
}
# =====================================================================
# 高级功能测试
# =====================================================================
def test_multi_turn_conversation(self) -> dict:
"""测试多轮对话"""
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": "你是一个数学助手"},
{"role": "user", "content": "我有3个苹果"},
{"role": "assistant", "content": "好的你有3个苹果。"},
{"role": "user", "content": "我又买了2个现在有多少"}
],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
# 检查是否包含"5"
if "5" in content:
return {"success": True, "message": f"AI 正确理解上下文", "details": {"content": content[:100]}}
return {
"success": True, # 即使没有5也算通过因为测试的是多轮对话功能
"message": f"多轮对话功能正常",
"details": {"content": content[:100]}
}
def test_long_input(self) -> dict:
"""测试长输入处理"""
# 生成约 1000 字的输入
long_text = "这是一段测试文本。" * 100
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": f"请总结以下内容的主题:{long_text}"}
],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
if "error" in data:
return {"success": False, "message": str(data.get("error"))}
return {
"success": True,
"message": f"成功处理 {len(long_text)} 字符输入",
"details": {"input_length": len(long_text)}
}
# =====================================================================
# 管理 API 测试
# =====================================================================
def test_admin_config(self) -> dict:
"""测试管理配置 API"""
resp = requests.get(
f"{self.endpoint}/admin/config",
timeout=10
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
# 验证返回结构
if "accounts" not in data:
return {"success": False, "message": "响应缺少 accounts 字段"}
# 验证 token_preview 字段存在
accounts = data.get("accounts", [])
if accounts:
first_acc = accounts[0]
if "token_preview" not in first_acc:
return {"success": False, "message": "响应缺少 token_preview 字段"}
return {
"success": True,
"message": f"获取配置成功,{len(accounts)} 个账号",
"details": {"account_count": len(accounts)}
}
def test_admin_account_test(self) -> dict:
"""测试单账号 API 测试端点"""
# 先获取配置以获取账号
config_resp = requests.get(f"{self.endpoint}/admin/config", timeout=10)
if config_resp.status_code != 200:
return {"success": False, "message": "获取配置失败"}
accounts = config_resp.json().get("accounts", [])
if not accounts:
return {"success": False, "message": "没有可测试的账号"}
# 测试第一个账号
first_acc = accounts[0]
identifier = first_acc.get("email") or first_acc.get("mobile")
resp = requests.post(
f"{self.endpoint}/admin/accounts/test",
json={"identifier": identifier},
timeout=30
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
data = resp.json()
# 验证返回结构
required_fields = ["account", "success", "response_time", "message"]
for field in required_fields:
if field not in data:
return {"success": False, "message": f"响应缺少 {field} 字段"}
if not data["success"]:
return {"success": False, "message": f"账号测试失败: {data['message']}"}
return {
"success": True,
"message": f"账号 {identifier} 测试成功 ({data['response_time']}ms)",
"details": {"response_time": data["response_time"]}
}
# =====================================================================
# 工具调用测试
# =====================================================================
def test_openai_tool_calling(self) -> dict:
"""测试 OpenAI 工具调用"""
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "What's the weather in Beijing? Use the get_weather tool."}
],
"tools": [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"}
},
"required": ["location"]
}
}
}],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}", "details": {"response": resp.text}}
data = resp.json()
if "error" in data:
return {"success": False, "message": data["error"]}
message = data.get("choices", [{}])[0].get("message", {})
tool_calls = message.get("tool_calls", [])
finish_reason = data.get("choices", [{}])[0].get("finish_reason", "")
content = message.get("content", "")
# AI 可能调用工具,也可能直接回复
if tool_calls:
return {
"success": True,
"message": f"检测到 {len(tool_calls)} 个工具调用, finish_reason={finish_reason}",
"details": {"tool_calls": tool_calls}
}
else:
return {
"success": True,
"message": f"AI 直接回复而非调用工具: {content[:50]}...",
"details": {"content": content[:100]}
}
def test_openai_tool_calling_stream(self) -> dict:
"""测试 OpenAI 流式工具调用"""
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "Use get_time tool to check current time in Tokyo."}
],
"tools": [{
"type": "function",
"function": {
"name": "get_time",
"description": "Get current time for a timezone",
"parameters": {
"type": "object",
"properties": {
"timezone": {"type": "string"}
},
"required": ["timezone"]
}
}
}],
"stream": True
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
stream=True,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
chunks = []
tool_calls_found = False
finish_reason = None
for line in resp.iter_lines():
if line:
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
data_str = line_str[6:]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
chunks.append(chunk)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "tool_calls" in delta:
tool_calls_found = True
fr = chunk.get("choices", [{}])[0].get("finish_reason")
if fr:
finish_reason = fr
except json.JSONDecodeError:
pass
return {
"success": True,
"message": f"收到 {len(chunks)} 个数据块, 工具调用: {tool_calls_found}, finish: {finish_reason}",
"details": {"chunk_count": len(chunks), "tool_calls_found": tool_calls_found}
}
def test_claude_tool_calling(self) -> dict:
"""测试 Claude 工具调用"""
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 200,
"messages": [
{"role": "user", "content": "Use the calculator tool to compute 15 * 23"}
],
"tools": [{
"name": "calculator",
"description": "Perform arithmetic calculations",
"input_schema": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression"}
},
"required": ["expression"]
}
}],
"stream": False
}
resp = requests.post(
f"{self.endpoint}/anthropic/v1/messages",
headers=self.get_headers(is_claude=True),
json=payload,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}", "details": {"response": resp.text}}
data = resp.json()
if "error" in data:
return {"success": False, "message": str(data["error"])}
content_blocks = data.get("content", [])
stop_reason = data.get("stop_reason", "")
tool_use_blocks = [b for b in content_blocks if b.get("type") == "tool_use"]
text_blocks = [b for b in content_blocks if b.get("type") == "text"]
if tool_use_blocks:
return {
"success": True,
"message": f"检测到 {len(tool_use_blocks)} 个工具调用, stop_reason={stop_reason}",
"details": {"tool_use": tool_use_blocks}
}
else:
text_content = "".join(b.get("text", "") for b in text_blocks)
return {
"success": True,
"message": f"AI 直接回复: {text_content[:50]}...",
"details": {"content": text_content[:100]}
}
# =====================================================================
# 搜索模式测试
# =====================================================================
def test_openai_search_mode(self) -> dict:
"""测试 OpenAI 搜索模式"""
payload = {
"model": "deepseek-chat-search",
"messages": [
{"role": "user", "content": "今天的新闻有哪些?"}
],
"stream": True
}
resp = requests.post(
f"{self.endpoint}/v1/chat/completions",
headers=self.get_headers(),
json=payload,
stream=True,
timeout=TEST_TIMEOUT
)
if resp.status_code != 200:
return {"success": False, "message": f"状态码: {resp.status_code}"}
content = ""
for line in resp.iter_lines():
if line:
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
data_str = line_str[6:]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "content" in delta:
content += delta["content"]
except json.JSONDecodeError:
pass
if not content:
return {"success": False, "message": "搜索模式无响应内容"}
return {
"success": True,
"message": f"搜索模式正常,收到 {len(content)} 字符",
"details": {"content_preview": content[:100]}
}
# =====================================================================
# 运行测试
# =====================================================================
def run_all_tests(self, quick: bool = False):
"""运行所有测试"""
print("\n" + "="*70)
print(" 🚀 DS2API 全面自动化测试")
print("="*70)
print(f"端点: {self.endpoint}")
print(f"API Key: {self.api_key[:10]}...")
print(f"模式: {'快速' if quick else '完整'}")
# 基础测试
self.run_test("服务健康检查", self.test_health_check)
if not self.results[-1].passed:
print("\n⚠️ 服务未运行,跳过其他测试")
return
# OpenAI API 测试
self.run_test("OpenAI 模型列表", self.test_openai_models_list)
self.run_test("OpenAI 非流式对话", self.test_openai_chat_non_stream)
self.run_test("OpenAI 流式对话", self.test_openai_chat_stream)
self.run_test("OpenAI 无效模型处理", self.test_openai_invalid_model)
self.run_test("OpenAI 缺少认证处理", self.test_openai_missing_auth)
if not quick:
self.run_test("OpenAI Reasoner 模式", self.test_openai_reasoner_stream)
# Claude API 测试
self.run_test("Claude 模型列表", self.test_claude_models_list)
self.run_test("Claude 非流式消息", self.test_claude_messages_non_stream)
self.run_test("Claude 流式消息", self.test_claude_messages_stream)
self.run_test("Claude Token 计数", self.test_claude_count_tokens)
# 高级功能测试
if not quick:
self.run_test("多轮对话", self.test_multi_turn_conversation)
self.run_test("长输入处理", self.test_long_input)
self.run_test("OpenAI 搜索模式", self.test_openai_search_mode)
# 工具调用测试
if not quick:
self.run_test("OpenAI 工具调用", self.test_openai_tool_calling)
self.run_test("OpenAI 流式工具调用", self.test_openai_tool_calling_stream)
self.run_test("Claude 工具调用", self.test_claude_tool_calling)
# 管理 API 测试
self.run_test("管理配置 API", self.test_admin_config)
self.run_test("账号测试 API", self.test_admin_account_test)
# 输出测试报告
self.print_report()
def print_report(self):
"""打印测试报告"""
print("\n" + "="*70)
print(" 📊 测试报告")
print("="*70)
passed = sum(1 for r in self.results if r.passed)
failed = len(self.results) - passed
total_time = sum(r.duration for r in self.results)
print(f"\n总计: {len(self.results)} 个测试")
print(f"✅ 通过: {passed}")
print(f"❌ 失败: {failed}")
print(f"⏱️ 耗时: {total_time:.2f}s")
print(f"📈 通过率: {passed/len(self.results)*100:.1f}%")
if failed > 0:
print("\n❌ 失败的测试:")
for r in self.results:
if not r.passed:
print(f"{r.name}: {r.message}")
print("\n" + "="*70)
# 返回退出码
return 0 if failed == 0 else 1
def main():
parser = argparse.ArgumentParser(description="DS2API 自动化测试")
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT, help="API 端点")
parser.add_argument("--api-key", default=TEST_API_KEY, help="API Key")
parser.add_argument("--quick", action="store_true", help="快速测试模式")
parser.add_argument("--verbose", "-v", action="store_true", help="详细输出")
args = parser.parse_args()
runner = TestRunner(
endpoint=args.endpoint,
api_key=args.api_key,
verbose=args.verbose
)
exit_code = runner.run_all_tests(quick=args.quick)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -1,565 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DS2API 单元测试
测试核心模块的功能,不依赖网络请求
"""
import json
import os
import sys
import unittest
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class TestConfig(unittest.TestCase):
"""配置模块测试"""
def test_config_loading(self):
"""测试配置加载"""
from core.config import load_config, CONFIG
# 测试加载函数不会抛出异常
config = load_config()
self.assertIsInstance(config, dict)
def test_config_paths(self):
"""测试配置路径"""
from core.config import WASM_PATH, CONFIG_PATH
# 路径应该是字符串
self.assertIsInstance(WASM_PATH, str)
self.assertIsInstance(CONFIG_PATH, str)
class TestMessages(unittest.TestCase):
"""消息处理模块测试"""
def test_messages_prepare_simple(self):
"""测试简单消息处理"""
from core.messages import messages_prepare
messages = [
{"role": "user", "content": "Hello"}
]
result = messages_prepare(messages)
self.assertIn("Hello", result)
def test_messages_prepare_multi_turn(self):
"""测试多轮对话消息处理"""
from core.messages import messages_prepare
messages = [
{"role": "system", "content": "You are a helper."},
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "Hello!"},
{"role": "user", "content": "How are you?"}
]
result = messages_prepare(messages)
# 检查助手消息标签
self.assertIn("<Assistant>", result)
self.assertIn("<end▁of▁sentence>", result)
# 检查用户消息标签
self.assertIn("<User>", result)
def test_messages_prepare_array_content(self):
"""测试数组格式内容处理"""
from core.messages import messages_prepare
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "First part"},
{"type": "text", "text": "Second part"},
{"type": "image", "url": "http://example.com/image.png"}
]
}
]
result = messages_prepare(messages)
self.assertIn("First part", result)
self.assertIn("Second part", result)
def test_markdown_image_removal(self):
"""测试 markdown 图片格式移除"""
from core.messages import messages_prepare
messages = [
{"role": "user", "content": "Check this ![alt](http://example.com/image.png) image"}
]
result = messages_prepare(messages)
# 图片格式应该被改为链接格式
self.assertNotIn("![alt]", result)
self.assertIn("[alt]", result)
def test_merge_consecutive_messages(self):
"""测试连续相同角色消息合并"""
from core.messages import messages_prepare
messages = [
{"role": "user", "content": "Part 1"},
{"role": "user", "content": "Part 2"},
{"role": "user", "content": "Part 3"}
]
result = messages_prepare(messages)
self.assertIn("Part 1", result)
self.assertIn("Part 2", result)
self.assertIn("Part 3", result)
def test_convert_claude_to_deepseek(self):
"""测试 Claude 到 DeepSeek 格式转换"""
from core.messages import convert_claude_to_deepseek
claude_request = {
"model": "claude-sonnet-4-20250514",
"messages": [{"role": "user", "content": "Hi"}],
"system": "You are helpful.",
"temperature": 0.7,
"stream": True
}
result = convert_claude_to_deepseek(claude_request)
# 检查模型映射
self.assertIn("deepseek", result.get("model", "").lower())
# 检查 system 消息插入
self.assertEqual(result["messages"][0]["role"], "system")
self.assertEqual(result["messages"][0]["content"], "You are helpful.")
# 检查其他参数
self.assertEqual(result.get("temperature"), 0.7)
self.assertEqual(result.get("stream"), True)
class TestPow(unittest.TestCase):
"""PoW 模块测试"""
def test_wasm_caching(self):
"""测试 WASM 缓存功能"""
from core.pow import _get_cached_wasm_module, _wasm_module, _wasm_engine
from core.config import WASM_PATH
# 首次调用
engine1, module1 = _get_cached_wasm_module(WASM_PATH)
self.assertIsNotNone(engine1)
self.assertIsNotNone(module1)
# 再次调用应该返回相同的实例
engine2, module2 = _get_cached_wasm_module(WASM_PATH)
self.assertIs(engine1, engine2)
self.assertIs(module1, module2)
def test_get_account_identifier(self):
"""测试账号标识获取"""
from core.utils import get_account_identifier
# 测试邮箱
account1 = {"email": "test@example.com"}
self.assertEqual(get_account_identifier(account1), "test@example.com")
# 测试手机号
account2 = {"mobile": "13800138000"}
self.assertEqual(get_account_identifier(account2), "13800138000")
# 邮箱优先
account3 = {"email": "test@example.com", "mobile": "13800138000"}
self.assertEqual(get_account_identifier(account3), "test@example.com")
class TestSessionManager(unittest.TestCase):
"""会话管理器模块测试"""
def test_get_model_config(self):
"""测试模型配置获取"""
from core.session_manager import get_model_config
# deepseek-chat
thinking, search = get_model_config("deepseek-chat")
self.assertEqual(thinking, False)
self.assertEqual(search, False)
# deepseek-reasoner
thinking, search = get_model_config("deepseek-reasoner")
self.assertEqual(thinking, True)
self.assertEqual(search, False)
# deepseek-chat-search
thinking, search = get_model_config("deepseek-chat-search")
self.assertEqual(thinking, False)
self.assertEqual(search, True)
# deepseek-reasoner-search
thinking, search = get_model_config("deepseek-reasoner-search")
self.assertEqual(thinking, True)
self.assertEqual(search, True)
# 大小写不敏感
thinking, search = get_model_config("DeepSeek-CHAT")
self.assertEqual(thinking, False)
self.assertEqual(search, False)
# 无效模型
thinking, search = get_model_config("invalid-model")
self.assertIsNone(thinking)
self.assertIsNone(search)
class TestAuth(unittest.TestCase):
"""认证模块测试"""
def test_auth_key_check(self):
"""测试 API Key 检查"""
from core.config import CONFIG
# 检查配置中是否有 keys
keys = CONFIG.get("keys", [])
self.assertIsInstance(keys, list)
class TestRegexPatterns(unittest.TestCase):
"""正则表达式测试"""
def test_markdown_image_pattern(self):
"""测试 markdown 图片正则"""
from core.messages import _MARKDOWN_IMAGE_PATTERN
text = "Check ![alt text](http://example.com/image.png) here"
match = _MARKDOWN_IMAGE_PATTERN.search(text)
self.assertIsNotNone(match)
self.assertEqual(match.group(1), "alt text")
self.assertEqual(match.group(2), "http://example.com/image.png")
class TestStreamParsing(unittest.TestCase):
"""流式响应解析测试"""
def test_parse_simple_string_content(self):
"""测试简单字符串内容解析"""
# 模拟 DeepSeek V3 的简单字符串格式
chunk = {"v": "你好"}
v_value = chunk.get("v")
self.assertIsInstance(v_value, str)
self.assertEqual(v_value, "你好")
def test_parse_nested_list_content(self):
"""测试嵌套列表内容解析 (DeepSeek V3 格式)"""
# 模拟 DeepSeek V3 的嵌套列表格式
chunk = {
"p": "response/fragments",
"o": "APPEND",
"v": [
{"id": 1, "type": "RESPONSE", "content": "我是DeepSeek", "references": [], "stage_id": 1}
]
}
v_value = chunk.get("v")
self.assertIsInstance(v_value, list)
self.assertEqual(len(v_value), 1)
inner = v_value[0]
self.assertEqual(inner.get("type"), "RESPONSE")
self.assertEqual(inner.get("content"), "我是DeepSeek")
def test_parse_thinking_content(self):
"""测试 thinking 内容解析"""
# 模拟带有 THINK 类型的内容 (DeepSeek 使用 THINK 而不是 THINKING)
chunk = {
"p": "response/fragments",
"o": "APPEND",
"v": [
{"id": 1, "type": "THINK", "content": "让我思考一下...", "references": [], "stage_id": 1}
]
}
v_value = chunk.get("v")
inner = v_value[0]
inner_type = inner.get("type", "").upper()
self.assertEqual(inner_type, "THINK")
self.assertEqual(inner.get("content"), "让我思考一下...")
def test_parse_finished_status(self):
"""测试 FINISHED 状态解析"""
chunk = {"p": "response/status", "o": "SET", "v": "FINISHED"}
v_value = chunk.get("v")
self.assertEqual(v_value, "FINISHED")
def test_parse_batch_status(self):
"""测试批量状态解析"""
chunk = {
"p": "response",
"o": "BATCH",
"v": [
{"p": "accumulated_token_usage", "v": 54},
{"p": "quasi_status", "v": "FINISHED"}
]
}
v_value = chunk.get("v")
self.assertIsInstance(v_value, list)
# 检查是否包含 FINISHED 状态
has_finished = any(
item.get("p") == "quasi_status" and item.get("v") == "FINISHED"
for item in v_value if isinstance(item, dict)
)
self.assertTrue(has_finished)
def test_extract_content_from_nested_response(self):
"""测试从嵌套响应中提取内容"""
# 模拟完整的嵌套列表格式
items = [
{"p": "fragments", "o": "APPEND", "v": [
{"id": 1, "type": "RESPONSE", "content": "Hello", "references": []}
]},
{"p": "search_status", "v": "searching"}, # 应该被跳过
]
extracted = []
for item in items:
if not isinstance(item, dict):
continue
item_p = item.get("p", "")
item_v = item.get("v")
# 跳过搜索状态
if "search_status" in item_p:
continue
if isinstance(item_v, list):
for inner in item_v:
if isinstance(inner, dict):
content = inner.get("content", "")
if content:
inner_type = inner.get("type", "").upper()
extracted.append((content, inner_type))
self.assertEqual(len(extracted), 1)
self.assertEqual(extracted[0], ("Hello", "RESPONSE"))
def test_thinking_vs_text_classification(self):
"""测试 thinking 和 text 类型分类"""
# 测试不同路径的类型分类
test_cases = [
("response/thinking_content", "thinking"),
("response/content", "text"),
("response/fragments", "text"),
("", "text"), # 默认类型
]
for chunk_path, expected_type in test_cases:
if chunk_path == "response/thinking_content":
ptype = "thinking"
elif chunk_path == "response/content" or "response/fragments" in chunk_path:
ptype = "text"
else:
ptype = "text"
self.assertEqual(ptype, expected_type, f"Path '{chunk_path}' should be '{expected_type}'")
def test_handle_non_dict_items(self):
"""测试处理非字典类型的列表项"""
items = [
"plain string",
123,
None,
{"p": "content", "v": "valid"},
]
valid_items = [item for item in items if isinstance(item, dict)]
self.assertEqual(len(valid_items), 1)
self.assertEqual(valid_items[0].get("v"), "valid")
def test_empty_content_handling(self):
"""测试空内容处理"""
chunk = {"v": ""}
content = chunk.get("v", "")
# 空内容不应该被添加
self.assertFalse(bool(content))
def test_response_started_flag(self):
"""测试 response_started 标志逻辑 - 只有 RESPONSE 类型才触发"""
response_started = False
thinking_enabled = True
# 模拟处理流程 - 修复后的逻辑
chunks = [
{"v": "思考中..."}, # thinking (before response)
{"p": "response/fragments", "v": [{"type": "THINK", "content": "思考"}]}, # THINK 不触发 response_started
{"v": "继续思考..."}, # 仍然是 thinking
{"p": "response/fragments", "v": [{"type": "RESPONSE", "content": "回复"}]}, # RESPONSE 触发
{"v": "正式回复"}, # text (after response started)
]
results = []
for chunk in chunks:
chunk_path = chunk.get("p", "")
v_value = chunk.get("v")
# 只有当 fragments 包含 RESPONSE 类型时才设置 response_started
if "response/fragments" in chunk_path and isinstance(v_value, list):
for frag in v_value:
if isinstance(frag, dict) and frag.get("type", "").upper() == "RESPONSE":
response_started = True
break
if not chunk_path:
if thinking_enabled and not response_started:
ptype = "thinking"
else:
ptype = "text"
else:
ptype = "text"
results.append((ptype, response_started))
self.assertEqual(results[0], ("thinking", False)) # 第一个是 thinking
self.assertEqual(results[1], ("text", False)) # THINK fragment 不触发 response_started
self.assertEqual(results[2], ("thinking", False)) # THINK 之后仍是 thinking
self.assertEqual(results[3], ("text", True)) # RESPONSE fragment 触发
self.assertEqual(results[4], ("text", True)) # 之后是 text
def test_think_vs_response_fragment_types(self):
"""测试 THINK 和 RESPONSE fragment 类型的区分"""
# 模拟 DeepSeek 的 fragments 数据
think_fragment = {"p": "response/fragments", "v": [{"id": 1, "type": "THINK", "content": ""}]}
response_fragment = {"p": "response/fragments", "v": [{"id": 2, "type": "RESPONSE", "content": "你好"}]}
def check_response_started(chunk):
"""检查是否应该设置 response_started"""
chunk_path = chunk.get("p", "")
v_value = chunk.get("v")
if "response/fragments" in chunk_path and isinstance(v_value, list):
for frag in v_value:
if isinstance(frag, dict) and frag.get("type", "").upper() == "RESPONSE":
return True
return False
self.assertFalse(check_response_started(think_fragment)) # THINK 不触发
self.assertTrue(check_response_started(response_fragment)) # RESPONSE 触发
class TestToolCallParsing(unittest.TestCase):
"""工具调用解析测试"""
def test_parse_tool_calls_simple(self):
"""测试简单工具调用解析"""
from core.sse_parser import parse_tool_calls
response_text = '{"tool_calls": [{"name": "get_weather", "input": {"location": "Beijing"}}]}'
tools = [{"name": "get_weather"}]
result = parse_tool_calls(response_text, tools)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "get_weather")
self.assertEqual(result[0]["input"]["location"], "Beijing")
def test_parse_tool_calls_multiple(self):
"""测试多工具调用解析"""
from core.sse_parser import parse_tool_calls
response_text = '''{"tool_calls": [
{"name": "get_weather", "input": {"location": "Beijing"}},
{"name": "get_time", "input": {"timezone": "Asia/Shanghai"}}
]}'''
tools = [{"name": "get_weather"}, {"name": "get_time"}]
result = parse_tool_calls(response_text, tools)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["name"], "get_weather")
self.assertEqual(result[1]["name"], "get_time")
def test_parse_tool_calls_no_match(self):
"""测试无工具调用时返回空列表"""
from core.sse_parser import parse_tool_calls
response_text = "这是一个普通的回复,没有工具调用。"
tools = [{"name": "get_weather"}]
result = parse_tool_calls(response_text, tools)
self.assertEqual(result, [])
def test_parse_tool_calls_with_surrounding_text(self):
"""测试带有周围文本的工具调用"""
from core.sse_parser import parse_tool_calls
response_text = '''好的,我来帮你查询天气。
{"tool_calls": [{"name": "get_weather", "input": {"location": "Shanghai"}}]}'''
tools = [{"name": "get_weather"}]
result = parse_tool_calls(response_text, tools)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "get_weather")
def test_parse_tool_calls_empty_input(self):
"""测试空输入"""
from core.sse_parser import parse_tool_calls
result = parse_tool_calls("", [])
self.assertEqual(result, [])
result = parse_tool_calls("some text", [])
self.assertEqual(result, [])
def test_parse_tool_calls_invalid_json(self):
"""测试无效 JSON"""
from core.sse_parser import parse_tool_calls
response_text = '{"tool_calls": [{"name": "get_weather", invalid json here}'
tools = [{"name": "get_weather"}]
result = parse_tool_calls(response_text, tools)
# 应该返回空列表而不是抛出异常
self.assertEqual(result, [])
class TestTokenEstimation(unittest.TestCase):
"""Token 估算测试"""
def test_estimate_tokens_string(self):
"""测试字符串 token 估算"""
from core.utils import estimate_tokens
# 8个字符应该约等于2个token
result = estimate_tokens("12345678")
self.assertEqual(result, 2)
# 空字符串应该返回1
result = estimate_tokens("")
self.assertEqual(result, 1)
def test_estimate_tokens_list(self):
"""测试列表 token 估算"""
from core.utils import estimate_tokens
content = [
{"text": "Hello"},
{"text": "World"}
]
result = estimate_tokens(content)
self.assertGreater(result, 0)
if __name__ == "__main__":
# 设置环境变量避免配置警告
os.environ.setdefault("DS2API_CONFIG_PATH",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"))
unittest.main(verbosity=2)