diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 2c5b7b3..6039e34 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -103,7 +103,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools` - OpenAI Chat / Responses 原生走统一 OpenAI 标准化与 DeepSeek payload 组装;Claude / Gemini 会尽量复用 OpenAI prompt/tool 语义,其中 Gemini 直接复用 `promptcompat.BuildOpenAIPromptForAdapter`,Claude 消息接口在可代理场景会转换为 OpenAI chat 形态再执行。 - 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。Claude surface 在流式请求且未显式声明 `thinking` 时,仍按 Anthropic 语义默认关闭;但在非流式代理场景,兼容层会内部开启一次下游 thinking,用于捕获“正文为空、工具调用落在 thinking 里”的情况,随后在回包前剥离用户不可见的 thinking block。 - 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;thinking / reasoning 增量仍按原样先发,只有在结束收尾时才可能补发最终工具调用结果。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。 -- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token、PoW 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。 +- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW。 ## 5. prompt 是怎么拼出来的 diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index d96c3d3..e83a488 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -38,6 +38,7 @@ type chatStreamRuntime struct { thinking strings.Builder toolDetectionThinking strings.Builder text strings.Builder + responseMessageID int finalThinking string finalText string @@ -234,6 +235,9 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD if !parsed.Parsed { return streamengine.ParsedDecision{} } + if parsed.ResponseMessageID > 0 { + s.responseMessageID = parsed.ResponseMessageID + } if parsed.ContentFilter { if strings.TrimSpace(s.text.String()) == "" { return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")} diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 14fe2b0..28e67e9 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -23,6 +23,7 @@ type chatNonStreamResult struct { detectedCalls int body map[string]any finishReason string + responseMessageID int } func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) { @@ -50,9 +51,14 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co } attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts) - retryPayload := clonePayloadWithEmptyOutputRetryPrompt(payload) - nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, pow, 3) + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID) + retryPow, powErr := h.DS.GetPow(ctx, a, 3) + if powErr != nil { + config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", powErr) + retryPow = pow + } + retryPayload := clonePayloadForEmptyOutputRetry(payload, result.responseMessageID) + nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3) if err != nil { if historySession != nil { historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text) @@ -91,6 +97,7 @@ func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http. detectedCalls: len(detected.Calls), body: respBody, finishReason: chatFinishReason(respBody), + responseMessageID: result.ResponseMessageID, }, true } @@ -152,8 +159,13 @@ func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, return } attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts) - nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) + retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) + if powErr != nil { + config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", powErr) + retryPow = pow + } + nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) if err != nil { failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error") config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err) diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index 6283731..da6c2ab 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -131,8 +131,8 @@ func emptyOutputRetryMaxAttempts() int { return shared.EmptyOutputRetryMaxAttempts() } -func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { - return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload) +func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any { + return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID) } func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index a5672b2..4ee77dc 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -22,6 +22,10 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { h.handleVercelStreamRelease(w, r) return } + if isVercelStreamPowRequest(r) { + h.handleVercelStreamPow(w, r) + return + } if isVercelStreamPrepareRequest(r) { h.handleVercelStreamPrepare(w, r) return diff --git a/internal/httpapi/openai/chat/vercel_stream.go b/internal/httpapi/openai/chat/vercel_stream.go index 1a3c00d..2a59410 100644 --- a/internal/httpapi/openai/chat/vercel_stream.go +++ b/internal/httpapi/openai/chat/vercel_stream.go @@ -150,6 +150,44 @@ func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Reque writeJSON(w, http.StatusOK, map[string]any{"success": true}) } +func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + leaseID, _ := req["lease_id"].(string) + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") + return + } + leaseAuth := h.lookupStreamLeaseAuth(leaseID) + if leaseAuth == nil { + writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired") + return + } + powHeader, err := h.DS.GetPow(r.Context(), leaseAuth, 3) + if err != nil { + writeOpenAIError(w, http.StatusInternalServerError, "Failed to get PoW.") + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "pow_header": powHeader, + }) +} + func isVercelStreamPrepareRequest(r *http.Request) bool { if r == nil { return false @@ -164,6 +202,13 @@ func isVercelStreamReleaseRequest(r *http.Request) bool { return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" } +func isVercelStreamPowRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1" +} + func vercelInternalSecret() string { if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { return v @@ -199,6 +244,20 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { return leaseID } +func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + return nil + } + h.leaseMu.Lock() + lease, ok := h.streamLeases[leaseID] + h.leaseMu.Unlock() + if !ok || time.Now().After(lease.ExpiresAt) { + return nil + } + return lease.Auth +} + func (h *Handler) releaseStreamLease(leaseID string) bool { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index 995b6b3..adad24a 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -24,6 +24,7 @@ type responsesNonStreamResult struct { contentFilter bool parsed toolcall.ToolCallParseResult body map[string]any + responseMessageID int } func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) { @@ -50,8 +51,13 @@ func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx c } attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts) - nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID) + retryPow, powErr := h.DS.GetPow(ctx, a, 3) + if powErr != nil { + config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", powErr) + retryPow = pow + } + nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadForEmptyOutputRetry(payload, result.responseMessageID), retryPow, 3) if err != nil { writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", err) @@ -86,6 +92,7 @@ func (h *Handler) collectResponsesNonStreamAttempt(w http.ResponseWriter, resp * contentFilter: result.ContentFilter, parsed: textParsed, body: responseObj, + responseMessageID: result.ResponseMessageID, }, true } @@ -135,8 +142,13 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http. return } attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts) - nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) + retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) + if powErr != nil { + config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", powErr) + retryPow = pow + } + nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) if err != nil { streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error") config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err) diff --git a/internal/httpapi/openai/responses/handler.go b/internal/httpapi/openai/responses/handler.go index a73e655..fc00da4 100644 --- a/internal/httpapi/openai/responses/handler.go +++ b/internal/httpapi/openai/responses/handler.go @@ -121,8 +121,8 @@ func emptyOutputRetryMaxAttempts() int { return shared.EmptyOutputRetryMaxAttempts() } -func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { - return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload) +func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any { + return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID) } func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index 0d4e75b..984593d 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -39,6 +39,7 @@ type responsesStreamRuntime struct { toolDetectionThinking strings.Builder text strings.Builder visibleText strings.Builder + responseMessageID int streamToolCallIDs map[int]string functionItemIDs map[int]string functionOutputIDs map[int]int @@ -205,6 +206,9 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa if !parsed.Parsed { return streamengine.ParsedDecision{} } + if parsed.ResponseMessageID > 0 { + s.responseMessageID = parsed.ResponseMessageID + } if parsed.ContentFilter || parsed.ErrorMessage != "" { return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")} } diff --git a/internal/httpapi/openai/shared/empty_retry.go b/internal/httpapi/openai/shared/empty_retry.go index 7d70b77..a84e93e 100644 --- a/internal/httpapi/openai/shared/empty_retry.go +++ b/internal/httpapi/openai/shared/empty_retry.go @@ -13,12 +13,23 @@ func EmptyOutputRetryMaxAttempts() int { } func ClonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { + return ClonePayloadForEmptyOutputRetry(payload, 0) +} + +// ClonePayloadForEmptyOutputRetry creates a retry payload with the suffix +// appended and, if parentMessageID > 0, sets parent_message_id so the +// retry is submitted as a proper follow-up turn in the same DeepSeek +// session rather than a disconnected root message. +func ClonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any { clone := make(map[string]any, len(payload)) for k, v := range payload { clone[k] = v } original, _ := payload["prompt"].(string) clone["prompt"] = AppendEmptyOutputRetrySuffix(original) + if parentMessageID > 0 { + clone["parent_message_id"] = parentMessageID + } return clone } diff --git a/internal/httpapi/openai/stream_status_test.go b/internal/httpapi/openai/stream_status_test.go index d65afc4..f34c11f 100644 --- a/internal/httpapi/openai/stream_status_test.go +++ b/internal/httpapi/openai/stream_status_test.go @@ -285,7 +285,7 @@ func TestChatCompletionsStreamEmitsFailureFrameWhenUpstreamOutputEmpty(t *testin func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":42,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -313,6 +313,10 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { if !strings.Contains(retryPrompt, "Previous reply had no visible output. Please regenerate the visible final answer or tool call now.") { t.Fatalf("expected retry suffix in prompt, got %q", retryPrompt) } + // Verify multi-turn chaining: retry must set parent_message_id from first call's response_message_id. + if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 42 { + t.Fatalf("expected retry parent_message_id=42, got %#v", ds.payloads[1]["parent_message_id"]) + } frames, done := parseSSEDataFrames(t, rec.Body.String()) if !done { @@ -341,7 +345,7 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":99,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -362,6 +366,10 @@ func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { if len(ds.payloads) != 2 { t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) } + // Verify multi-turn chaining. + if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 99 { + t.Fatalf("expected retry parent_message_id=99, got %#v", ds.payloads[1]["parent_message_id"]) + } var out map[string]any if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil { t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String()) @@ -452,7 +460,7 @@ func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) { func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":77,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -473,6 +481,10 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { if len(ds.payloads) != 2 { t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) } + // Verify multi-turn chaining. + if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 77 { + t.Fatalf("expected retry parent_message_id=77, got %#v", ds.payloads[1]["parent_message_id"]) + } body := rec.Body.String() if strings.Contains(body, "response.failed") { t.Fatalf("did not expect premature response.failed, body=%s", body) @@ -487,7 +499,7 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":88,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -508,6 +520,10 @@ func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { if len(ds.payloads) != 2 { t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) } + // Verify multi-turn chaining. + if parentID, ok := ds.payloads[1]["parent_message_id"].(int); !ok || parentID != 88 { + t.Fatalf("expected retry parent_message_id=88, got %#v", ds.payloads[1]["parent_message_id"]) + } var out map[string]any if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil { t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String()) diff --git a/internal/js/chat-stream/http_internal.js b/internal/js/chat-stream/http_internal.js index 01caa8d..247e38c 100644 --- a/internal/js/chat-stream/http_internal.js +++ b/internal/js/chat-stream/http_internal.js @@ -58,6 +58,33 @@ async function fetchStreamPrepare(req, rawBody) { }; } +async function fetchStreamPow(req, leaseID) { + const url = buildInternalGoURL(req); + url.searchParams.set('__stream_pow', '1'); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), + body: Buffer.from(JSON.stringify({ lease_id: leaseID })), + }); + + const text = await upstream.text(); + let body = {}; + try { + body = JSON.parse(text || '{}'); + } catch (_err) { + body = {}; + } + + return { + ok: upstream.ok, + status: upstream.status, + contentType: upstream.headers.get('content-type') || 'application/json', + text, + body, + }; +} + function relayPreparedFailure(res, prep) { if (prep.status === 401 && looksLikeVercelAuthPage(prep.text)) { writeOpenAIError( @@ -195,6 +222,7 @@ module.exports = { header, readRawBody, fetchStreamPrepare, + fetchStreamPow, relayPreparedFailure, safeReadText, buildInternalGoURL, diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index 33476db..dfd6aad 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -25,6 +25,7 @@ const { asString, isAbortError, fetchStreamPrepare, + fetchStreamPow, relayPreparedFailure, createLeaseReleaser, } = require('./http_internal'); @@ -49,7 +50,7 @@ async function handleVercelStream(req, res, rawBody, payload) { const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; const leaseID = asString(prep.body.lease_id); const deepseekToken = asString(prep.body.deepseek_token); - const powHeader = asString(prep.body.pow_header); + const initialPowHeader = asString(prep.body.pow_header); const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; const finalPrompt = asString(prep.body.final_prompt); const thinkingEnabled = toBool(prep.body.thinking_enabled); @@ -59,7 +60,7 @@ async function handleVercelStream(req, res, rawBody, payload) { const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas; const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers); - if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) { + if (!model || !leaseID || !deepseekToken || !initialPowHeader || !completionPayload) { writeOpenAIError(res, 500, 'invalid vercel prepare response'); return; } @@ -88,7 +89,32 @@ async function handleVercelStream(req, res, rawBody, payload) { res.on('close', onResClose); try { - const fetchDeepSeekStream = async (url, bodyPayload) => { + let currentPowHeader = initialPowHeader; + const refreshPowHeader = async (roundType) => { + try { + const pow = await fetchStreamPow(req, leaseID); + const nextPowHeader = asString(pow.body && pow.body.pow_header); + if (pow.ok && nextPowHeader) { + currentPowHeader = nextPowHeader; + return currentPowHeader; + } + console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', { + round_type: roundType, + status: pow.status || 0, + }); + } catch (err) { + if (clientClosed || isAbortError(err)) { + return ''; + } + console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', { + round_type: roundType, + error: err, + }); + } + return currentPowHeader; + }; + + const fetchDeepSeekStream = async (url, bodyPayload, powHeader) => { try { return await fetch(url, { method: 'POST', @@ -107,12 +133,18 @@ async function handleVercelStream(req, res, rawBody, payload) { throw err; } }; - const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload); - const fetchContinue = (messageID) => fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, { - chat_session_id: sessionID, - message_id: messageID, - fallback_to_resume: true, - }); + const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload, currentPowHeader); + const fetchContinue = async (messageID) => { + const powHeader = await refreshPowHeader('continue'); + if (!powHeader) { + return null; + } + return fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, { + chat_session_id: sessionID, + message_id: messageID, + fallback_to_resume: true, + }, powHeader); + }; let completionRes = await fetchCompletion(completionPayload); if (completionRes === null) { @@ -371,7 +403,7 @@ async function handleVercelStream(req, res, rawBody, payload) { } const terminal = await finish('stop', { deferEmpty: allowDeferEmpty }); - return { terminal, retryable: !terminal && allowDeferEmpty }; + return { terminal, retryable: !terminal && allowDeferEmpty, responseMessageID: continueState.responseMessageID }; }; let retryAttempts = 0; @@ -390,9 +422,18 @@ async function handleVercelStream(req, res, rawBody, payload) { surface: 'chat.completions', stream: true, retry_attempt: retryAttempts, + parent_message_id: processed.responseMessageID || 0, }); usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, retryAttempts); - completionRes = await fetchCompletion(clonePayloadWithEmptyOutputRetryPrompt(completionPayload)); + const retryPowHeader = await refreshPowHeader('retry'); + if (!retryPowHeader) { + return; + } + completionRes = await fetchDeepSeekStream( + DEEPSEEK_COMPLETION_URL, + clonePayloadForEmptyOutputRetry(completionPayload, processed.responseMessageID), + retryPowHeader, + ); if (completionRes === null) { return; } @@ -412,11 +453,15 @@ function toBool(v) { return v === true; } -function clonePayloadWithEmptyOutputRetryPrompt(payload) { - return { +function clonePayloadForEmptyOutputRetry(payload, parentMessageID) { + const clone = { ...(payload || {}), prompt: appendEmptyOutputRetrySuffix(asString(payload && payload.prompt)), }; + if (parentMessageID && parentMessageID > 0) { + clone.parent_message_id = parentMessageID; + } + return clone; } function appendEmptyOutputRetrySuffix(prompt) { diff --git a/internal/sse/consumer.go b/internal/sse/consumer.go index d81d818..db42bf5 100644 --- a/internal/sse/consumer.go +++ b/internal/sse/consumer.go @@ -5,6 +5,7 @@ import ( "strings" dsprotocol "ds2api/internal/deepseek/protocol" + "ds2api/internal/util" ) // CollectResult holds the aggregated text and thinking content from a @@ -15,6 +16,7 @@ type CollectResult struct { ToolDetectionThinking string ContentFilter bool CitationLinks map[int]string + ResponseMessageID int } // CollectStream fully consumes a DeepSeek SSE response and separates @@ -33,6 +35,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co contentFilter := false stopped := false collector := newCitationLinkCollector() + responseMessageID := 0 currentType := "text" if thinkingEnabled { currentType = "thinking" @@ -41,6 +44,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co chunk, done, parsed := ParseDeepSeekSSELine(line) if parsed && !done { collector.ingestChunk(chunk) + observeResponseMessageID(chunk, &responseMessageID) } if done { return false @@ -84,5 +88,32 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co ToolDetectionThinking: toolDetectionThinking.String(), ContentFilter: contentFilter, CitationLinks: collector.build(), + ResponseMessageID: responseMessageID, + } +} + +// observeResponseMessageID extracts the response_message_id from a parsed SSE +// chunk. It mirrors the extraction logic in client_continue.go's observe +// method, checking top-level response_message_id, v.response.message_id, and +// message.response.message_id. +func observeResponseMessageID(chunk map[string]any, out *int) { + if chunk == nil || out == nil { + return + } + if id := util.IntFrom(chunk["response_message_id"]); id > 0 { + *out = id + } + v, _ := chunk["v"].(map[string]any) + if response, _ := v["response"].(map[string]any); response != nil { + if id := util.IntFrom(response["message_id"]); id > 0 { + *out = id + } + } + if message, _ := chunk["message"].(map[string]any); message != nil { + if response, _ := message["response"].(map[string]any); response != nil { + if id := util.IntFrom(response["message_id"]); id > 0 { + *out = id + } + } } } diff --git a/internal/sse/line.go b/internal/sse/line.go index 311a91f..a52a9ab 100644 --- a/internal/sse/line.go +++ b/internal/sse/line.go @@ -1,6 +1,8 @@ package sse -import "fmt" +import ( + "fmt" +) // LineResult is the normalized parse result for one DeepSeek SSE line. type LineResult struct { @@ -11,6 +13,7 @@ type LineResult struct { Parts []ContentPart ToolDetectionThinkingParts []ContentPart NextType string + ResponseMessageID int } // ParseDeepSeekContentLine centralizes one-line DeepSeek SSE parsing for both @@ -50,11 +53,14 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri parts, detectionThinkingParts, finished, nextType := ParseSSEChunkForContentDetailed(chunk, thinkingEnabled, currentType) parts = filterLeakedContentFilterParts(parts) detectionThinkingParts = filterLeakedContentFilterParts(detectionThinkingParts) + var respMsgID int + observeResponseMessageID(chunk, &respMsgID) return LineResult{ Parsed: true, Stop: finished, Parts: parts, ToolDetectionThinkingParts: detectionThinkingParts, NextType: nextType, + ResponseMessageID: respMsgID, } } diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index f20f52e..dbfbe3e 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -153,6 +153,9 @@ async function runMockVercelStreamSequence(upstreamSequences, prepareOverrides = if (textURL.includes('__stream_prepare=1')) { return jsonResponse(prepareBody); } + if (textURL.includes('__stream_pow=1')) { + return jsonResponse({ pow_header: 'pow-header-refreshed' }); + } if (textURL.includes('__stream_release=1')) { return jsonResponse({ success: true }); } @@ -199,6 +202,7 @@ test('vercel stream retries empty output once and keeps one terminal frame', asy const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); const completionBodies = fetchBodies.filter((body) => Object.hasOwn(body, 'prompt')); assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 2); + assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1); assert.equal(frames.filter((frame) => frame === '[DONE]').length, 1); assert.equal(parsed[0].choices[0].delta.content, 'visible'); assert.equal(parsed[1].choices[0].finish_reason, 'stop'); @@ -217,11 +221,67 @@ test('vercel stream exhausts DeepSeek continue before synthetic retry', async () const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 1); assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1); + assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1); assert.equal(parsed[0].choices[0].delta.content, 'continued'); assert.equal(parsed[1].choices[0].finish_reason, 'stop'); assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false); }); +test('vercel stream reuses prior PoW when refresh fails', async () => { + const originalFetch = global.fetch; + const fetchURLs = []; + const completionPowHeaders = []; + let completionCalls = 0; + global.fetch = async (url, init = {}) => { + const textURL = String(url); + fetchURLs.push(textURL); + if (textURL.includes('__stream_prepare=1')) { + return jsonResponse({ + session_id: 'chatcmpl-test', + lease_id: 'lease-test', + model: 'gpt-test', + final_prompt: 'hello', + thinking_enabled: false, + search_enabled: false, + compat: { strip_reference_markers: true }, + tool_names: [], + deepseek_token: 'deepseek-token', + pow_header: 'pow-header-initial', + payload: { prompt: 'hello' }, + }); + } + if (textURL.includes('__stream_pow=1')) { + return jsonResponse({}, 500); + } + if (textURL.includes('__stream_release=1')) { + return jsonResponse({ success: true }); + } + if (textURL === 'https://chat.deepseek.com/api/v0/chat/completion') { + completionPowHeaders.push(init.headers['x-ds-pow-response']); + completionCalls += 1; + if (completionCalls === 1) { + return sseResponse(['data: [DONE]\n\n']); + } + return sseResponse(['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n']); + } + throw new Error(`unexpected fetch url: ${textURL}`); + }; + try { + const req = new MockStreamRequest(); + const res = new MockStreamResponse(); + const payload = { model: 'gpt-test', stream: true }; + await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload); + const frames = parseSSEDataFrames(res.bodyText()); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + assert.deepEqual(completionPowHeaders, ['pow-header-initial', 'pow-header-initial']); + assert.equal(fetchURLs.filter((url) => url.includes('__stream_pow=1')).length, 1); + assert.equal(parsed[0].choices[0].delta.content, 'visible'); + assert.equal(parsed[1].choices[0].finish_reason, 'stop'); + } finally { + global.fetch = originalFetch; + } +}); + test('vercel stream emits content_filter failure when upstream filters empty output', async () => { const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']); assert.equal(frames.length, 2);