mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-05 00:45:29 +08:00
refactor: Extract OpenAI streaming response payload construction into dedicated utility functions.
This commit is contained in:
@@ -206,13 +206,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
delta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
sendChunk(map[string]any{
|
||||
"id": completionID,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": []map[string]any{{"delta": delta, "index": 0}},
|
||||
})
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
} else if bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&toolSieve, toolNames) {
|
||||
if evt.Content == "" {
|
||||
@@ -225,36 +225,25 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
delta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
sendChunk(map[string]any{
|
||||
"id": completionID,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": []map[string]any{{"delta": delta, "index": 0}},
|
||||
})
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
}
|
||||
}
|
||||
if len(detected) > 0 || toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
}
|
||||
promptTokens := util.EstimateTokens(finalPrompt)
|
||||
reasoningTokens := util.EstimateTokens(finalThinking)
|
||||
completionTokens := util.EstimateTokens(finalText)
|
||||
sendChunk(map[string]any{
|
||||
"id": completionID,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": []map[string]any{{"delta": map[string]any{}, "index": 0, "finish_reason": finishReason}},
|
||||
"usage": map[string]any{
|
||||
"prompt_tokens": promptTokens,
|
||||
"completion_tokens": reasoningTokens + completionTokens,
|
||||
"total_tokens": promptTokens + reasoningTokens + completionTokens,
|
||||
"completion_tokens_details": map[string]any{
|
||||
"reasoning_tokens": reasoningTokens,
|
||||
},
|
||||
},
|
||||
})
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamFinishChoice(0, finishReason)},
|
||||
util.BuildOpenAIChatUsage(finalPrompt, finalThinking, finalText),
|
||||
))
|
||||
sendDone()
|
||||
}
|
||||
|
||||
@@ -340,10 +329,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
tcDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, map[string]any{
|
||||
"delta": tcDelta,
|
||||
"index": 0,
|
||||
})
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
@@ -355,10 +341,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
tcDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, map[string]any{
|
||||
"delta": tcDelta,
|
||||
"index": 0,
|
||||
})
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if evt.Content != "" {
|
||||
@@ -369,26 +352,17 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
contentDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, map[string]any{
|
||||
"delta": contentDelta,
|
||||
"index": 0,
|
||||
})
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, contentDelta))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(delta) > 0 {
|
||||
newChoices = append(newChoices, map[string]any{"delta": delta, "index": 0})
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, delta))
|
||||
}
|
||||
}
|
||||
if len(newChoices) > 0 {
|
||||
sendChunk(map[string]any{
|
||||
"id": completionID,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": newChoices,
|
||||
})
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(completionID, created, model, newChoices, nil))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,13 +144,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
}
|
||||
}
|
||||
|
||||
sendEvent("response.created", map[string]any{
|
||||
"type": "response.created",
|
||||
"id": responseID,
|
||||
"object": "response",
|
||||
"model": model,
|
||||
"status": "in_progress",
|
||||
})
|
||||
sendEvent("response.created", util.BuildOpenAIResponsesCreatedPayload(responseID, model))
|
||||
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
@@ -172,19 +166,11 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
for _, evt := range flushToolSieve(&sieve, toolNames) {
|
||||
if evt.Content != "" {
|
||||
finalText += evt.Content
|
||||
sendEvent("response.output_text.delta", map[string]any{
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"delta": evt.Content,
|
||||
})
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.done", map[string]any{
|
||||
"type": "response.output_tool_call.done",
|
||||
"id": responseID,
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
|
||||
})
|
||||
sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -193,10 +179,7 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
obj["status"] = "completed"
|
||||
}
|
||||
h.getResponseStore().put(owner, responseID, obj)
|
||||
sendEvent("response.completed", map[string]any{
|
||||
"type": "response.completed",
|
||||
"response": obj,
|
||||
})
|
||||
sendEvent("response.completed", util.BuildOpenAIResponsesCompletedPayload(obj))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
@@ -232,48 +215,28 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
continue
|
||||
}
|
||||
thinking.WriteString(p.Text)
|
||||
sendEvent("response.reasoning.delta", map[string]any{
|
||||
"type": "response.reasoning.delta",
|
||||
"id": responseID,
|
||||
"delta": p.Text,
|
||||
})
|
||||
sendEvent("response.reasoning.delta", util.BuildOpenAIResponsesReasoningDeltaPayload(responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
text.WriteString(p.Text)
|
||||
if !bufferToolContent {
|
||||
sendEvent("response.output_text.delta", map[string]any{
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"delta": p.Text,
|
||||
})
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
for _, evt := range processToolSieveChunk(&sieve, p.Text, toolNames) {
|
||||
if evt.Content != "" {
|
||||
sendEvent("response.output_text.delta", map[string]any{
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"delta": evt.Content,
|
||||
})
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.delta", map[string]any{
|
||||
"type": "response.output_tool_call.delta",
|
||||
"id": responseID,
|
||||
"tool_calls": formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs),
|
||||
})
|
||||
sendEvent("response.output_tool_call.delta", util.BuildOpenAIResponsesToolCallDeltaPayload(responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs)))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.done", map[string]any{
|
||||
"type": "response.output_tool_call.done",
|
||||
"id": responseID,
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
|
||||
})
|
||||
sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
93
internal/util/render_stream.go
Normal file
93
internal/util/render_stream.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package util
|
||||
|
||||
func BuildOpenAIChatStreamDeltaChoice(index int, delta map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"delta": delta,
|
||||
"index": index,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIChatStreamFinishChoice(index int, finishReason string) map[string]any {
|
||||
return map[string]any{
|
||||
"delta": map[string]any{},
|
||||
"index": index,
|
||||
"finish_reason": finishReason,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIChatStreamChunk(completionID string, created int64, model string, choices []map[string]any, usage map[string]any) map[string]any {
|
||||
out := map[string]any{
|
||||
"id": completionID,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": choices,
|
||||
}
|
||||
if len(usage) > 0 {
|
||||
out["usage"] = usage
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func BuildOpenAIChatUsage(finalPrompt, finalThinking, finalText string) map[string]any {
|
||||
promptTokens := EstimateTokens(finalPrompt)
|
||||
reasoningTokens := EstimateTokens(finalThinking)
|
||||
completionTokens := EstimateTokens(finalText)
|
||||
return map[string]any{
|
||||
"prompt_tokens": promptTokens,
|
||||
"completion_tokens": reasoningTokens + completionTokens,
|
||||
"total_tokens": promptTokens + reasoningTokens + completionTokens,
|
||||
"completion_tokens_details": map[string]any{
|
||||
"reasoning_tokens": reasoningTokens,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesCreatedPayload(responseID, model string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.created",
|
||||
"id": responseID,
|
||||
"object": "response",
|
||||
"model": model,
|
||||
"status": "in_progress",
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesTextDeltaPayload(responseID, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesReasoningDeltaPayload(responseID, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.reasoning.delta",
|
||||
"id": responseID,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesToolCallDeltaPayload(responseID string, toolCalls []map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_tool_call.delta",
|
||||
"id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesToolCallDonePayload(responseID string, toolCalls []map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_tool_call.done",
|
||||
"id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildOpenAIResponsesCompletedPayload(response map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.completed",
|
||||
"response": response,
|
||||
}
|
||||
}
|
||||
48
internal/util/render_stream_test.go
Normal file
48
internal/util/render_stream_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package util
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestBuildOpenAIChatStreamChunk(t *testing.T) {
|
||||
chunk := BuildOpenAIChatStreamChunk(
|
||||
"cid",
|
||||
123,
|
||||
"deepseek-chat",
|
||||
[]map[string]any{BuildOpenAIChatStreamDeltaChoice(0, map[string]any{"role": "assistant"})},
|
||||
nil,
|
||||
)
|
||||
if chunk["object"] != "chat.completion.chunk" {
|
||||
t.Fatalf("unexpected object: %#v", chunk["object"])
|
||||
}
|
||||
choices, _ := chunk["choices"].([]map[string]any)
|
||||
if len(choices) == 0 {
|
||||
rawChoices, _ := chunk["choices"].([]any)
|
||||
if len(rawChoices) == 0 {
|
||||
t.Fatalf("expected choices")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildOpenAIChatUsage(t *testing.T) {
|
||||
usage := BuildOpenAIChatUsage("prompt", "think", "answer")
|
||||
if _, ok := usage["prompt_tokens"]; !ok {
|
||||
t.Fatalf("expected prompt_tokens")
|
||||
}
|
||||
if _, ok := usage["completion_tokens_details"]; !ok {
|
||||
t.Fatalf("expected completion_tokens_details")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildOpenAIResponsesEventPayloads(t *testing.T) {
|
||||
created := BuildOpenAIResponsesCreatedPayload("resp_1", "gpt-4o")
|
||||
if created["type"] != "response.created" {
|
||||
t.Fatalf("unexpected type: %#v", created["type"])
|
||||
}
|
||||
done := BuildOpenAIResponsesToolCallDonePayload("resp_1", []map[string]any{{"index": 0}})
|
||||
if done["type"] != "response.output_tool_call.done" {
|
||||
t.Fatalf("unexpected type: %#v", done["type"])
|
||||
}
|
||||
completed := BuildOpenAIResponsesCompletedPayload(map[string]any{"id": "resp_1"})
|
||||
if completed["type"] != "response.completed" {
|
||||
t.Fatalf("unexpected type: %#v", completed["type"])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user