mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-05 17:05:32 +08:00
- Stream: strip both and [reference:N] markers to prevent leaking partial link metadata during incremental output - Non-stream: convert citation/reference markers to Markdown links for Claude Messages, Gemini generateContent, and OpenAI Chat/Responses - Remove StripReferenceMarkers option from call sites; behavior is now determined automatically by stream vs non-stream context - Extend JS runtime stripReferenceMarkersText() to also match [citation:N] - Add tests for streaming marker stripping and non-stream link conversion Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
271 lines
8.8 KiB
Go
271 lines
8.8 KiB
Go
package responses
|
|
|
|
import (
|
|
"ds2api/internal/toolcall"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/google/uuid"
|
|
|
|
"ds2api/internal/assistantturn"
|
|
"ds2api/internal/auth"
|
|
"ds2api/internal/completionruntime"
|
|
"ds2api/internal/config"
|
|
dsprotocol "ds2api/internal/deepseek/protocol"
|
|
openaifmt "ds2api/internal/format/openai"
|
|
"ds2api/internal/promptcompat"
|
|
"ds2api/internal/responsehistory"
|
|
"ds2api/internal/sse"
|
|
streamengine "ds2api/internal/stream"
|
|
)
|
|
|
|
func (h *Handler) GetResponseByID(w http.ResponseWriter, r *http.Request) {
|
|
a, err := h.Auth.DetermineCaller(r)
|
|
if err != nil {
|
|
writeOpenAIError(w, http.StatusUnauthorized, err.Error())
|
|
return
|
|
}
|
|
|
|
id := strings.TrimSpace(chi.URLParam(r, "response_id"))
|
|
if id == "" {
|
|
writeOpenAIError(w, http.StatusBadRequest, "response_id is required.")
|
|
return
|
|
}
|
|
owner := responseStoreOwner(a)
|
|
if owner == "" {
|
|
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized")
|
|
return
|
|
}
|
|
st := h.getResponseStore()
|
|
item, ok := st.get(owner, id)
|
|
if !ok {
|
|
writeOpenAIError(w, http.StatusNotFound, "Response not found.")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, item)
|
|
}
|
|
|
|
func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) {
|
|
a, err := h.Auth.Determine(r)
|
|
if err != nil {
|
|
status := http.StatusUnauthorized
|
|
detail := err.Error()
|
|
if err == auth.ErrNoAccount {
|
|
status = http.StatusTooManyRequests
|
|
}
|
|
writeOpenAIError(w, status, detail)
|
|
return
|
|
}
|
|
defer h.Auth.Release(a)
|
|
r = r.WithContext(auth.WithAuth(r.Context(), a))
|
|
owner := responseStoreOwner(a)
|
|
if owner == "" {
|
|
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized")
|
|
return
|
|
}
|
|
|
|
r.Body = http.MaxBytesReader(w, r.Body, openAIGeneralMaxSize)
|
|
var req map[string]any
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
if strings.Contains(strings.ToLower(err.Error()), "too large") {
|
|
writeOpenAIError(w, http.StatusRequestEntityTooLarge, "request body too large")
|
|
return
|
|
}
|
|
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
|
|
return
|
|
}
|
|
if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil {
|
|
writeOpenAIInlineFileError(w, err)
|
|
return
|
|
}
|
|
traceID := requestTraceID(r)
|
|
stdReq, err := promptcompat.NormalizeOpenAIResponsesRequest(h.Store, req, traceID)
|
|
if err != nil {
|
|
writeOpenAIError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq)
|
|
if err != nil {
|
|
status, message := mapCurrentInputFileError(err)
|
|
writeOpenAIError(w, status, message)
|
|
return
|
|
}
|
|
|
|
responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
|
historySession := responsehistory.Start(responsehistory.StartParams{
|
|
Store: h.ChatHistory,
|
|
Request: r,
|
|
Auth: a,
|
|
Surface: "openai.responses",
|
|
Standard: stdReq,
|
|
})
|
|
if !stdReq.Stream {
|
|
result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{
|
|
RetryEnabled: true,
|
|
CurrentInputFile: h.Store,
|
|
})
|
|
if outErr != nil {
|
|
if historySession != nil {
|
|
historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn)
|
|
}
|
|
writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code)
|
|
return
|
|
}
|
|
if historySession != nil {
|
|
historySession.SuccessTurn(http.StatusOK, result.Turn, assistantturn.OpenAIResponsesUsage(result.Turn))
|
|
}
|
|
responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, stdReq.ResponseModel, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, stdReq.ToolsRaw)
|
|
responseObj["usage"] = assistantturn.OpenAIResponsesUsage(result.Turn)
|
|
h.getResponseStore().put(owner, responseID, responseObj)
|
|
writeJSON(w, http.StatusOK, responseObj)
|
|
return
|
|
}
|
|
|
|
start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{
|
|
CurrentInputFile: h.Store,
|
|
})
|
|
if outErr != nil {
|
|
if historySession != nil {
|
|
historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "")
|
|
}
|
|
writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code)
|
|
return
|
|
}
|
|
|
|
streamReq := start.Request
|
|
refFileTokens := streamReq.RefFileTokens
|
|
h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession)
|
|
}
|
|
|
|
func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
|
|
return
|
|
}
|
|
result := sse.CollectStream(resp, thinkingEnabled, true)
|
|
|
|
turn := assistantturn.BuildTurnFromCollected(result, assistantturn.BuildOptions{
|
|
Model: model,
|
|
Prompt: finalPrompt,
|
|
RefFileTokens: refFileTokens,
|
|
SearchEnabled: searchEnabled,
|
|
ToolNames: toolNames,
|
|
ToolsRaw: toolsRaw,
|
|
ToolChoice: toolChoice,
|
|
})
|
|
logResponsesToolPolicyRejection(traceID, toolChoice, turn.ParsedToolCalls, "text")
|
|
outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{})
|
|
if outcome.ShouldFail {
|
|
writeOpenAIErrorWithCode(w, outcome.Error.Status, outcome.Error.Message, outcome.Error.Code)
|
|
return
|
|
}
|
|
|
|
responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, model, finalPrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw)
|
|
responseObj["usage"] = assistantturn.OpenAIResponsesUsage(turn)
|
|
h.getResponseStore().put(owner, responseID, responseObj)
|
|
writeJSON(w, http.StatusOK, responseObj)
|
|
}
|
|
|
|
func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
rc := http.NewResponseController(w)
|
|
_, canFlush := w.(http.Flusher)
|
|
|
|
initialType := "text"
|
|
if thinkingEnabled {
|
|
initialType = "thinking"
|
|
}
|
|
bufferToolContent := len(toolNames) > 0
|
|
emitEarlyToolDeltas := h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence()
|
|
stripReferenceMarkers := stripReferenceMarkersEnabled()
|
|
|
|
streamRuntime := newResponsesStreamRuntime(
|
|
w,
|
|
rc,
|
|
canFlush,
|
|
responseID,
|
|
model,
|
|
finalPrompt,
|
|
thinkingEnabled,
|
|
searchEnabled,
|
|
stripReferenceMarkers,
|
|
toolNames,
|
|
toolsRaw,
|
|
bufferToolContent,
|
|
emitEarlyToolDeltas,
|
|
toolChoice,
|
|
traceID,
|
|
func(obj map[string]any) {
|
|
h.getResponseStore().put(owner, responseID, obj)
|
|
},
|
|
nil,
|
|
)
|
|
streamRuntime.refFileTokens = refFileTokens
|
|
streamRuntime.sendCreated()
|
|
|
|
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
|
Context: r.Context(),
|
|
Body: resp.Body,
|
|
ThinkingEnabled: thinkingEnabled,
|
|
InitialType: initialType,
|
|
KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second,
|
|
IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second,
|
|
MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount,
|
|
}, streamengine.ConsumeHooks{
|
|
OnParsed: streamRuntime.onParsed,
|
|
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
|
if string(reason) == "content_filter" {
|
|
streamRuntime.finalize("content_filter", false)
|
|
return
|
|
}
|
|
streamRuntime.finalize("stop", false)
|
|
},
|
|
})
|
|
}
|
|
|
|
func logResponsesToolPolicyRejection(traceID string, policy promptcompat.ToolChoicePolicy, parsed toolcall.ToolCallParseResult, channel string) {
|
|
rejected := filteredRejectedToolNamesForLog(parsed.RejectedToolNames)
|
|
if !parsed.RejectedByPolicy || len(rejected) == 0 {
|
|
return
|
|
}
|
|
config.Logger.Warn(
|
|
"[responses] rejected tool calls by policy",
|
|
"trace_id", strings.TrimSpace(traceID),
|
|
"channel", channel,
|
|
"tool_choice_mode", policy.Mode,
|
|
"rejected_tool_names", strings.Join(rejected, ","),
|
|
)
|
|
}
|
|
|
|
func filteredRejectedToolNamesForLog(names []string) []string {
|
|
if len(names) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]string, 0, len(names))
|
|
for _, name := range names {
|
|
trimmed := strings.TrimSpace(name)
|
|
switch strings.ToLower(trimmed) {
|
|
case "", "tool_name":
|
|
continue
|
|
default:
|
|
out = append(out, trimmed)
|
|
}
|
|
}
|
|
return out
|
|
}
|