Files
ds2api/internal/httpapi/openai/responses/empty_retry_runtime.go

139 lines
6.3 KiB
Go

package responses
import (
"io"
"net/http"
"strings"
"time"
"ds2api/internal/auth"
"ds2api/internal/config"
dsprotocol "ds2api/internal/deepseek/protocol"
"ds2api/internal/promptcompat"
"ds2api/internal/responsehistory"
streamengine "ds2api/internal/stream"
)
func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) {
streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID, historySession)
if !ok {
return
}
attempts := 0
currentResp := resp
for {
terminalWritten, retryable := h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, attempts < emptyOutputRetryMaxAttempts())
if terminalWritten {
logResponsesStreamTerminal(streamRuntime, attempts)
return
}
if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() {
streamRuntime.finalize("stop", false)
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
return
}
attempts++
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID)
retryPow, powErr := h.DS.GetPow(r.Context(), a, 3)
if powErr != nil {
config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3)
if err != nil {
streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error")
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err)
return
}
if nextResp.StatusCode != http.StatusOK {
defer func() { _ = nextResp.Body.Close() }()
body, _ := io.ReadAll(nextResp.Body)
streamRuntime.failResponse(nextResp.StatusCode, strings.TrimSpace(string(body)), "error")
return
}
streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts)
currentResp = nextResp
}
}
func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) (*responsesStreamRuntime, string, bool) {
if resp.StatusCode != http.StatusOK {
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if historySession != nil {
historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "")
}
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
return nil, "", false
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
_, canFlush := w.(http.Flusher)
initialType := "text"
if thinkingEnabled {
initialType = "thinking"
}
streamRuntime := newResponsesStreamRuntime(
w, rc, canFlush, responseID, model, finalPrompt, thinkingEnabled, searchEnabled,
stripReferenceMarkersEnabled(), toolNames, toolsRaw, len(toolNames) > 0,
h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(),
toolChoice, traceID, func(obj map[string]any) {
h.getResponseStore().put(owner, responseID, obj)
}, historySession,
)
streamRuntime.refFileTokens = refFileTokens
streamRuntime.sendCreated()
return streamRuntime, initialType, true
}
func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *responsesStreamRuntime, initialType string, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) {
defer func() { _ = resp.Body.Close() }()
finalReason := "stop"
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
Context: r.Context(),
Body: resp.Body,
ThinkingEnabled: thinkingEnabled,
InitialType: initialType,
KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second,
IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second,
MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount,
}, streamengine.ConsumeHooks{
OnParsed: streamRuntime.onParsed,
OnFinalize: func(reason streamengine.StopReason, _ error) {
if string(reason) == "content_filter" {
finalReason = "content_filter"
}
},
OnContextDone: func() {
streamRuntime.markContextCancelled()
},
})
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
return true, false
}
terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter")
if terminalWritten {
return true, false
}
return false, true
}
func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts int) {
source := "first_attempt"
if attempts > 0 {
source = "synthetic_retry"
}
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "responses", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode)
return
}
if streamRuntime.failed {
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
return
}
config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", source)
}