package responses import ( "ds2api/internal/toolcall" "encoding/json" "io" "net/http" "strings" "time" "github.com/go-chi/chi/v5" "github.com/google/uuid" "ds2api/internal/assistantturn" "ds2api/internal/auth" "ds2api/internal/completionruntime" "ds2api/internal/config" dsprotocol "ds2api/internal/deepseek/protocol" openaifmt "ds2api/internal/format/openai" "ds2api/internal/promptcompat" "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" ) func (h *Handler) GetResponseByID(w http.ResponseWriter, r *http.Request) { a, err := h.Auth.DetermineCaller(r) if err != nil { writeOpenAIError(w, http.StatusUnauthorized, err.Error()) return } id := strings.TrimSpace(chi.URLParam(r, "response_id")) if id == "" { writeOpenAIError(w, http.StatusBadRequest, "response_id is required.") return } owner := responseStoreOwner(a) if owner == "" { writeOpenAIError(w, http.StatusUnauthorized, "unauthorized") return } st := h.getResponseStore() item, ok := st.get(owner, id) if !ok { writeOpenAIError(w, http.StatusNotFound, "Response not found.") return } writeJSON(w, http.StatusOK, item) } func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { a, err := h.Auth.Determine(r) if err != nil { status := http.StatusUnauthorized detail := err.Error() if err == auth.ErrNoAccount { status = http.StatusTooManyRequests } writeOpenAIError(w, status, detail) return } defer h.Auth.Release(a) r = r.WithContext(auth.WithAuth(r.Context(), a)) owner := responseStoreOwner(a) if owner == "" { writeOpenAIError(w, http.StatusUnauthorized, "unauthorized") return } r.Body = http.MaxBytesReader(w, r.Body, openAIGeneralMaxSize) var req map[string]any if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if strings.Contains(strings.ToLower(err.Error()), "too large") { writeOpenAIError(w, http.StatusRequestEntityTooLarge, "request body too large") return } writeOpenAIError(w, http.StatusBadRequest, "invalid json") return } if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil { writeOpenAIInlineFileError(w, err) return } traceID := requestTraceID(r) stdReq, err := promptcompat.NormalizeOpenAIResponsesRequest(h.Store, req, traceID) if err != nil { writeOpenAIError(w, http.StatusBadRequest, err.Error()) return } stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq) if err != nil { status, message := mapCurrentInputFileError(err) writeOpenAIError(w, status, message) return } responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "") historySession := responsehistory.Start(responsehistory.StartParams{ Store: h.ChatHistory, Request: r, Auth: a, Surface: "openai.responses", Standard: stdReq, }) if !stdReq.Stream { result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ RetryEnabled: true, CurrentInputFile: h.Store, }) if outErr != nil { if historySession != nil { historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn) } writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) return } if historySession != nil { historySession.SuccessTurn(http.StatusOK, result.Turn, assistantturn.OpenAIResponsesUsage(result.Turn)) } responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, stdReq.ResponseModel, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, stdReq.ToolsRaw) responseObj["usage"] = assistantturn.OpenAIResponsesUsage(result.Turn) h.getResponseStore().put(owner, responseID, responseObj) writeJSON(w, http.StatusOK, responseObj) return } start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{ CurrentInputFile: h.Store, }) if outErr != nil { if historySession != nil { historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "") } writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) return } streamReq := start.Request refFileTokens := streamReq.RefFileTokens h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession) } func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) return } result := sse.CollectStream(resp, thinkingEnabled, true) turn := assistantturn.BuildTurnFromCollected(result, assistantturn.BuildOptions{ Model: model, Prompt: finalPrompt, RefFileTokens: refFileTokens, SearchEnabled: searchEnabled, ToolNames: toolNames, ToolsRaw: toolsRaw, ToolChoice: toolChoice, }) logResponsesToolPolicyRejection(traceID, toolChoice, turn.ParsedToolCalls, "text") outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) if outcome.ShouldFail { writeOpenAIErrorWithCode(w, outcome.Error.Status, outcome.Error.Message, outcome.Error.Code) return } responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, model, finalPrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw) responseObj["usage"] = assistantturn.OpenAIResponsesUsage(turn) h.getResponseStore().put(owner, responseID, responseObj) writeJSON(w, http.StatusOK, responseObj) } func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") rc := http.NewResponseController(w) _, canFlush := w.(http.Flusher) initialType := "text" if thinkingEnabled { initialType = "thinking" } bufferToolContent := len(toolNames) > 0 emitEarlyToolDeltas := h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence() stripReferenceMarkers := stripReferenceMarkersEnabled() streamRuntime := newResponsesStreamRuntime( w, rc, canFlush, responseID, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkers, toolNames, toolsRaw, bufferToolContent, emitEarlyToolDeltas, toolChoice, traceID, func(obj map[string]any) { h.getResponseStore().put(owner, responseID, obj) }, nil, ) streamRuntime.refFileTokens = refFileTokens streamRuntime.sendCreated() streamengine.ConsumeSSE(streamengine.ConsumeConfig{ Context: r.Context(), Body: resp.Body, ThinkingEnabled: thinkingEnabled, InitialType: initialType, KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, }, streamengine.ConsumeHooks{ OnParsed: streamRuntime.onParsed, OnFinalize: func(reason streamengine.StopReason, _ error) { if string(reason) == "content_filter" { streamRuntime.finalize("content_filter", false) return } streamRuntime.finalize("stop", false) }, }) } func logResponsesToolPolicyRejection(traceID string, policy promptcompat.ToolChoicePolicy, parsed toolcall.ToolCallParseResult, channel string) { rejected := filteredRejectedToolNamesForLog(parsed.RejectedToolNames) if !parsed.RejectedByPolicy || len(rejected) == 0 { return } config.Logger.Warn( "[responses] rejected tool calls by policy", "trace_id", strings.TrimSpace(traceID), "channel", channel, "tool_choice_mode", policy.Mode, "rejected_tool_names", strings.Join(rejected, ","), ) } func filteredRejectedToolNamesForLog(names []string) []string { if len(names) == 0 { return nil } out := make([]string, 0, len(names)) for _, name := range names { trimmed := strings.TrimSpace(name) switch strings.ToLower(trimmed) { case "", "tool_name": continue default: out = append(out, trimmed) } } return out }