feat: support multi-round upstream captures in raw sample generation

This commit is contained in:
CJACK
2026-04-05 22:48:41 +08:00
parent 2a6b787f38
commit 84813eca80
4 changed files with 222 additions and 36 deletions

1
.gitignore vendored
View File

@@ -9,6 +9,7 @@ config.json
*.swo
*~
.DS_Store
opencode.json
# Logs
*.log

View File

@@ -59,9 +59,9 @@ func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) {
return
}
captureEntry, ok := selectNewestCaptureEntry(before, after)
if !ok {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "no upstream capture was recorded"})
captureEntries, err := collectNewCaptureEntries(before, after)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": err.Error()})
return
}
@@ -70,8 +70,8 @@ func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) {
SampleID: sampleID,
Source: "admin/dev/raw-samples/capture",
Request: payload,
Capture: captureSummaryFromEntry(captureEntry),
UpstreamBody: []byte(captureEntry.ResponseBody),
Capture: captureSummaryFromEntries(captureEntries),
UpstreamBody: combineCaptureBodies(captureEntries),
})
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": err.Error()})
@@ -134,12 +134,13 @@ func prepareRawSampleCaptureRequest(store ConfigStore, req map[string]any) (map[
return payload, sampleID, apiKey, nil
}
func selectNewestCaptureEntry(before, after []devcapture.Entry) (devcapture.Entry, bool) {
func collectNewCaptureEntries(before, after []devcapture.Entry) ([]devcapture.Entry, error) {
beforeIDs := make(map[string]struct{}, len(before))
for _, entry := range before {
beforeIDs[entry.ID] = struct{}{}
}
candidates := make([]devcapture.Entry, 0, len(after))
entries := make([]devcapture.Entry, 0, len(after))
for _, entry := range after {
if _, ok := beforeIDs[entry.ID]; ok {
continue
@@ -147,33 +148,68 @@ func selectNewestCaptureEntry(before, after []devcapture.Entry) (devcapture.Entr
if strings.TrimSpace(entry.ResponseBody) == "" {
continue
}
candidates = append(candidates, entry)
entries = append(entries, entry)
}
if len(candidates) == 0 {
candidates = append(candidates, after...)
if len(entries) == 0 {
return nil, fmt.Errorf("no upstream capture was recorded")
}
if len(candidates) == 0 {
return devcapture.Entry{}, false
// Snapshot order is newest-first; reverse to preserve the actual request order.
for i, j := 0, len(entries)-1; i < j; i, j = i+1, j-1 {
entries[i], entries[j] = entries[j], entries[i]
}
best := candidates[0]
bestScore := len(best.ResponseBody)
for _, entry := range candidates[1:] {
score := len(entry.ResponseBody)
if entry.CreatedAt > best.CreatedAt || (entry.CreatedAt == best.CreatedAt && score > bestScore) {
best = entry
bestScore = score
}
}
return best, true
return entries, nil
}
func captureSummaryFromEntry(entry devcapture.Entry) rawsample.CaptureSummary {
return rawsample.CaptureSummary{
Label: strings.TrimSpace(entry.Label),
URL: strings.TrimSpace(entry.URL),
StatusCode: entry.StatusCode,
ResponseBytes: len(entry.ResponseBody),
func captureSummaryFromEntries(entries []devcapture.Entry) rawsample.CaptureSummary {
if len(entries) == 0 {
return rawsample.CaptureSummary{}
}
// Primary metadata comes from the first (initial) capture.
summary := rawsample.CaptureSummary{
Label: strings.TrimSpace(entries[0].Label),
URL: strings.TrimSpace(entries[0].URL),
StatusCode: entries[0].StatusCode,
}
// Record every round (initial + continuations) so replay/debug
// can reconstruct the full multi-round interaction.
totalBytes := 0
rounds := make([]rawsample.CaptureRound, 0, len(entries))
for _, entry := range entries {
n := len(entry.ResponseBody)
totalBytes += n
rounds = append(rounds, rawsample.CaptureRound{
Label: strings.TrimSpace(entry.Label),
URL: strings.TrimSpace(entry.URL),
StatusCode: entry.StatusCode,
ResponseBytes: n,
})
}
summary.ResponseBytes = totalBytes
if len(rounds) > 1 {
summary.Rounds = rounds
}
return summary
}
func combineCaptureBodies(entries []devcapture.Entry) []byte {
if len(entries) == 0 {
return nil
}
var buf bytes.Buffer
for _, entry := range entries {
if buf.Len() > 0 {
last := buf.Bytes()[buf.Len()-1]
if last != '\n' {
buf.WriteByte('\n')
}
}
buf.WriteString(entry.ResponseBody)
}
return buf.Bytes()
}
func copyHeader(dst, src http.Header) {

View File

@@ -1,6 +1,7 @@
package admin
import (
"bytes"
"encoding/json"
"io"
"net/http"
@@ -33,6 +34,36 @@ func (stubOpenAIChatCaller) ChatCompletions(w http.ResponseWriter, _ *http.Reque
_, _ = io.WriteString(w, "data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n")
}
type stubOpenAIChatCallerWithContinuations struct{}
func (stubOpenAIChatCallerWithContinuations) ChatCompletions(w http.ResponseWriter, _ *http.Request) {
recordCapturedResponse("deepseek_completion", "https://chat.deepseek.com/api/v0/chat/completion", http.StatusOK, map[string]any{"model": "deepseek-chat"}, "data: {\"v\":\"hello [reference:1]\"}\n\n"+"data: [DONE]\n\n")
recordCapturedResponse("deepseek_continue", "https://chat.deepseek.com/api/v0/chat/continue", http.StatusOK, map[string]any{"chat_session_id": "session-1", "message_id": 2}, "data: {\"v\":\"continued\"}\n\n"+"data: [DONE]\n\n")
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "data: {\"choices\":[{\"delta\":{\"content\":\"hello continued\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n")
}
type stubOpenAIChatCallerWithoutCapture struct{}
func (stubOpenAIChatCallerWithoutCapture) ChatCompletions(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n")
}
func recordCapturedResponse(label, rawURL string, statusCode int, request any, body string) {
store := devcapture.Global()
session := store.Start(label, rawURL, "acct-test", request)
raw := io.NopCloser(strings.NewReader(body))
if session != nil {
raw = session.WrapBody(raw, statusCode)
}
_, _ = io.ReadAll(raw)
_ = raw.Close()
}
func TestCaptureRawSampleWritesPersistentSample(t *testing.T) {
t.Setenv("DS2API_RAW_STREAM_SAMPLE_ROOT", t.TempDir())
devcapture.Global().Clear()
@@ -89,3 +120,113 @@ func TestCaptureRawSampleWritesPersistentSample(t *testing.T) {
t.Fatalf("unexpected processed meta: %#v", meta["processed"])
}
}
func TestCaptureRawSampleCombinesContinuationCaptures(t *testing.T) {
t.Setenv("DS2API_RAW_STREAM_SAMPLE_ROOT", t.TempDir())
devcapture.Global().Clear()
defer devcapture.Global().Clear()
h := &Handler{OpenAI: stubOpenAIChatCallerWithContinuations{}}
reqBody := `{
"sample_id":"My Sample 02",
"api_key":"local-key",
"model":"deepseek-chat",
"message":"广州天气",
"stream":true
}`
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/admin/dev/raw-samples/capture", strings.NewReader(reqBody))
h.captureRawSample(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
sampleDir := filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-02")
upstreamBytes, err := os.ReadFile(filepath.Join(sampleDir, "upstream.stream.sse"))
if err != nil {
t.Fatalf("read upstream: %v", err)
}
upstream := string(upstreamBytes)
if !strings.Contains(upstream, "hello [reference:1]") {
t.Fatalf("expected initial capture in combined upstream, got %s", upstream)
}
if !strings.Contains(upstream, "continued") {
t.Fatalf("expected continuation capture in combined upstream, got %s", upstream)
}
if strings.Index(upstream, "hello [reference:1]") > strings.Index(upstream, "continued") {
t.Fatalf("expected initial capture before continuation, got %s", upstream)
}
metaBytes, err := os.ReadFile(filepath.Join(sampleDir, "meta.json"))
if err != nil {
t.Fatalf("read meta: %v", err)
}
var meta map[string]any
if err := json.Unmarshal(metaBytes, &meta); err != nil {
t.Fatalf("decode meta: %v", err)
}
capture, _ := meta["capture"].(map[string]any)
if capture == nil {
t.Fatalf("missing capture meta: %#v", meta)
}
if got := int(capture["response_bytes"].(float64)); got != len(upstreamBytes) {
t.Fatalf("expected combined response_bytes %d, got %#v", len(upstreamBytes), capture["response_bytes"])
}
rounds, _ := capture["rounds"].([]any)
if len(rounds) != 2 {
t.Fatalf("expected 2 capture rounds, got %d: %#v", len(rounds), capture)
}
r0, _ := rounds[0].(map[string]any)
r1, _ := rounds[1].(map[string]any)
if r0["label"] != "deepseek_completion" {
t.Fatalf("expected first round label deepseek_completion, got %v", r0["label"])
}
if r1["label"] != "deepseek_continue" {
t.Fatalf("expected second round label deepseek_continue, got %v", r1["label"])
}
}
func TestCaptureRawSampleReturnsErrorWhenNoNewCaptureRecorded(t *testing.T) {
root := t.TempDir()
t.Setenv("DS2API_RAW_STREAM_SAMPLE_ROOT", root)
devcapture.Global().Clear()
defer devcapture.Global().Clear()
recordCapturedResponse("preexisting", "https://chat.deepseek.com/api/v0/chat/completion", http.StatusOK, map[string]any{"model": "deepseek-chat"}, "data: {\"v\":\"old\"}\n\n")
h := &Handler{OpenAI: stubOpenAIChatCallerWithoutCapture{}}
reqBody := `{
"sample_id":"My Sample 03",
"api_key":"local-key",
"model":"deepseek-chat",
"message":"广州天气",
"stream":true
}`
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/admin/dev/raw-samples/capture", strings.NewReader(reqBody))
h.captureRawSample(rec, req)
if rec.Code != http.StatusInternalServerError {
t.Fatalf("expected 500, got %d body=%s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "no upstream capture was recorded") {
t.Fatalf("expected no-capture error, got %s", rec.Body.String())
}
if _, err := os.Stat(filepath.Join(root, "my-sample-03")); !os.IsNotExist(err) {
t.Fatalf("expected no sample dir to be created, stat err=%v", err)
}
}
func TestCombineCaptureBodiesPreservesOrderAndSeparators(t *testing.T) {
entries := []devcapture.Entry{
{ResponseBody: "first"},
{ResponseBody: "second"},
}
got := combineCaptureBodies(entries)
if !bytes.Equal(got, []byte("first\nsecond")) {
t.Fatalf("unexpected combined body: %q", string(got))
}
}

View File

@@ -15,15 +15,23 @@ import (
var referenceMarkerRe = regexp.MustCompile(`(?i)\[reference:\s*\d+\]`)
type CaptureRound struct {
Label string `json:"label,omitempty"`
URL string `json:"url,omitempty"`
StatusCode int `json:"status_code"`
ResponseBytes int `json:"response_bytes"`
}
type CaptureSummary struct {
Label string `json:"label,omitempty"`
URL string `json:"url,omitempty"`
StatusCode int `json:"status_code"`
ResponseBytes int `json:"response_bytes"`
ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"`
ReferenceMarkerCount int `json:"reference_marker_count,omitempty"`
ContainsFinishedToken bool `json:"contains_finished_token,omitempty"`
FinishedTokenCount int `json:"finished_token_count,omitempty"`
Label string `json:"label,omitempty"`
URL string `json:"url,omitempty"`
StatusCode int `json:"status_code"`
ResponseBytes int `json:"response_bytes"`
Rounds []CaptureRound `json:"rounds,omitempty"`
ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"`
ReferenceMarkerCount int `json:"reference_marker_count,omitempty"`
ContainsFinishedToken bool `json:"contains_finished_token,omitempty"`
FinishedTokenCount int `json:"finished_token_count,omitempty"`
}
type Meta struct {