diff --git a/internal/adapter/openai/handler.go b/internal/adapter/openai/handler.go index ce90804..5ef6e7b 100644 --- a/internal/adapter/openai/handler.go +++ b/internal/adapter/openai/handler.go @@ -206,13 +206,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt delta["role"] = "assistant" firstChunkSent = true } - sendChunk(map[string]any{ - "id": completionID, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": []map[string]any{{"delta": delta, "index": 0}}, - }) + sendChunk(util.BuildOpenAIChatStreamChunk( + completionID, + created, + model, + []map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)}, + nil, + )) } else if bufferToolContent { for _, evt := range flushToolSieve(&toolSieve, toolNames) { if evt.Content == "" { @@ -225,36 +225,25 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt delta["role"] = "assistant" firstChunkSent = true } - sendChunk(map[string]any{ - "id": completionID, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": []map[string]any{{"delta": delta, "index": 0}}, - }) + sendChunk(util.BuildOpenAIChatStreamChunk( + completionID, + created, + model, + []map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)}, + nil, + )) } } if len(detected) > 0 || toolCallsEmitted { finishReason = "tool_calls" } - promptTokens := util.EstimateTokens(finalPrompt) - reasoningTokens := util.EstimateTokens(finalThinking) - completionTokens := util.EstimateTokens(finalText) - sendChunk(map[string]any{ - "id": completionID, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": []map[string]any{{"delta": map[string]any{}, "index": 0, "finish_reason": finishReason}}, - "usage": map[string]any{ - "prompt_tokens": promptTokens, - "completion_tokens": reasoningTokens + completionTokens, - "total_tokens": promptTokens + reasoningTokens + completionTokens, - "completion_tokens_details": map[string]any{ - "reasoning_tokens": reasoningTokens, - }, - }, - }) + sendChunk(util.BuildOpenAIChatStreamChunk( + completionID, + created, + model, + []map[string]any{util.BuildOpenAIChatStreamFinishChoice(0, finishReason)}, + util.BuildOpenAIChatUsage(finalPrompt, finalThinking, finalText), + )) sendDone() } @@ -340,10 +329,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt tcDelta["role"] = "assistant" firstChunkSent = true } - newChoices = append(newChoices, map[string]any{ - "delta": tcDelta, - "index": 0, - }) + newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta)) continue } if len(evt.ToolCalls) > 0 { @@ -355,10 +341,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt tcDelta["role"] = "assistant" firstChunkSent = true } - newChoices = append(newChoices, map[string]any{ - "delta": tcDelta, - "index": 0, - }) + newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta)) continue } if evt.Content != "" { @@ -369,26 +352,17 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt contentDelta["role"] = "assistant" firstChunkSent = true } - newChoices = append(newChoices, map[string]any{ - "delta": contentDelta, - "index": 0, - }) + newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, contentDelta)) } } } } if len(delta) > 0 { - newChoices = append(newChoices, map[string]any{"delta": delta, "index": 0}) + newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, delta)) } } if len(newChoices) > 0 { - sendChunk(map[string]any{ - "id": completionID, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": newChoices, - }) + sendChunk(util.BuildOpenAIChatStreamChunk(completionID, created, model, newChoices, nil)) } } } diff --git a/internal/adapter/openai/responses_handler.go b/internal/adapter/openai/responses_handler.go index 92dd891..9aaa7cd 100644 --- a/internal/adapter/openai/responses_handler.go +++ b/internal/adapter/openai/responses_handler.go @@ -144,13 +144,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, } } - sendEvent("response.created", map[string]any{ - "type": "response.created", - "id": responseID, - "object": "response", - "model": model, - "status": "in_progress", - }) + sendEvent("response.created", util.BuildOpenAIResponsesCreatedPayload(responseID, model)) initialType := "text" if thinkingEnabled { @@ -172,19 +166,11 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, for _, evt := range flushToolSieve(&sieve, toolNames) { if evt.Content != "" { finalText += evt.Content - sendEvent("response.output_text.delta", map[string]any{ - "type": "response.output_text.delta", - "id": responseID, - "delta": evt.Content, - }) + sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content)) } if len(evt.ToolCalls) > 0 { toolCallsEmitted = true - sendEvent("response.output_tool_call.done", map[string]any{ - "type": "response.output_tool_call.done", - "id": responseID, - "tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls), - }) + sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls))) } } } @@ -193,10 +179,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, obj["status"] = "completed" } h.getResponseStore().put(owner, responseID, obj) - sendEvent("response.completed", map[string]any{ - "type": "response.completed", - "response": obj, - }) + sendEvent("response.completed", util.BuildOpenAIResponsesCompletedPayload(obj)) _, _ = w.Write([]byte("data: [DONE]\n\n")) if canFlush { _ = rc.Flush() @@ -232,48 +215,28 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request, continue } thinking.WriteString(p.Text) - sendEvent("response.reasoning.delta", map[string]any{ - "type": "response.reasoning.delta", - "id": responseID, - "delta": p.Text, - }) + sendEvent("response.reasoning.delta", util.BuildOpenAIResponsesReasoningDeltaPayload(responseID, p.Text)) continue } text.WriteString(p.Text) if !bufferToolContent { - sendEvent("response.output_text.delta", map[string]any{ - "type": "response.output_text.delta", - "id": responseID, - "delta": p.Text, - }) + sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, p.Text)) continue } for _, evt := range processToolSieveChunk(&sieve, p.Text, toolNames) { if evt.Content != "" { - sendEvent("response.output_text.delta", map[string]any{ - "type": "response.output_text.delta", - "id": responseID, - "delta": evt.Content, - }) + sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content)) } if len(evt.ToolCallDeltas) > 0 { if !emitEarlyToolDeltas { continue } toolCallsEmitted = true - sendEvent("response.output_tool_call.delta", map[string]any{ - "type": "response.output_tool_call.delta", - "id": responseID, - "tool_calls": formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs), - }) + sendEvent("response.output_tool_call.delta", util.BuildOpenAIResponsesToolCallDeltaPayload(responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs))) } if len(evt.ToolCalls) > 0 { toolCallsEmitted = true - sendEvent("response.output_tool_call.done", map[string]any{ - "type": "response.output_tool_call.done", - "id": responseID, - "tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls), - }) + sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls))) } } } diff --git a/internal/util/render_stream.go b/internal/util/render_stream.go new file mode 100644 index 0000000..716c158 --- /dev/null +++ b/internal/util/render_stream.go @@ -0,0 +1,93 @@ +package util + +func BuildOpenAIChatStreamDeltaChoice(index int, delta map[string]any) map[string]any { + return map[string]any{ + "delta": delta, + "index": index, + } +} + +func BuildOpenAIChatStreamFinishChoice(index int, finishReason string) map[string]any { + return map[string]any{ + "delta": map[string]any{}, + "index": index, + "finish_reason": finishReason, + } +} + +func BuildOpenAIChatStreamChunk(completionID string, created int64, model string, choices []map[string]any, usage map[string]any) map[string]any { + out := map[string]any{ + "id": completionID, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": choices, + } + if len(usage) > 0 { + out["usage"] = usage + } + return out +} + +func BuildOpenAIChatUsage(finalPrompt, finalThinking, finalText string) map[string]any { + promptTokens := EstimateTokens(finalPrompt) + reasoningTokens := EstimateTokens(finalThinking) + completionTokens := EstimateTokens(finalText) + return map[string]any{ + "prompt_tokens": promptTokens, + "completion_tokens": reasoningTokens + completionTokens, + "total_tokens": promptTokens + reasoningTokens + completionTokens, + "completion_tokens_details": map[string]any{ + "reasoning_tokens": reasoningTokens, + }, + } +} + +func BuildOpenAIResponsesCreatedPayload(responseID, model string) map[string]any { + return map[string]any{ + "type": "response.created", + "id": responseID, + "object": "response", + "model": model, + "status": "in_progress", + } +} + +func BuildOpenAIResponsesTextDeltaPayload(responseID, delta string) map[string]any { + return map[string]any{ + "type": "response.output_text.delta", + "id": responseID, + "delta": delta, + } +} + +func BuildOpenAIResponsesReasoningDeltaPayload(responseID, delta string) map[string]any { + return map[string]any{ + "type": "response.reasoning.delta", + "id": responseID, + "delta": delta, + } +} + +func BuildOpenAIResponsesToolCallDeltaPayload(responseID string, toolCalls []map[string]any) map[string]any { + return map[string]any{ + "type": "response.output_tool_call.delta", + "id": responseID, + "tool_calls": toolCalls, + } +} + +func BuildOpenAIResponsesToolCallDonePayload(responseID string, toolCalls []map[string]any) map[string]any { + return map[string]any{ + "type": "response.output_tool_call.done", + "id": responseID, + "tool_calls": toolCalls, + } +} + +func BuildOpenAIResponsesCompletedPayload(response map[string]any) map[string]any { + return map[string]any{ + "type": "response.completed", + "response": response, + } +} diff --git a/internal/util/render_stream_test.go b/internal/util/render_stream_test.go new file mode 100644 index 0000000..420a311 --- /dev/null +++ b/internal/util/render_stream_test.go @@ -0,0 +1,48 @@ +package util + +import "testing" + +func TestBuildOpenAIChatStreamChunk(t *testing.T) { + chunk := BuildOpenAIChatStreamChunk( + "cid", + 123, + "deepseek-chat", + []map[string]any{BuildOpenAIChatStreamDeltaChoice(0, map[string]any{"role": "assistant"})}, + nil, + ) + if chunk["object"] != "chat.completion.chunk" { + t.Fatalf("unexpected object: %#v", chunk["object"]) + } + choices, _ := chunk["choices"].([]map[string]any) + if len(choices) == 0 { + rawChoices, _ := chunk["choices"].([]any) + if len(rawChoices) == 0 { + t.Fatalf("expected choices") + } + } +} + +func TestBuildOpenAIChatUsage(t *testing.T) { + usage := BuildOpenAIChatUsage("prompt", "think", "answer") + if _, ok := usage["prompt_tokens"]; !ok { + t.Fatalf("expected prompt_tokens") + } + if _, ok := usage["completion_tokens_details"]; !ok { + t.Fatalf("expected completion_tokens_details") + } +} + +func TestBuildOpenAIResponsesEventPayloads(t *testing.T) { + created := BuildOpenAIResponsesCreatedPayload("resp_1", "gpt-4o") + if created["type"] != "response.created" { + t.Fatalf("unexpected type: %#v", created["type"]) + } + done := BuildOpenAIResponsesToolCallDonePayload("resp_1", []map[string]any{{"index": 0}}) + if done["type"] != "response.output_tool_call.done" { + t.Fatalf("unexpected type: %#v", done["type"]) + } + completed := BuildOpenAIResponsesCompletedPayload(map[string]any{"id": "resp_1"}) + if completed["type"] != "response.completed" { + t.Fatalf("unexpected type: %#v", completed["type"]) + } +}