mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-02 07:25:26 +08:00
feat: Introduce stable call_id for OpenAI function_call and tool_calls events in streaming output, including reasoning text.
This commit is contained in:
@@ -99,7 +99,7 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
if len(detected) > 0 && !s.toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
delta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(detected),
|
||||
"tool_calls": formatFinalStreamToolCallsWithStableIDs(detected, s.streamToolCallIDs),
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
@@ -203,7 +203,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
tcDelta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
|
||||
"tool_calls": formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs),
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
tcDelta["role"] = "assistant"
|
||||
|
||||
@@ -280,6 +280,36 @@ func formatIncrementalStreamToolCallDeltas(deltas []toolCallDelta, ids map[int]s
|
||||
return out
|
||||
}
|
||||
|
||||
func formatFinalStreamToolCallsWithStableIDs(calls []util.ParsedToolCall, ids map[int]string) []map[string]any {
|
||||
if len(calls) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]map[string]any, 0, len(calls))
|
||||
for i, c := range calls {
|
||||
callID := ""
|
||||
if ids != nil {
|
||||
callID = strings.TrimSpace(ids[i])
|
||||
}
|
||||
if callID == "" {
|
||||
callID = "call_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
if ids != nil {
|
||||
ids[i] = callID
|
||||
}
|
||||
}
|
||||
args, _ := json.Marshal(c.Input)
|
||||
out = append(out, map[string]any{
|
||||
"index": i,
|
||||
"id": callID,
|
||||
"type": "function",
|
||||
"function": map[string]any{
|
||||
"name": c.Name,
|
||||
"arguments": string(args),
|
||||
},
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func writeOpenAIError(w http.ResponseWriter, status int, message string) {
|
||||
writeJSON(w, status, map[string]any{
|
||||
"error": map[string]any{
|
||||
|
||||
@@ -3,12 +3,15 @@ package openai
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type responsesStreamRuntime struct {
|
||||
@@ -24,14 +27,19 @@ type responsesStreamRuntime struct {
|
||||
thinkingEnabled bool
|
||||
searchEnabled bool
|
||||
|
||||
bufferToolContent bool
|
||||
emitEarlyToolDeltas bool
|
||||
toolCallsEmitted bool
|
||||
bufferToolContent bool
|
||||
emitEarlyToolDeltas bool
|
||||
toolCallsEmitted bool
|
||||
toolCallsDoneEmitted bool
|
||||
|
||||
sieve toolStreamSieveState
|
||||
thinkingSieve toolStreamSieveState
|
||||
thinking strings.Builder
|
||||
text strings.Builder
|
||||
streamToolCallIDs map[int]string
|
||||
streamFunctionIDs map[int]string
|
||||
functionDone map[int]bool
|
||||
reasoningItemID string
|
||||
|
||||
persistResponse func(obj map[string]any)
|
||||
}
|
||||
@@ -63,6 +71,8 @@ func newResponsesStreamRuntime(
|
||||
bufferToolContent: bufferToolContent,
|
||||
emitEarlyToolDeltas: emitEarlyToolDeltas,
|
||||
streamToolCallIDs: map[int]string{},
|
||||
streamFunctionIDs: map[int]string{},
|
||||
functionDone: map[int]bool{},
|
||||
persistResponse: persistResponse,
|
||||
}
|
||||
}
|
||||
@@ -92,6 +102,9 @@ func (s *responsesStreamRuntime) sendDone() {
|
||||
func (s *responsesStreamRuntime) finalize() {
|
||||
finalThinking := s.thinking.String()
|
||||
finalText := s.text.String()
|
||||
if strings.TrimSpace(finalThinking) != "" {
|
||||
s.sendEvent("response.reasoning_text.done", openaifmt.BuildResponsesReasoningTextDonePayload(s.responseID, s.ensureReasoningItemID(), 0, 0, finalThinking))
|
||||
}
|
||||
if s.bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&s.sieve, s.toolNames) {
|
||||
if evt.Content != "" {
|
||||
@@ -99,12 +112,45 @@ func (s *responsesStreamRuntime) finalize() {
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
s.toolCallsDoneEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDoneEvents(evt.ToolCalls)
|
||||
}
|
||||
}
|
||||
for _, evt := range flushToolSieve(&s.thinkingSieve, s.toolNames) {
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.toolCallsDoneEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDoneEvents(evt.ToolCalls)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Compatibility fallback: some streams only emit incremental tool deltas.
|
||||
// Ensure final function_call_arguments.done is emitted at least once.
|
||||
if s.toolCallsEmitted {
|
||||
detected := util.ParseStandaloneToolCalls(finalText, s.toolNames)
|
||||
if len(detected) == 0 {
|
||||
detected = util.ParseToolCalls(finalText, s.toolNames)
|
||||
}
|
||||
if len(detected) == 0 {
|
||||
detected = util.ParseStandaloneToolCalls(finalThinking, s.toolNames)
|
||||
}
|
||||
if len(detected) == 0 {
|
||||
detected = util.ParseToolCalls(finalThinking, s.toolNames)
|
||||
}
|
||||
if len(detected) > 0 {
|
||||
if !s.toolCallsDoneEmitted {
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, formatFinalStreamToolCallsWithStableIDs(detected, s.streamToolCallIDs)))
|
||||
}
|
||||
s.emitFunctionCallDoneEvents(detected)
|
||||
}
|
||||
}
|
||||
|
||||
obj := openaifmt.BuildResponseObject(s.responseID, s.model, s.finalPrompt, finalThinking, finalText, s.toolNames)
|
||||
if s.toolCallsEmitted {
|
||||
s.alignCompletedOutputCallIDs(obj)
|
||||
}
|
||||
if s.toolCallsEmitted {
|
||||
obj["status"] = "completed"
|
||||
}
|
||||
@@ -138,6 +184,25 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
|
||||
}
|
||||
s.thinking.WriteString(p.Text)
|
||||
s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, p.Text))
|
||||
s.sendEvent("response.reasoning_text.delta", openaifmt.BuildResponsesReasoningTextDeltaPayload(s.responseID, s.ensureReasoningItemID(), 0, 0, p.Text))
|
||||
if s.bufferToolContent {
|
||||
for _, evt := range processToolSieveChunk(&s.thinkingSieve, p.Text, s.toolNames) {
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !s.emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.delta", openaifmt.BuildResponsesToolCallDeltaPayload(s.responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDeltaEvents(evt.ToolCallDeltas)
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.toolCallsDoneEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDoneEvents(evt.ToolCalls)
|
||||
}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -156,13 +221,138 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
|
||||
}
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.delta", openaifmt.BuildResponsesToolCallDeltaPayload(s.responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDeltaEvents(evt.ToolCallDeltas)
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
s.toolCallsDoneEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs)))
|
||||
s.emitFunctionCallDoneEvents(evt.ToolCalls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return streamengine.ParsedDecision{ContentSeen: contentSeen}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) ensureReasoningItemID() string {
|
||||
if strings.TrimSpace(s.reasoningItemID) != "" {
|
||||
return s.reasoningItemID
|
||||
}
|
||||
s.reasoningItemID = "rs_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
return s.reasoningItemID
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) ensureFunctionItemID(index int) string {
|
||||
if id, ok := s.streamFunctionIDs[index]; ok && strings.TrimSpace(id) != "" {
|
||||
return id
|
||||
}
|
||||
id := "fc_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
s.streamFunctionIDs[index] = id
|
||||
return id
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) ensureToolCallID(index int) string {
|
||||
if id, ok := s.streamToolCallIDs[index]; ok && strings.TrimSpace(id) != "" {
|
||||
return id
|
||||
}
|
||||
id := "call_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
s.streamToolCallIDs[index] = id
|
||||
return id
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) functionOutputBaseIndex() int {
|
||||
if strings.TrimSpace(s.thinking.String()) != "" {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) emitFunctionCallDeltaEvents(deltas []toolCallDelta) {
|
||||
for _, d := range deltas {
|
||||
if strings.TrimSpace(d.Arguments) == "" {
|
||||
continue
|
||||
}
|
||||
outputIndex := s.functionOutputBaseIndex() + d.Index
|
||||
itemID := s.ensureFunctionItemID(outputIndex)
|
||||
callID := s.ensureToolCallID(d.Index)
|
||||
s.sendEvent(
|
||||
"response.function_call_arguments.delta",
|
||||
openaifmt.BuildResponsesFunctionCallArgumentsDeltaPayload(s.responseID, itemID, outputIndex, callID, d.Arguments),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) emitFunctionCallDoneEvents(calls []util.ParsedToolCall) {
|
||||
base := s.functionOutputBaseIndex()
|
||||
for idx, tc := range calls {
|
||||
if strings.TrimSpace(tc.Name) == "" {
|
||||
continue
|
||||
}
|
||||
outputIndex := base + idx
|
||||
if s.functionDone[outputIndex] {
|
||||
continue
|
||||
}
|
||||
itemID := s.ensureFunctionItemID(outputIndex)
|
||||
callID := s.ensureToolCallID(idx)
|
||||
argsBytes, _ := json.Marshal(tc.Input)
|
||||
s.sendEvent(
|
||||
"response.function_call_arguments.done",
|
||||
openaifmt.BuildResponsesFunctionCallArgumentsDonePayload(s.responseID, itemID, outputIndex, callID, tc.Name, string(argsBytes)),
|
||||
)
|
||||
s.functionDone[outputIndex] = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) alignCompletedOutputCallIDs(obj map[string]any) {
|
||||
if obj == nil || len(s.streamToolCallIDs) == 0 {
|
||||
return
|
||||
}
|
||||
output, _ := obj["output"].([]any)
|
||||
if len(output) == 0 {
|
||||
return
|
||||
}
|
||||
indices := make([]int, 0, len(s.streamToolCallIDs))
|
||||
for idx := range s.streamToolCallIDs {
|
||||
indices = append(indices, idx)
|
||||
}
|
||||
sort.Ints(indices)
|
||||
ordered := make([]string, 0, len(indices))
|
||||
for _, idx := range indices {
|
||||
id := strings.TrimSpace(s.streamToolCallIDs[idx])
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
ordered = append(ordered, id)
|
||||
}
|
||||
if len(ordered) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
functionIdx := 0
|
||||
for _, item := range output {
|
||||
m, _ := item.(map[string]any)
|
||||
if m == nil {
|
||||
continue
|
||||
}
|
||||
typ, _ := m["type"].(string)
|
||||
switch typ {
|
||||
case "function_call":
|
||||
if functionIdx < len(ordered) {
|
||||
m["call_id"] = ordered[functionIdx]
|
||||
functionIdx++
|
||||
}
|
||||
case "tool_calls":
|
||||
tcArr, _ := m["tool_calls"].([]any)
|
||||
for i, raw := range tcArr {
|
||||
tc, _ := raw.(map[string]any)
|
||||
if tc == nil {
|
||||
continue
|
||||
}
|
||||
if i < len(ordered) {
|
||||
tc["id"] = ordered[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,13 +45,29 @@ func TestHandleResponsesStreamToolCallsHideRawOutputTextInCompleted(t *testing.T
|
||||
if len(output) == 0 {
|
||||
t.Fatalf("expected structured output entries, got %#v", responseObj["output"])
|
||||
}
|
||||
first, _ := output[0].(map[string]any)
|
||||
if first["type"] != "tool_calls" {
|
||||
t.Fatalf("expected first output type tool_calls, got %#v", first["type"])
|
||||
var firstToolWrapper map[string]any
|
||||
hasFunctionCall := false
|
||||
for _, item := range output {
|
||||
m, _ := item.(map[string]any)
|
||||
if m == nil {
|
||||
continue
|
||||
}
|
||||
if m["type"] == "function_call" {
|
||||
hasFunctionCall = true
|
||||
}
|
||||
if m["type"] == "tool_calls" && firstToolWrapper == nil {
|
||||
firstToolWrapper = m
|
||||
}
|
||||
}
|
||||
toolCalls, _ := first["tool_calls"].([]any)
|
||||
if !hasFunctionCall {
|
||||
t.Fatalf("expected at least one function_call item for responses compatibility, got %#v", responseObj["output"])
|
||||
}
|
||||
if firstToolWrapper == nil {
|
||||
t.Fatalf("expected a tool_calls wrapper item, got %#v", responseObj["output"])
|
||||
}
|
||||
toolCalls, _ := firstToolWrapper["tool_calls"].([]any)
|
||||
if len(toolCalls) == 0 {
|
||||
t.Fatalf("expected at least one tool_call in output, got %#v", first["tool_calls"])
|
||||
t.Fatalf("expected at least one tool_call in output, got %#v", firstToolWrapper["tool_calls"])
|
||||
}
|
||||
call0, _ := toolCalls[0].(map[string]any)
|
||||
if call0["type"] != "function" {
|
||||
@@ -99,6 +115,137 @@ func TestHandleResponsesStreamIncompleteTailNotDuplicatedInCompletedOutputText(t
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleResponsesStreamEmitsReasoningCompatEvents(t *testing.T) {
|
||||
h := &Handler{}
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
b, _ := json.Marshal(map[string]any{
|
||||
"p": "response/thinking_content",
|
||||
"v": "thought",
|
||||
})
|
||||
streamBody := "data: " + string(b) + "\n" + "data: [DONE]\n"
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(streamBody)),
|
||||
}
|
||||
|
||||
h.handleResponsesStream(rec, req, resp, "owner-a", "resp_test", "deepseek-reasoner", "prompt", true, false, nil)
|
||||
|
||||
body := rec.Body.String()
|
||||
if !strings.Contains(body, "event: response.reasoning.delta") {
|
||||
t.Fatalf("expected response.reasoning.delta event, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "event: response.reasoning_text.delta") {
|
||||
t.Fatalf("expected response.reasoning_text.delta compatibility event, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "event: response.reasoning_text.done") {
|
||||
t.Fatalf("expected response.reasoning_text.done compatibility event, body=%s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleResponsesStreamEmitsFunctionCallCompatEvents(t *testing.T) {
|
||||
h := &Handler{}
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
sseLine := func(v string) string {
|
||||
b, _ := json.Marshal(map[string]any{
|
||||
"p": "response/content",
|
||||
"v": v,
|
||||
})
|
||||
return "data: " + string(b) + "\n"
|
||||
}
|
||||
|
||||
streamBody := sseLine(`{"tool_calls":[{"name":"read_file","input":{"path":"README.MD"}}]}`) + "data: [DONE]\n"
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(streamBody)),
|
||||
}
|
||||
|
||||
h.handleResponsesStream(rec, req, resp, "owner-a", "resp_test", "deepseek-chat", "prompt", false, false, []string{"read_file"})
|
||||
body := rec.Body.String()
|
||||
if !strings.Contains(body, "event: response.function_call_arguments.delta") {
|
||||
t.Fatalf("expected response.function_call_arguments.delta compatibility event, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "event: response.function_call_arguments.done") {
|
||||
t.Fatalf("expected response.function_call_arguments.done compatibility event, body=%s", body)
|
||||
}
|
||||
donePayload, ok := extractSSEEventPayload(body, "response.function_call_arguments.done")
|
||||
if !ok {
|
||||
t.Fatalf("expected to parse response.function_call_arguments.done payload, body=%s", body)
|
||||
}
|
||||
if strings.TrimSpace(asString(donePayload["call_id"])) == "" {
|
||||
t.Fatalf("expected call_id in response.function_call_arguments.done payload, payload=%#v", donePayload)
|
||||
}
|
||||
if strings.TrimSpace(asString(donePayload["response_id"])) == "" {
|
||||
t.Fatalf("expected response_id in response.function_call_arguments.done payload, payload=%#v", donePayload)
|
||||
}
|
||||
doneCallID := strings.TrimSpace(asString(donePayload["call_id"]))
|
||||
if doneCallID == "" {
|
||||
t.Fatalf("expected non-empty call_id in done payload, payload=%#v", donePayload)
|
||||
}
|
||||
completed, ok := extractSSEEventPayload(body, "response.completed")
|
||||
if !ok {
|
||||
t.Fatalf("expected response.completed payload, body=%s", body)
|
||||
}
|
||||
responseObj, _ := completed["response"].(map[string]any)
|
||||
output, _ := responseObj["output"].([]any)
|
||||
if len(output) == 0 {
|
||||
t.Fatalf("expected non-empty output in response.completed, response=%#v", responseObj)
|
||||
}
|
||||
var completedCallID string
|
||||
for _, item := range output {
|
||||
m, _ := item.(map[string]any)
|
||||
if m == nil || m["type"] != "function_call" {
|
||||
continue
|
||||
}
|
||||
completedCallID = strings.TrimSpace(asString(m["call_id"]))
|
||||
if completedCallID != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
if completedCallID == "" {
|
||||
t.Fatalf("expected function_call.call_id in completed output, output=%#v", output)
|
||||
}
|
||||
if completedCallID != doneCallID {
|
||||
t.Fatalf("expected completed call_id to match stream done call_id, done=%q completed=%q", doneCallID, completedCallID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleResponsesStreamDetectsToolCallsFromThinkingChannel(t *testing.T) {
|
||||
h := &Handler{}
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
sseLine := func(path, v string) string {
|
||||
b, _ := json.Marshal(map[string]any{
|
||||
"p": path,
|
||||
"v": v,
|
||||
})
|
||||
return "data: " + string(b) + "\n"
|
||||
}
|
||||
|
||||
streamBody := sseLine("response/thinking_content", `{"tool_calls":[{"name":"read_file","input":{"path":"README.MD"}}]}`) + "data: [DONE]\n"
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(streamBody)),
|
||||
}
|
||||
|
||||
h.handleResponsesStream(rec, req, resp, "owner-a", "resp_test", "deepseek-reasoner", "prompt", true, false, []string{"read_file"})
|
||||
|
||||
body := rec.Body.String()
|
||||
if !strings.Contains(body, "event: response.reasoning_text.delta") {
|
||||
t.Fatalf("expected response.reasoning_text.delta event, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "event: response.function_call_arguments.done") {
|
||||
t.Fatalf("expected response.function_call_arguments.done event from thinking channel, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "event: response.output_tool_call.done") {
|
||||
t.Fatalf("expected response.output_tool_call.done event from thinking channel, body=%s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func extractSSEEventPayload(body, targetEvent string) (map[string]any, bool) {
|
||||
scanner := bufio.NewScanner(strings.NewReader(body))
|
||||
matched := false
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -47,28 +48,24 @@ func BuildResponseObject(responseID, model, finalPrompt, finalThinking, finalTex
|
||||
// produced a standalone structured payload. This prevents accidental
|
||||
// empty output_text on normal prose that merely contains tool_call-like text.
|
||||
detected := util.ParseStandaloneToolCalls(finalText, toolNames)
|
||||
toolCallsFromThinking := false
|
||||
if len(detected) == 0 && strings.TrimSpace(finalThinking) != "" {
|
||||
detected = util.ParseStandaloneToolCalls(finalThinking, toolNames)
|
||||
toolCallsFromThinking = len(detected) > 0
|
||||
}
|
||||
exposedOutputText := finalText
|
||||
output := make([]any, 0, 2)
|
||||
if len(detected) > 0 {
|
||||
if !toolCallsFromThinking || strings.TrimSpace(finalText) != "" {
|
||||
exposedOutputText = ""
|
||||
} else {
|
||||
exposedOutputText = finalThinking
|
||||
}
|
||||
exposedOutputText = ""
|
||||
if strings.TrimSpace(finalThinking) != "" {
|
||||
output = append(output, map[string]any{
|
||||
"type": "reasoning",
|
||||
"text": finalThinking,
|
||||
})
|
||||
}
|
||||
formatted := util.FormatOpenAIToolCalls(detected)
|
||||
output = append(output, toResponsesFunctionCallItems(formatted)...)
|
||||
output = append(output, map[string]any{
|
||||
"type": "tool_calls",
|
||||
"tool_calls": util.FormatOpenAIToolCalls(detected),
|
||||
"tool_calls": formatted,
|
||||
})
|
||||
} else {
|
||||
content := make([]any, 0, 2)
|
||||
@@ -114,6 +111,54 @@ func BuildResponseObject(responseID, model, finalPrompt, finalThinking, finalTex
|
||||
}
|
||||
}
|
||||
|
||||
func toResponsesFunctionCallItems(toolCalls []map[string]any) []any {
|
||||
if len(toolCalls) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]any, 0, len(toolCalls))
|
||||
for _, tc := range toolCalls {
|
||||
callID, _ := tc["id"].(string)
|
||||
if strings.TrimSpace(callID) == "" {
|
||||
callID = "call_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
}
|
||||
name := ""
|
||||
args := "{}"
|
||||
if fn, ok := tc["function"].(map[string]any); ok {
|
||||
if n, _ := fn["name"].(string); strings.TrimSpace(n) != "" {
|
||||
name = n
|
||||
}
|
||||
if a, _ := fn["arguments"].(string); strings.TrimSpace(a) != "" {
|
||||
args = a
|
||||
}
|
||||
}
|
||||
out = append(out, map[string]any{
|
||||
"id": "fc_" + strings.ReplaceAll(uuid.NewString(), "-", ""),
|
||||
"type": "function_call",
|
||||
"call_id": callID,
|
||||
"name": name,
|
||||
"arguments": normalizeJSONString(args),
|
||||
"status": "completed",
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func normalizeJSONString(raw string) string {
|
||||
s := strings.TrimSpace(raw)
|
||||
if s == "" {
|
||||
return "{}"
|
||||
}
|
||||
var v any
|
||||
if err := json.Unmarshal([]byte(s), &v); err != nil {
|
||||
return raw
|
||||
}
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func BuildChatStreamDeltaChoice(index int, delta map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"delta": delta,
|
||||
@@ -159,49 +204,105 @@ func BuildChatUsage(finalPrompt, finalThinking, finalText string) map[string]any
|
||||
|
||||
func BuildResponsesCreatedPayload(responseID, model string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.created",
|
||||
"id": responseID,
|
||||
"object": "response",
|
||||
"model": model,
|
||||
"status": "in_progress",
|
||||
"type": "response.created",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"object": "response",
|
||||
"model": model,
|
||||
"status": "in_progress",
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesTextDeltaPayload(responseID, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"delta": delta,
|
||||
"type": "response.output_text.delta",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesReasoningDeltaPayload(responseID, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.reasoning.delta",
|
||||
"id": responseID,
|
||||
"delta": delta,
|
||||
"type": "response.reasoning.delta",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesReasoningTextDeltaPayload(responseID, itemID string, outputIndex, contentIndex int, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.reasoning_text.delta",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"item_id": itemID,
|
||||
"output_index": outputIndex,
|
||||
"content_index": contentIndex,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesReasoningTextDonePayload(responseID, itemID string, outputIndex, contentIndex int, text string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.reasoning_text.done",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"item_id": itemID,
|
||||
"output_index": outputIndex,
|
||||
"content_index": contentIndex,
|
||||
"text": text,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesToolCallDeltaPayload(responseID string, toolCalls []map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_tool_call.delta",
|
||||
"id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
"type": "response.output_tool_call.delta",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesToolCallDonePayload(responseID string, toolCalls []map[string]any) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.output_tool_call.done",
|
||||
"id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
"type": "response.output_tool_call.done",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"tool_calls": toolCalls,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesFunctionCallArgumentsDeltaPayload(responseID, itemID string, outputIndex int, callID, delta string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.function_call_arguments.delta",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"item_id": itemID,
|
||||
"output_index": outputIndex,
|
||||
"call_id": callID,
|
||||
"delta": delta,
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesFunctionCallArgumentsDonePayload(responseID, itemID string, outputIndex int, callID, name, arguments string) map[string]any {
|
||||
return map[string]any{
|
||||
"type": "response.function_call_arguments.done",
|
||||
"id": responseID,
|
||||
"response_id": responseID,
|
||||
"item_id": itemID,
|
||||
"output_index": outputIndex,
|
||||
"call_id": callID,
|
||||
"name": name,
|
||||
"arguments": normalizeJSONString(arguments),
|
||||
}
|
||||
}
|
||||
|
||||
func BuildResponsesCompletedPayload(response map[string]any) map[string]any {
|
||||
responseID, _ := response["id"].(string)
|
||||
return map[string]any{
|
||||
"type": "response.completed",
|
||||
"response": response,
|
||||
"type": "response.completed",
|
||||
"response_id": responseID,
|
||||
"response": response,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,16 +21,23 @@ func TestBuildResponseObjectToolCallsFollowChatShape(t *testing.T) {
|
||||
}
|
||||
|
||||
output, _ := obj["output"].([]any)
|
||||
if len(output) != 1 {
|
||||
t.Fatalf("expected one tool_calls wrapper, got %#v", obj["output"])
|
||||
if len(output) != 2 {
|
||||
t.Fatalf("expected function_call + tool_calls wrapper, got %#v", obj["output"])
|
||||
}
|
||||
|
||||
first, _ := output[0].(map[string]any)
|
||||
if first["type"] != "tool_calls" {
|
||||
t.Fatalf("expected first output item type tool_calls, got %#v", first["type"])
|
||||
if first["type"] != "function_call" {
|
||||
t.Fatalf("expected first output item type function_call, got %#v", first["type"])
|
||||
}
|
||||
if first["call_id"] == "" {
|
||||
t.Fatalf("expected function_call item to have call_id, got %#v", first)
|
||||
}
|
||||
second, _ := output[1].(map[string]any)
|
||||
if second["type"] != "tool_calls" {
|
||||
t.Fatalf("expected second output item type tool_calls, got %#v", second["type"])
|
||||
}
|
||||
var toolCalls []map[string]any
|
||||
switch v := first["tool_calls"].(type) {
|
||||
switch v := second["tool_calls"].(type) {
|
||||
case []map[string]any:
|
||||
toolCalls = v
|
||||
case []any:
|
||||
@@ -43,7 +50,7 @@ func TestBuildResponseObjectToolCallsFollowChatShape(t *testing.T) {
|
||||
}
|
||||
}
|
||||
if len(toolCalls) != 1 {
|
||||
t.Fatalf("expected one tool call, got %#v", first["tool_calls"])
|
||||
t.Fatalf("expected one tool call, got %#v", second["tool_calls"])
|
||||
}
|
||||
tc := toolCalls[0]
|
||||
if tc["type"] != "function" || tc["id"] == "" {
|
||||
@@ -132,15 +139,19 @@ func TestBuildResponseObjectDetectsToolCallFromThinkingChannel(t *testing.T) {
|
||||
)
|
||||
|
||||
output, _ := obj["output"].([]any)
|
||||
if len(output) != 2 {
|
||||
t.Fatalf("expected reasoning + tool_calls outputs, got %#v", obj["output"])
|
||||
if len(output) != 3 {
|
||||
t.Fatalf("expected reasoning + function_call + tool_calls outputs, got %#v", obj["output"])
|
||||
}
|
||||
first, _ := output[0].(map[string]any)
|
||||
if first["type"] != "reasoning" {
|
||||
t.Fatalf("expected first output reasoning, got %#v", first["type"])
|
||||
}
|
||||
second, _ := output[1].(map[string]any)
|
||||
if second["type"] != "tool_calls" {
|
||||
t.Fatalf("expected second output tool_calls, got %#v", second["type"])
|
||||
if second["type"] != "function_call" {
|
||||
t.Fatalf("expected second output function_call, got %#v", second["type"])
|
||||
}
|
||||
third, _ := output[2].(map[string]any)
|
||||
if third["type"] != "tool_calls" {
|
||||
t.Fatalf("expected third output tool_calls, got %#v", third["type"])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user