mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-11 11:47:43 +08:00
refactor: improve chat history persistence reliability with metadata-only migration, error handling, and optimized file updates
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -45,10 +46,6 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request
|
||||
Messages: extractAllMessages(stdReq.Messages),
|
||||
FinalPrompt: stdReq.FinalPrompt,
|
||||
})
|
||||
if err != nil {
|
||||
config.Logger.Warn("[chat_history] start failed", "error", err)
|
||||
return nil
|
||||
}
|
||||
startParams := chathistory.StartParams{
|
||||
CallerID: strings.TrimSpace(a.CallerID),
|
||||
AccountID: strings.TrimSpace(a.AccountID),
|
||||
@@ -58,7 +55,7 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request
|
||||
Messages: extractAllMessages(stdReq.Messages),
|
||||
FinalPrompt: stdReq.FinalPrompt,
|
||||
}
|
||||
return &chatHistorySession{
|
||||
session := &chatHistorySession{
|
||||
store: store,
|
||||
entryID: entry.ID,
|
||||
startedAt: time.Now(),
|
||||
@@ -66,6 +63,14 @@ func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.Request
|
||||
finalPrompt: stdReq.FinalPrompt,
|
||||
startParams: startParams,
|
||||
}
|
||||
if err != nil {
|
||||
if entry.ID == "" {
|
||||
config.Logger.Warn("[chat_history] start failed", "error", err)
|
||||
return nil
|
||||
}
|
||||
config.Logger.Warn("[chat_history] start persisted in memory after write failure", "error", err)
|
||||
}
|
||||
return session
|
||||
}
|
||||
|
||||
func shouldCaptureChatHistory(r *http.Request) bool {
|
||||
@@ -124,33 +129,20 @@ func (s *chatHistorySession) progress(thinking, content string) {
|
||||
return
|
||||
}
|
||||
s.lastPersist = now
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
s.persistUpdate(chathistory.UpdateParams{
|
||||
Status: "streaming",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "streaming",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) success(statusCode int, thinking, content, finishReason string, usage map[string]any) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
s.persistUpdate(chathistory.UpdateParams{
|
||||
Status: "success",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
@@ -159,30 +151,14 @@ func (s *chatHistorySession) success(statusCode int, thinking, content, finishRe
|
||||
FinishReason: finishReason,
|
||||
Usage: usage,
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "success",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: usage,
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) error(statusCode int, message, finishReason, thinking, content string) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
s.persistUpdate(chathistory.UpdateParams{
|
||||
Status: "error",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
@@ -191,30 +167,14 @@ func (s *chatHistorySession) error(statusCode int, message, finishReason, thinki
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "error",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
Error: message,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) stopped(thinking, content, finishReason string) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
s.persistUpdate(chathistory.UpdateParams{
|
||||
Status: "stopped",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
@@ -223,23 +183,7 @@ func (s *chatHistorySession) stopped(thinking, content, finishReason string) {
|
||||
FinishReason: finishReason,
|
||||
Usage: openaifmt.BuildChatUsage(s.finalPrompt, thinking, content),
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "stopped",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: openaifmt.BuildChatUsage(s.finalPrompt, thinking, content),
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) retryMissingEntry() bool {
|
||||
@@ -247,22 +191,60 @@ func (s *chatHistorySession) retryMissingEntry() bool {
|
||||
return false
|
||||
}
|
||||
entry, err := s.store.Start(s.startParams)
|
||||
if err != nil {
|
||||
s.disableOnMissing(err)
|
||||
if errors.Is(err, chathistory.ErrDisabled) {
|
||||
s.disabled = true
|
||||
return false
|
||||
}
|
||||
if entry.ID == "" {
|
||||
if err != nil {
|
||||
config.Logger.Warn("[chat_history] recreate missing entry failed", "error", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
s.entryID = entry.ID
|
||||
if err != nil {
|
||||
config.Logger.Warn("[chat_history] recreate missing entry persisted in memory after write failure", "error", err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) disableOnMissing(err error) {
|
||||
func (s *chatHistorySession) persistUpdate(params chathistory.UpdateParams) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, params); err != nil {
|
||||
s.handlePersistError(params, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) handlePersistError(params chathistory.UpdateParams, err error) {
|
||||
if err == nil || s == nil {
|
||||
return
|
||||
}
|
||||
if strings.Contains(strings.ToLower(err.Error()), "not found") {
|
||||
if errors.Is(err, chathistory.ErrDisabled) {
|
||||
s.disabled = true
|
||||
return
|
||||
}
|
||||
config.Logger.Warn("[chat_history] update disabled", "error", err)
|
||||
s.disabled = true
|
||||
if isChatHistoryMissingError(err) {
|
||||
if s.retryMissingEntry() {
|
||||
if _, retryErr := s.store.Update(s.entryID, params); retryErr != nil {
|
||||
if errors.Is(retryErr, chathistory.ErrDisabled) || isChatHistoryMissingError(retryErr) {
|
||||
s.disabled = true
|
||||
return
|
||||
}
|
||||
config.Logger.Warn("[chat_history] retry after missing entry failed", "error", retryErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
s.disabled = true
|
||||
return
|
||||
}
|
||||
config.Logger.Warn("[chat_history] update failed", "error", err)
|
||||
}
|
||||
|
||||
func isChatHistoryMissingError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(strings.ToLower(err.Error()), "not found")
|
||||
}
|
||||
|
||||
@@ -4,12 +4,16 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/chathistory"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
func newTestChatHistoryStore(t *testing.T) *chathistory.Store {
|
||||
@@ -21,6 +25,35 @@ func newTestChatHistoryStore(t *testing.T) *chathistory.Store {
|
||||
return store
|
||||
}
|
||||
|
||||
func blockChatHistoryDetailDir(t *testing.T, detailDir string) func() {
|
||||
t.Helper()
|
||||
blockedDir := detailDir + ".blocked"
|
||||
if err := os.RemoveAll(blockedDir); err != nil {
|
||||
t.Fatalf("remove blocked detail dir failed: %v", err)
|
||||
}
|
||||
if err := os.Rename(detailDir, blockedDir); err != nil {
|
||||
t.Fatalf("move detail dir aside failed: %v", err)
|
||||
}
|
||||
if err := os.RemoveAll(detailDir); err != nil {
|
||||
t.Fatalf("remove blocked detail path failed: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(detailDir, []byte("blocked"), 0o644); err != nil {
|
||||
t.Fatalf("write blocked detail path failed: %v", err)
|
||||
}
|
||||
var once sync.Once
|
||||
return func() {
|
||||
t.Helper()
|
||||
once.Do(func() {
|
||||
if err := os.RemoveAll(detailDir); err != nil {
|
||||
t.Fatalf("remove blocking detail path failed: %v", err)
|
||||
}
|
||||
if err := os.Rename(blockedDir, detailDir); err != nil {
|
||||
t.Fatalf("restore detail dir failed: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsNonStreamPersistsHistory(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
h := &Handler{
|
||||
@@ -69,6 +102,72 @@ func TestChatCompletionsNonStreamPersistsHistory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartChatHistoryRecoversFromTransientWriteFailure(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
restore := blockChatHistoryDetailDir(t, historyStore.DetailDir())
|
||||
t.Cleanup(restore)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
a := &auth.RequestAuth{
|
||||
CallerID: "caller:test",
|
||||
AccountID: "acct:test",
|
||||
}
|
||||
stdReq := util.StandardRequest{
|
||||
ResponseModel: "deepseek-chat",
|
||||
Stream: true,
|
||||
Messages: []any{
|
||||
map[string]any{"role": "user", "content": "hello"},
|
||||
},
|
||||
FinalPrompt: "hello",
|
||||
}
|
||||
|
||||
session := startChatHistory(historyStore, req, a, stdReq)
|
||||
if session == nil {
|
||||
t.Fatalf("expected session even when initial persistence fails")
|
||||
}
|
||||
if session.disabled {
|
||||
t.Fatalf("expected session to remain active after transient start failure")
|
||||
}
|
||||
if session.entryID == "" {
|
||||
t.Fatalf("expected session entry id to be retained")
|
||||
}
|
||||
if err := historyStore.Err(); err != nil {
|
||||
t.Fatalf("transient start failure should not latch store error: %v", err)
|
||||
}
|
||||
|
||||
session.lastPersist = time.Now().Add(-time.Second)
|
||||
session.progress("thinking", "partial")
|
||||
if session.disabled {
|
||||
t.Fatalf("expected session to remain active after transient update failure")
|
||||
}
|
||||
if session.entryID == "" {
|
||||
t.Fatalf("expected session entry id to remain set after update failure")
|
||||
}
|
||||
if err := historyStore.Err(); err != nil {
|
||||
t.Fatalf("transient update failure should not latch store error: %v", err)
|
||||
}
|
||||
|
||||
restore()
|
||||
|
||||
session.success(http.StatusOK, "thinking", "final answer", "stop", map[string]any{"total_tokens": 7})
|
||||
snapshot, err := historyStore.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot failed after restore: %v", err)
|
||||
}
|
||||
if len(snapshot.Items) != 1 {
|
||||
t.Fatalf("expected one persisted item after restore, got %#v", snapshot.Items)
|
||||
}
|
||||
full, err := historyStore.Get(session.entryID)
|
||||
if err != nil {
|
||||
t.Fatalf("get restored entry failed: %v", err)
|
||||
}
|
||||
if full.Status != "success" || full.Content != "final answer" {
|
||||
t.Fatalf("expected restored entry to persist final success, got %#v", full)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleStreamContextCancelledMarksHistoryStopped(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
entry, err := historyStore.Start(chathistory.StartParams{
|
||||
|
||||
Reference in New Issue
Block a user