From c099a6f7bfa7b23d768a2b32b64cbbf5492d4b7b Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 3 May 2026 17:24:38 +0800 Subject: [PATCH] feat: add unified response history session management across Claude, Gemini, and OpenAI API backends Co-Authored-By: Claude Opus 4.7 --- API.md | 2 + docs/DEVELOPMENT.md | 2 +- internal/chathistory/store.go | 14 +- internal/chathistory/store_test.go | 31 ++ .../httpapi/claude/current_input_file_test.go | 79 ++++- internal/httpapi/claude/handler_messages.go | 58 +++- internal/httpapi/claude/handler_routes.go | 10 +- .../httpapi/claude/stream_runtime_core.go | 10 + .../httpapi/claude/stream_runtime_finalize.go | 16 + internal/httpapi/gemini/handler_generate.go | 42 ++- internal/httpapi/gemini/handler_routes.go | 10 +- .../httpapi/gemini/handler_stream_runtime.go | 31 +- internal/httpapi/gemini/handler_test.go | 33 +- internal/httpapi/openai/chat/chat_history.go | 7 +- .../httpapi/openai/chat/chat_history_test.go | 8 +- .../openai/responses/empty_retry_runtime.go | 12 +- .../responses/empty_retry_runtime_test.go | 1 + .../openai/responses/responses_handler.go | 20 +- .../responses/responses_history_test.go | 100 ++++++ .../responses_stream_runtime_core.go | 22 ++ internal/responsehistory/session.go | 289 ++++++++++++++++++ internal/server/router.go | 4 +- .../features/apiTester/useChatStreamClient.js | 1 - .../chatHistory/ChatHistoryContainer.jsx | 7 +- .../chatHistory/ChatHistoryDetail.jsx | 4 + .../chatHistory/ChatHistoryPanels.jsx | 2 +- webui/src/locales/en.json | 9 +- webui/src/locales/zh.json | 9 +- 28 files changed, 776 insertions(+), 57 deletions(-) create mode 100644 internal/httpapi/openai/responses/responses_history_test.go create mode 100644 internal/responsehistory/session.go diff --git a/API.md b/API.md index 3b8bbc8..7d668ed 100644 --- a/API.md +++ b/API.md @@ -168,6 +168,8 @@ Gemini 兼容客户端还可以使用 `x-goog-api-key`、`?key=` 或 `?api_key=` | GET | `/admin/chat-history/{id}` | Admin | 查看单条服务器端对话记录 | | DELETE | `/admin/chat-history/{id}` | Admin | 删除单条服务器端对话记录 | | PUT | `/admin/chat-history/settings` | Admin | 更新对话记录保留条数 | + +服务器端记录本质上是 DeepSeek 上游响应归档:OpenAI Chat、OpenAI Responses、Claude Messages、Gemini GenerateContent 等直连 DeepSeek 的生成接口,在收到上游响应后会于各协议回译/裁剪前写入记录;列表按请求创建时间倒序展示,流式请求会在生成过程中持续刷新状态与详情。WebUI「API 测试」发出的请求也会进入该记录。 | GET | `/admin/version` | Admin | 查询当前版本与最新 Release | OpenAI `/v1/*` 仍是规范路径。对于只配置 DS2API 根地址的客户端,同一套 OpenAI handler 也通过根路径快捷路由暴露:`/models`、`/models/{id}`、`/chat/completions`、`/responses`、`/responses/{response_id}`、`/embeddings`、`/files`、`/files/{file_id}`。 diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 68858c4..dee5b07 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -74,7 +74,7 @@ gofmt -w - Admin API:`/admin/chat-history`、`/admin/chat-history/{id}`。 - 后端存储:`internal/chathistory/store.go`。 -- 输出归档:Chat history 详情保存上游原始 assistant text / thinking;即使工具调用已被对外响应转成结构化 `tool_calls` 并从可见正文剔除,后台历史仍应保留原始 DSML / XML 片段,方便排查格式漂移。 +- 输出归档:`internal/responsehistory` 在协议回译/裁剪前记录 DeepSeek 上游 assistant text / thinking;即使工具调用已被对外响应转成结构化 `tool_calls` 并从可见正文剔除,后台历史仍应保留原始 DSML / XML 片段,方便排查格式漂移。 - 前端轮询和 ETag:`webui/src/features/chatHistory/ChatHistoryContainer.jsx`。 Tool call 问题优先跑: diff --git a/internal/chathistory/store.go b/internal/chathistory/store.go index 823cc24..85228dc 100644 --- a/internal/chathistory/store.go +++ b/internal/chathistory/store.go @@ -43,6 +43,7 @@ type Entry struct { Status string `json:"status"` CallerID string `json:"caller_id,omitempty"` AccountID string `json:"account_id,omitempty"` + Surface string `json:"surface,omitempty"` Model string `json:"model,omitempty"` Stream bool `json:"stream"` UserInput string `json:"user_input,omitempty"` @@ -72,6 +73,7 @@ type SummaryEntry struct { Status string `json:"status"` CallerID string `json:"caller_id,omitempty"` AccountID string `json:"account_id,omitempty"` + Surface string `json:"surface,omitempty"` Model string `json:"model,omitempty"` Stream bool `json:"stream"` UserInput string `json:"user_input,omitempty"` @@ -92,6 +94,7 @@ type File struct { type StartParams struct { CallerID string AccountID string + Surface string Model string Stream bool UserInput string @@ -271,6 +274,7 @@ func (s *Store) Start(params StartParams) (Entry, error) { Status: "streaming", CallerID: strings.TrimSpace(params.CallerID), AccountID: strings.TrimSpace(params.AccountID), + Surface: strings.TrimSpace(params.Surface), Model: strings.TrimSpace(params.Model), Stream: params.Stream, UserInput: strings.TrimSpace(params.UserInput), @@ -546,10 +550,13 @@ func (s *Store) rebuildIndexLocked() { summaries = append(summaries, summaryFromEntry(item)) } sort.Slice(summaries, func(i, j int) bool { - if summaries[i].UpdatedAt == summaries[j].UpdatedAt { - return summaries[i].CreatedAt > summaries[j].CreatedAt + if summaries[i].CreatedAt == summaries[j].CreatedAt { + if summaries[i].Revision == summaries[j].Revision { + return summaries[i].UpdatedAt > summaries[j].UpdatedAt + } + return summaries[i].Revision > summaries[j].Revision } - return summaries[i].UpdatedAt > summaries[j].UpdatedAt + return summaries[i].CreatedAt > summaries[j].CreatedAt }) if s.state.Limit < DisabledLimit || !isAllowedLimit(s.state.Limit) { s.state.Limit = DefaultLimit @@ -593,6 +600,7 @@ func summaryFromEntry(item Entry) SummaryEntry { Status: item.Status, CallerID: item.CallerID, AccountID: item.AccountID, + Surface: item.Surface, Model: item.Model, Stream: item.Stream, UserInput: item.UserInput, diff --git a/internal/chathistory/store_test.go b/internal/chathistory/store_test.go index 79483a9..14da001 100644 --- a/internal/chathistory/store_test.go +++ b/internal/chathistory/store_test.go @@ -8,6 +8,7 @@ import ( "strings" "sync" "testing" + "time" "unicode/utf8" ) @@ -494,6 +495,36 @@ func TestStoreWritesOnlyChangedDetailFiles(t *testing.T) { } } +func TestStoreOrdersByCreationTimeNotStreamingUpdates(t *testing.T) { + path := filepath.Join(t.TempDir(), "chat_history.json") + store := New(path) + + first, err := store.Start(StartParams{UserInput: "first"}) + if err != nil { + t.Fatalf("start first failed: %v", err) + } + time.Sleep(time.Millisecond) + second, err := store.Start(StartParams{UserInput: "second"}) + if err != nil { + t.Fatalf("start second failed: %v", err) + } + time.Sleep(time.Millisecond) + if _, err := store.Update(first.ID, UpdateParams{Status: "streaming", Content: "still running"}); err != nil { + t.Fatalf("update first failed: %v", err) + } + + snapshot, err := store.Snapshot() + if err != nil { + t.Fatalf("snapshot failed: %v", err) + } + if len(snapshot.Items) != 2 { + t.Fatalf("expected two items, got %#v", snapshot.Items) + } + if snapshot.Items[0].ID != second.ID || snapshot.Items[1].ID != first.ID { + t.Fatalf("expected creation-time order to stay stable, got %#v", snapshot.Items) + } +} + func TestUpdatePreservesContentWhenNewContentIsEmpty(t *testing.T) { path := filepath.Join(t.TempDir(), "chat_history.json") store := New(path) diff --git a/internal/httpapi/claude/current_input_file_test.go b/internal/httpapi/claude/current_input_file_test.go index 4a5be55..fa6b34b 100644 --- a/internal/httpapi/claude/current_input_file_test.go +++ b/internal/httpapi/claude/current_input_file_test.go @@ -5,15 +5,25 @@ import ( "io" "net/http" "net/http/httptest" + "path/filepath" "strings" "testing" "ds2api/internal/auth" + "ds2api/internal/chathistory" dsclient "ds2api/internal/deepseek/client" ) type claudeCurrentInputAuth struct{} +type claudeHistoryConfig struct { + aliases map[string]string +} + +func (m claudeHistoryConfig) ModelAliases() map[string]string { return m.aliases } +func (claudeHistoryConfig) CurrentInputFileEnabled() bool { return false } +func (claudeHistoryConfig) CurrentInputFileMinChars() int { return 0 } + func (claudeCurrentInputAuth) Determine(*http.Request) (*auth.RequestAuth, error) { return &auth.RequestAuth{ DeepSeekToken: "direct-token", @@ -22,6 +32,50 @@ func (claudeCurrentInputAuth) Determine(*http.Request) (*auth.RequestAuth, error }, nil } +func TestClaudeDirectRecordsResponseHistory(t *testing.T) { + ds := &claudeCurrentInputDS{} + historyStore := chathistory.New(filepath.Join(t.TempDir(), "history.json")) + h := &Handler{ + Store: claudeHistoryConfig{aliases: map[string]string{"claude-sonnet-4-6": "deepseek-v4-flash"}}, + Auth: claudeCurrentInputAuth{}, + DS: ds, + ChatHistory: historyStore, + } + reqBody := `{"model":"claude-sonnet-4-6","messages":[{"role":"user","content":"hello from claude"}],"max_tokens":1024}` + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + h.Messages(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + snapshot, err := historyStore.Snapshot() + if err != nil { + t.Fatalf("snapshot history: %v", err) + } + if len(snapshot.Items) != 1 { + t.Fatalf("expected one history item, got %d", len(snapshot.Items)) + } + item, err := historyStore.Get(snapshot.Items[0].ID) + if err != nil { + t.Fatalf("get history item: %v", err) + } + if item.Surface != "claude.messages" { + t.Fatalf("unexpected surface: %q", item.Surface) + } + if item.Model != "claude-sonnet-4-6" { + t.Fatalf("unexpected model: %q", item.Model) + } + if item.UserInput != "hello from claude" { + t.Fatalf("unexpected user input: %q", item.UserInput) + } + if item.Content != "ok" { + t.Fatalf("expected raw upstream content, got %q", item.Content) + } +} + func (claudeCurrentInputAuth) Release(*auth.RequestAuth) {} type claudeCurrentInputDS struct { @@ -53,10 +107,12 @@ func (d *claudeCurrentInputDS) CallCompletion(_ context.Context, _ *auth.Request func TestClaudeDirectAppliesCurrentInputFile(t *testing.T) { ds := &claudeCurrentInputDS{} + historyStore := chathistory.New(filepath.Join(t.TempDir(), "history.json")) h := &Handler{ - Store: mockClaudeConfig{aliases: map[string]string{"claude-sonnet-4-6": "deepseek-v4-flash"}}, - Auth: claudeCurrentInputAuth{}, - DS: ds, + Store: mockClaudeConfig{aliases: map[string]string{"claude-sonnet-4-6": "deepseek-v4-flash"}}, + Auth: claudeCurrentInputAuth{}, + DS: ds, + ChatHistory: historyStore, } reqBody := `{"model":"claude-sonnet-4-6","messages":[{"role":"user","content":"hello from claude"}],"max_tokens":1024}` req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(reqBody)) @@ -82,4 +138,21 @@ func TestClaudeDirectAppliesCurrentInputFile(t *testing.T) { if !strings.Contains(prompt, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") { t.Fatalf("expected continuation prompt, got %q", prompt) } + snapshot, err := historyStore.Snapshot() + if err != nil { + t.Fatalf("snapshot history: %v", err) + } + if len(snapshot.Items) != 1 { + t.Fatalf("expected one history item, got %d", len(snapshot.Items)) + } + full, err := historyStore.Get(snapshot.Items[0].ID) + if err != nil { + t.Fatalf("get history item: %v", err) + } + if full.HistoryText != string(ds.uploads[0].Data) { + t.Fatalf("expected uploaded current input file to be persisted in history text") + } + if len(full.Messages) != 1 || !strings.Contains(full.Messages[0].Content, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") { + t.Fatalf("expected persisted message to match upstream continuation prompt, got %#v", full.Messages) + } } diff --git a/internal/httpapi/claude/handler_messages.go b/internal/httpapi/claude/handler_messages.go index 8c6f063..0e8f94f 100644 --- a/internal/httpapi/claude/handler_messages.go +++ b/internal/httpapi/claude/handler_messages.go @@ -2,6 +2,7 @@ package claude import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -15,8 +16,10 @@ import ( "ds2api/internal/completionruntime" "ds2api/internal/config" claudefmt "ds2api/internal/format/claude" + "ds2api/internal/httpapi/openai/history" "ds2api/internal/httpapi/requestbody" "ds2api/internal/promptcompat" + "ds2api/internal/responsehistory" streamengine "ds2api/internal/stream" "ds2api/internal/translatorcliproxy" "ds2api/internal/util" @@ -79,38 +82,71 @@ func (h *Handler) handleClaudeDirect(w http.ResponseWriter, r *http.Request) boo return true } defer h.Auth.Release(a) - if norm.Standard.Stream { - h.handleClaudeDirectStream(w, r, a, norm.Standard) + stdReq, err := h.applyCurrentInputFile(r.Context(), a, norm.Standard) + if err != nil { + status, message := mapCurrentInputFileError(err) + writeClaudeError(w, status, message) return true } - result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, norm.Standard, completionruntime.Options{ + historySession := responsehistory.Start(responsehistory.StartParams{ + Store: h.ChatHistory, + Request: r, + Auth: a, + Surface: "claude.messages", + Standard: stdReq, + }) + if stdReq.Stream { + h.handleClaudeDirectStream(w, r, a, stdReq, historySession) + return true + } + result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ StripReferenceMarkers: stripReferenceMarkersEnabled(), RetryEnabled: true, CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn) + } writeClaudeError(w, outErr.Status, outErr.Message) return true } + if historySession != nil { + historySession.SuccessTurn(http.StatusOK, result.Turn, responsehistory.GenericUsage(result.Turn)) + } writeJSON(w, http.StatusOK, claudefmt.BuildMessageResponseFromTurn( fmt.Sprintf("msg_%d", time.Now().UnixNano()), - norm.Standard.ResponseModel, + stdReq.ResponseModel, result.Turn, exposeThinking, )) return true } -func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) { +func (h *Handler) applyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) (promptcompat.StandardRequest, error) { + if h == nil { + return stdReq, nil + } + return (history.Service{Store: h.Store, DS: h.DS}).ApplyCurrentInputFile(ctx, a, stdReq) +} + +func mapCurrentInputFileError(err error) (int, string) { + return history.MapError(err) +} + +func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session) { start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{ CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "") + } writeClaudeError(w, outErr.Status, outErr.Message) return } streamReq := start.Request - h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw) + h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool { @@ -264,10 +300,17 @@ func stripClaudeThinkingBlocks(raw []byte) []byte { return out } -func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any) { +func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) { + var historySession *responsehistory.Session + if len(historySessions) > 0 { + historySession = historySessions[0] + } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") + } writeClaudeError(w, http.StatusInternalServerError, string(body)) return } @@ -294,6 +337,7 @@ func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Requ toolNames, toolsRaw, buildClaudePromptTokenText(messages, thinkingEnabled), + historySession, ) streamRuntime.sendMessageStart() diff --git a/internal/httpapi/claude/handler_routes.go b/internal/httpapi/claude/handler_routes.go index 6875c9d..257fc08 100644 --- a/internal/httpapi/claude/handler_routes.go +++ b/internal/httpapi/claude/handler_routes.go @@ -6,6 +6,7 @@ import ( "github.com/go-chi/chi/v5" + "ds2api/internal/chathistory" "ds2api/internal/config" dsprotocol "ds2api/internal/deepseek/protocol" "ds2api/internal/textclean" @@ -16,10 +17,11 @@ import ( var writeJSON = util.WriteJSON type Handler struct { - Store ConfigReader - Auth AuthResolver - DS DeepSeekCaller - OpenAI OpenAIChatRunner + Store ConfigReader + Auth AuthResolver + DS DeepSeekCaller + OpenAI OpenAIChatRunner + ChatHistory *chathistory.Store } func stripReferenceMarkersEnabled() bool { diff --git a/internal/httpapi/claude/stream_runtime_core.go b/internal/httpapi/claude/stream_runtime_core.go index df73599..9c9e656 100644 --- a/internal/httpapi/claude/stream_runtime_core.go +++ b/internal/httpapi/claude/stream_runtime_core.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" "ds2api/internal/toolcall" @@ -46,6 +47,7 @@ type claudeStreamRuntime struct { textEmitted bool ended bool upstreamErr string + history *responsehistory.Session } func newClaudeStreamRuntime( @@ -60,6 +62,7 @@ func newClaudeStreamRuntime( toolNames []string, toolsRaw any, promptTokenText string, + history *responsehistory.Session, ) *claudeStreamRuntime { return &claudeStreamRuntime{ w: w, @@ -74,6 +77,7 @@ func newClaudeStreamRuntime( toolNames: toolNames, toolsRaw: toolsRaw, promptTokenText: promptTokenText, + history: history, messageID: fmt.Sprintf("msg_%d", time.Now().UnixNano()), thinkingBlockIndex: -1, textBlockIndex: -1, @@ -232,5 +236,11 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse } } + if s.history != nil { + s.history.Progress( + responsehistory.ThinkingForArchive(s.rawThinking.String(), s.toolDetectionThinking.String(), s.thinking.String()), + responsehistory.TextForArchive(s.rawText.String(), s.text.String()), + ) + } return streamengine.ParsedDecision{ContentSeen: contentSeen} } diff --git a/internal/httpapi/claude/stream_runtime_finalize.go b/internal/httpapi/claude/stream_runtime_finalize.go index 1d4e512..f63b125 100644 --- a/internal/httpapi/claude/stream_runtime_finalize.go +++ b/internal/httpapi/claude/stream_runtime_finalize.go @@ -2,6 +2,7 @@ package claude import ( "ds2api/internal/assistantturn" + "ds2api/internal/responsehistory" "ds2api/internal/sse" "ds2api/internal/toolcall" "ds2api/internal/toolstream" @@ -175,6 +176,15 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { if outcome.HasToolCalls { stopReason = "tool_use" } + if s.history != nil { + s.history.Success( + 200, + responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), + responsehistory.TextForArchive(turn.RawText, turn.Text), + stopReason, + responsehistory.GenericUsage(turn), + ) + } s.send("message_delta", map[string]any{ "type": "message_delta", @@ -191,10 +201,16 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scannerErr error) { if string(reason) == "upstream_error" { + if s.history != nil { + s.history.Error(500, s.upstreamErr, "upstream_error", responsehistory.ThinkingForArchive(s.rawThinking.String(), s.toolDetectionThinking.String(), s.thinking.String()), responsehistory.TextForArchive(s.rawText.String(), s.text.String())) + } s.sendError(s.upstreamErr) return } if scannerErr != nil { + if s.history != nil { + s.history.Error(500, scannerErr.Error(), "error", responsehistory.ThinkingForArchive(s.rawThinking.String(), s.toolDetectionThinking.String(), s.thinking.String()), responsehistory.TextForArchive(s.rawText.String(), s.text.String())) + } s.sendError(scannerErr.Error()) return } diff --git a/internal/httpapi/gemini/handler_generate.go b/internal/httpapi/gemini/handler_generate.go index 4945d6e..9161036 100644 --- a/internal/httpapi/gemini/handler_generate.go +++ b/internal/httpapi/gemini/handler_generate.go @@ -2,6 +2,7 @@ package gemini import ( "bytes" + "context" "encoding/json" "errors" "io" @@ -14,8 +15,10 @@ import ( "ds2api/internal/assistantturn" "ds2api/internal/auth" "ds2api/internal/completionruntime" + "ds2api/internal/httpapi/openai/history" "ds2api/internal/httpapi/requestbody" "ds2api/internal/promptcompat" + "ds2api/internal/responsehistory" "ds2api/internal/sse" "ds2api/internal/toolcall" "ds2api/internal/translatorcliproxy" @@ -76,8 +79,21 @@ func (h *Handler) handleGeminiDirect(w http.ResponseWriter, r *http.Request, str return true } defer h.Auth.Release(a) + stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq) + if err != nil { + status, message := mapCurrentInputFileError(err) + writeGeminiError(w, status, message) + return true + } + historySession := responsehistory.Start(responsehistory.StartParams{ + Store: h.ChatHistory, + Request: r, + Auth: a, + Surface: "gemini.generate_content", + Standard: stdReq, + }) if stream { - h.handleGeminiDirectStream(w, r, a, stdReq) + h.handleGeminiDirectStream(w, r, a, stdReq, historySession) return true } result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ @@ -86,23 +102,43 @@ func (h *Handler) handleGeminiDirect(w http.ResponseWriter, r *http.Request, str CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn) + } writeGeminiError(w, outErr.Status, outErr.Message) return true } + if historySession != nil { + historySession.SuccessTurn(http.StatusOK, result.Turn, responsehistory.GenericUsage(result.Turn)) + } writeJSON(w, http.StatusOK, buildGeminiGenerateContentResponseFromTurn(result.Turn)) return true } -func (h *Handler) handleGeminiDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) { +func (h *Handler) applyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) (promptcompat.StandardRequest, error) { + if h == nil { + return stdReq, nil + } + return (history.Service{Store: h.Store, DS: h.DS}).ApplyCurrentInputFile(ctx, a, stdReq) +} + +func mapCurrentInputFileError(err error) (int, string) { + return history.MapError(err) +} + +func (h *Handler) handleGeminiDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session) { start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{ CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "") + } writeGeminiError(w, outErr.Status, outErr.Message) return } streamReq := start.Request - h.handleStreamGenerateContent(w, r, start.Response, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw) + h.handleStreamGenerateContent(w, r, start.Response, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, stream bool) bool { diff --git a/internal/httpapi/gemini/handler_routes.go b/internal/httpapi/gemini/handler_routes.go index e4a6cbc..6f6c56e 100644 --- a/internal/httpapi/gemini/handler_routes.go +++ b/internal/httpapi/gemini/handler_routes.go @@ -5,6 +5,7 @@ import ( "github.com/go-chi/chi/v5" + "ds2api/internal/chathistory" "ds2api/internal/textclean" "ds2api/internal/util" ) @@ -12,10 +13,11 @@ import ( var writeJSON = util.WriteJSON type Handler struct { - Store ConfigReader - Auth AuthResolver - DS DeepSeekCaller - OpenAI OpenAIChatRunner + Store ConfigReader + Auth AuthResolver + DS DeepSeekCaller + OpenAI OpenAIChatRunner + ChatHistory *chathistory.Store } //nolint:unused // used by native Gemini stream/non-stream runtime helpers. diff --git a/internal/httpapi/gemini/handler_stream_runtime.go b/internal/httpapi/gemini/handler_stream_runtime.go index c005d92..de80fab 100644 --- a/internal/httpapi/gemini/handler_stream_runtime.go +++ b/internal/httpapi/gemini/handler_stream_runtime.go @@ -9,15 +9,23 @@ import ( "ds2api/internal/assistantturn" dsprotocol "ds2api/internal/deepseek/protocol" + "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" ) //nolint:unused // retained for native Gemini stream handling path. -func (h *Handler) handleStreamGenerateContent(w http.ResponseWriter, r *http.Request, resp *http.Response, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any) { +func (h *Handler) handleStreamGenerateContent(w http.ResponseWriter, r *http.Request, resp *http.Response, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) { + var historySession *responsehistory.Session + if len(historySessions) > 0 { + historySession = historySessions[0] + } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") + } writeGeminiError(w, resp.StatusCode, strings.TrimSpace(string(body))) return } @@ -29,7 +37,7 @@ func (h *Handler) handleStreamGenerateContent(w http.ResponseWriter, r *http.Req rc := http.NewResponseController(w) _, canFlush := w.(http.Flusher) - runtime := newGeminiStreamRuntime(w, rc, canFlush, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw) + runtime := newGeminiStreamRuntime(w, rc, canFlush, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, historySession) initialType := "text" if thinkingEnabled { @@ -70,6 +78,7 @@ type geminiStreamRuntime struct { accumulator *assistantturn.Accumulator contentFilter bool responseMessageID int + history *responsehistory.Session } //nolint:unused // retained for native Gemini stream handling path. @@ -84,6 +93,7 @@ func newGeminiStreamRuntime( stripReferenceMarkers bool, toolNames []string, toolsRaw any, + history *responsehistory.Session, ) *geminiStreamRuntime { return &geminiStreamRuntime{ w: w, @@ -97,6 +107,7 @@ func newGeminiStreamRuntime( stripReferenceMarkers: stripReferenceMarkers, toolNames: toolNames, toolsRaw: toolsRaw, + history: history, accumulator: assistantturn.NewAccumulator(assistantturn.AccumulatorOptions{ ThinkingEnabled: thinkingEnabled, SearchEnabled: searchEnabled, @@ -170,6 +181,13 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "modelVersion": s.model, }) } + if s.history != nil { + rawText, text, rawThinking, thinking, detectionThinking := s.accumulator.Snapshot() + s.history.Progress( + responsehistory.ThinkingForArchive(rawThinking, detectionThinking, thinking), + responsehistory.TextForArchive(rawText, text), + ) + } return streamengine.ParsedDecision{ContentSeen: accumulated.ContentSeen} } @@ -193,6 +211,15 @@ func (s *geminiStreamRuntime) finalize() { ToolsRaw: s.toolsRaw, }) outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) + if s.history != nil { + s.history.Success( + http.StatusOK, + responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), + responsehistory.TextForArchive(turn.RawText, turn.Text), + assistantturn.FinishReason(turn), + responsehistory.GenericUsage(turn), + ) + } if s.bufferContent { parts := buildGeminiPartsFromTurn(turn) diff --git a/internal/httpapi/gemini/handler_test.go b/internal/httpapi/gemini/handler_test.go index a7e974b..90a1fe9 100644 --- a/internal/httpapi/gemini/handler_test.go +++ b/internal/httpapi/gemini/handler_test.go @@ -7,12 +7,14 @@ import ( "io" "net/http" "net/http/httptest" + "path/filepath" "strings" "testing" "github.com/go-chi/chi/v5" "ds2api/internal/auth" + "ds2api/internal/chathistory" dsclient "ds2api/internal/deepseek/client" ) @@ -138,10 +140,12 @@ func TestGeminiDirectAppliesCurrentInputFile(t *testing.T) { ds := &testGeminiDS{ resp: makeGeminiUpstreamResponse(`data: {"p":"response/content","v":"ok"}`), } + historyStore := chathistory.New(filepath.Join(t.TempDir(), "history.json")) h := &Handler{ - Store: testGeminiConfig{}, - Auth: testGeminiAuth{}, - DS: ds, + Store: testGeminiConfig{}, + Auth: testGeminiAuth{}, + DS: ds, + ChatHistory: historyStore, } reqBody := `{"contents":[{"role":"user","parts":[{"text":"hello from gemini"}]}]}` req := httptest.NewRequest(http.MethodPost, "/v1beta/models/gemini-2.5-pro:generateContent", strings.NewReader(reqBody)) @@ -172,6 +176,29 @@ func TestGeminiDirectAppliesCurrentInputFile(t *testing.T) { if !strings.Contains(prompt, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") { t.Fatalf("expected continuation prompt, got %q", prompt) } + snapshot, err := historyStore.Snapshot() + if err != nil { + t.Fatalf("snapshot history: %v", err) + } + if len(snapshot.Items) != 1 { + t.Fatalf("expected one history item, got %d", len(snapshot.Items)) + } + full, err := historyStore.Get(snapshot.Items[0].ID) + if err != nil { + t.Fatalf("get history item: %v", err) + } + if full.Surface != "gemini.generate_content" { + t.Fatalf("unexpected surface: %q", full.Surface) + } + if full.Content != "ok" { + t.Fatalf("expected raw upstream content, got %q", full.Content) + } + if full.HistoryText != string(ds.uploadCalls[0].Data) { + t.Fatalf("expected uploaded current input file to be persisted in history text") + } + if len(full.Messages) != 1 || !strings.Contains(full.Messages[0].Content, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") { + t.Fatalf("expected persisted message to match upstream continuation prompt, got %#v", full.Messages) + } } func TestGeminiRoutesRegistered(t *testing.T) { diff --git a/internal/httpapi/openai/chat/chat_history.go b/internal/httpapi/openai/chat/chat_history.go index 27a5f2e..fe97a69 100644 --- a/internal/httpapi/openai/chat/chat_history.go +++ b/internal/httpapi/openai/chat/chat_history.go @@ -14,9 +14,6 @@ import ( "ds2api/internal/promptcompat" ) -const adminWebUISourceHeader = "X-Ds2-Source" -const adminWebUISourceValue = "admin-webui-api-tester" - type chatHistorySession struct { store *chathistory.Store entryID string @@ -40,6 +37,7 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request entry, err := store.Start(chathistory.StartParams{ CallerID: strings.TrimSpace(a.CallerID), AccountID: strings.TrimSpace(a.AccountID), + Surface: "openai.chat_completions", Model: strings.TrimSpace(stdReq.ResponseModel), Stream: stdReq.Stream, UserInput: extractSingleUserInput(stdReq.Messages), @@ -50,6 +48,7 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request startParams := chathistory.StartParams{ CallerID: strings.TrimSpace(a.CallerID), AccountID: strings.TrimSpace(a.AccountID), + Surface: "openai.chat_completions", Model: strings.TrimSpace(stdReq.ResponseModel), Stream: stdReq.Stream, UserInput: extractSingleUserInput(stdReq.Messages), @@ -82,7 +81,7 @@ func shouldCaptureChatHistory(r *http.Request) bool { if isVercelStreamPrepareRequest(r) || isVercelStreamReleaseRequest(r) { return false } - return strings.TrimSpace(r.Header.Get(adminWebUISourceHeader)) != adminWebUISourceValue + return true } func extractSingleUserInput(messages []any) string { diff --git a/internal/httpapi/openai/chat/chat_history_test.go b/internal/httpapi/openai/chat/chat_history_test.go index 33d784f..ca9a772 100644 --- a/internal/httpapi/openai/chat/chat_history_test.go +++ b/internal/httpapi/openai/chat/chat_history_test.go @@ -294,7 +294,7 @@ func TestHandleStreamContextCancelledMarksHistoryStopped(t *testing.T) { } } -func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) { +func TestChatCompletionsRecordsAdminWebUISource(t *testing.T) { historyStore := newTestChatHistoryStore(t) h := &Handler{ Store: mockOpenAIConfig{}, @@ -307,7 +307,7 @@ func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody)) req.Header.Set("Authorization", "Bearer direct-token") req.Header.Set("Content-Type", "application/json") - req.Header.Set(adminWebUISourceHeader, adminWebUISourceValue) + req.Header.Set("X-Ds2-Source", "admin-webui-api-tester") rec := httptest.NewRecorder() h.ChatCompletions(rec, req) @@ -318,8 +318,8 @@ func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) { if err != nil { t.Fatalf("snapshot failed: %v", err) } - if len(snapshot.Items) != 0 { - t.Fatalf("expected admin webui source to be skipped, got %#v", snapshot.Items) + if len(snapshot.Items) != 1 { + t.Fatalf("expected admin webui source to be recorded, got %#v", snapshot.Items) } } diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index ccf2f06..b0cb205 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -10,11 +10,12 @@ import ( "ds2api/internal/config" dsprotocol "ds2api/internal/deepseek/protocol" "ds2api/internal/promptcompat" + "ds2api/internal/responsehistory" streamengine "ds2api/internal/stream" ) -func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { - streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID) +func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) { + streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID, historySession) if !ok { return } @@ -55,10 +56,13 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http. } } -func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) (*responsesStreamRuntime, string, bool) { +func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) (*responsesStreamRuntime, string, bool) { if resp.StatusCode != http.StatusOK { defer func() { _ = resp.Body.Close() }() body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") + } writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) return nil, "", false } @@ -78,7 +82,7 @@ func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *htt h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(), toolChoice, traceID, func(obj map[string]any) { h.getResponseStore().put(owner, responseID, obj) - }, + }, historySession, ) streamRuntime.refFileTokens = refFileTokens streamRuntime.sendCreated() diff --git a/internal/httpapi/openai/responses/empty_retry_runtime_test.go b/internal/httpapi/openai/responses/empty_retry_runtime_test.go index c40e983..00aefec 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime_test.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime_test.go @@ -47,6 +47,7 @@ func TestConsumeResponsesStreamAttemptMarksContextCancelledState(t *testing.T) { promptcompat.DefaultToolChoicePolicy(), "", nil, + nil, ) resp := makeResponsesOpenAISSEHTTPResponse( `data: {"p":"response/content","v":"hello"}`, diff --git a/internal/httpapi/openai/responses/responses_handler.go b/internal/httpapi/openai/responses/responses_handler.go index 9ec27e3..4b45a36 100644 --- a/internal/httpapi/openai/responses/responses_handler.go +++ b/internal/httpapi/openai/responses/responses_handler.go @@ -18,6 +18,7 @@ import ( dsprotocol "ds2api/internal/deepseek/protocol" openaifmt "ds2api/internal/format/openai" "ds2api/internal/promptcompat" + "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" ) @@ -95,6 +96,13 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { } responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "") + historySession := responsehistory.Start(responsehistory.StartParams{ + Store: h.ChatHistory, + Request: r, + Auth: a, + Surface: "openai.responses", + Standard: stdReq, + }) if !stdReq.Stream { result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ StripReferenceMarkers: stripReferenceMarkersEnabled(), @@ -102,9 +110,15 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn) + } writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) return } + if historySession != nil { + historySession.SuccessTurn(http.StatusOK, result.Turn, assistantturn.OpenAIResponsesUsage(result.Turn)) + } responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, stdReq.ResponseModel, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, stdReq.ToolsRaw) responseObj["usage"] = assistantturn.OpenAIResponsesUsage(result.Turn) h.getResponseStore().put(owner, responseID, responseObj) @@ -116,13 +130,16 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { CurrentInputFile: h.Store, }) if outErr != nil { + if historySession != nil { + historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "") + } writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) return } streamReq := start.Request refFileTokens := streamReq.RefFileTokens - h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID) + h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession) } func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { @@ -198,6 +215,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, func(obj map[string]any) { h.getResponseStore().put(owner, responseID, obj) }, + nil, ) streamRuntime.refFileTokens = refFileTokens streamRuntime.sendCreated() diff --git a/internal/httpapi/openai/responses/responses_history_test.go b/internal/httpapi/openai/responses/responses_history_test.go new file mode 100644 index 0000000..0d659cb --- /dev/null +++ b/internal/httpapi/openai/responses/responses_history_test.go @@ -0,0 +1,100 @@ +package responses + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" + + "github.com/go-chi/chi/v5" + + "ds2api/internal/auth" + "ds2api/internal/chathistory" + dsclient "ds2api/internal/deepseek/client" +) + +type responsesHistoryDS struct { + payload map[string]any +} + +func (d *responsesHistoryDS) CreateSession(context.Context, *auth.RequestAuth, int) (string, error) { + return "session-id", nil +} + +func (d *responsesHistoryDS) GetPow(context.Context, *auth.RequestAuth, int) (string, error) { + return "pow", nil +} + +func (d *responsesHistoryDS) UploadFile(context.Context, *auth.RequestAuth, dsclient.UploadFileRequest, int) (*dsclient.UploadFileResult, error) { + return &dsclient.UploadFileResult{ID: "file-id"}, nil +} + +func (d *responsesHistoryDS) CallCompletion(_ context.Context, _ *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) { + d.payload = payload + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("data: {\"p\":\"response/content\",\"v\":\"ok\"}\n")), + }, nil +} + +func (d *responsesHistoryDS) DeleteSessionForToken(context.Context, string, string) (*dsclient.DeleteSessionResult, error) { + return &dsclient.DeleteSessionResult{Success: true}, nil +} + +func (d *responsesHistoryDS) DeleteAllSessionsForToken(context.Context, string) error { + return nil +} + +func TestResponsesRecordsResponseHistory(t *testing.T) { + store, resolver := newDirectTokenResolver(t) + historyStore := chathistory.New(filepath.Join(t.TempDir(), "history.json")) + ds := &responsesHistoryDS{} + h := &Handler{ + Store: store, + Auth: resolver, + DS: ds, + ChatHistory: historyStore, + } + r := chi.NewRouter() + RegisterRoutes(r, h) + + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(`{"model":"deepseek-v4-flash","input":"hello responses"}`)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if ds.payload == nil { + t.Fatalf("expected upstream payload to be sent") + } + snapshot, err := historyStore.Snapshot() + if err != nil { + t.Fatalf("snapshot history: %v", err) + } + if len(snapshot.Items) != 1 { + t.Fatalf("expected one history item, got %d", len(snapshot.Items)) + } + item, err := historyStore.Get(snapshot.Items[0].ID) + if err != nil { + t.Fatalf("get history item: %v", err) + } + if item.Surface != "openai.responses" { + t.Fatalf("unexpected surface: %q", item.Surface) + } + if !strings.Contains(item.UserInput, "Continue from the latest state in the attached DS2API_HISTORY.txt context.") { + t.Fatalf("unexpected user input: %q", item.UserInput) + } + if !strings.Contains(item.HistoryText, "hello responses") { + t.Fatalf("expected original input in persisted history text, got %q", item.HistoryText) + } + if item.Content != "ok" { + t.Fatalf("expected raw upstream content, got %q", item.Content) + } +} diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index 4d3a3ab..524808e 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -10,6 +10,7 @@ import ( openaifmt "ds2api/internal/format/openai" "ds2api/internal/httpapi/openai/shared" "ds2api/internal/promptcompat" + "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" "ds2api/internal/toolstream" @@ -61,6 +62,7 @@ type responsesStreamRuntime struct { finalErrorCode string persistResponse func(obj map[string]any) + history *responsehistory.Session } func newResponsesStreamRuntime( @@ -80,6 +82,7 @@ func newResponsesStreamRuntime( toolChoice promptcompat.ToolChoicePolicy, traceID string, persistResponse func(obj map[string]any), + history *responsehistory.Session, ) *responsesStreamRuntime { return &responsesStreamRuntime{ w: w, @@ -106,6 +109,7 @@ func newResponsesStreamRuntime( toolChoice: toolChoice, traceID: traceID, persistResponse: persistResponse, + history: history, accumulator: shared.StreamAccumulator{ ThinkingEnabled: thinkingEnabled, SearchEnabled: searchEnabled, @@ -138,6 +142,9 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string) if s.persistResponse != nil { s.persistResponse(failedResp) } + if s.history != nil { + s.history.Error(status, message, code, responsehistory.ThinkingForArchive(s.accumulator.RawThinking.String(), s.accumulator.ToolDetectionThinking.String(), s.accumulator.Thinking.String()), responsehistory.TextForArchive(s.accumulator.RawText.String(), s.accumulator.Text.String())) + } s.sendEvent("response.failed", openaifmt.BuildResponsesFailedPayload(s.responseID, s.model, status, message, code)) s.sendDone() } @@ -214,6 +221,15 @@ func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput if s.persistResponse != nil { s.persistResponse(obj) } + if s.history != nil { + s.history.Success( + http.StatusOK, + responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), + responsehistory.TextForArchive(turn.RawText, turn.Text), + outcome.FinishReason, + assistantturn.OpenAIResponsesUsage(turn), + ) + } s.sendEvent("response.completed", openaifmt.BuildResponsesCompletedPayload(obj)) s.sendDone() return true @@ -272,5 +288,11 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa } batch.flush() + if s.history != nil { + s.history.Progress( + responsehistory.ThinkingForArchive(s.accumulator.RawThinking.String(), s.accumulator.ToolDetectionThinking.String(), s.accumulator.Thinking.String()), + responsehistory.TextForArchive(s.accumulator.RawText.String(), s.accumulator.Text.String()), + ) + } return streamengine.ParsedDecision{ContentSeen: accumulated.ContentSeen} } diff --git a/internal/responsehistory/session.go b/internal/responsehistory/session.go new file mode 100644 index 0000000..dd10f16 --- /dev/null +++ b/internal/responsehistory/session.go @@ -0,0 +1,289 @@ +package responsehistory + +import ( + "errors" + "net/http" + "strings" + "time" + + "ds2api/internal/assistantturn" + "ds2api/internal/auth" + "ds2api/internal/chathistory" + "ds2api/internal/config" + "ds2api/internal/prompt" + "ds2api/internal/promptcompat" +) + +type Session struct { + store *chathistory.Store + entryID string + startedAt time.Time + lastPersist time.Time + startParams chathistory.StartParams + disabled bool +} + +type StartParams struct { + Store *chathistory.Store + Request *http.Request + Auth *auth.RequestAuth + Surface string + Standard promptcompat.StandardRequest +} + +func Start(params StartParams) *Session { + if params.Store == nil || params.Request == nil || params.Auth == nil { + return nil + } + if !params.Store.Enabled() || !shouldCapture(params.Request) { + return nil + } + startParams := chathistory.StartParams{ + CallerID: strings.TrimSpace(params.Auth.CallerID), + AccountID: strings.TrimSpace(params.Auth.AccountID), + Surface: strings.TrimSpace(params.Surface), + Model: strings.TrimSpace(params.Standard.ResponseModel), + Stream: params.Standard.Stream, + UserInput: ExtractSingleUserInput(params.Standard.Messages), + Messages: ExtractAllMessages(params.Standard.Messages), + HistoryText: params.Standard.HistoryText, + FinalPrompt: params.Standard.FinalPrompt, + } + entry, err := params.Store.Start(startParams) + session := &Session{ + store: params.Store, + entryID: entry.ID, + startedAt: time.Now(), + lastPersist: time.Now(), + startParams: startParams, + } + if err != nil { + if entry.ID == "" { + config.Logger.Warn("[response_history] start failed", "surface", startParams.Surface, "error", err) + return nil + } + config.Logger.Warn("[response_history] start persisted in memory after write failure", "surface", startParams.Surface, "error", err) + } + return session +} + +func shouldCapture(r *http.Request) bool { + if r == nil || r.URL == nil { + return false + } + if strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" { + return false + } + if strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" { + return false + } + return true +} + +func ExtractSingleUserInput(messages []any) string { + for i := len(messages) - 1; i >= 0; i-- { + msg, ok := messages[i].(map[string]any) + if !ok { + continue + } + role := strings.ToLower(strings.TrimSpace(asString(msg["role"]))) + if role != "user" { + continue + } + if normalized := strings.TrimSpace(prompt.NormalizeContent(msg["content"])); normalized != "" { + return normalized + } + } + return "" +} + +func ExtractAllMessages(messages []any) []chathistory.Message { + out := make([]chathistory.Message, 0, len(messages)) + for _, raw := range messages { + msg, ok := raw.(map[string]any) + if !ok { + continue + } + role := strings.ToLower(strings.TrimSpace(asString(msg["role"]))) + content := strings.TrimSpace(prompt.NormalizeContent(msg["content"])) + if role == "" || content == "" { + continue + } + out = append(out, chathistory.Message{ + Role: role, + Content: content, + }) + } + return out +} + +func (s *Session) Progress(thinking, content string) { + if s == nil || s.store == nil || s.disabled { + return + } + now := time.Now() + if now.Sub(s.lastPersist) < 250*time.Millisecond { + return + } + s.lastPersist = now + s.persistUpdate(chathistory.UpdateParams{ + Status: "streaming", + ReasoningContent: thinking, + Content: content, + StatusCode: http.StatusOK, + ElapsedMs: time.Since(s.startedAt).Milliseconds(), + }) +} + +func (s *Session) Success(statusCode int, thinking, content, finishReason string, usage map[string]any) { + if s == nil || s.store == nil || s.disabled { + return + } + s.persistUpdate(chathistory.UpdateParams{ + Status: "success", + ReasoningContent: thinking, + Content: content, + StatusCode: statusCode, + ElapsedMs: time.Since(s.startedAt).Milliseconds(), + FinishReason: finishReason, + Usage: usage, + Completed: true, + }) +} + +func (s *Session) Error(statusCode int, message, finishReason, thinking, content string) { + if s == nil || s.store == nil || s.disabled { + return + } + s.persistUpdate(chathistory.UpdateParams{ + Status: "error", + ReasoningContent: thinking, + Content: content, + Error: message, + StatusCode: statusCode, + ElapsedMs: time.Since(s.startedAt).Milliseconds(), + FinishReason: finishReason, + Completed: true, + }) +} + +func (s *Session) SuccessTurn(statusCode int, turn assistantturn.Turn, usage map[string]any) { + outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) + s.Success( + statusCode, + ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), + TextForArchive(turn.RawText, turn.Text), + outcome.FinishReason, + usage, + ) +} + +func (s *Session) ErrorTurn(statusCode int, message, finishReason string, turn assistantturn.Turn) { + s.Error( + statusCode, + message, + finishReason, + ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), + TextForArchive(turn.RawText, turn.Text), + ) +} + +func TextForArchive(raw, visible string) string { + if strings.TrimSpace(raw) != "" { + return raw + } + return visible +} + +func ThinkingForArchive(raw, detection, visible string) string { + if strings.TrimSpace(raw) != "" { + return raw + } + if strings.TrimSpace(detection) != "" { + return detection + } + return visible +} + +func GenericUsage(turn assistantturn.Turn) map[string]any { + return map[string]any{ + "input_tokens": turn.Usage.InputTokens, + "output_tokens": turn.Usage.OutputTokens, + "reasoning_tokens": turn.Usage.ReasoningTokens, + "total_tokens": turn.Usage.TotalTokens, + } +} + +func (s *Session) retryMissingEntry() bool { + if s == nil || s.store == nil || s.disabled { + return false + } + entry, err := s.store.Start(s.startParams) + if errors.Is(err, chathistory.ErrDisabled) { + s.disabled = true + return false + } + if entry.ID == "" { + if err != nil { + config.Logger.Warn("[response_history] recreate missing entry failed", "surface", s.startParams.Surface, "error", err) + } + return false + } + s.entryID = entry.ID + if err != nil { + config.Logger.Warn("[response_history] recreate missing entry persisted in memory after write failure", "surface", s.startParams.Surface, "error", err) + } + return true +} + +func (s *Session) persistUpdate(params chathistory.UpdateParams) { + if s == nil || s.store == nil || s.disabled { + return + } + if _, err := s.store.Update(s.entryID, params); err != nil { + s.handlePersistError(params, err) + } +} + +func (s *Session) handlePersistError(params chathistory.UpdateParams, err error) { + if err == nil || s == nil { + return + } + if errors.Is(err, chathistory.ErrDisabled) { + s.disabled = true + return + } + if isMissingError(err) { + if s.retryMissingEntry() { + if _, retryErr := s.store.Update(s.entryID, params); retryErr != nil { + if errors.Is(retryErr, chathistory.ErrDisabled) || isMissingError(retryErr) { + s.disabled = true + return + } + config.Logger.Warn("[response_history] retry after missing entry failed", "surface", s.startParams.Surface, "error", retryErr) + } + return + } + s.disabled = true + return + } + config.Logger.Warn("[response_history] update failed", "surface", s.startParams.Surface, "error", err) +} + +func isMissingError(err error) bool { + if err == nil { + return false + } + return strings.Contains(strings.ToLower(err.Error()), "not found") +} + +func asString(v any) string { + switch x := v.(type) { + case string: + return x + case nil: + return "" + default: + return strings.TrimSpace(prompt.NormalizeContent(x)) + } +} diff --git a/internal/server/router.go b/internal/server/router.go index 5d8fae2..7ec7eef 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -65,8 +65,8 @@ func NewApp() (*App, error) { responsesHandler := &responses.Handler{Store: store, Auth: resolver, DS: dsClient, ChatHistory: chatHistoryStore} filesHandler := &files.Handler{Store: store, Auth: resolver, DS: dsClient, ChatHistory: chatHistoryStore} embeddingsHandler := &embeddings.Handler{Store: store, Auth: resolver, DS: dsClient, ChatHistory: chatHistoryStore} - claudeHandler := &claude.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: chatHandler} - geminiHandler := &gemini.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: chatHandler} + claudeHandler := &claude.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: chatHandler, ChatHistory: chatHistoryStore} + geminiHandler := &gemini.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: chatHandler, ChatHistory: chatHistoryStore} adminHandler := &admin.Handler{Store: store, Pool: pool, DS: dsClient, OpenAI: chatHandler, ChatHistory: chatHistoryStore} webuiHandler := webui.NewHandler() diff --git a/webui/src/features/apiTester/useChatStreamClient.js b/webui/src/features/apiTester/useChatStreamClient.js index d1a2229..b6fad86 100644 --- a/webui/src/features/apiTester/useChatStreamClient.js +++ b/webui/src/features/apiTester/useChatStreamClient.js @@ -123,7 +123,6 @@ export function useChatStreamClient({ const headers = { 'Content-Type': 'application/json', 'Authorization': `Bearer ${effectiveKey}`, - 'X-Ds2-Source': 'admin-webui-api-tester', } if (requestAccount) { headers['X-Ds2-Target-Account'] = requestAccount diff --git a/webui/src/features/chatHistory/ChatHistoryContainer.jsx b/webui/src/features/chatHistory/ChatHistoryContainer.jsx index 81a8834..0bddf02 100644 --- a/webui/src/features/chatHistory/ChatHistoryContainer.jsx +++ b/webui/src/features/chatHistory/ChatHistoryContainer.jsx @@ -10,6 +10,9 @@ import { VIEW_MODE_KEY, } from './chatHistoryUtils' +const LIST_REFRESH_MS = 1500 +const STREAMING_DETAIL_REFRESH_MS = 750 + export default function ChatHistoryContainer({ authFetch, onMessage }) { const { t, lang } = useI18n() const apiFetch = authFetch || fetch @@ -136,7 +139,7 @@ export default function ChatHistoryContainer({ authFetch, onMessage }) { if (!autoRefreshReady || limit === DISABLED_LIMIT) return undefined const timer = window.setInterval(() => { loadList({ mode: 'silent', announceError: false }) - }, 5000) + }, LIST_REFRESH_MS) return () => window.clearInterval(timer) }, [autoRefreshReady, limit]) @@ -144,7 +147,7 @@ export default function ChatHistoryContainer({ authFetch, onMessage }) { if (!autoRefreshReady || !selectedId || selectedSummary?.status !== 'streaming') return undefined const timer = window.setInterval(() => { loadDetail(selectedId, { announceError: false }) - }, 1000) + }, STREAMING_DETAIL_REFRESH_MS) return () => window.clearInterval(timer) }, [autoRefreshReady, selectedId, selectedSummary?.status]) diff --git a/webui/src/features/chatHistory/ChatHistoryDetail.jsx b/webui/src/features/chatHistory/ChatHistoryDetail.jsx index 785359b..880a70b 100644 --- a/webui/src/features/chatHistory/ChatHistoryDetail.jsx +++ b/webui/src/features/chatHistory/ChatHistoryDetail.jsx @@ -207,6 +207,10 @@ function MetaGrid({ selectedItem, t }) { {formatElapsed(selectedItem.elapsed_ms, t)} +
+
{t('chatHistory.metaSurface')}
+
{selectedItem.surface || t('chatHistory.metaUnknown')}
+
{t('chatHistory.metaModel')}
{selectedItem.model || t('chatHistory.metaUnknown')}
diff --git a/webui/src/features/chatHistory/ChatHistoryPanels.jsx b/webui/src/features/chatHistory/ChatHistoryPanels.jsx index cd42a72..2d0fd6c 100644 --- a/webui/src/features/chatHistory/ChatHistoryPanels.jsx +++ b/webui/src/features/chatHistory/ChatHistoryPanels.jsx @@ -69,7 +69,7 @@ export function ChatHistoryListPane({ items, selectedItem, deletingId, t, lang, {item.user_input || t('chatHistory.untitled')}
- {item.model || '-'} + {[item.surface, item.model].filter(Boolean).join(' · ') || '-'}
diff --git a/webui/src/locales/en.json b/webui/src/locales/en.json index 36abab2..3e609d5 100644 --- a/webui/src/locales/en.json +++ b/webui/src/locales/en.json @@ -18,8 +18,8 @@ "desc": "Test API connectivity and responses" }, "history": { - "label": "Conversations", - "desc": "Browse server-side external chat history" + "label": "Responses", + "desc": "Browse server-side upstream response records" }, "import": { "label": "Batch Import", @@ -261,7 +261,7 @@ "loading": "Loading conversation history...", "loadFailed": "Failed to load conversation history.", "retentionTitle": "Retention", - "retentionDesc": "The server keeps only the latest N external /v1/chat/completions conversations.", + "retentionDesc": "The server keeps only the latest N DeepSeek upstream response records across OpenAI Chat, OpenAI Responses, Claude, and Gemini direct interfaces.", "off": "OFF", "refresh": "Refresh", "clearAll": "Clear all", @@ -277,7 +277,7 @@ "viewModeList": "List mode", "viewModeMerged": "Merged mode", "emptyTitle": "No conversation history yet", - "emptyDesc": "When external clients call /v1/chat/completions, the server will save the results here automatically.", + "emptyDesc": "When a supported interface talks to DeepSeek upstream and receives a response, the server saves the result here automatically.", "untitled": "Untitled conversation", "noPreview": "No preview available.", "selectPrompt": "Select a record on the left to view details.", @@ -303,6 +303,7 @@ "metaTitle": "Metadata", "metaAccount": "Account", "metaElapsed": "Elapsed", + "metaSurface": "Surface", "metaModel": "Model", "metaStatusCode": "Status code", "metaStream": "Output mode", diff --git a/webui/src/locales/zh.json b/webui/src/locales/zh.json index 8e72487..596824c 100644 --- a/webui/src/locales/zh.json +++ b/webui/src/locales/zh.json @@ -18,8 +18,8 @@ "desc": "测试 API 连接与响应" }, "history": { - "label": "对话记录", - "desc": "查看服务器保存的外部对话历史" + "label": "响应记录", + "desc": "查看服务器保存的上游响应归档" }, "import": { "label": "批量导入", @@ -261,7 +261,7 @@ "loading": "正在加载对话记录...", "loadFailed": "加载对话记录失败", "retentionTitle": "保留条数", - "retentionDesc": "服务器端只保留最新 N 条外部 /v1/chat/completions 对话记录。", + "retentionDesc": "服务器端只保留最新 N 条 DeepSeek 上游响应记录,覆盖 OpenAI Chat、OpenAI Responses、Claude 和 Gemini 直连接口。", "off": "OFF", "refresh": "刷新", "clearAll": "清空全部", @@ -277,7 +277,7 @@ "viewModeList": "列表模式", "viewModeMerged": "合并模式", "emptyTitle": "还没有可用的对话记录", - "emptyDesc": "当外部客户端调用 /v1/chat/completions 时,服务端会自动把结果写入这里。", + "emptyDesc": "当支持的接口与 DeepSeek 上游交互并收到响应时,服务端会自动把结果写入这里。", "untitled": "未命名对话", "noPreview": "暂无预览内容", "selectPrompt": "从左侧选择一条记录查看详情。", @@ -303,6 +303,7 @@ "metaTitle": "元信息", "metaAccount": "使用账号", "metaElapsed": "耗时", + "metaSurface": "接口", "metaModel": "模型", "metaStatusCode": "状态码", "metaStream": "输出模式",