feat: add unified response history session management across Claude, Gemini, and OpenAI API backends

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
CJACK
2026-05-03 17:24:38 +08:00
parent 5e55cf36d8
commit c099a6f7bf
28 changed files with 776 additions and 57 deletions

View File

@@ -14,9 +14,6 @@ import (
"ds2api/internal/promptcompat"
)
const adminWebUISourceHeader = "X-Ds2-Source"
const adminWebUISourceValue = "admin-webui-api-tester"
type chatHistorySession struct {
store *chathistory.Store
entryID string
@@ -40,6 +37,7 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request
entry, err := store.Start(chathistory.StartParams{
CallerID: strings.TrimSpace(a.CallerID),
AccountID: strings.TrimSpace(a.AccountID),
Surface: "openai.chat_completions",
Model: strings.TrimSpace(stdReq.ResponseModel),
Stream: stdReq.Stream,
UserInput: extractSingleUserInput(stdReq.Messages),
@@ -50,6 +48,7 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request
startParams := chathistory.StartParams{
CallerID: strings.TrimSpace(a.CallerID),
AccountID: strings.TrimSpace(a.AccountID),
Surface: "openai.chat_completions",
Model: strings.TrimSpace(stdReq.ResponseModel),
Stream: stdReq.Stream,
UserInput: extractSingleUserInput(stdReq.Messages),
@@ -82,7 +81,7 @@ func shouldCaptureChatHistory(r *http.Request) bool {
if isVercelStreamPrepareRequest(r) || isVercelStreamReleaseRequest(r) {
return false
}
return strings.TrimSpace(r.Header.Get(adminWebUISourceHeader)) != adminWebUISourceValue
return true
}
func extractSingleUserInput(messages []any) string {

View File

@@ -294,7 +294,7 @@ func TestHandleStreamContextCancelledMarksHistoryStopped(t *testing.T) {
}
}
func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) {
func TestChatCompletionsRecordsAdminWebUISource(t *testing.T) {
historyStore := newTestChatHistoryStore(t)
h := &Handler{
Store: mockOpenAIConfig{},
@@ -307,7 +307,7 @@ func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
req.Header.Set("Authorization", "Bearer direct-token")
req.Header.Set("Content-Type", "application/json")
req.Header.Set(adminWebUISourceHeader, adminWebUISourceValue)
req.Header.Set("X-Ds2-Source", "admin-webui-api-tester")
rec := httptest.NewRecorder()
h.ChatCompletions(rec, req)
@@ -318,8 +318,8 @@ func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) {
if err != nil {
t.Fatalf("snapshot failed: %v", err)
}
if len(snapshot.Items) != 0 {
t.Fatalf("expected admin webui source to be skipped, got %#v", snapshot.Items)
if len(snapshot.Items) != 1 {
t.Fatalf("expected admin webui source to be recorded, got %#v", snapshot.Items)
}
}

View File

@@ -10,11 +10,12 @@ import (
"ds2api/internal/config"
dsprotocol "ds2api/internal/deepseek/protocol"
"ds2api/internal/promptcompat"
"ds2api/internal/responsehistory"
streamengine "ds2api/internal/stream"
)
func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID)
func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) {
streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID, historySession)
if !ok {
return
}
@@ -55,10 +56,13 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.
}
}
func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) (*responsesStreamRuntime, string, bool) {
func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) (*responsesStreamRuntime, string, bool) {
if resp.StatusCode != http.StatusOK {
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if historySession != nil {
historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "")
}
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
return nil, "", false
}
@@ -78,7 +82,7 @@ func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *htt
h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(),
toolChoice, traceID, func(obj map[string]any) {
h.getResponseStore().put(owner, responseID, obj)
},
}, historySession,
)
streamRuntime.refFileTokens = refFileTokens
streamRuntime.sendCreated()

View File

@@ -47,6 +47,7 @@ func TestConsumeResponsesStreamAttemptMarksContextCancelledState(t *testing.T) {
promptcompat.DefaultToolChoicePolicy(),
"",
nil,
nil,
)
resp := makeResponsesOpenAISSEHTTPResponse(
`data: {"p":"response/content","v":"hello"}`,

View File

@@ -18,6 +18,7 @@ import (
dsprotocol "ds2api/internal/deepseek/protocol"
openaifmt "ds2api/internal/format/openai"
"ds2api/internal/promptcompat"
"ds2api/internal/responsehistory"
"ds2api/internal/sse"
streamengine "ds2api/internal/stream"
)
@@ -95,6 +96,13 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) {
}
responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "")
historySession := responsehistory.Start(responsehistory.StartParams{
Store: h.ChatHistory,
Request: r,
Auth: a,
Surface: "openai.responses",
Standard: stdReq,
})
if !stdReq.Stream {
result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{
StripReferenceMarkers: stripReferenceMarkersEnabled(),
@@ -102,9 +110,15 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) {
CurrentInputFile: h.Store,
})
if outErr != nil {
if historySession != nil {
historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn)
}
writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code)
return
}
if historySession != nil {
historySession.SuccessTurn(http.StatusOK, result.Turn, assistantturn.OpenAIResponsesUsage(result.Turn))
}
responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, stdReq.ResponseModel, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, stdReq.ToolsRaw)
responseObj["usage"] = assistantturn.OpenAIResponsesUsage(result.Turn)
h.getResponseStore().put(owner, responseID, responseObj)
@@ -116,13 +130,16 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) {
CurrentInputFile: h.Store,
})
if outErr != nil {
if historySession != nil {
historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "")
}
writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code)
return
}
streamReq := start.Request
refFileTokens := streamReq.RefFileTokens
h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID)
h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession)
}
func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
@@ -198,6 +215,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
func(obj map[string]any) {
h.getResponseStore().put(owner, responseID, obj)
},
nil,
)
streamRuntime.refFileTokens = refFileTokens
streamRuntime.sendCreated()

View File

@@ -0,0 +1,100 @@
package responses
import (
"context"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"github.com/go-chi/chi/v5"
"ds2api/internal/auth"
"ds2api/internal/chathistory"
dsclient "ds2api/internal/deepseek/client"
)
type responsesHistoryDS struct {
payload map[string]any
}
func (d *responsesHistoryDS) CreateSession(context.Context, *auth.RequestAuth, int) (string, error) {
return "session-id", nil
}
func (d *responsesHistoryDS) GetPow(context.Context, *auth.RequestAuth, int) (string, error) {
return "pow", nil
}
func (d *responsesHistoryDS) UploadFile(context.Context, *auth.RequestAuth, dsclient.UploadFileRequest, int) (*dsclient.UploadFileResult, error) {
return &dsclient.UploadFileResult{ID: "file-id"}, nil
}
func (d *responsesHistoryDS) CallCompletion(_ context.Context, _ *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) {
d.payload = payload
return &http.Response{
StatusCode: http.StatusOK,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader("data: {\"p\":\"response/content\",\"v\":\"ok\"}\n")),
}, nil
}
func (d *responsesHistoryDS) DeleteSessionForToken(context.Context, string, string) (*dsclient.DeleteSessionResult, error) {
return &dsclient.DeleteSessionResult{Success: true}, nil
}
func (d *responsesHistoryDS) DeleteAllSessionsForToken(context.Context, string) error {
return nil
}
func TestResponsesRecordsResponseHistory(t *testing.T) {
store, resolver := newDirectTokenResolver(t)
historyStore := chathistory.New(filepath.Join(t.TempDir(), "history.json"))
ds := &responsesHistoryDS{}
h := &Handler{
Store: store,
Auth: resolver,
DS: ds,
ChatHistory: historyStore,
}
r := chi.NewRouter()
RegisterRoutes(r, h)
req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(`{"model":"deepseek-v4-flash","input":"hello responses"}`))
req.Header.Set("Authorization", "Bearer direct-token")
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
r.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
if ds.payload == nil {
t.Fatalf("expected upstream payload to be sent")
}
snapshot, err := historyStore.Snapshot()
if err != nil {
t.Fatalf("snapshot history: %v", err)
}
if len(snapshot.Items) != 1 {
t.Fatalf("expected one history item, got %d", len(snapshot.Items))
}
item, err := historyStore.Get(snapshot.Items[0].ID)
if err != nil {
t.Fatalf("get history item: %v", err)
}
if item.Surface != "openai.responses" {
t.Fatalf("unexpected surface: %q", item.Surface)
}
if !strings.Contains(item.UserInput, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") {
t.Fatalf("unexpected user input: %q", item.UserInput)
}
if !strings.Contains(item.HistoryText, "hello responses") {
t.Fatalf("expected original input in persisted history text, got %q", item.HistoryText)
}
if item.Content != "ok" {
t.Fatalf("expected raw upstream content, got %q", item.Content)
}
}

View File

@@ -10,6 +10,7 @@ import (
openaifmt "ds2api/internal/format/openai"
"ds2api/internal/httpapi/openai/shared"
"ds2api/internal/promptcompat"
"ds2api/internal/responsehistory"
"ds2api/internal/sse"
streamengine "ds2api/internal/stream"
"ds2api/internal/toolstream"
@@ -61,6 +62,7 @@ type responsesStreamRuntime struct {
finalErrorCode string
persistResponse func(obj map[string]any)
history *responsehistory.Session
}
func newResponsesStreamRuntime(
@@ -80,6 +82,7 @@ func newResponsesStreamRuntime(
toolChoice promptcompat.ToolChoicePolicy,
traceID string,
persistResponse func(obj map[string]any),
history *responsehistory.Session,
) *responsesStreamRuntime {
return &responsesStreamRuntime{
w: w,
@@ -106,6 +109,7 @@ func newResponsesStreamRuntime(
toolChoice: toolChoice,
traceID: traceID,
persistResponse: persistResponse,
history: history,
accumulator: shared.StreamAccumulator{
ThinkingEnabled: thinkingEnabled,
SearchEnabled: searchEnabled,
@@ -138,6 +142,9 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string)
if s.persistResponse != nil {
s.persistResponse(failedResp)
}
if s.history != nil {
s.history.Error(status, message, code, responsehistory.ThinkingForArchive(s.accumulator.RawThinking.String(), s.accumulator.ToolDetectionThinking.String(), s.accumulator.Thinking.String()), responsehistory.TextForArchive(s.accumulator.RawText.String(), s.accumulator.Text.String()))
}
s.sendEvent("response.failed", openaifmt.BuildResponsesFailedPayload(s.responseID, s.model, status, message, code))
s.sendDone()
}
@@ -214,6 +221,15 @@ func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput
if s.persistResponse != nil {
s.persistResponse(obj)
}
if s.history != nil {
s.history.Success(
http.StatusOK,
responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking),
responsehistory.TextForArchive(turn.RawText, turn.Text),
outcome.FinishReason,
assistantturn.OpenAIResponsesUsage(turn),
)
}
s.sendEvent("response.completed", openaifmt.BuildResponsesCompletedPayload(obj))
s.sendDone()
return true
@@ -272,5 +288,11 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
}
batch.flush()
if s.history != nil {
s.history.Progress(
responsehistory.ThinkingForArchive(s.accumulator.RawThinking.String(), s.accumulator.ToolDetectionThinking.String(), s.accumulator.Thinking.String()),
responsehistory.TextForArchive(s.accumulator.RawText.String(), s.accumulator.Text.String()),
)
}
return streamengine.ParsedDecision{ContentSeen: accumulated.ContentSeen}
}