From 5e55cf36d8dabd220e0c96abdd341ac88d894cbc Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 3 May 2026 15:44:17 +0800 Subject: [PATCH] refactor: prioritize raw model output in chat history archiving to ensure accurate capture of tool call and thinking markup --- docs/DEVELOPMENT.md | 1 + internal/httpapi/openai/chat/chat_history.go | 17 ++++ .../httpapi/openai/chat/chat_history_test.go | 81 +++++++++++++++++++ .../openai/chat/chat_stream_runtime.go | 18 +++++ .../openai/chat/empty_retry_runtime.go | 24 ++++-- internal/httpapi/openai/chat/handler_chat.go | 17 ++-- 6 files changed, 142 insertions(+), 16 deletions(-) diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index d3fe7cf..68858c4 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -74,6 +74,7 @@ gofmt -w - Admin API:`/admin/chat-history`、`/admin/chat-history/{id}`。 - 后端存储:`internal/chathistory/store.go`。 +- 输出归档:Chat history 详情保存上游原始 assistant text / thinking;即使工具调用已被对外响应转成结构化 `tool_calls` 并从可见正文剔除,后台历史仍应保留原始 DSML / XML 片段,方便排查格式漂移。 - 前端轮询和 ETag:`webui/src/features/chatHistory/ChatHistoryContainer.jsx`。 Tool call 问题优先跑: diff --git a/internal/httpapi/openai/chat/chat_history.go b/internal/httpapi/openai/chat/chat_history.go index fb274fc..27a5f2e 100644 --- a/internal/httpapi/openai/chat/chat_history.go +++ b/internal/httpapi/openai/chat/chat_history.go @@ -188,6 +188,23 @@ func (s *chatHistorySession) stopped(thinking, content, finishReason string) { }) } +func historyTextForArchive(raw, visible string) string { + if strings.TrimSpace(raw) != "" { + return raw + } + return visible +} + +func historyThinkingForArchive(raw, detection, visible string) string { + if strings.TrimSpace(raw) != "" { + return raw + } + if strings.TrimSpace(detection) != "" { + return detection + } + return visible +} + func (s *chatHistorySession) retryMissingEntry() bool { if s == nil || s.store == nil || s.disabled { return false diff --git a/internal/httpapi/openai/chat/chat_history_test.go b/internal/httpapi/openai/chat/chat_history_test.go index 246abfa..33d784f 100644 --- a/internal/httpapi/openai/chat/chat_history_test.go +++ b/internal/httpapi/openai/chat/chat_history_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" "strings" "sync" "testing" @@ -102,6 +103,86 @@ func TestChatCompletionsNonStreamPersistsHistory(t *testing.T) { } } +func TestChatHistoryNonStreamArchivesRawToolCallMarkup(t *testing.T) { + historyStore := newTestChatHistoryStore(t) + entry, err := historyStore.Start(chathistory.StartParams{ + CallerID: "caller:test", + Model: "deepseek-v4-flash", + UserInput: "call tool", + }) + if err != nil { + t.Fatalf("start history failed: %v", err) + } + session := &chatHistorySession{ + store: historyStore, + entryID: entry.ID, + startedAt: time.Now(), + lastPersist: time.Now().Add(-time.Second), + finalPrompt: "call tool", + } + rawToolCall := `golang` + + h := &Handler{} + rec := httptest.NewRecorder() + resp := makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":`+strconv.Quote(rawToolCall)+`}`, `data: [DONE]`) + h.handleNonStream(rec, resp, "cid-tool-history", "deepseek-v4-flash", "prompt", 0, false, false, []string{"search"}, nil, session) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + full, err := historyStore.Get(entry.ID) + if err != nil { + t.Fatalf("get detail failed: %v", err) + } + if full.Content != rawToolCall { + t.Fatalf("expected raw tool markup archived, got %q", full.Content) + } + if full.FinishReason != "tool_calls" { + t.Fatalf("expected tool_calls finish reason, got %#v", full.FinishReason) + } +} + +func TestChatHistoryStreamArchivesRawToolCallMarkup(t *testing.T) { + historyStore := newTestChatHistoryStore(t) + entry, err := historyStore.Start(chathistory.StartParams{ + CallerID: "caller:test", + Model: "deepseek-v4-flash", + Stream: true, + UserInput: "call tool", + }) + if err != nil { + t.Fatalf("start history failed: %v", err) + } + session := &chatHistorySession{ + store: historyStore, + entryID: entry.ID, + startedAt: time.Now(), + lastPersist: time.Now().Add(-time.Second), + finalPrompt: "call tool", + } + rawToolCall := `golang` + + h := &Handler{} + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + rec := httptest.NewRecorder() + resp := makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":`+strconv.Quote(rawToolCall)+`}`, `data: [DONE]`) + h.handleStream(rec, req, resp, "cid-stream-tool-history", "deepseek-v4-flash", "prompt", 0, false, false, []string{"search"}, nil, session) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + full, err := historyStore.Get(entry.ID) + if err != nil { + t.Fatalf("get detail failed: %v", err) + } + if full.Content != rawToolCall { + t.Fatalf("expected raw streamed tool markup archived, got %q", full.Content) + } + if full.FinishReason != "tool_calls" { + t.Fatalf("expected tool_calls finish reason, got %#v", full.FinishReason) + } +} + func TestStartChatHistoryRecoversFromTransientWriteFailure(t *testing.T) { historyStore := newTestChatHistoryStore(t) restore := blockChatHistoryDetailDir(t, historyStore.DetailDir()) diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index db34bcb..ee20405 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -195,6 +195,24 @@ func (s *chatStreamRuntime) markContextCancelled() { s.finalFinishReason = string(streamengine.StopReasonContextCancelled) } +func (s *chatStreamRuntime) historyText() string { + if s == nil { + return "" + } + return historyTextForArchive(s.accumulator.RawText.String(), s.finalText) +} + +func (s *chatStreamRuntime) historyThinking() string { + if s == nil { + return "" + } + return historyThinkingForArchive( + s.accumulator.RawThinking.String(), + s.accumulator.ToolDetectionThinking.String(), + s.finalThinking, + ) +} + func (s *chatStreamRuntime) resetStreamToolCallState() { s.streamToolCallIDs = map[int]string{} s.streamToolNames = map[int]string{} diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index d3fca68..72cbcdb 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -31,6 +31,14 @@ type chatNonStreamResult struct { outputError *assistantturn.OutputError } +func (r chatNonStreamResult) historyText() string { + return historyTextForArchive(r.rawText, r.text) +} + +func (r chatNonStreamResult) historyThinking() string { + return historyThinkingForArchive(r.rawThinking, r.toolDetectionThinking, r.thinking) +} + func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { attempts := 0 currentResp := resp @@ -70,7 +78,7 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3) if err != nil { if historySession != nil { - historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text) + historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.historyThinking(), result.historyText()) } writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err) @@ -120,14 +128,14 @@ func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNo status, message, code = result.outputError.Status, result.outputError.Message, result.outputError.Code } if historySession != nil { - historySession.error(status, message, code, result.thinking, result.text) + historySession.error(status, message, code, result.historyThinking(), result.historyText()) } writeOpenAIErrorWithCode(w, status, message, code) config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter) return } if historySession != nil { - historySession.success(http.StatusOK, result.thinking, result.text, result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens)) + historySession.success(http.StatusOK, result.historyThinking(), result.historyText(), result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens)) } writeJSON(w, http.StatusOK, result.body) source := "first_attempt" @@ -246,7 +254,7 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision { decision := streamRuntime.onParsed(parsed) if historySession != nil { - historySession.progress(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String()) + historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText()) } return decision }, @@ -258,7 +266,7 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, OnContextDone: func() { streamRuntime.markContextCancelled() if historySession != nil { - historySession.stopped(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String(), string(streamengine.StopReasonContextCancelled)) + historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled)) } }, }) @@ -278,16 +286,16 @@ func recordChatStreamHistory(streamRuntime *chatStreamRuntime, historySession *c return } if streamRuntime.finalErrorMessage != "" { - historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String()) + historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText()) return } - historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage) + historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage) } func failChatStreamRetry(streamRuntime *chatStreamRuntime, historySession *chatHistorySession, status int, message, code string) { streamRuntime.sendFailedChunk(status, message, code) if historySession != nil { - historySession.error(status, message, code, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String()) + historySession.error(status, message, code, streamRuntime.historyThinking(), streamRuntime.historyText()) } } diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index 5064869..2b6b24d 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -87,7 +87,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { sessionID = result.SessionID if outErr != nil { if historySession != nil { - historySession.error(outErr.Status, outErr.Message, outErr.Code, result.Turn.Thinking, result.Turn.Text) + historySession.error(outErr.Status, outErr.Message, outErr.Code, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text)) } writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) return @@ -96,7 +96,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { respBody["usage"] = assistantturn.OpenAIChatUsage(result.Turn) finishReason := assistantturn.FinalizeTurn(result.Turn, assistantturn.FinalizeOptions{}).FinishReason if historySession != nil { - historySession.success(http.StatusOK, result.Turn.Thinking, result.Turn.Text, finishReason, assistantturn.OpenAIChatUsage(result.Turn)) + historySession.success(http.StatusOK, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text), finishReason, assistantturn.OpenAIChatUsage(result.Turn)) } writeJSON(w, http.StatusOK, respBody) return @@ -177,7 +177,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co if outcome.ShouldFail { status, message, code := outcome.Error.Status, outcome.Error.Message, outcome.Error.Code if historySession != nil { - historySession.error(status, message, code, turn.Thinking, turn.Text) + historySession.error(status, message, code, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text)) } writeOpenAIErrorWithCode(w, status, message, code) return @@ -185,7 +185,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, finalPrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw) respBody["usage"] = assistantturn.OpenAIChatUsage(turn) if historySession != nil { - historySession.success(http.StatusOK, turn.Thinking, turn.Text, outcome.FinishReason, assistantturn.OpenAIChatUsage(turn)) + historySession.success(http.StatusOK, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text), outcome.FinishReason, assistantturn.OpenAIChatUsage(turn)) } writeJSON(w, http.StatusOK, respBody) } @@ -253,7 +253,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision { decision := streamRuntime.onParsed(parsed) if historySession != nil { - historySession.progress(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String()) + historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText()) } return decision }, @@ -267,14 +267,15 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt return } if streamRuntime.finalErrorMessage != "" { - historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String()) + historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText()) return } - historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage) + historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage) }, OnContextDone: func() { + streamRuntime.markContextCancelled() if historySession != nil { - historySession.stopped(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String(), string(streamengine.StopReasonContextCancelled)) + historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled)) } }, })