refactor: prioritize raw model output in chat history archiving to ensure accurate capture of tool call and thinking markup

This commit is contained in:
CJACK
2026-05-03 15:44:17 +08:00
parent 837dc74ffc
commit 5e55cf36d8
6 changed files with 142 additions and 16 deletions

View File

@@ -74,6 +74,7 @@ gofmt -w <changed-go-files>
- Admin API`/admin/chat-history``/admin/chat-history/{id}`
- 后端存储:`internal/chathistory/store.go`
- 输出归档Chat history 详情保存上游原始 assistant text / thinking即使工具调用已被对外响应转成结构化 `tool_calls` 并从可见正文剔除,后台历史仍应保留原始 DSML / XML 片段,方便排查格式漂移。
- 前端轮询和 ETag`webui/src/features/chatHistory/ChatHistoryContainer.jsx`
Tool call 问题优先跑:

View File

@@ -188,6 +188,23 @@ func (s *chatHistorySession) stopped(thinking, content, finishReason string) {
})
}
func historyTextForArchive(raw, visible string) string {
if strings.TrimSpace(raw) != "" {
return raw
}
return visible
}
func historyThinkingForArchive(raw, detection, visible string) string {
if strings.TrimSpace(raw) != "" {
return raw
}
if strings.TrimSpace(detection) != "" {
return detection
}
return visible
}
func (s *chatHistorySession) retryMissingEntry() bool {
if s == nil || s.store == nil || s.disabled {
return false

View File

@@ -6,6 +6,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
@@ -102,6 +103,86 @@ func TestChatCompletionsNonStreamPersistsHistory(t *testing.T) {
}
}
func TestChatHistoryNonStreamArchivesRawToolCallMarkup(t *testing.T) {
historyStore := newTestChatHistoryStore(t)
entry, err := historyStore.Start(chathistory.StartParams{
CallerID: "caller:test",
Model: "deepseek-v4-flash",
UserInput: "call tool",
})
if err != nil {
t.Fatalf("start history failed: %v", err)
}
session := &chatHistorySession{
store: historyStore,
entryID: entry.ID,
startedAt: time.Now(),
lastPersist: time.Now().Add(-time.Second),
finalPrompt: "call tool",
}
rawToolCall := `<tool_calls><invoke name="search"><parameter name="q">golang</parameter></invoke></tool_calls>`
h := &Handler{}
rec := httptest.NewRecorder()
resp := makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":`+strconv.Quote(rawToolCall)+`}`, `data: [DONE]`)
h.handleNonStream(rec, resp, "cid-tool-history", "deepseek-v4-flash", "prompt", 0, false, false, []string{"search"}, nil, session)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
full, err := historyStore.Get(entry.ID)
if err != nil {
t.Fatalf("get detail failed: %v", err)
}
if full.Content != rawToolCall {
t.Fatalf("expected raw tool markup archived, got %q", full.Content)
}
if full.FinishReason != "tool_calls" {
t.Fatalf("expected tool_calls finish reason, got %#v", full.FinishReason)
}
}
func TestChatHistoryStreamArchivesRawToolCallMarkup(t *testing.T) {
historyStore := newTestChatHistoryStore(t)
entry, err := historyStore.Start(chathistory.StartParams{
CallerID: "caller:test",
Model: "deepseek-v4-flash",
Stream: true,
UserInput: "call tool",
})
if err != nil {
t.Fatalf("start history failed: %v", err)
}
session := &chatHistorySession{
store: historyStore,
entryID: entry.ID,
startedAt: time.Now(),
lastPersist: time.Now().Add(-time.Second),
finalPrompt: "call tool",
}
rawToolCall := `<tool_calls><invoke name="search"><parameter name="q">golang</parameter></invoke></tool_calls>`
h := &Handler{}
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
rec := httptest.NewRecorder()
resp := makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":`+strconv.Quote(rawToolCall)+`}`, `data: [DONE]`)
h.handleStream(rec, req, resp, "cid-stream-tool-history", "deepseek-v4-flash", "prompt", 0, false, false, []string{"search"}, nil, session)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
full, err := historyStore.Get(entry.ID)
if err != nil {
t.Fatalf("get detail failed: %v", err)
}
if full.Content != rawToolCall {
t.Fatalf("expected raw streamed tool markup archived, got %q", full.Content)
}
if full.FinishReason != "tool_calls" {
t.Fatalf("expected tool_calls finish reason, got %#v", full.FinishReason)
}
}
func TestStartChatHistoryRecoversFromTransientWriteFailure(t *testing.T) {
historyStore := newTestChatHistoryStore(t)
restore := blockChatHistoryDetailDir(t, historyStore.DetailDir())

View File

@@ -195,6 +195,24 @@ func (s *chatStreamRuntime) markContextCancelled() {
s.finalFinishReason = string(streamengine.StopReasonContextCancelled)
}
func (s *chatStreamRuntime) historyText() string {
if s == nil {
return ""
}
return historyTextForArchive(s.accumulator.RawText.String(), s.finalText)
}
func (s *chatStreamRuntime) historyThinking() string {
if s == nil {
return ""
}
return historyThinkingForArchive(
s.accumulator.RawThinking.String(),
s.accumulator.ToolDetectionThinking.String(),
s.finalThinking,
)
}
func (s *chatStreamRuntime) resetStreamToolCallState() {
s.streamToolCallIDs = map[int]string{}
s.streamToolNames = map[int]string{}

View File

@@ -31,6 +31,14 @@ type chatNonStreamResult struct {
outputError *assistantturn.OutputError
}
func (r chatNonStreamResult) historyText() string {
return historyTextForArchive(r.rawText, r.text)
}
func (r chatNonStreamResult) historyThinking() string {
return historyThinkingForArchive(r.rawThinking, r.toolDetectionThinking, r.thinking)
}
func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) {
attempts := 0
currentResp := resp
@@ -70,7 +78,7 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co
nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3)
if err != nil {
if historySession != nil {
historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text)
historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.historyThinking(), result.historyText())
}
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.")
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err)
@@ -120,14 +128,14 @@ func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNo
status, message, code = result.outputError.Status, result.outputError.Message, result.outputError.Code
}
if historySession != nil {
historySession.error(status, message, code, result.thinking, result.text)
historySession.error(status, message, code, result.historyThinking(), result.historyText())
}
writeOpenAIErrorWithCode(w, status, message, code)
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter)
return
}
if historySession != nil {
historySession.success(http.StatusOK, result.thinking, result.text, result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens))
historySession.success(http.StatusOK, result.historyThinking(), result.historyText(), result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens))
}
writeJSON(w, http.StatusOK, result.body)
source := "first_attempt"
@@ -246,7 +254,7 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response,
OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision {
decision := streamRuntime.onParsed(parsed)
if historySession != nil {
historySession.progress(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String())
historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText())
}
return decision
},
@@ -258,7 +266,7 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response,
OnContextDone: func() {
streamRuntime.markContextCancelled()
if historySession != nil {
historySession.stopped(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String(), string(streamengine.StopReasonContextCancelled))
historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled))
}
},
})
@@ -278,16 +286,16 @@ func recordChatStreamHistory(streamRuntime *chatStreamRuntime, historySession *c
return
}
if streamRuntime.finalErrorMessage != "" {
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String())
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText())
return
}
historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage)
historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage)
}
func failChatStreamRetry(streamRuntime *chatStreamRuntime, historySession *chatHistorySession, status int, message, code string) {
streamRuntime.sendFailedChunk(status, message, code)
if historySession != nil {
historySession.error(status, message, code, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String())
historySession.error(status, message, code, streamRuntime.historyThinking(), streamRuntime.historyText())
}
}

View File

@@ -87,7 +87,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
sessionID = result.SessionID
if outErr != nil {
if historySession != nil {
historySession.error(outErr.Status, outErr.Message, outErr.Code, result.Turn.Thinking, result.Turn.Text)
historySession.error(outErr.Status, outErr.Message, outErr.Code, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text))
}
writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code)
return
@@ -96,7 +96,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
respBody["usage"] = assistantturn.OpenAIChatUsage(result.Turn)
finishReason := assistantturn.FinalizeTurn(result.Turn, assistantturn.FinalizeOptions{}).FinishReason
if historySession != nil {
historySession.success(http.StatusOK, result.Turn.Thinking, result.Turn.Text, finishReason, assistantturn.OpenAIChatUsage(result.Turn))
historySession.success(http.StatusOK, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text), finishReason, assistantturn.OpenAIChatUsage(result.Turn))
}
writeJSON(w, http.StatusOK, respBody)
return
@@ -177,7 +177,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co
if outcome.ShouldFail {
status, message, code := outcome.Error.Status, outcome.Error.Message, outcome.Error.Code
if historySession != nil {
historySession.error(status, message, code, turn.Thinking, turn.Text)
historySession.error(status, message, code, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text))
}
writeOpenAIErrorWithCode(w, status, message, code)
return
@@ -185,7 +185,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co
respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, finalPrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw)
respBody["usage"] = assistantturn.OpenAIChatUsage(turn)
if historySession != nil {
historySession.success(http.StatusOK, turn.Thinking, turn.Text, outcome.FinishReason, assistantturn.OpenAIChatUsage(turn))
historySession.success(http.StatusOK, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text), outcome.FinishReason, assistantturn.OpenAIChatUsage(turn))
}
writeJSON(w, http.StatusOK, respBody)
}
@@ -253,7 +253,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision {
decision := streamRuntime.onParsed(parsed)
if historySession != nil {
historySession.progress(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String())
historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText())
}
return decision
},
@@ -267,14 +267,15 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
return
}
if streamRuntime.finalErrorMessage != "" {
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String())
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText())
return
}
historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage)
historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage)
},
OnContextDone: func() {
streamRuntime.markContextCancelled()
if historySession != nil {
historySession.stopped(streamRuntime.accumulator.Thinking.String(), streamRuntime.accumulator.Text.String(), string(streamengine.StopReasonContextCancelled))
historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled))
}
},
})