diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index 17ff0d5..a9270a1 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -173,6 +173,15 @@ func (s *chatStreamRuntime) sendFailedChunk(status int, message, code string) { s.sendDone() } +func (s *chatStreamRuntime) markContextCancelled() { + s.finalErrorStatus = 499 + s.finalErrorMessage = "request context cancelled" + s.finalErrorCode = string(streamengine.StopReasonContextCancelled) + s.finalThinking = s.thinking.String() + s.finalText = cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers) + s.finalFinishReason = string(streamengine.StopReasonContextCancelled) +} + func (s *chatStreamRuntime) resetStreamToolCallState() { s.streamToolCallIDs = map[int]string{} s.streamToolNames = map[int]string{} diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 147024f..464dd2c 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -247,12 +247,13 @@ func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, } }, OnContextDone: func() { + streamRuntime.markContextCancelled() if historySession != nil { historySession.stopped(streamRuntime.thinking.String(), streamRuntime.text.String(), string(streamengine.StopReasonContextCancelled)) } }, }) - if r.Context().Err() != nil { + if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { return true, false } terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") @@ -286,6 +287,10 @@ func logChatStreamTerminal(streamRuntime *chatStreamRuntime, attempts int) { if attempts > 0 { source = "synthetic_retry" } + if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { + config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode) + return + } if streamRuntime.finalErrorMessage != "" { config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) return diff --git a/internal/httpapi/openai/chat/empty_retry_runtime_test.go b/internal/httpapi/openai/chat/empty_retry_runtime_test.go new file mode 100644 index 0000000..ff8155f --- /dev/null +++ b/internal/httpapi/openai/chat/empty_retry_runtime_test.go @@ -0,0 +1,85 @@ +package chat + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "ds2api/internal/chathistory" + "ds2api/internal/stream" +) + +func TestConsumeChatStreamAttemptMarksContextCancelledState(t *testing.T) { + historyStore := newTestChatHistoryStore(t) + entry, err := historyStore.Start(chathistory.StartParams{ + CallerID: "caller:test", + Model: "deepseek-v4-flash", + Stream: true, + UserInput: "hello", + }) + if err != nil { + t.Fatalf("start history failed: %v", err) + } + session := &chatHistorySession{ + store: historyStore, + entryID: entry.ID, + startedAt: time.Now(), + lastPersist: time.Now(), + finalPrompt: "prompt", + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil).WithContext(ctx) + rec := httptest.NewRecorder() + streamRuntime := newChatStreamRuntime( + rec, + http.NewResponseController(rec), + true, + "cid-cancelled", + time.Now().Unix(), + "deepseek-v4-flash", + "prompt", + false, + false, + true, + nil, + nil, + false, + false, + ) + resp := makeOpenAISSEHTTPResponse( + `data: {"p":"response/content","v":"hello"}`, + `data: [DONE]`, + ) + + h := &Handler{} + terminalWritten, retryable := h.consumeChatStreamAttempt(req, resp, streamRuntime, "text", false, session, true) + if !terminalWritten || retryable { + t.Fatalf("expected cancelled attempt to terminate without retry, got terminalWritten=%v retryable=%v", terminalWritten, retryable) + } + if got, want := streamRuntime.finalErrorCode, string(stream.StopReasonContextCancelled); got != want { + t.Fatalf("expected cancelled final error code %q, got %q", want, got) + } + if streamRuntime.finalErrorMessage == "" { + t.Fatalf("expected cancelled final error message to be preserved") + } + + snapshot, err := historyStore.Snapshot() + if err != nil { + t.Fatalf("snapshot failed: %v", err) + } + if len(snapshot.Items) != 1 { + t.Fatalf("expected one history item, got %d", len(snapshot.Items)) + } + full, err := historyStore.Get(snapshot.Items[0].ID) + if err != nil { + t.Fatalf("get detail failed: %v", err) + } + if full.Status != "stopped" { + t.Fatalf("expected stopped status, got %#v", full) + } +} diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index 45d861d..25131e1 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -222,8 +222,11 @@ func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Resp finalReason = "content_filter" } }, + OnContextDone: func() { + streamRuntime.markContextCancelled() + }, }) - if r.Context().Err() != nil { + if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { return true, false } terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") @@ -238,6 +241,10 @@ func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts if attempts > 0 { source = "synthetic_retry" } + if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { + config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "responses", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode) + return + } if streamRuntime.failed { config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) return diff --git a/internal/httpapi/openai/responses/empty_retry_runtime_test.go b/internal/httpapi/openai/responses/empty_retry_runtime_test.go new file mode 100644 index 0000000..c40e983 --- /dev/null +++ b/internal/httpapi/openai/responses/empty_retry_runtime_test.go @@ -0,0 +1,70 @@ +package responses + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "ds2api/internal/promptcompat" + "ds2api/internal/stream" +) + +func makeResponsesOpenAISSEHTTPResponse(lines ...string) *http.Response { + body := strings.Join(lines, "\n") + if !strings.HasSuffix(body, "\n") { + body += "\n" + } + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(body)), + } +} + +func TestConsumeResponsesStreamAttemptMarksContextCancelledState(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil).WithContext(ctx) + rec := httptest.NewRecorder() + streamRuntime := newResponsesStreamRuntime( + rec, + http.NewResponseController(rec), + true, + "resp-cancelled", + "deepseek-v4-flash", + "prompt", + false, + false, + true, + nil, + nil, + false, + false, + promptcompat.DefaultToolChoicePolicy(), + "", + nil, + ) + resp := makeResponsesOpenAISSEHTTPResponse( + `data: {"p":"response/content","v":"hello"}`, + `data: [DONE]`, + ) + + h := &Handler{} + terminalWritten, retryable := h.consumeResponsesStreamAttempt(req, resp, streamRuntime, "text", false, true) + if !terminalWritten || retryable { + t.Fatalf("expected cancelled attempt to terminate without retry, got terminalWritten=%v retryable=%v", terminalWritten, retryable) + } + if !streamRuntime.failed { + t.Fatalf("expected cancelled response stream to be marked failed") + } + if got, want := streamRuntime.finalErrorCode, string(stream.StopReasonContextCancelled); got != want { + t.Fatalf("expected cancelled final error code %q, got %q", want, got) + } + if streamRuntime.finalErrorMessage == "" { + t.Fatalf("expected cancelled final error message to be preserved") + } +} diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index 7184c3f..a4749c0 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -139,6 +139,13 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string) s.sendDone() } +func (s *responsesStreamRuntime) markContextCancelled() { + s.failed = true + s.finalErrorStatus = 499 + s.finalErrorMessage = "request context cancelled" + s.finalErrorCode = string(streamengine.StopReasonContextCancelled) +} + func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool { s.failed = false s.finalErrorStatus = 0