diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 19b6dde..2b7dd2d 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -101,6 +101,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools` - OpenAI Chat / Responses 原生走统一 OpenAI 标准化与 DeepSeek payload 组装;Claude / Gemini 会尽量复用 OpenAI prompt/tool 语义,其中 Gemini 直接复用 `promptcompat.BuildOpenAIPromptForAdapter`,Claude 消息接口在可代理场景会转换为 OpenAI chat 形态再执行。 - 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。Claude surface 在流式请求且未显式声明 `thinking` 时,仍按 Anthropic 语义默认关闭;但在非流式代理场景,兼容层会内部开启一次下游 thinking,用于捕获“正文为空、工具调用落在 thinking 里”的情况,随后在回包前剥离用户不可见的 thinking block。 - 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;thinking / reasoning 增量仍按原样先发,只有在结束收尾时才可能补发最终工具调用结果。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。 +- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token、PoW 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。 ## 5. prompt 是怎么拼出来的 diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index 4bfb5d3..d96c3d3 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -128,7 +128,10 @@ func (s *chatStreamRuntime) resetStreamToolCallState() { s.streamToolNames = map[int]string{} } -func (s *chatStreamRuntime) finalize(finishReason string) { +func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool { + s.finalErrorStatus = 0 + s.finalErrorMessage = "" + s.finalErrorCode = "" finalThinking := s.thinking.String() finalToolDetectionThinking := s.toolDetectionThinking.String() finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers) @@ -204,8 +207,14 @@ func (s *chatStreamRuntime) finalize(finishReason string) { } if len(detected.Calls) == 0 && !s.toolCallsEmitted && strings.TrimSpace(finalText) == "" { status, message, code := upstreamEmptyOutputDetail(finishReason == "content_filter", finalText, finalThinking) + if deferEmptyOutput { + s.finalErrorStatus = status + s.finalErrorMessage = message + s.finalErrorCode = code + return false + } s.sendFailedChunk(status, message, code) - return + return true } usage := openaifmt.BuildChatUsage(s.finalPrompt, finalThinking, finalText) s.finalFinishReason = finishReason @@ -218,6 +227,7 @@ func (s *chatStreamRuntime) finalize(finishReason string) { usage, )) s.sendDone() + return true } func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision { diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go new file mode 100644 index 0000000..14fe2b0 --- /dev/null +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -0,0 +1,271 @@ +package chat + +import ( + "context" + "io" + "net/http" + "strings" + "time" + + "ds2api/internal/auth" + "ds2api/internal/config" + dsprotocol "ds2api/internal/deepseek/protocol" + openaifmt "ds2api/internal/format/openai" + "ds2api/internal/sse" + streamengine "ds2api/internal/stream" +) + +type chatNonStreamResult struct { + thinking string + toolDetectionThinking string + text string + contentFilter bool + detectedCalls int + body map[string]any + finishReason string +} + +func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) { + attempts := 0 + currentResp := resp + usagePrompt := finalPrompt + accumulatedThinking := "" + accumulatedToolDetectionThinking := "" + for { + result, ok := h.collectChatNonStreamAttempt(w, currentResp, completionID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames) + if !ok { + return + } + accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking) + accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking) + result.thinking = accumulatedThinking + result.toolDetectionThinking = accumulatedToolDetectionThinking + detected := detectAssistantToolCalls(result.text, result.thinking, result.toolDetectionThinking, toolNames) + result.detectedCalls = len(detected.Calls) + result.body = openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, result.thinking, result.text, detected.Calls) + result.finishReason = chatFinishReason(result.body) + if !shouldRetryChatNonStream(result, attempts) { + h.finishChatNonStreamResult(w, result, attempts, usagePrompt, historySession) + return + } + + attempts++ + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts) + retryPayload := clonePayloadWithEmptyOutputRetryPrompt(payload) + nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, pow, 3) + if err != nil { + if historySession != nil { + historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text) + } + writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") + config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err) + return + } + usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) + currentResp = nextResp + } +} + +func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http.Response, completionID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) (chatNonStreamResult, bool) { + if resp.StatusCode != http.StatusOK { + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(resp.Body) + writeOpenAIError(w, resp.StatusCode, string(body)) + return chatNonStreamResult{}, false + } + result := sse.CollectStream(resp, thinkingEnabled, true) + stripReferenceMarkers := h.compatStripReferenceMarkers() + finalThinking := cleanVisibleOutput(result.Thinking, stripReferenceMarkers) + finalToolDetectionThinking := cleanVisibleOutput(result.ToolDetectionThinking, stripReferenceMarkers) + finalText := cleanVisibleOutput(result.Text, stripReferenceMarkers) + if searchEnabled { + finalText = replaceCitationMarkersWithLinks(finalText, result.CitationLinks) + } + detected := detectAssistantToolCalls(finalText, finalThinking, finalToolDetectionThinking, toolNames) + respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, finalThinking, finalText, detected.Calls) + return chatNonStreamResult{ + thinking: finalThinking, + toolDetectionThinking: finalToolDetectionThinking, + text: finalText, + contentFilter: result.ContentFilter, + detectedCalls: len(detected.Calls), + body: respBody, + finishReason: chatFinishReason(respBody), + }, true +} + +func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNonStreamResult, attempts int, usagePrompt string, historySession *chatHistorySession) { + if result.detectedCalls == 0 && shouldWriteUpstreamEmptyOutputError(result.text) { + status, message, code := upstreamEmptyOutputDetail(result.contentFilter, result.text, result.thinking) + if historySession != nil { + historySession.error(status, message, code, result.thinking, result.text) + } + writeUpstreamEmptyOutputError(w, result.text, result.thinking, result.contentFilter) + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter) + return + } + if historySession != nil { + historySession.success(http.StatusOK, result.thinking, result.text, result.finishReason, openaifmt.BuildChatUsage(usagePrompt, result.thinking, result.text)) + } + writeJSON(w, http.StatusOK, result.body) + source := "first_attempt" + if attempts > 0 { + source = "synthetic_retry" + } + config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", source) +} + +func chatFinishReason(respBody map[string]any) string { + if choices, ok := respBody["choices"].([]map[string]any); ok && len(choices) > 0 { + if fr, _ := choices[0]["finish_reason"].(string); strings.TrimSpace(fr) != "" { + return fr + } + } + return "stop" +} + +func shouldRetryChatNonStream(result chatNonStreamResult, attempts int) bool { + return emptyOutputRetryEnabled() && + attempts < emptyOutputRetryMaxAttempts() && + !result.contentFilter && + result.detectedCalls == 0 && + strings.TrimSpace(result.text) == "" +} + +func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) { + streamRuntime, initialType, ok := h.prepareChatStreamRuntime(w, resp, completionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames, historySession) + if !ok { + return + } + attempts := 0 + currentResp := resp + for { + terminalWritten, retryable := h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, attempts < emptyOutputRetryMaxAttempts()) + if terminalWritten { + logChatStreamTerminal(streamRuntime, attempts) + return + } + if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { + streamRuntime.finalize("stop", false) + recordChatStreamHistory(streamRuntime, historySession) + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none") + return + } + attempts++ + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts) + nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + if err != nil { + failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error") + config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err) + return + } + if nextResp.StatusCode != http.StatusOK { + defer func() { _ = nextResp.Body.Close() }() + body, _ := io.ReadAll(nextResp.Body) + failChatStreamRetry(streamRuntime, historySession, nextResp.StatusCode, string(body), "error") + return + } + streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) + currentResp = nextResp + } +} + +func (h *Handler) prepareChatStreamRuntime(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) (*chatStreamRuntime, string, bool) { + if resp.StatusCode != http.StatusOK { + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.error(resp.StatusCode, string(body), "error", "", "") + } + writeOpenAIError(w, resp.StatusCode, string(body)) + return nil, "", false + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache, no-transform") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + _, canFlush := w.(http.Flusher) + if !canFlush { + config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered") + } + initialType := "text" + if thinkingEnabled { + initialType = "thinking" + } + streamRuntime := newChatStreamRuntime( + w, rc, canFlush, completionID, time.Now().Unix(), model, finalPrompt, + thinkingEnabled, searchEnabled, h.compatStripReferenceMarkers(), toolNames, + len(toolNames) > 0, h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(), + ) + return streamRuntime, initialType, true +} + +func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *chatStreamRuntime, initialType string, thinkingEnabled bool, historySession *chatHistorySession, allowDeferEmpty bool) (bool, bool) { + defer func() { _ = resp.Body.Close() }() + finalReason := "stop" + streamengine.ConsumeSSE(streamengine.ConsumeConfig{ + Context: r.Context(), + Body: resp.Body, + ThinkingEnabled: thinkingEnabled, + InitialType: initialType, + KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, + IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, + MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, + }, streamengine.ConsumeHooks{ + OnKeepAlive: streamRuntime.sendKeepAlive, + OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision { + decision := streamRuntime.onParsed(parsed) + if historySession != nil { + historySession.progress(streamRuntime.thinking.String(), streamRuntime.text.String()) + } + return decision + }, + OnFinalize: func(reason streamengine.StopReason, _ error) { + if string(reason) == "content_filter" { + finalReason = "content_filter" + } + }, + OnContextDone: func() { + if historySession != nil { + historySession.stopped(streamRuntime.thinking.String(), streamRuntime.text.String(), string(streamengine.StopReasonContextCancelled)) + } + }, + }) + terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") + if terminalWritten { + recordChatStreamHistory(streamRuntime, historySession) + return true, false + } + return false, true +} + +func recordChatStreamHistory(streamRuntime *chatStreamRuntime, historySession *chatHistorySession) { + if historySession == nil { + return + } + if streamRuntime.finalErrorMessage != "" { + historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.thinking.String(), streamRuntime.text.String()) + return + } + historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage) +} + +func failChatStreamRetry(streamRuntime *chatStreamRuntime, historySession *chatHistorySession, status int, message, code string) { + streamRuntime.sendFailedChunk(status, message, code) + if historySession != nil { + historySession.error(status, message, code, streamRuntime.thinking.String(), streamRuntime.text.String()) + } +} + +func logChatStreamTerminal(streamRuntime *chatStreamRuntime, attempts int) { + source := "first_attempt" + if attempts > 0 { + source = "synthetic_retry" + } + if streamRuntime.finalErrorMessage != "" { + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) + return + } + config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", source) +} diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index a682cc6..6283731 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -123,6 +123,22 @@ func writeUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string, return shared.WriteUpstreamEmptyOutputError(w, text, thinking, contentFilter) } +func emptyOutputRetryEnabled() bool { + return shared.EmptyOutputRetryEnabled() +} + +func emptyOutputRetryMaxAttempts() int { + return shared.EmptyOutputRetryMaxAttempts() +} + +func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { + return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload) +} + +func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { + return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts) +} + func formatIncrementalStreamToolCallDeltas(deltas []toolstream.ToolCallDelta, ids map[int]string) []map[string]any { return shared.FormatIncrementalStreamToolCallDeltas(deltas, ids) } diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index da048fa..a5672b2 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -105,10 +105,10 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { return } if stdReq.Stream { - h.handleStream(w, r, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession) + h.handleStreamWithRetry(w, r, a, resp, payload, pow, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession) return } - h.handleNonStream(w, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession) + h.handleNonStreamWithRetry(w, r.Context(), a, resp, payload, pow, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession) } func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) { @@ -251,9 +251,9 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt }, OnFinalize: func(reason streamengine.StopReason, _ error) { if string(reason) == "content_filter" { - streamRuntime.finalize("content_filter") + streamRuntime.finalize("content_filter", false) } else { - streamRuntime.finalize("stop") + streamRuntime.finalize("stop", false) } if historySession == nil { return diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go new file mode 100644 index 0000000..995b6b3 --- /dev/null +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -0,0 +1,221 @@ +package responses + +import ( + "context" + "io" + "net/http" + "strings" + "time" + + "ds2api/internal/auth" + "ds2api/internal/config" + dsprotocol "ds2api/internal/deepseek/protocol" + openaifmt "ds2api/internal/format/openai" + "ds2api/internal/promptcompat" + "ds2api/internal/sse" + streamengine "ds2api/internal/stream" + "ds2api/internal/toolcall" +) + +type responsesNonStreamResult struct { + thinking string + toolDetectionThinking string + text string + contentFilter bool + parsed toolcall.ToolCallParseResult + body map[string]any +} + +func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) { + attempts := 0 + currentResp := resp + usagePrompt := finalPrompt + accumulatedThinking := "" + accumulatedToolDetectionThinking := "" + for { + result, ok := h.collectResponsesNonStreamAttempt(w, currentResp, responseID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames) + if !ok { + return + } + accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking) + accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking) + result.thinking = accumulatedThinking + result.toolDetectionThinking = accumulatedToolDetectionThinking + result.parsed = detectAssistantToolCalls(result.text, result.thinking, result.toolDetectionThinking, toolNames) + result.body = openaifmt.BuildResponseObjectWithToolCalls(responseID, model, usagePrompt, result.thinking, result.text, result.parsed.Calls) + + if !shouldRetryResponsesNonStream(result, attempts) { + h.finishResponsesNonStreamResult(w, result, attempts, owner, responseID, toolChoice, traceID) + return + } + + attempts++ + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts) + nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + if err != nil { + writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") + config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", err) + return + } + usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) + currentResp = nextResp + } +} + +func (h *Handler) collectResponsesNonStreamAttempt(w http.ResponseWriter, resp *http.Response, responseID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) (responsesNonStreamResult, bool) { + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) + return responsesNonStreamResult{}, false + } + result := sse.CollectStream(resp, thinkingEnabled, false) + stripReferenceMarkers := h.compatStripReferenceMarkers() + sanitizedThinking := cleanVisibleOutput(result.Thinking, stripReferenceMarkers) + toolDetectionThinking := cleanVisibleOutput(result.ToolDetectionThinking, stripReferenceMarkers) + sanitizedText := cleanVisibleOutput(result.Text, stripReferenceMarkers) + if searchEnabled { + sanitizedText = replaceCitationMarkersWithLinks(sanitizedText, result.CitationLinks) + } + textParsed := detectAssistantToolCalls(sanitizedText, sanitizedThinking, toolDetectionThinking, toolNames) + responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, model, usagePrompt, sanitizedThinking, sanitizedText, textParsed.Calls) + return responsesNonStreamResult{ + thinking: sanitizedThinking, + toolDetectionThinking: toolDetectionThinking, + text: sanitizedText, + contentFilter: result.ContentFilter, + parsed: textParsed, + body: responseObj, + }, true +} + +func (h *Handler) finishResponsesNonStreamResult(w http.ResponseWriter, result responsesNonStreamResult, attempts int, owner, responseID string, toolChoice promptcompat.ToolChoicePolicy, traceID string) { + if len(result.parsed.Calls) == 0 && writeUpstreamEmptyOutputError(w, result.text, result.thinking, result.contentFilter) { + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter) + return + } + logResponsesToolPolicyRejection(traceID, toolChoice, result.parsed, "text") + if toolChoice.IsRequired() && len(result.parsed.Calls) == 0 { + writeOpenAIErrorWithCode(w, http.StatusUnprocessableEntity, "tool_choice requires at least one valid tool call.", "tool_choice_violation") + return + } + h.getResponseStore().put(owner, responseID, result.body) + writeJSON(w, http.StatusOK, result.body) + source := "first_attempt" + if attempts > 0 { + source = "synthetic_retry" + } + config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", false, "retry_attempts", attempts, "success_source", source) +} + +func shouldRetryResponsesNonStream(result responsesNonStreamResult, attempts int) bool { + return emptyOutputRetryEnabled() && + attempts < emptyOutputRetryMaxAttempts() && + !result.contentFilter && + len(result.parsed.Calls) == 0 && + strings.TrimSpace(result.text) == "" +} + +func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) { + streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames, toolChoice, traceID) + if !ok { + return + } + attempts := 0 + currentResp := resp + for { + terminalWritten, retryable := h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, attempts < emptyOutputRetryMaxAttempts()) + if terminalWritten { + logResponsesStreamTerminal(streamRuntime, attempts) + return + } + if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { + streamRuntime.finalize("stop", false) + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) + return + } + attempts++ + config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts) + nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3) + if err != nil { + streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error") + config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err) + return + } + if nextResp.StatusCode != http.StatusOK { + defer func() { _ = nextResp.Body.Close() }() + body, _ := io.ReadAll(nextResp.Body) + streamRuntime.failResponse(nextResp.StatusCode, strings.TrimSpace(string(body)), "error") + return + } + streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) + currentResp = nextResp + } +} + +func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) (*responsesStreamRuntime, string, bool) { + if resp.StatusCode != http.StatusOK { + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(resp.Body) + writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) + return nil, "", false + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache, no-transform") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + _, canFlush := w.(http.Flusher) + initialType := "text" + if thinkingEnabled { + initialType = "thinking" + } + streamRuntime := newResponsesStreamRuntime( + w, rc, canFlush, responseID, model, finalPrompt, thinkingEnabled, searchEnabled, + h.compatStripReferenceMarkers(), toolNames, len(toolNames) > 0, + h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(), + toolChoice, traceID, func(obj map[string]any) { + h.getResponseStore().put(owner, responseID, obj) + }, + ) + streamRuntime.sendCreated() + return streamRuntime, initialType, true +} + +func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *responsesStreamRuntime, initialType string, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) { + defer func() { _ = resp.Body.Close() }() + finalReason := "stop" + streamengine.ConsumeSSE(streamengine.ConsumeConfig{ + Context: r.Context(), + Body: resp.Body, + ThinkingEnabled: thinkingEnabled, + InitialType: initialType, + KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, + IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, + MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, + }, streamengine.ConsumeHooks{ + OnParsed: streamRuntime.onParsed, + OnFinalize: func(reason streamengine.StopReason, _ error) { + if string(reason) == "content_filter" { + finalReason = "content_filter" + } + }, + }) + terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") + if terminalWritten { + return true, false + } + return false, true +} + +func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts int) { + source := "first_attempt" + if attempts > 0 { + source = "synthetic_retry" + } + if streamRuntime.failed { + config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) + return + } + config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", source) +} diff --git a/internal/httpapi/openai/responses/handler.go b/internal/httpapi/openai/responses/handler.go index 0ad75bf..a73e655 100644 --- a/internal/httpapi/openai/responses/handler.go +++ b/internal/httpapi/openai/responses/handler.go @@ -113,6 +113,22 @@ func writeUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string, return shared.WriteUpstreamEmptyOutputError(w, text, thinking, contentFilter) } +func emptyOutputRetryEnabled() bool { + return shared.EmptyOutputRetryEnabled() +} + +func emptyOutputRetryMaxAttempts() int { + return shared.EmptyOutputRetryMaxAttempts() +} + +func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { + return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload) +} + +func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { + return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts) +} + func filterIncrementalToolCallDeltasByAllowed(deltas []toolstream.ToolCallDelta, seenNames map[int]string) []toolstream.ToolCallDelta { return shared.FilterIncrementalToolCallDeltasByAllowed(deltas, seenNames) } diff --git a/internal/httpapi/openai/responses/responses_handler.go b/internal/httpapi/openai/responses/responses_handler.go index 0054b8e..f32e3ec 100644 --- a/internal/httpapi/openai/responses/responses_handler.go +++ b/internal/httpapi/openai/responses/responses_handler.go @@ -115,10 +115,10 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "") if stdReq.Stream { - h.handleResponsesStream(w, r, resp, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID) + h.handleResponsesStreamWithRetry(w, r, a, resp, payload, pow, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID) return } - h.handleResponsesNonStream(w, resp, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID) + h.handleResponsesNonStreamWithRetry(w, r.Context(), a, resp, payload, pow, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID) } func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) { @@ -206,8 +206,12 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, }, streamengine.ConsumeHooks{ OnParsed: streamRuntime.onParsed, - OnFinalize: func(_ streamengine.StopReason, _ error) { - streamRuntime.finalize() + OnFinalize: func(reason streamengine.StopReason, _ error) { + if string(reason) == "content_filter" { + streamRuntime.finalize("content_filter", false) + return + } + streamRuntime.finalize("stop", false) }, }) } diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index 4f65ce2..0d4e75b 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -53,6 +53,9 @@ type responsesStreamRuntime struct { messagePartAdded bool sequence int failed bool + finalErrorStatus int + finalErrorMessage string + finalErrorCode string persistResponse func(obj map[string]any) } @@ -103,6 +106,9 @@ func newResponsesStreamRuntime( func (s *responsesStreamRuntime) failResponse(status int, message, code string) { s.failed = true + s.finalErrorStatus = status + s.finalErrorMessage = message + s.finalErrorCode = code failedResp := map[string]any{ "id": s.responseID, "type": "response", @@ -126,7 +132,11 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string) s.sendDone() } -func (s *responsesStreamRuntime) finalize() { +func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool { + s.failed = false + s.finalErrorStatus = 0 + s.finalErrorMessage = "" + s.finalErrorCode = "" finalThinking := s.thinking.String() finalToolDetectionThinking := s.toolDetectionThinking.String() finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers) @@ -150,12 +160,18 @@ func (s *responsesStreamRuntime) finalize() { if s.toolChoice.IsRequired() && len(detected) == 0 { s.failResponse(http.StatusUnprocessableEntity, "tool_choice requires at least one valid tool call.", "tool_choice_violation") - return + return true } if len(detected) == 0 && strings.TrimSpace(finalText) == "" { - status, message, code := upstreamEmptyOutputDetail(false, finalText, finalThinking) + status, message, code := upstreamEmptyOutputDetail(finishReason == "content_filter", finalText, finalThinking) + if deferEmptyOutput { + s.finalErrorStatus = status + s.finalErrorMessage = message + s.finalErrorCode = code + return false + } s.failResponse(status, message, code) - return + return true } s.closeIncompleteFunctionItems() @@ -165,6 +181,7 @@ func (s *responsesStreamRuntime) finalize() { } s.sendEvent("response.completed", openaifmt.BuildResponsesCompletedPayload(obj)) s.sendDone() + return true } func (s *responsesStreamRuntime) logToolPolicyRejections(textParsed toolcall.ToolCallParseResult) { @@ -188,7 +205,10 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa if !parsed.Parsed { return streamengine.ParsedDecision{} } - if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop { + if parsed.ContentFilter || parsed.ErrorMessage != "" { + return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")} + } + if parsed.Stop { return streamengine.ParsedDecision{Stop: true} } diff --git a/internal/httpapi/openai/shared/empty_retry.go b/internal/httpapi/openai/shared/empty_retry.go new file mode 100644 index 0000000..7d70b77 --- /dev/null +++ b/internal/httpapi/openai/shared/empty_retry.go @@ -0,0 +1,45 @@ +package shared + +import "strings" + +const EmptyOutputRetrySuffix = "Previous reply had no visible output. Please regenerate the visible final answer or tool call now." + +func EmptyOutputRetryEnabled() bool { + return true +} + +func EmptyOutputRetryMaxAttempts() int { + return 1 +} + +func ClonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any { + clone := make(map[string]any, len(payload)) + for k, v := range payload { + clone[k] = v + } + original, _ := payload["prompt"].(string) + clone["prompt"] = AppendEmptyOutputRetrySuffix(original) + return clone +} + +func AppendEmptyOutputRetrySuffix(prompt string) string { + prompt = strings.TrimRight(prompt, "\r\n\t ") + if prompt == "" { + return EmptyOutputRetrySuffix + } + return prompt + "\n\n" + EmptyOutputRetrySuffix +} + +func UsagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { + if retryAttempts <= 0 { + return originalPrompt + } + parts := make([]string, 0, retryAttempts+1) + parts = append(parts, originalPrompt) + next := originalPrompt + for i := 0; i < retryAttempts; i++ { + next = AppendEmptyOutputRetrySuffix(next) + parts = append(parts, next) + } + return strings.Join(parts, "\n") +} diff --git a/internal/httpapi/openai/stream_status_test.go b/internal/httpapi/openai/stream_status_test.go index 3c2827f..d65afc4 100644 --- a/internal/httpapi/openai/stream_status_test.go +++ b/internal/httpapi/openai/stream_status_test.go @@ -66,6 +66,44 @@ func (m streamStatusDSStub) DeleteAllSessionsForToken(_ context.Context, _ strin return nil } +type streamStatusDSSeqStub struct { + resps []*http.Response + payloads []map[string]any +} + +func (m *streamStatusDSSeqStub) CreateSession(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) { + return "session-id", nil +} + +func (m *streamStatusDSSeqStub) GetPow(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) { + return "pow", nil +} + +func (m *streamStatusDSSeqStub) UploadFile(_ context.Context, _ *auth.RequestAuth, _ dsclient.UploadFileRequest, _ int) (*dsclient.UploadFileResult, error) { + return &dsclient.UploadFileResult{ID: "file-id", Filename: "file.txt", Bytes: 1, Status: "uploaded"}, nil +} + +func (m *streamStatusDSSeqStub) CallCompletion(_ context.Context, _ *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) { + clone := make(map[string]any, len(payload)) + for k, v := range payload { + clone[k] = v + } + m.payloads = append(m.payloads, clone) + idx := len(m.payloads) - 1 + if idx >= len(m.resps) { + idx = len(m.resps) - 1 + } + return m.resps[idx], nil +} + +func (m *streamStatusDSSeqStub) DeleteSessionForToken(_ context.Context, _ string, _ string) (*dsclient.DeleteSessionResult, error) { + return &dsclient.DeleteSessionResult{Success: true}, nil +} + +func (m *streamStatusDSSeqStub) DeleteAllSessionsForToken(_ context.Context, _ string) error { + return nil +} + func makeOpenAISSEHTTPResponse(lines ...string) *http.Response { body := strings.Join(lines, "\n") if !strings.HasSuffix(body, "\n") { @@ -78,6 +116,12 @@ func makeOpenAISSEHTTPResponse(lines ...string) *http.Response { } } +func newOpenAITestRouter(h *openAITestSurface) http.Handler { + r := chi.NewRouter() + registerOpenAITestRoutes(r, h) + return r +} + func captureStatusMiddleware(statuses *[]int) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -239,6 +283,125 @@ func TestChatCompletionsStreamEmitsFailureFrameWhenUpstreamOutputEmpty(t *testin } } +func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { + ds := &streamStatusDSSeqStub{resps: []*http.Response{ + makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), + }} + h := &openAITestSurface{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + reqBody := `{"model":"deepseek-v4-pro","messages":[{"role":"user","content":"hi"}],"stream":true}` + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + newOpenAITestRouter(h).ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.payloads) != 2 { + t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) + } + if ds.payloads[0]["chat_session_id"] != ds.payloads[1]["chat_session_id"] { + t.Fatalf("expected retry to reuse session, payloads=%#v", ds.payloads) + } + retryPrompt := asString(ds.payloads[1]["prompt"]) + if !strings.Contains(retryPrompt, "Previous reply had no visible output. Please regenerate the visible final answer or tool call now.") { + t.Fatalf("expected retry suffix in prompt, got %q", retryPrompt) + } + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + doneCount := strings.Count(rec.Body.String(), "data: [DONE]") + if doneCount != 1 { + t.Fatalf("expected one [DONE], got %d body=%s", doneCount, rec.Body.String()) + } + if len(frames) != 3 { + t.Fatalf("expected reasoning, content, finish frames, got %#v body=%s", frames, rec.Body.String()) + } + id := asString(frames[0]["id"]) + for _, frame := range frames[1:] { + if asString(frame["id"]) != id { + t.Fatalf("expected same completion id across retry stream, frames=%#v", frames) + } + } + choices, _ := frames[1]["choices"].([]any) + choice, _ := choices[0].(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if asString(delta["content"]) != "visible" { + t.Fatalf("expected retry content delta, got %#v body=%s", delta, rec.Body.String()) + } +} + +func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { + ds := &streamStatusDSSeqStub{resps: []*http.Response{ + makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), + }} + h := &openAITestSurface{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + reqBody := `{"model":"deepseek-v4-pro","messages":[{"role":"user","content":"hi"}],"stream":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + newOpenAITestRouter(h).ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 after retry, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.payloads) != 2 { + t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) + } + var out map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil { + t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String()) + } + choices, _ := out["choices"].([]any) + choice, _ := choices[0].(map[string]any) + message, _ := choice["message"].(map[string]any) + if asString(message["content"]) != "visible" { + t.Fatalf("expected retry visible content, got %#v", message) + } + if !strings.Contains(asString(message["reasoning_content"]), "plan") { + t.Fatalf("expected first-attempt reasoning to be preserved, got %#v", message) + } +} + +func TestChatCompletionsContentFilterDoesNotRetry(t *testing.T) { + ds := &streamStatusDSSeqStub{resps: []*http.Response{ + makeOpenAISSEHTTPResponse(`data: {"code":"content_filter"}`), + makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), + }} + h := &openAITestSurface{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + reqBody := `{"model":"deepseek-v4-flash","messages":[{"role":"user","content":"hi"}],"stream":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + newOpenAITestRouter(h).ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("expected content_filter 400, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.payloads) != 1 { + t.Fatalf("expected no retry on content_filter, got %d calls", len(ds.payloads)) + } +} + func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) { statuses := make([]int, 0, 1) h := &openAITestSurface{ @@ -287,6 +450,86 @@ func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) { } } +func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { + ds := &streamStatusDSSeqStub{resps: []*http.Response{ + makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), + }} + h := &openAITestSurface{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + reqBody := `{"model":"deepseek-v4-pro","input":"hi","stream":true}` + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + newOpenAITestRouter(h).ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.payloads) != 2 { + t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) + } + body := rec.Body.String() + if strings.Contains(body, "response.failed") { + t.Fatalf("did not expect premature response.failed, body=%s", body) + } + if !strings.Contains(body, "response.reasoning.delta") || !strings.Contains(body, "response.output_text.delta") || !strings.Contains(body, "response.completed") { + t.Fatalf("expected reasoning, text delta, and completed events, body=%s", body) + } + if strings.Count(body, "data: [DONE]") != 1 { + t.Fatalf("expected one [DONE], body=%s", body) + } +} + +func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { + ds := &streamStatusDSSeqStub{resps: []*http.Response{ + makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), + }} + h := &openAITestSurface{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + reqBody := `{"model":"deepseek-v4-pro","input":"hi","stream":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + newOpenAITestRouter(h).ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 after retry, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.payloads) != 2 { + t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads)) + } + var out map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil { + t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String()) + } + if asString(out["output_text"]) != "visible" { + t.Fatalf("expected retry visible output_text, got %#v", out["output_text"]) + } + output, _ := out["output"].([]any) + if len(output) == 0 { + t.Fatalf("expected output items, got %#v", out) + } + item, _ := output[0].(map[string]any) + content, _ := item["content"].([]any) + if len(content) == 0 { + t.Fatalf("expected content entries, got %#v", item) + } + reasoning, _ := content[0].(map[string]any) + if asString(reasoning["type"]) != "reasoning" || !strings.Contains(asString(reasoning["text"]), "plan") { + t.Fatalf("expected preserved reasoning entry, got %#v", content) + } +} + func TestResponsesNonStreamUsageIgnoresPromptAndOutputTokenUsage(t *testing.T) { statuses := make([]int, 0, 1) h := &openAITestSurface{ diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index 553af69..33476db 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -33,6 +33,10 @@ const { } = require('./dedupe'); const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; +const DEEPSEEK_CONTINUE_URL = 'https://chat.deepseek.com/api/v0/chat/continue'; +const EMPTY_OUTPUT_RETRY_SUFFIX = 'Previous reply had no visible output. Please regenerate the visible final answer or tool call now.'; +const EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS = 1; +const AUTO_CONTINUE_MAX_ROUNDS = 8; async function handleVercelStream(req, res, rawBody, payload) { const prep = await fetchStreamPrepare(req, rawBody); @@ -84,23 +88,35 @@ async function handleVercelStream(req, res, rawBody, payload) { res.on('close', onResClose); try { - let completionRes; - try { - completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { - method: 'POST', - headers: { - ...BASE_HEADERS, - authorization: `Bearer ${deepseekToken}`, - 'x-ds-pow-response': powHeader, - }, - body: JSON.stringify(completionPayload), - signal: upstreamController.signal, - }); - } catch (err) { - if (clientClosed || isAbortError(err)) { - return; + const fetchDeepSeekStream = async (url, bodyPayload) => { + try { + return await fetch(url, { + method: 'POST', + headers: { + ...BASE_HEADERS, + authorization: `Bearer ${deepseekToken}`, + 'x-ds-pow-response': powHeader, + }, + body: JSON.stringify(bodyPayload), + signal: upstreamController.signal, + }); + } catch (err) { + if (clientClosed || isAbortError(err)) { + return null; + } + throw err; } - throw err; + }; + const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload); + const fetchContinue = (messageID) => fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, { + chat_session_id: sessionID, + message_id: messageID, + fallback_to_resume: true, + }); + + let completionRes = await fetchCompletion(completionPayload); + if (completionRes === null) { + return; } if (clientClosed) { return; @@ -126,6 +142,7 @@ async function handleVercelStream(req, res, rawBody, payload) { let currentType = thinkingEnabled ? 'thinking' : 'text'; let thinkingText = ''; let outputText = ''; + let usagePrompt = finalPrompt; const toolSieveEnabled = toolPolicy.toolSieveEnabled; const toolSieveState = createToolSieveState(); let toolCallsEmitted = false; @@ -133,7 +150,6 @@ async function handleVercelStream(req, res, rawBody, payload) { const streamToolCallIDs = new Map(); const streamToolNames = new Map(); const decoder = new TextDecoder(); - reader = completionRes.body.getReader(); let buffered = ''; let ended = false; const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({ @@ -144,14 +160,14 @@ async function handleVercelStream(req, res, rawBody, payload) { isClosed: () => clientClosed, }); - const finish = async (reason) => { + const finish = async (reason, options = {}) => { if (ended) { - return; + return true; } - ended = true; if (clientClosed || res.writableEnded || res.destroyed) { + ended = true; await releaseLease(); - return; + return true; } const detected = parseStandaloneToolCalls(outputText, toolNames); if (detected.length > 0 && !toolCallsDoneEmitted) { @@ -177,21 +193,26 @@ async function handleVercelStream(req, res, rawBody, payload) { reason = 'tool_calls'; } if (detected.length === 0 && !toolCallsEmitted && outputText.trim() === '') { + if (options.deferEmpty && reason !== 'content_filter') { + return false; + } + ended = true; const detail = upstreamEmptyOutputDetail(reason === 'content_filter', outputText, thinkingText); sendFailedChunk(res, detail.status, detail.message, detail.code); await releaseLease(); if (!res.writableEnded && !res.destroyed) { res.end(); } - return; + return true; } + ended = true; sendFrame({ id: sessionID, object: 'chat.completion.chunk', created, model, choices: [{ delta: {}, index: 0, finish_reason: reason }], - usage: buildUsage(finalPrompt, thinkingText, outputText), + usage: buildUsage(usagePrompt, thinkingText, outputText), }); if (!res.writableEnded && !res.destroyed) { res.write('data: [DONE]\n\n'); @@ -200,122 +221,185 @@ async function handleVercelStream(req, res, rawBody, payload) { if (!res.writableEnded && !res.destroyed) { res.end(); } + return true; }; - try { + const processStream = async (initialResponse, allowDeferEmpty) => { + let currentResponse = initialResponse; + let continueState = createContinueState(sessionID); + let continueRounds = 0; // eslint-disable-next-line no-constant-condition while (true) { - if (clientClosed) { - await finish('stop'); - return; - } - const { value, done } = await reader.read(); - if (done) { - break; - } - buffered += decoder.decode(value, { stream: true }); - const lines = buffered.split('\n'); - buffered = lines.pop() || ''; - - for (const rawLine of lines) { - const line = rawLine.trim(); - if (!line.startsWith('data:')) { - continue; - } - const dataStr = line.slice(5).trim(); - if (!dataStr) { - continue; - } - if (dataStr === '[DONE]') { - await finish('stop'); - return; - } - let chunk; - try { - chunk = JSON.parse(dataStr); - } catch (_err) { - continue; - } - const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); - if (!parsed.parsed) { - continue; - } - currentType = parsed.newType; - if (parsed.errorMessage) { - await finish('content_filter'); - return; - } - if (parsed.contentFilter) { - await finish(outputText.trim() === '' ? 'content_filter' : 'stop'); - return; - } - if (parsed.finished) { - await finish('stop'); - return; - } - - for (const p of parsed.parts) { - if (!p.text) { - continue; + reader = currentResponse.body.getReader(); + buffered = ''; + let streamEnded = false; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + if (clientClosed) { + await finish('stop'); + return { terminal: true, retryable: false }; } - if (p.type === 'thinking') { - if (thinkingEnabled) { - const trimmed = trimContinuationOverlap(thinkingText, p.text); - if (!trimmed) { + const { value, done } = await reader.read(); + if (done) { + break; + } + buffered += decoder.decode(value, { stream: true }); + const lines = buffered.split('\n'); + buffered = lines.pop() || ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line.startsWith('data:')) { + continue; + } + const dataStr = line.slice(5).trim(); + if (!dataStr) { + continue; + } + if (dataStr === '[DONE]') { + streamEnded = true; + break; + } + let chunk; + try { + chunk = JSON.parse(dataStr); + } catch (_err) { + continue; + } + observeContinueState(continueState, chunk); + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); + if (!parsed.parsed) { + continue; + } + currentType = parsed.newType; + if (parsed.errorMessage) { + return { terminal: await finish('content_filter'), retryable: false }; + } + if (parsed.contentFilter) { + return { terminal: await finish(outputText.trim() === '' ? 'content_filter' : 'stop'), retryable: false }; + } + if (parsed.finished) { + streamEnded = true; + break; + } + + for (const p of parsed.parts) { + if (!p.text) { continue; } - thinkingText += trimmed; - sendDeltaFrame({ reasoning_content: trimmed }); - } - } else { - const trimmed = trimContinuationOverlap(outputText, p.text); - if (!trimmed) { - continue; - } - if (searchEnabled && isCitation(trimmed)) { - continue; - } - outputText += trimmed; - if (!toolSieveEnabled) { - sendDeltaFrame({ content: trimmed }); - continue; - } - const events = processToolSieveChunk(toolSieveState, trimmed, toolNames); - for (const evt of events) { - if (evt.type === 'tool_call_deltas') { - if (!emitEarlyToolDeltas) { + if (p.type === 'thinking') { + if (thinkingEnabled) { + const trimmed = trimContinuationOverlap(thinkingText, p.text); + if (!trimmed) { + continue; + } + thinkingText += trimmed; + sendDeltaFrame({ reasoning_content: trimmed }); + } + } else { + const trimmed = trimContinuationOverlap(outputText, p.text); + if (!trimmed) { continue; } - const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames); - const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); - if (formatted.length > 0) { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatted }); + if (searchEnabled && isCitation(trimmed)) { + continue; + } + outputText += trimmed; + if (!toolSieveEnabled) { + sendDeltaFrame({ content: trimmed }); + continue; + } + const events = processToolSieveChunk(toolSieveState, trimmed, toolNames); + for (const evt of events) { + if (evt.type === 'tool_call_deltas') { + if (!emitEarlyToolDeltas) { + continue; + } + const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames); + const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); + if (formatted.length > 0) { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatted }); + } + continue; + } + if (evt.type === 'tool_calls') { + toolCallsEmitted = true; + toolCallsDoneEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); + resetStreamToolCallState(streamToolCallIDs, streamToolNames); + continue; + } + if (evt.text) { + sendDeltaFrame({ content: evt.text }); + } } - continue; - } - if (evt.type === 'tool_calls') { - toolCallsEmitted = true; - toolCallsDoneEmitted = true; - sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); - resetStreamToolCallState(streamToolCallIDs, streamToolNames); - continue; - } - if (evt.text) { - sendDeltaFrame({ content: evt.text }); } } + if (streamEnded) { + break; + } + } + if (streamEnded) { + break; } } + } catch (err) { + if (clientClosed || isAbortError(err)) { + await finish('stop'); + return { terminal: true, retryable: false }; + } + await finish('stop'); + return { terminal: true, retryable: false }; } + + if (shouldAutoContinue(continueState) && continueRounds < AUTO_CONTINUE_MAX_ROUNDS) { + continueRounds += 1; + const nextRes = await fetchContinue(continueState.responseMessageID); + if (nextRes === null) { + return { terminal: true, retryable: false }; + } + if (!nextRes.ok || !nextRes.body) { + return { terminal: await finish('stop'), retryable: false }; + } + continueState = prepareContinueStateForNextRound(continueState); + currentResponse = nextRes; + continue; + } + break; } - await finish('stop'); - } catch (err) { - if (clientClosed || isAbortError(err)) { + + const terminal = await finish('stop', { deferEmpty: allowDeferEmpty }); + return { terminal, retryable: !terminal && allowDeferEmpty }; + }; + + let retryAttempts = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + const processed = await processStream(completionRes, retryAttempts < EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS); + if (processed.terminal) { + return; + } + if (!processed.retryable || retryAttempts >= EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS) { + await finish('stop'); + return; + } + retryAttempts += 1; + console.info('[openai_empty_retry] attempting synthetic retry', { + surface: 'chat.completions', + stream: true, + retry_attempt: retryAttempts, + }); + usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, retryAttempts); + completionRes = await fetchCompletion(clonePayloadWithEmptyOutputRetryPrompt(completionPayload)); + if (completionRes === null) { + return; + } + if (!completionRes.ok || !completionRes.body) { await finish('stop'); return; } - await finish('stop'); } } finally { req.removeListener('aborted', onReqAborted); @@ -328,6 +412,109 @@ function toBool(v) { return v === true; } +function clonePayloadWithEmptyOutputRetryPrompt(payload) { + return { + ...(payload || {}), + prompt: appendEmptyOutputRetrySuffix(asString(payload && payload.prompt)), + }; +} + +function appendEmptyOutputRetrySuffix(prompt) { + const base = asString(prompt).trimEnd(); + if (!base) { + return EMPTY_OUTPUT_RETRY_SUFFIX; + } + return `${base}\n\n${EMPTY_OUTPUT_RETRY_SUFFIX}`; +} + +function usagePromptWithEmptyOutputRetry(originalPrompt, attempts) { + if (!attempts || attempts <= 0) { + return originalPrompt; + } + const parts = [originalPrompt]; + let next = originalPrompt; + for (let i = 0; i < attempts; i += 1) { + next = appendEmptyOutputRetrySuffix(next); + parts.push(next); + } + return parts.join('\n'); +} + +function createContinueState(sessionID) { + return { + sessionID: asString(sessionID), + responseMessageID: 0, + lastStatus: '', + finished: false, + }; +} + +function prepareContinueStateForNextRound(state) { + return { + ...state, + lastStatus: '', + finished: false, + }; +} + +function observeContinueState(state, chunk) { + if (!state || !chunk || typeof chunk !== 'object') { + return; + } + const topID = numberValue(chunk.response_message_id); + if (topID > 0) { + state.responseMessageID = topID; + } + if (chunk.p === 'response/status') { + setContinueStatus(state, asString(chunk.v)); + } + const response = chunk.v && typeof chunk.v === 'object' ? chunk.v.response : null; + if (response && typeof response === 'object') { + const id = numberValue(response.message_id); + if (id > 0) { + state.responseMessageID = id; + } + setContinueStatus(state, asString(response.status)); + if (response.auto_continue === true) { + state.lastStatus = 'AUTO_CONTINUE'; + } + } + const messageResponse = chunk.message && typeof chunk.message === 'object' && chunk.message.response; + if (messageResponse && typeof messageResponse === 'object') { + const id = numberValue(messageResponse.message_id); + if (id > 0) { + state.responseMessageID = id; + } + setContinueStatus(state, asString(messageResponse.status)); + } +} + +function setContinueStatus(state, status) { + const normalized = asString(status).trim(); + if (!normalized) { + return; + } + state.lastStatus = normalized; + if (normalized.toUpperCase() === 'FINISHED') { + state.finished = true; + } +} + +function shouldAutoContinue(state) { + if (!state || state.finished || !state.sessionID || state.responseMessageID <= 0) { + return false; + } + return ['WIP', 'INCOMPLETE', 'AUTO_CONTINUE'].includes(asString(state.lastStatus).trim().toUpperCase()); +} + +function numberValue(v) { + if (typeof v === 'number' && Number.isFinite(v)) { + return Math.trunc(v); + } + const parsed = Number.parseInt(asString(v), 10); + return Number.isFinite(parsed) ? parsed : 0; +} + function upstreamEmptyOutputDetail(contentFilter, _text, thinking) { if (contentFilter) { return { diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 50e94ee..f20f52e 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -121,8 +121,15 @@ function parseSSEDataFrames(body) { } async function runMockVercelStream(upstreamLines, prepareOverrides = {}) { + return runMockVercelStreamSequence([upstreamLines], prepareOverrides); +} + +async function runMockVercelStreamSequence(upstreamSequences, prepareOverrides = {}) { const originalFetch = global.fetch; const fetchURLs = []; + const fetchBodies = []; + let completionCalls = 0; + let continueCalls = 0; const prepareBody = { session_id: 'chatcmpl-test', lease_id: 'lease-test', @@ -137,23 +144,33 @@ async function runMockVercelStream(upstreamLines, prepareOverrides = {}) { payload: { prompt: 'hello' }, ...prepareOverrides, }; - global.fetch = async (url) => { + global.fetch = async (url, init = {}) => { const textURL = String(url); fetchURLs.push(textURL); + if (init && init.body) { + fetchBodies.push(JSON.parse(String(init.body))); + } if (textURL.includes('__stream_prepare=1')) { return jsonResponse(prepareBody); } if (textURL.includes('__stream_release=1')) { return jsonResponse({ success: true }); } - return sseResponse(upstreamLines); + if (textURL.includes('/continue')) { + const idx = Math.min(continueCalls + 1, upstreamSequences.length - 1); + continueCalls += 1; + return sseResponse(upstreamSequences[idx]); + } + const idx = Math.min(completionCalls, upstreamSequences.length - 1); + completionCalls += 1; + return sseResponse(upstreamSequences[idx]); }; try { const req = new MockStreamRequest(); const res = new MockStreamResponse(); const payload = { model: 'gpt-test', stream: true }; await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload); - return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs }; + return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs, fetchBodies }; } finally { global.fetch = originalFetch; } @@ -174,6 +191,37 @@ test('vercel stream emits Go-parity empty-output failure on DONE', async () => { assert.equal(frames[1], '[DONE]'); }); +test('vercel stream retries empty output once and keeps one terminal frame', async () => { + const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([ + ['data: [DONE]\n\n'], + ['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n'], + ]); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + const completionBodies = fetchBodies.filter((body) => Object.hasOwn(body, 'prompt')); + assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 2); + assert.equal(frames.filter((frame) => frame === '[DONE]').length, 1); + assert.equal(parsed[0].choices[0].delta.content, 'visible'); + assert.equal(parsed[1].choices[0].finish_reason, 'stop'); + assert.equal(parsed[0].id, parsed[1].id); + assert.match(completionBodies[1].prompt, /Previous reply had no visible output\. Please regenerate the visible final answer or tool call now\.$/); +}); + +test('vercel stream exhausts DeepSeek continue before synthetic retry', async () => { + const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([ + [ + 'data: {"response_message_id":7,"v":{"response":{"message_id":7,"status":"WIP","auto_continue":true}}}\n\n', + 'data: [DONE]\n\n', + ], + ['data: {"p":"response/content","v":"continued"}\n\n', 'data: [DONE]\n\n'], + ]); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 1); + assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1); + assert.equal(parsed[0].choices[0].delta.content, 'continued'); + assert.equal(parsed[1].choices[0].finish_reason, 'stop'); + assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false); +}); + test('vercel stream emits content_filter failure when upstream filters empty output', async () => { const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']); assert.equal(frames.length, 2);