fix: use parent_message_id and fresh PoW headers for empty-output retry and continue

Previously retry/continue requests reused the initial PoW header and
lacked parent_message_id, causing them to land as disconnected root
messages in the DeepSeek session instead of proper follow-up turns.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
CJACK
2026-04-27 21:31:51 +08:00
parent fb43bd92f5
commit b82bc1311a
16 changed files with 324 additions and 32 deletions

View File

@@ -103,7 +103,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools`
- OpenAI Chat / Responses 原生走统一 OpenAI 标准化与 DeepSeek payload 组装Claude / Gemini 会尽量复用 OpenAI prompt/tool 语义,其中 Gemini 直接复用 `promptcompat.BuildOpenAIPromptForAdapter`Claude 消息接口在可代理场景会转换为 OpenAI chat 形态再执行。
- 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。Claude surface 在流式请求且未显式声明 `thinking` 时,仍按 Anthropic 语义默认关闭;但在非流式代理场景,兼容层会内部开启一次下游 thinking用于捕获“正文为空、工具调用落在 thinking 里”的情况,随后在回包前剥离用户不可见的 thinking block。
- 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测但不会因为思维链内容去中途拦截或改写流式输出thinking / reasoning 增量仍按原样先发,只有在结束收尾时才可能补发最终工具调用结果。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning思维链只用于检测不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。
- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token、PoW 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。该重试不会重新标准化消息、不会新建 session、不会切换账号也不会向流式客户端插入重试标记第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。
- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW若 PoW 获取失败则回退到原始 PoW该重试不会重新标准化消息、不会新建 session、不会切换账号也不会向流式客户端插入重试标记第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW。
## 5. prompt 是怎么拼出来的

View File

@@ -38,6 +38,7 @@ type chatStreamRuntime struct {
thinking strings.Builder
toolDetectionThinking strings.Builder
text strings.Builder
responseMessageID int
finalThinking string
finalText string
@@ -234,6 +235,9 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.ResponseMessageID > 0 {
s.responseMessageID = parsed.ResponseMessageID
}
if parsed.ContentFilter {
if strings.TrimSpace(s.text.String()) == "" {
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")}

View File

@@ -23,6 +23,7 @@ type chatNonStreamResult struct {
detectedCalls int
body map[string]any
finishReason string
responseMessageID int
}
func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) {
@@ -50,9 +51,14 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co
}
attempts++
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts)
retryPayload := clonePayloadWithEmptyOutputRetryPrompt(payload)
nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, pow, 3)
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID)
retryPow, powErr := h.DS.GetPow(ctx, a, 3)
if powErr != nil {
config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
retryPayload := clonePayloadForEmptyOutputRetry(payload, result.responseMessageID)
nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3)
if err != nil {
if historySession != nil {
historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text)
@@ -91,6 +97,7 @@ func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http.
detectedCalls: len(detected.Calls),
body: respBody,
finishReason: chatFinishReason(respBody),
responseMessageID: result.ResponseMessageID,
}, true
}
@@ -152,8 +159,13 @@ func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request,
return
}
attempts++
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts)
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID)
retryPow, powErr := h.DS.GetPow(r.Context(), a, 3)
if powErr != nil {
config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3)
if err != nil {
failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error")
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err)

View File

@@ -131,8 +131,8 @@ func emptyOutputRetryMaxAttempts() int {
return shared.EmptyOutputRetryMaxAttempts()
}
func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload)
func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any {
return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID)
}
func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string {

View File

@@ -22,6 +22,10 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
h.handleVercelStreamRelease(w, r)
return
}
if isVercelStreamPowRequest(r) {
h.handleVercelStreamPow(w, r)
return
}
if isVercelStreamPrepareRequest(r) {
h.handleVercelStreamPrepare(w, r)
return

View File

@@ -150,6 +150,44 @@ func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Reque
writeJSON(w, http.StatusOK, map[string]any{"success": true})
}
func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
leaseID, _ := req["lease_id"].(string)
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
leaseAuth := h.lookupStreamLeaseAuth(leaseID)
if leaseAuth == nil {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired")
return
}
powHeader, err := h.DS.GetPow(r.Context(), leaseAuth, 3)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get PoW.")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"pow_header": powHeader,
})
}
func isVercelStreamPrepareRequest(r *http.Request) bool {
if r == nil {
return false
@@ -164,6 +202,13 @@ func isVercelStreamReleaseRequest(r *http.Request) bool {
return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
}
func isVercelStreamPowRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1"
}
func vercelInternalSecret() string {
if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" {
return v
@@ -199,6 +244,20 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth) string {
return leaseID
}
func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return nil
}
h.leaseMu.Lock()
lease, ok := h.streamLeases[leaseID]
h.leaseMu.Unlock()
if !ok || time.Now().After(lease.ExpiresAt) {
return nil
}
return lease.Auth
}
func (h *Handler) releaseStreamLease(leaseID string) bool {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {

View File

@@ -24,6 +24,7 @@ type responsesNonStreamResult struct {
contentFilter bool
parsed toolcall.ToolCallParseResult
body map[string]any
responseMessageID int
}
func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
@@ -50,8 +51,13 @@ func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx c
}
attempts++
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts)
nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID)
retryPow, powErr := h.DS.GetPow(ctx, a, 3)
if powErr != nil {
config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadForEmptyOutputRetry(payload, result.responseMessageID), retryPow, 3)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.")
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", err)
@@ -86,6 +92,7 @@ func (h *Handler) collectResponsesNonStreamAttempt(w http.ResponseWriter, resp *
contentFilter: result.ContentFilter,
parsed: textParsed,
body: responseObj,
responseMessageID: result.ResponseMessageID,
}, true
}
@@ -135,8 +142,13 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.
return
}
attempts++
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts)
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID)
retryPow, powErr := h.DS.GetPow(r.Context(), a, 3)
if powErr != nil {
config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3)
if err != nil {
streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error")
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err)

View File

@@ -121,8 +121,8 @@ func emptyOutputRetryMaxAttempts() int {
return shared.EmptyOutputRetryMaxAttempts()
}
func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload)
func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any {
return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID)
}
func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string {

View File

@@ -39,6 +39,7 @@ type responsesStreamRuntime struct {
toolDetectionThinking strings.Builder
text strings.Builder
visibleText strings.Builder
responseMessageID int
streamToolCallIDs map[int]string
functionItemIDs map[int]string
functionOutputIDs map[int]int
@@ -205,6 +206,9 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.ResponseMessageID > 0 {
s.responseMessageID = parsed.ResponseMessageID
}
if parsed.ContentFilter || parsed.ErrorMessage != "" {
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")}
}

View File

@@ -13,12 +13,23 @@ func EmptyOutputRetryMaxAttempts() int {
}
func ClonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
return ClonePayloadForEmptyOutputRetry(payload, 0)
}
// ClonePayloadForEmptyOutputRetry creates a retry payload with the suffix
// appended and, if parentMessageID > 0, sets parent_message_id so the
// retry is submitted as a proper follow-up turn in the same DeepSeek
// session rather than a disconnected root message.
func ClonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any {
clone := make(map[string]any, len(payload))
for k, v := range payload {
clone[k] = v
}
original, _ := payload["prompt"].(string)
clone["prompt"] = AppendEmptyOutputRetrySuffix(original)
if parentMessageID > 0 {
clone["parent_message_id"] = parentMessageID
}
return clone
}

View File

@@ -285,7 +285,7 @@ func TestChatCompletionsStreamEmitsFailureFrameWhenUpstreamOutputEmpty(t *testin
func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) {
ds := &streamStatusDSSeqStub{resps: []*http.Response{
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"response_message_id":42,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
}}
h := &openAITestSurface{
@@ -313,6 +313,10 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) {
if !strings.Contains(retryPrompt, "Previous reply had no visible output. Please regenerate the visible final answer or tool call now.") {
t.Fatalf("expected retry suffix in prompt, got %q", retryPrompt)
}
// Verify multi-turn chaining: retry must set parent_message_id from first call's response_message_id.
if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 42 {
t.Fatalf("expected retry parent_message_id=42, got %#v", ds.payloads[1]["parent_message_id"])
}
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
@@ -341,7 +345,7 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) {
func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
ds := &streamStatusDSSeqStub{resps: []*http.Response{
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"response_message_id":99,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
}}
h := &openAITestSurface{
@@ -362,6 +366,10 @@ func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
if len(ds.payloads) != 2 {
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
}
// Verify multi-turn chaining.
if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 99 {
t.Fatalf("expected retry parent_message_id=99, got %#v", ds.payloads[1]["parent_message_id"])
}
var out map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String())
@@ -452,7 +460,7 @@ func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) {
func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) {
ds := &streamStatusDSSeqStub{resps: []*http.Response{
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"response_message_id":77,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
}}
h := &openAITestSurface{
@@ -473,6 +481,10 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) {
if len(ds.payloads) != 2 {
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
}
// Verify multi-turn chaining.
if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 77 {
t.Fatalf("expected retry parent_message_id=77, got %#v", ds.payloads[1]["parent_message_id"])
}
body := rec.Body.String()
if strings.Contains(body, "response.failed") {
t.Fatalf("did not expect premature response.failed, body=%s", body)
@@ -487,7 +499,7 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) {
func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
ds := &streamStatusDSSeqStub{resps: []*http.Response{
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"response_message_id":88,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
}}
h := &openAITestSurface{
@@ -508,6 +520,10 @@ func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
if len(ds.payloads) != 2 {
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
}
// Verify multi-turn chaining.
if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 88 {
t.Fatalf("expected retry parent_message_id=88, got %#v", ds.payloads[1]["parent_message_id"])
}
var out map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String())

View File

@@ -58,6 +58,33 @@ async function fetchStreamPrepare(req, rawBody) {
};
}
async function fetchStreamPow(req, leaseID) {
const url = buildInternalGoURL(req);
url.searchParams.set('__stream_pow', '1');
const upstream = await fetch(url.toString(), {
method: 'POST',
headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }),
body: Buffer.from(JSON.stringify({ lease_id: leaseID })),
});
const text = await upstream.text();
let body = {};
try {
body = JSON.parse(text || '{}');
} catch (_err) {
body = {};
}
return {
ok: upstream.ok,
status: upstream.status,
contentType: upstream.headers.get('content-type') || 'application/json',
text,
body,
};
}
function relayPreparedFailure(res, prep) {
if (prep.status === 401 && looksLikeVercelAuthPage(prep.text)) {
writeOpenAIError(
@@ -195,6 +222,7 @@ module.exports = {
header,
readRawBody,
fetchStreamPrepare,
fetchStreamPow,
relayPreparedFailure,
safeReadText,
buildInternalGoURL,

View File

@@ -25,6 +25,7 @@ const {
asString,
isAbortError,
fetchStreamPrepare,
fetchStreamPow,
relayPreparedFailure,
createLeaseReleaser,
} = require('./http_internal');
@@ -49,7 +50,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`;
const leaseID = asString(prep.body.lease_id);
const deepseekToken = asString(prep.body.deepseek_token);
const powHeader = asString(prep.body.pow_header);
const initialPowHeader = asString(prep.body.pow_header);
const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null;
const finalPrompt = asString(prep.body.final_prompt);
const thinkingEnabled = toBool(prep.body.thinking_enabled);
@@ -59,7 +60,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas;
const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers);
if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) {
if (!model || !leaseID || !deepseekToken || !initialPowHeader || !completionPayload) {
writeOpenAIError(res, 500, 'invalid vercel prepare response');
return;
}
@@ -88,7 +89,32 @@ async function handleVercelStream(req, res, rawBody, payload) {
res.on('close', onResClose);
try {
const fetchDeepSeekStream = async (url, bodyPayload) => {
let currentPowHeader = initialPowHeader;
const refreshPowHeader = async (roundType) => {
try {
const pow = await fetchStreamPow(req, leaseID);
const nextPowHeader = asString(pow.body && pow.body.pow_header);
if (pow.ok && nextPowHeader) {
currentPowHeader = nextPowHeader;
return currentPowHeader;
}
console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', {
round_type: roundType,
status: pow.status || 0,
});
} catch (err) {
if (clientClosed || isAbortError(err)) {
return '';
}
console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', {
round_type: roundType,
error: err,
});
}
return currentPowHeader;
};
const fetchDeepSeekStream = async (url, bodyPayload, powHeader) => {
try {
return await fetch(url, {
method: 'POST',
@@ -107,12 +133,18 @@ async function handleVercelStream(req, res, rawBody, payload) {
throw err;
}
};
const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload);
const fetchContinue = (messageID) => fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, {
chat_session_id: sessionID,
message_id: messageID,
fallback_to_resume: true,
});
const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload, currentPowHeader);
const fetchContinue = async (messageID) => {
const powHeader = await refreshPowHeader('continue');
if (!powHeader) {
return null;
}
return fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, {
chat_session_id: sessionID,
message_id: messageID,
fallback_to_resume: true,
}, powHeader);
};
let completionRes = await fetchCompletion(completionPayload);
if (completionRes === null) {
@@ -371,7 +403,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
}
const terminal = await finish('stop', { deferEmpty: allowDeferEmpty });
return { terminal, retryable: !terminal && allowDeferEmpty };
return { terminal, retryable: !terminal && allowDeferEmpty, responseMessageID: continueState.responseMessageID };
};
let retryAttempts = 0;
@@ -390,9 +422,18 @@ async function handleVercelStream(req, res, rawBody, payload) {
surface: 'chat.completions',
stream: true,
retry_attempt: retryAttempts,
parent_message_id: processed.responseMessageID || 0,
});
usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, retryAttempts);
completionRes = await fetchCompletion(clonePayloadWithEmptyOutputRetryPrompt(completionPayload));
const retryPowHeader = await refreshPowHeader('retry');
if (!retryPowHeader) {
return;
}
completionRes = await fetchDeepSeekStream(
DEEPSEEK_COMPLETION_URL,
clonePayloadForEmptyOutputRetry(completionPayload, processed.responseMessageID),
retryPowHeader,
);
if (completionRes === null) {
return;
}
@@ -412,11 +453,15 @@ function toBool(v) {
return v === true;
}
function clonePayloadWithEmptyOutputRetryPrompt(payload) {
return {
function clonePayloadForEmptyOutputRetry(payload, parentMessageID) {
const clone = {
...(payload || {}),
prompt: appendEmptyOutputRetrySuffix(asString(payload && payload.prompt)),
};
if (parentMessageID && parentMessageID > 0) {
clone.parent_message_id = parentMessageID;
}
return clone;
}
function appendEmptyOutputRetrySuffix(prompt) {

View File

@@ -5,6 +5,7 @@ import (
"strings"
dsprotocol "ds2api/internal/deepseek/protocol"
"ds2api/internal/util"
)
// CollectResult holds the aggregated text and thinking content from a
@@ -15,6 +16,7 @@ type CollectResult struct {
ToolDetectionThinking string
ContentFilter bool
CitationLinks map[int]string
ResponseMessageID int
}
// CollectStream fully consumes a DeepSeek SSE response and separates
@@ -33,6 +35,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
contentFilter := false
stopped := false
collector := newCitationLinkCollector()
responseMessageID := 0
currentType := "text"
if thinkingEnabled {
currentType = "thinking"
@@ -41,6 +44,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
chunk, done, parsed := ParseDeepSeekSSELine(line)
if parsed && !done {
collector.ingestChunk(chunk)
observeResponseMessageID(chunk, &responseMessageID)
}
if done {
return false
@@ -84,5 +88,32 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
ToolDetectionThinking: toolDetectionThinking.String(),
ContentFilter: contentFilter,
CitationLinks: collector.build(),
ResponseMessageID: responseMessageID,
}
}
// observeResponseMessageID extracts the response_message_id from a parsed SSE
// chunk. It mirrors the extraction logic in client_continue.go's observe
// method, checking top-level response_message_id, v.response.message_id, and
// message.response.message_id.
func observeResponseMessageID(chunk map[string]any, out *int) {
if chunk == nil || out == nil {
return
}
if id := util.IntFrom(chunk["response_message_id"]); id > 0 {
*out = id
}
v, _ := chunk["v"].(map[string]any)
if response, _ := v["response"].(map[string]any); response != nil {
if id := util.IntFrom(response["message_id"]); id > 0 {
*out = id
}
}
if message, _ := chunk["message"].(map[string]any); message != nil {
if response, _ := message["response"].(map[string]any); response != nil {
if id := util.IntFrom(response["message_id"]); id > 0 {
*out = id
}
}
}
}

View File

@@ -1,6 +1,8 @@
package sse
import "fmt"
import (
"fmt"
)
// LineResult is the normalized parse result for one DeepSeek SSE line.
type LineResult struct {
@@ -11,6 +13,7 @@ type LineResult struct {
Parts []ContentPart
ToolDetectionThinkingParts []ContentPart
NextType string
ResponseMessageID int
}
// ParseDeepSeekContentLine centralizes one-line DeepSeek SSE parsing for both
@@ -50,11 +53,14 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri
parts, detectionThinkingParts, finished, nextType := ParseSSEChunkForContentDetailed(chunk, thinkingEnabled, currentType)
parts = filterLeakedContentFilterParts(parts)
detectionThinkingParts = filterLeakedContentFilterParts(detectionThinkingParts)
var respMsgID int
observeResponseMessageID(chunk, &respMsgID)
return LineResult{
Parsed: true,
Stop: finished,
Parts: parts,
ToolDetectionThinkingParts: detectionThinkingParts,
NextType: nextType,
ResponseMessageID: respMsgID,
}
}

View File

@@ -153,6 +153,9 @@ async function runMockVercelStreamSequence(upstreamSequences, prepareOverrides =
if (textURL.includes('__stream_prepare=1')) {
return jsonResponse(prepareBody);
}
if (textURL.includes('__stream_pow=1')) {
return jsonResponse({ pow_header: 'pow-header-refreshed' });
}
if (textURL.includes('__stream_release=1')) {
return jsonResponse({ success: true });
}
@@ -199,6 +202,7 @@ test('vercel stream retries empty output once and keeps one terminal frame', asy
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
const completionBodies = fetchBodies.filter((body) => Object.hasOwn(body, 'prompt'));
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 2);
assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1);
assert.equal(frames.filter((frame) => frame === '[DONE]').length, 1);
assert.equal(parsed[0].choices[0].delta.content, 'visible');
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
@@ -217,11 +221,67 @@ test('vercel stream exhausts DeepSeek continue before synthetic retry', async ()
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 1);
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1);
assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1);
assert.equal(parsed[0].choices[0].delta.content, 'continued');
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false);
});
test('vercel stream reuses prior PoW when refresh fails', async () => {
const originalFetch = global.fetch;
const fetchURLs = [];
const completionPowHeaders = [];
let completionCalls = 0;
global.fetch = async (url, init = {}) => {
const textURL = String(url);
fetchURLs.push(textURL);
if (textURL.includes('__stream_prepare=1')) {
return jsonResponse({
session_id: 'chatcmpl-test',
lease_id: 'lease-test',
model: 'gpt-test',
final_prompt: 'hello',
thinking_enabled: false,
search_enabled: false,
compat: { strip_reference_markers: true },
tool_names: [],
deepseek_token: 'deepseek-token',
pow_header: 'pow-header-initial',
payload: { prompt: 'hello' },
});
}
if (textURL.includes('__stream_pow=1')) {
return jsonResponse({}, 500);
}
if (textURL.includes('__stream_release=1')) {
return jsonResponse({ success: true });
}
if (textURL === 'https://chat.deepseek.com/api/v0/chat/completion') {
completionPowHeaders.push(init.headers['x-ds-pow-response']);
completionCalls += 1;
if (completionCalls === 1) {
return sseResponse(['data: [DONE]\n\n']);
}
return sseResponse(['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n']);
}
throw new Error(`unexpected fetch url: ${textURL}`);
};
try {
const req = new MockStreamRequest();
const res = new MockStreamResponse();
const payload = { model: 'gpt-test', stream: true };
await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload);
const frames = parseSSEDataFrames(res.bodyText());
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
assert.deepEqual(completionPowHeaders, ['pow-header-initial', 'pow-header-initial']);
assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1);
assert.equal(parsed[0].choices[0].delta.content, 'visible');
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
} finally {
global.fetch = originalFetch;
}
});
test('vercel stream emits content_filter failure when upstream filters empty output', async () => {
const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']);
assert.equal(frames.length, 2);