mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-13 04:38:00 +08:00
增加“对话记录”
This commit is contained in:
268
internal/adapter/openai/chat_history.go
Normal file
268
internal/adapter/openai/chat_history.go
Normal file
@@ -0,0 +1,268 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/chathistory"
|
||||
"ds2api/internal/config"
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/prompt"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
const adminWebUISourceHeader = "X-Ds2-Source"
|
||||
const adminWebUISourceValue = "admin-webui-api-tester"
|
||||
|
||||
type chatHistorySession struct {
|
||||
store *chathistory.Store
|
||||
entryID string
|
||||
startedAt time.Time
|
||||
lastPersist time.Time
|
||||
finalPrompt string
|
||||
startParams chathistory.StartParams
|
||||
disabled bool
|
||||
}
|
||||
|
||||
func startChatHistory(store *chathistory.Store, r *http.Request, a *auth.RequestAuth, stdReq util.StandardRequest) *chatHistorySession {
|
||||
if store == nil || r == nil || a == nil {
|
||||
return nil
|
||||
}
|
||||
if !store.Enabled() {
|
||||
return nil
|
||||
}
|
||||
if !shouldCaptureChatHistory(r) {
|
||||
return nil
|
||||
}
|
||||
entry, err := store.Start(chathistory.StartParams{
|
||||
CallerID: strings.TrimSpace(a.CallerID),
|
||||
AccountID: strings.TrimSpace(a.AccountID),
|
||||
Model: strings.TrimSpace(stdReq.ResponseModel),
|
||||
Stream: stdReq.Stream,
|
||||
UserInput: extractSingleUserInput(stdReq.Messages),
|
||||
Messages: extractAllMessages(stdReq.Messages),
|
||||
FinalPrompt: stdReq.FinalPrompt,
|
||||
})
|
||||
if err != nil {
|
||||
config.Logger.Warn("[chat_history] start failed", "error", err)
|
||||
return nil
|
||||
}
|
||||
startParams := chathistory.StartParams{
|
||||
CallerID: strings.TrimSpace(a.CallerID),
|
||||
AccountID: strings.TrimSpace(a.AccountID),
|
||||
Model: strings.TrimSpace(stdReq.ResponseModel),
|
||||
Stream: stdReq.Stream,
|
||||
UserInput: extractSingleUserInput(stdReq.Messages),
|
||||
Messages: extractAllMessages(stdReq.Messages),
|
||||
FinalPrompt: stdReq.FinalPrompt,
|
||||
}
|
||||
return &chatHistorySession{
|
||||
store: store,
|
||||
entryID: entry.ID,
|
||||
startedAt: time.Now(),
|
||||
lastPersist: time.Now(),
|
||||
finalPrompt: stdReq.FinalPrompt,
|
||||
startParams: startParams,
|
||||
}
|
||||
}
|
||||
|
||||
func shouldCaptureChatHistory(r *http.Request) bool {
|
||||
if r == nil {
|
||||
return false
|
||||
}
|
||||
if isVercelStreamPrepareRequest(r) || isVercelStreamReleaseRequest(r) {
|
||||
return false
|
||||
}
|
||||
return strings.TrimSpace(r.Header.Get(adminWebUISourceHeader)) != adminWebUISourceValue
|
||||
}
|
||||
|
||||
func extractSingleUserInput(messages []any) string {
|
||||
for i := len(messages) - 1; i >= 0; i-- {
|
||||
msg, ok := messages[i].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
role := strings.ToLower(strings.TrimSpace(asString(msg["role"])))
|
||||
if role != "user" {
|
||||
continue
|
||||
}
|
||||
if normalized := strings.TrimSpace(prompt.NormalizeContent(msg["content"])); normalized != "" {
|
||||
return normalized
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func extractAllMessages(messages []any) []chathistory.Message {
|
||||
out := make([]chathistory.Message, 0, len(messages))
|
||||
for _, raw := range messages {
|
||||
msg, ok := raw.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
role := strings.ToLower(strings.TrimSpace(asString(msg["role"])))
|
||||
content := strings.TrimSpace(prompt.NormalizeContent(msg["content"]))
|
||||
if role == "" || content == "" {
|
||||
continue
|
||||
}
|
||||
out = append(out, chathistory.Message{
|
||||
Role: role,
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) progress(thinking, content string) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
if now.Sub(s.lastPersist) < 250*time.Millisecond {
|
||||
return
|
||||
}
|
||||
s.lastPersist = now
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "streaming",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "streaming",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) success(statusCode int, thinking, content, finishReason string, usage map[string]any) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "success",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: usage,
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "success",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: usage,
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) error(statusCode int, message, finishReason, thinking, content string) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "error",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
Error: message,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "error",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
Error: message,
|
||||
StatusCode: statusCode,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) stopped(thinking, content, finishReason string) {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return
|
||||
}
|
||||
if _, err := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "stopped",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: openaifmt.BuildChatUsage(s.finalPrompt, thinking, content),
|
||||
Completed: true,
|
||||
}); err != nil {
|
||||
if !s.retryMissingEntry() {
|
||||
s.disableOnMissing(err)
|
||||
return
|
||||
}
|
||||
_, retryErr := s.store.Update(s.entryID, chathistory.UpdateParams{
|
||||
Status: "stopped",
|
||||
ReasoningContent: thinking,
|
||||
Content: content,
|
||||
StatusCode: http.StatusOK,
|
||||
ElapsedMs: time.Since(s.startedAt).Milliseconds(),
|
||||
FinishReason: finishReason,
|
||||
Usage: openaifmt.BuildChatUsage(s.finalPrompt, thinking, content),
|
||||
Completed: true,
|
||||
})
|
||||
s.disableOnMissing(retryErr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) retryMissingEntry() bool {
|
||||
if s == nil || s.store == nil || s.disabled {
|
||||
return false
|
||||
}
|
||||
entry, err := s.store.Start(s.startParams)
|
||||
if err != nil {
|
||||
s.disableOnMissing(err)
|
||||
return false
|
||||
}
|
||||
s.entryID = entry.ID
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *chatHistorySession) disableOnMissing(err error) {
|
||||
if err == nil || s == nil {
|
||||
return
|
||||
}
|
||||
if strings.Contains(strings.ToLower(err.Error()), "not found") {
|
||||
s.disabled = true
|
||||
return
|
||||
}
|
||||
config.Logger.Warn("[chat_history] update disabled", "error", err)
|
||||
s.disabled = true
|
||||
}
|
||||
174
internal/adapter/openai/chat_history_test.go
Normal file
174
internal/adapter/openai/chat_history_test.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/chathistory"
|
||||
)
|
||||
|
||||
func newTestChatHistoryStore(t *testing.T) *chathistory.Store {
|
||||
t.Helper()
|
||||
store := chathistory.New(filepath.Join(t.TempDir(), "chat_history.json"))
|
||||
if err := store.Err(); err != nil {
|
||||
t.Fatalf("chat history store unavailable: %v", err)
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func TestChatCompletionsNonStreamPersistsHistory(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
h := &Handler{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: streamStatusDSStub{resp: makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"hello world"}`, `data: [DONE]`)},
|
||||
ChatHistory: historyStore,
|
||||
}
|
||||
|
||||
reqBody := `{"model":"deepseek-chat","messages":[{"role":"system","content":"be precise"},{"role":"user","content":"hi there"},{"role":"assistant","content":"previous answer"}],"stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
h.ChatCompletions(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
snapshot, err := historyStore.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot failed: %v", err)
|
||||
}
|
||||
if len(snapshot.Items) != 1 {
|
||||
t.Fatalf("expected one history item, got %d", len(snapshot.Items))
|
||||
}
|
||||
item := snapshot.Items[0]
|
||||
if item.Status != "success" || item.UserInput != "hi there" {
|
||||
t.Fatalf("unexpected persisted history summary: %#v", item)
|
||||
}
|
||||
full, err := historyStore.Get(item.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("expected detail item, got %v", err)
|
||||
}
|
||||
if full.Content != "hello world" {
|
||||
t.Fatalf("expected detail content persisted, got %#v", full)
|
||||
}
|
||||
if len(full.Messages) != 3 {
|
||||
t.Fatalf("expected all request messages persisted, got %#v", full.Messages)
|
||||
}
|
||||
if full.FinalPrompt == "" {
|
||||
t.Fatalf("expected final prompt to be persisted")
|
||||
}
|
||||
if item.CallerID != "caller:test" {
|
||||
t.Fatalf("expected caller hash persisted in summary, got %#v", item.CallerID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleStreamContextCancelledMarksHistoryStopped(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
entry, err := historyStore.Start(chathistory.StartParams{
|
||||
CallerID: "caller:test",
|
||||
Model: "deepseek-chat",
|
||||
Stream: true,
|
||||
UserInput: "hello",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start history failed: %v", err)
|
||||
}
|
||||
session := &chatHistorySession{
|
||||
store: historyStore,
|
||||
entryID: entry.ID,
|
||||
startedAt: time.Now(),
|
||||
lastPersist: time.Now(),
|
||||
finalPrompt: "hello",
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
h := &Handler{}
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil).WithContext(ctx)
|
||||
rec := httptest.NewRecorder()
|
||||
resp := makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"hello"}`, `data: [DONE]`)
|
||||
|
||||
h.handleStream(rec, req, resp, "cid-stop", "deepseek-chat", "prompt", false, false, nil, session)
|
||||
|
||||
snapshot, err := historyStore.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot failed: %v", err)
|
||||
}
|
||||
if len(snapshot.Items) != 1 {
|
||||
t.Fatalf("expected one history item, got %d", len(snapshot.Items))
|
||||
}
|
||||
full, err := historyStore.Get(snapshot.Items[0].ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get detail failed: %v", err)
|
||||
}
|
||||
if full.Status != "stopped" {
|
||||
t.Fatalf("expected stopped status, got %#v", full)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsSkipsAdminWebUISource(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
h := &Handler{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: streamStatusDSStub{resp: makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"hello world"}`, `data: [DONE]`)},
|
||||
ChatHistory: historyStore,
|
||||
}
|
||||
|
||||
reqBody := `{"model":"deepseek-chat","messages":[{"role":"user","content":"hi there"}],"stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set(adminWebUISourceHeader, adminWebUISourceValue)
|
||||
rec := httptest.NewRecorder()
|
||||
h.ChatCompletions(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
snapshot, err := historyStore.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot failed: %v", err)
|
||||
}
|
||||
if len(snapshot.Items) != 0 {
|
||||
t.Fatalf("expected admin webui source to be skipped, got %#v", snapshot.Items)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsSkipsHistoryWhenDisabled(t *testing.T) {
|
||||
historyStore := newTestChatHistoryStore(t)
|
||||
if _, err := historyStore.SetLimit(chathistory.DisabledLimit); err != nil {
|
||||
t.Fatalf("disable history store failed: %v", err)
|
||||
}
|
||||
h := &Handler{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: streamStatusDSStub{resp: makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"hello world"}`, `data: [DONE]`)},
|
||||
ChatHistory: historyStore,
|
||||
}
|
||||
|
||||
reqBody := `{"model":"deepseek-chat","messages":[{"role":"user","content":"hi there"}],"stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
h.ChatCompletions(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
snapshot, err := historyStore.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot failed: %v", err)
|
||||
}
|
||||
if len(snapshot.Items) != 0 {
|
||||
t.Fatalf("expected disabled history to stay empty, got %#v", snapshot.Items)
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,14 @@ type chatStreamRuntime struct {
|
||||
streamToolNames map[int]string
|
||||
thinking strings.Builder
|
||||
text strings.Builder
|
||||
|
||||
finalThinking string
|
||||
finalText string
|
||||
finalFinishReason string
|
||||
finalUsage map[string]any
|
||||
finalErrorStatus int
|
||||
finalErrorMessage string
|
||||
finalErrorCode string
|
||||
}
|
||||
|
||||
func newChatStreamRuntime(
|
||||
@@ -99,6 +107,9 @@ func (s *chatStreamRuntime) sendDone() {
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) sendFailedChunk(status int, message, code string) {
|
||||
s.finalErrorStatus = status
|
||||
s.finalErrorMessage = message
|
||||
s.finalErrorCode = code
|
||||
s.sendChunk(map[string]any{
|
||||
"status_code": status,
|
||||
"error": map[string]any{
|
||||
@@ -114,6 +125,8 @@ func (s *chatStreamRuntime) sendFailedChunk(status int, message, code string) {
|
||||
func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
finalThinking := s.thinking.String()
|
||||
finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers)
|
||||
s.finalThinking = finalThinking
|
||||
s.finalText = finalText
|
||||
detected := toolcall.ParseStandaloneToolCallsDetailed(finalText, s.toolNames)
|
||||
if len(detected.Calls) > 0 && !s.toolCallsDoneEmitted {
|
||||
finishReason = "tool_calls"
|
||||
@@ -197,6 +210,8 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
return
|
||||
}
|
||||
usage := openaifmt.BuildChatUsage(s.finalPrompt, finalThinking, finalText)
|
||||
s.finalFinishReason = finishReason
|
||||
s.finalUsage = usage
|
||||
s.sendChunk(openaifmt.BuildChatStreamChunk(
|
||||
s.completionID,
|
||||
s.created,
|
||||
|
||||
@@ -63,32 +63,45 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
|
||||
writeOpenAIError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
historySession := startChatHistory(h.ChatHistory, r, a, stdReq)
|
||||
|
||||
sessionID, err = h.DS.CreateSession(r.Context(), a, 3)
|
||||
if err != nil {
|
||||
if a.UseConfigToken {
|
||||
if historySession != nil {
|
||||
historySession.error(http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.", "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.")
|
||||
} else {
|
||||
if historySession != nil {
|
||||
historySession.error(http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.", "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
|
||||
}
|
||||
return
|
||||
}
|
||||
pow, err := h.DS.GetPow(r.Context(), a, 3)
|
||||
if err != nil {
|
||||
if historySession != nil {
|
||||
historySession.error(http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).", "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).")
|
||||
return
|
||||
}
|
||||
payload := stdReq.CompletionPayload(sessionID)
|
||||
resp, err := h.DS.CallCompletion(r.Context(), a, payload, pow, 3)
|
||||
if err != nil {
|
||||
if historySession != nil {
|
||||
historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.")
|
||||
return
|
||||
}
|
||||
if stdReq.Stream {
|
||||
h.handleStream(w, r, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames)
|
||||
h.handleStream(w, r, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
return
|
||||
}
|
||||
h.handleNonStream(w, r.Context(), resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames)
|
||||
h.handleNonStream(w, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
}
|
||||
|
||||
func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) {
|
||||
@@ -124,14 +137,16 @@ func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAu
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) {
|
||||
func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if historySession != nil {
|
||||
historySession.error(resp.StatusCode, string(body), "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, resp.StatusCode, string(body))
|
||||
return
|
||||
}
|
||||
_ = ctx
|
||||
result := sse.CollectStream(resp, thinkingEnabled, true)
|
||||
|
||||
stripReferenceMarkers := h.compatStripReferenceMarkers()
|
||||
@@ -140,17 +155,34 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, re
|
||||
if searchEnabled {
|
||||
finalText = replaceCitationMarkersWithLinks(finalText, result.CitationLinks)
|
||||
}
|
||||
if writeUpstreamEmptyOutputError(w, finalText, result.ContentFilter) {
|
||||
if shouldWriteUpstreamEmptyOutputError(finalText, result.ContentFilter) {
|
||||
status, message, code := upstreamEmptyOutputDetail(result.ContentFilter, finalText, finalThinking)
|
||||
if historySession != nil {
|
||||
historySession.error(status, message, code, finalThinking, finalText)
|
||||
}
|
||||
writeUpstreamEmptyOutputError(w, finalText, result.ContentFilter)
|
||||
return
|
||||
}
|
||||
respBody := openaifmt.BuildChatCompletion(completionID, model, finalPrompt, finalThinking, finalText, toolNames)
|
||||
finishReason := "stop"
|
||||
if choices, ok := respBody["choices"].([]map[string]any); ok && len(choices) > 0 {
|
||||
if fr, _ := choices[0]["finish_reason"].(string); strings.TrimSpace(fr) != "" {
|
||||
finishReason = fr
|
||||
}
|
||||
}
|
||||
if historySession != nil {
|
||||
historySession.success(http.StatusOK, finalThinking, finalText, finishReason, openaifmt.BuildChatUsage(finalPrompt, finalThinking, finalText))
|
||||
}
|
||||
writeJSON(w, http.StatusOK, respBody)
|
||||
}
|
||||
|
||||
func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) {
|
||||
func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if historySession != nil {
|
||||
historySession.error(resp.StatusCode, string(body), "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, resp.StatusCode, string(body))
|
||||
return
|
||||
}
|
||||
@@ -201,13 +233,32 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
OnKeepAlive: func() {
|
||||
streamRuntime.sendKeepAlive()
|
||||
},
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
decision := streamRuntime.onParsed(parsed)
|
||||
if historySession != nil {
|
||||
historySession.progress(streamRuntime.thinking.String(), streamRuntime.text.String())
|
||||
}
|
||||
return decision
|
||||
},
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
streamRuntime.finalize("content_filter")
|
||||
} else {
|
||||
streamRuntime.finalize("stop")
|
||||
}
|
||||
if historySession == nil {
|
||||
return
|
||||
}
|
||||
streamRuntime.finalize("stop")
|
||||
if streamRuntime.finalErrorMessage != "" {
|
||||
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.thinking.String(), streamRuntime.text.String())
|
||||
return
|
||||
}
|
||||
historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage)
|
||||
},
|
||||
OnContextDone: func() {
|
||||
if historySession != nil {
|
||||
historySession.stopped(streamRuntime.thinking.String(), streamRuntime.text.String(), string(streamengine.StopReasonContextCancelled))
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/chathistory"
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
@@ -25,9 +26,10 @@ const (
|
||||
var writeJSON = util.WriteJSON
|
||||
|
||||
type Handler struct {
|
||||
Store ConfigReader
|
||||
Auth AuthResolver
|
||||
DS DeepSeekCaller
|
||||
Store ConfigReader
|
||||
Auth AuthResolver
|
||||
DS DeepSeekCaller
|
||||
ChatHistory *chathistory.Store
|
||||
|
||||
leaseMu sync.Mutex
|
||||
streamLeases map[string]streamLease
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -94,7 +93,7 @@ func TestHandleNonStreamReturns429WhenUpstreamOutputEmpty(t *testing.T) {
|
||||
)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h.handleNonStream(rec, context.Background(), resp, "cid-empty", "deepseek-chat", "prompt", false, false, nil)
|
||||
h.handleNonStream(rec, resp, "cid-empty", "deepseek-chat", "prompt", false, false, nil, nil)
|
||||
if rec.Code != http.StatusTooManyRequests {
|
||||
t.Fatalf("expected status 429 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
@@ -113,7 +112,7 @@ func TestHandleNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutp
|
||||
)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h.handleNonStream(rec, context.Background(), resp, "cid-empty-filtered", "deepseek-chat", "prompt", false, false, nil)
|
||||
h.handleNonStream(rec, resp, "cid-empty-filtered", "deepseek-chat", "prompt", false, false, nil, nil)
|
||||
if rec.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected status 400 for filtered upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
@@ -132,7 +131,7 @@ func TestHandleNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testing.T) {
|
||||
)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h.handleNonStream(rec, context.Background(), resp, "cid-thinking-only", "deepseek-reasoner", "prompt", true, false, nil)
|
||||
h.handleNonStream(rec, resp, "cid-thinking-only", "deepseek-reasoner", "prompt", true, false, nil, nil)
|
||||
if rec.Code != http.StatusTooManyRequests {
|
||||
t.Fatalf("expected status 429 for thinking-only upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
@@ -153,7 +152,7 @@ func TestHandleStreamToolsPlainTextStreamsBeforeFinish(t *testing.T) {
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||
|
||||
h.handleStream(rec, req, resp, "cid6", "deepseek-chat", "prompt", false, false, []string{"search"})
|
||||
h.handleStream(rec, req, resp, "cid6", "deepseek-chat", "prompt", false, false, []string{"search"}, nil)
|
||||
|
||||
frames, done := parseSSEDataFrames(t, rec.Body.String())
|
||||
if !done {
|
||||
@@ -190,7 +189,7 @@ func TestHandleStreamIncompleteCapturedToolJSONFlushesAsTextOnFinalize(t *testin
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||
|
||||
h.handleStream(rec, req, resp, "cid10", "deepseek-chat", "prompt", false, false, []string{"search"})
|
||||
h.handleStream(rec, req, resp, "cid10", "deepseek-chat", "prompt", false, false, []string{"search"}, nil)
|
||||
|
||||
frames, done := parseSSEDataFrames(t, rec.Body.String())
|
||||
if !done {
|
||||
|
||||
@@ -2,14 +2,26 @@ package openai
|
||||
|
||||
import "net/http"
|
||||
|
||||
func shouldWriteUpstreamEmptyOutputError(text string, contentFilter bool) bool {
|
||||
return text == ""
|
||||
}
|
||||
|
||||
func upstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, string, string) {
|
||||
_ = text
|
||||
if contentFilter {
|
||||
return http.StatusBadRequest, "Upstream content filtered the response and returned no output.", "content_filter"
|
||||
}
|
||||
if thinking != "" {
|
||||
return http.StatusTooManyRequests, "Upstream model returned reasoning without visible output.", "upstream_empty_output"
|
||||
}
|
||||
return http.StatusTooManyRequests, "Upstream model returned empty output.", "upstream_empty_output"
|
||||
}
|
||||
|
||||
func writeUpstreamEmptyOutputError(w http.ResponseWriter, text string, contentFilter bool) bool {
|
||||
if text != "" {
|
||||
if !shouldWriteUpstreamEmptyOutputError(text, contentFilter) {
|
||||
return false
|
||||
}
|
||||
if contentFilter {
|
||||
writeOpenAIErrorWithCode(w, http.StatusBadRequest, "Upstream content filtered the response and returned no output.", "content_filter")
|
||||
return true
|
||||
}
|
||||
writeOpenAIErrorWithCode(w, http.StatusTooManyRequests, "Upstream model returned empty output.", "upstream_empty_output")
|
||||
status, message, code := upstreamEmptyOutputDetail(contentFilter, text, "")
|
||||
writeOpenAIErrorWithCode(w, status, message, code)
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user