feat: implement context cancellation handling for chat and response stream runtimes to ensure clean termination without retries

This commit is contained in:
CJACK
2026-05-01 23:17:58 +08:00
parent 934b40e572
commit 0bca6e2cee
6 changed files with 185 additions and 2 deletions

View File

@@ -173,6 +173,15 @@ func (s *chatStreamRuntime) sendFailedChunk(status int, message, code string) {
s.sendDone()
}
func (s *chatStreamRuntime) markContextCancelled() {
s.finalErrorStatus = 499
s.finalErrorMessage = "request context cancelled"
s.finalErrorCode = string(streamengine.StopReasonContextCancelled)
s.finalThinking = s.thinking.String()
s.finalText = cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers)
s.finalFinishReason = string(streamengine.StopReasonContextCancelled)
}
func (s *chatStreamRuntime) resetStreamToolCallState() {
s.streamToolCallIDs = map[int]string{}
s.streamToolNames = map[int]string{}

View File

@@ -247,12 +247,13 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response,
}
},
OnContextDone: func() {
streamRuntime.markContextCancelled()
if historySession != nil {
historySession.stopped(streamRuntime.thinking.String(), streamRuntime.text.String(), string(streamengine.StopReasonContextCancelled))
}
},
})
if r.Context().Err() != nil {
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
return true, false
}
terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter")
@@ -286,6 +287,10 @@ func logChatStreamTerminal(streamRuntime *chatStreamRuntime, attempts int) {
if attempts > 0 {
source = "synthetic_retry"
}
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode)
return
}
if streamRuntime.finalErrorMessage != "" {
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
return

View File

@@ -0,0 +1,85 @@
package chat
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"ds2api/internal/chathistory"
"ds2api/internal/stream"
)
func TestConsumeChatStreamAttemptMarksContextCancelledState(t *testing.T) {
historyStore := newTestChatHistoryStore(t)
entry, err := historyStore.Start(chathistory.StartParams{
CallerID: "caller:test",
Model: "deepseek-v4-flash",
Stream: true,
UserInput: "hello",
})
if err != nil {
t.Fatalf("start history failed: %v", err)
}
session := &chatHistorySession{
store: historyStore,
entryID: entry.ID,
startedAt: time.Now(),
lastPersist: time.Now(),
finalPrompt: "prompt",
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil).WithContext(ctx)
rec := httptest.NewRecorder()
streamRuntime := newChatStreamRuntime(
rec,
http.NewResponseController(rec),
true,
"cid-cancelled",
time.Now().Unix(),
"deepseek-v4-flash",
"prompt",
false,
false,
true,
nil,
nil,
false,
false,
)
resp := makeOpenAISSEHTTPResponse(
`data: {"p":"response/content","v":"hello"}`,
`data: [DONE]`,
)
h := &Handler{}
terminalWritten, retryable := h.consumeChatStreamAttempt(req, resp, streamRuntime, "text", false, session, true)
if !terminalWritten || retryable {
t.Fatalf("expected cancelled attempt to terminate without retry, got terminalWritten=%v retryable=%v", terminalWritten, retryable)
}
if got, want := streamRuntime.finalErrorCode, string(stream.StopReasonContextCancelled); got != want {
t.Fatalf("expected cancelled final error code %q, got %q", want, got)
}
if streamRuntime.finalErrorMessage == "" {
t.Fatalf("expected cancelled final error message to be preserved")
}
snapshot, err := historyStore.Snapshot()
if err != nil {
t.Fatalf("snapshot failed: %v", err)
}
if len(snapshot.Items) != 1 {
t.Fatalf("expected one history item, got %d", len(snapshot.Items))
}
full, err := historyStore.Get(snapshot.Items[0].ID)
if err != nil {
t.Fatalf("get detail failed: %v", err)
}
if full.Status != "stopped" {
t.Fatalf("expected stopped status, got %#v", full)
}
}

View File

@@ -222,8 +222,11 @@ func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Resp
finalReason = "content_filter"
}
},
OnContextDone: func() {
streamRuntime.markContextCancelled()
},
})
if r.Context().Err() != nil {
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
return true, false
}
terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter")
@@ -238,6 +241,10 @@ func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts
if attempts > 0 {
source = "synthetic_retry"
}
if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) {
config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "responses", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode)
return
}
if streamRuntime.failed {
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
return

View File

@@ -0,0 +1,70 @@
package responses
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"ds2api/internal/promptcompat"
"ds2api/internal/stream"
)
func makeResponsesOpenAISSEHTTPResponse(lines ...string) *http.Response {
body := strings.Join(lines, "\n")
if !strings.HasSuffix(body, "\n") {
body += "\n"
}
return &http.Response{
StatusCode: http.StatusOK,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(body)),
}
}
func TestConsumeResponsesStreamAttemptMarksContextCancelledState(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil).WithContext(ctx)
rec := httptest.NewRecorder()
streamRuntime := newResponsesStreamRuntime(
rec,
http.NewResponseController(rec),
true,
"resp-cancelled",
"deepseek-v4-flash",
"prompt",
false,
false,
true,
nil,
nil,
false,
false,
promptcompat.DefaultToolChoicePolicy(),
"",
nil,
)
resp := makeResponsesOpenAISSEHTTPResponse(
`data: {"p":"response/content","v":"hello"}`,
`data: [DONE]`,
)
h := &Handler{}
terminalWritten, retryable := h.consumeResponsesStreamAttempt(req, resp, streamRuntime, "text", false, true)
if !terminalWritten || retryable {
t.Fatalf("expected cancelled attempt to terminate without retry, got terminalWritten=%v retryable=%v", terminalWritten, retryable)
}
if !streamRuntime.failed {
t.Fatalf("expected cancelled response stream to be marked failed")
}
if got, want := streamRuntime.finalErrorCode, string(stream.StopReasonContextCancelled); got != want {
t.Fatalf("expected cancelled final error code %q, got %q", want, got)
}
if streamRuntime.finalErrorMessage == "" {
t.Fatalf("expected cancelled final error message to be preserved")
}
}

View File

@@ -139,6 +139,13 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string)
s.sendDone()
}
func (s *responsesStreamRuntime) markContextCancelled() {
s.failed = true
s.finalErrorStatus = 499
s.finalErrorMessage = "request context cancelled"
s.finalErrorCode = string(streamengine.StopReasonContextCancelled)
}
func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool {
s.failed = false
s.finalErrorStatus = 0