Files
ds2api/core/pow.py

254 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""PoW (Proof of Work) 计算模块"""
import base64
import ctypes
import json
import struct
import threading
import time
from curl_cffi import requests
from wasmtime import Engine, Linker, Module, Store
from .config import CONFIG, WASM_PATH, logger
from .utils import get_account_identifier
# ----------------------------------------------------------------------
# WASM 模块缓存 - 避免每次请求都重新加载
# ----------------------------------------------------------------------
_wasm_cache_lock = threading.Lock()
_wasm_engine = None
_wasm_module = None
def _get_cached_wasm_module(wasm_path: str):
"""获取缓存的 WASM 模块,首次调用时加载"""
global _wasm_engine, _wasm_module
if _wasm_module is not None:
return _wasm_engine, _wasm_module
with _wasm_cache_lock:
# 双重检查锁定
if _wasm_module is not None:
return _wasm_engine, _wasm_module
try:
with open(wasm_path, "rb") as f:
wasm_bytes = f.read()
_wasm_engine = Engine()
_wasm_module = Module(_wasm_engine, wasm_bytes)
logger.info(f"[WASM] 已缓存 WASM 模块: {wasm_path}")
except Exception as e:
logger.error(f"[WASM] 加载 WASM 模块失败: {e}")
raise RuntimeError(f"加载 wasm 文件失败: {wasm_path}, 错误: {e}")
return _wasm_engine, _wasm_module
# 启动时预加载 WASM 模块
try:
_get_cached_wasm_module(WASM_PATH)
except Exception as e:
logger.warning(f"[WASM] 启动时预加载失败(将在首次使用时重试): {e}")
# get_account_identifier 已移至 core.utils
# ----------------------------------------------------------------------
# 使用 WASM 模块计算 PoW 答案的辅助函数
# ----------------------------------------------------------------------
def compute_pow_answer(
algorithm: str,
challenge_str: str,
salt: str,
difficulty: int,
expire_at: int,
signature: str,
target_path: str,
wasm_path: str,
) -> int:
"""
使用 WASM 模块计算 DeepSeekHash 答案answer
根据 JS 逻辑:
- 拼接前缀: "{salt}_{expire_at}_"
- 将 challenge 与前缀写入 wasm 内存后调用 wasm_solve 进行求解,
- 从 wasm 内存中读取状态与求解结果,
- 若状态非 0则返回整数形式的答案否则返回 None。
优化:使用缓存的 WASM 模块,避免每次请求都重新加载文件。
"""
if algorithm != "DeepSeekHashV1":
raise ValueError(f"不支持的算法:{algorithm}")
prefix = f"{salt}_{expire_at}_"
# 获取缓存的 WASM 模块(避免重复加载文件)
engine, module = _get_cached_wasm_module(wasm_path)
# 每次调用创建新的 Store 和实例(必须的,因为 Store 不是线程安全的)
store = Store(engine)
linker = Linker(engine)
instance = linker.instantiate(store, module)
exports = instance.exports(store)
try:
memory = exports["memory"]
add_to_stack = exports["__wbindgen_add_to_stack_pointer"]
alloc = exports["__wbindgen_export_0"]
wasm_solve = exports["wasm_solve"]
except KeyError as e:
raise RuntimeError(f"缺少 wasm 导出函数: {e}")
def write_memory(offset: int, data: bytes):
size = len(data)
base_addr = ctypes.cast(memory.data_ptr(store), ctypes.c_void_p).value
ctypes.memmove(base_addr + offset, data, size)
def read_memory(offset: int, size: int) -> bytes:
base_addr = ctypes.cast(memory.data_ptr(store), ctypes.c_void_p).value
return ctypes.string_at(base_addr + offset, size)
def encode_string(text: str):
data = text.encode("utf-8")
length = len(data)
ptr_val = alloc(store, length, 1)
ptr = int(ptr_val.value) if hasattr(ptr_val, "value") else int(ptr_val)
write_memory(ptr, data)
return ptr, length
# 1. 申请 16 字节栈空间
retptr = add_to_stack(store, -16)
# 2. 编码 challenge 与 prefix 到 wasm 内存中
ptr_challenge, len_challenge = encode_string(challenge_str)
ptr_prefix, len_prefix = encode_string(prefix)
# 3. 调用 wasm_solve注意difficulty 以 float 形式传入)
wasm_solve(
store,
retptr,
ptr_challenge,
len_challenge,
ptr_prefix,
len_prefix,
float(difficulty),
)
# 4. 从 retptr 处读取 4 字节状态和 8 字节求解结果
status_bytes = read_memory(retptr, 4)
if len(status_bytes) != 4:
add_to_stack(store, 16)
raise RuntimeError("读取状态字节失败")
status = struct.unpack("<i", status_bytes)[0]
value_bytes = read_memory(retptr + 8, 8)
if len(value_bytes) != 8:
add_to_stack(store, 16)
raise RuntimeError("读取结果字节失败")
value = struct.unpack("<d", value_bytes)[0]
# 5. 恢复栈指针
add_to_stack(store, 16)
if status == 0:
return None
return int(value)
def get_pow_response(request, max_attempts: int = 3):
"""获取 PoW 响应
Args:
request: FastAPI 请求对象
max_attempts: 最大重试次数
Returns:
Base64 编码的 PoW 响应,如果失败返回 None
"""
from .auth import get_auth_headers, choose_new_account
from .deepseek import BASE_HEADERS, login_deepseek_via_account, DEEPSEEK_CREATE_POW_URL
pow_url = DEEPSEEK_CREATE_POW_URL
attempts = 0
while attempts < max_attempts:
headers = get_auth_headers(request)
try:
resp = requests.post(
pow_url,
headers=headers,
json={"target_path": "/api/v0/chat/completion"},
timeout=30,
impersonate="safari15_3",
)
except Exception as e:
logger.error(f"[get_pow_response] 请求异常: {e}")
attempts += 1
continue
try:
data = resp.json()
except Exception as e:
logger.error(f"[get_pow_response] JSON解析异常: {e}")
data = {}
if resp.status_code == 200 and data.get("code") == 0:
challenge = data["data"]["biz_data"]["challenge"]
difficulty = challenge.get("difficulty", 144000)
expire_at = challenge.get("expire_at", 1680000000)
try:
answer = compute_pow_answer(
challenge["algorithm"],
challenge["challenge"],
challenge["salt"],
difficulty,
expire_at,
challenge["signature"],
challenge["target_path"],
WASM_PATH,
)
except Exception as e:
logger.error(f"[get_pow_response] PoW 答案计算异常: {e}")
answer = None
if answer is None:
logger.warning("[get_pow_response] PoW 答案计算失败,重试中...")
resp.close()
attempts += 1
continue
pow_dict = {
"algorithm": challenge["algorithm"],
"challenge": challenge["challenge"],
"salt": challenge["salt"],
"answer": answer,
"signature": challenge["signature"],
"target_path": challenge["target_path"],
}
pow_str = json.dumps(pow_dict, separators=(",", ":"), ensure_ascii=False)
encoded = base64.b64encode(pow_str.encode("utf-8")).decode("utf-8").rstrip()
resp.close()
return encoded
else:
code = data.get("code")
logger.warning(
f"[get_pow_response] 获取 PoW 失败, code={code}, msg={data.get('msg')}"
)
resp.close()
if request.state.use_config_token:
current_id = get_account_identifier(request.state.account)
if not hasattr(request.state, "tried_accounts"):
request.state.tried_accounts = []
if current_id not in request.state.tried_accounts:
request.state.tried_accounts.append(current_id)
new_account = choose_new_account(request.state.tried_accounts)
if new_account is None:
break
try:
login_deepseek_via_account(new_account)
except Exception as e:
logger.error(
f"[get_pow_response] 账号 {get_account_identifier(new_account)} 登录失败:{e}"
)
attempts += 1
continue
request.state.account = new_account
request.state.deepseek_token = new_account.get("token")
else:
attempts += 1
continue
attempts += 1
return None