refactor: replace processed output comparison with baseline-based validation in SSE simulator

This commit is contained in:
CJACK
2026-04-05 01:34:06 +08:00
parent 93879c9808
commit 0bebb4b28d
20 changed files with 3014 additions and 3853 deletions

View File

@@ -65,18 +65,13 @@ func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) {
return
}
processedContentType := strings.TrimSpace(rec.Header().Get("Content-Type"))
saved, err := rawsample.Persist(rawsample.PersistOptions{
RootDir: config.RawStreamSampleRoot(),
SampleID: sampleID,
Source: "admin/dev/raw-samples/capture",
Request: payload,
Capture: captureSummaryFromEntry(captureEntry),
UpstreamBody: []byte(captureEntry.ResponseBody),
ProcessedBody: rec.Body.Bytes(),
ProcessedKind: processedKindFromContentType(processedContentType),
ProcessedStatusCode: rec.Code,
ProcessedContentType: processedContentType,
RootDir: config.RawStreamSampleRoot(),
SampleID: sampleID,
Source: "admin/dev/raw-samples/capture",
Request: payload,
Capture: captureSummaryFromEntry(captureEntry),
UpstreamBody: []byte(captureEntry.ResponseBody),
})
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": err.Error()})
@@ -88,8 +83,6 @@ func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Ds2-Sample-Dir", saved.Dir)
w.Header().Set("X-Ds2-Sample-Meta", saved.MetaPath)
w.Header().Set("X-Ds2-Sample-Upstream", saved.UpstreamPath)
w.Header().Set("X-Ds2-Sample-Processed", saved.ProcessedPath)
w.Header().Set("X-Ds2-Sample-Output", saved.OutputPath)
w.WriteHeader(rec.Code)
_, _ = io.Copy(w, bytes.NewReader(rec.Body.Bytes()))
}
@@ -183,18 +176,6 @@ func captureSummaryFromEntry(entry devcapture.Entry) rawsample.CaptureSummary {
}
}
func processedKindFromContentType(contentType string) string {
ct := strings.ToLower(strings.TrimSpace(contentType))
switch {
case strings.Contains(ct, "text/event-stream"):
return "stream"
case strings.Contains(ct, "application/json"):
return "json"
default:
return "stream"
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
dst.Del(k)

View File

@@ -56,8 +56,8 @@ func TestCaptureRawSampleWritesPersistentSample(t *testing.T) {
if got := rec.Header().Get("X-Ds2-Sample-Id"); got != "my-sample-01" {
t.Fatalf("expected sample id header my-sample-01, got %q", got)
}
if got := rec.Header().Get("X-Ds2-Sample-Output"); got != filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01", "openai.output.txt") {
t.Fatalf("unexpected sample output header: %q", got)
if got := rec.Header().Get("X-Ds2-Sample-Upstream"); got != filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01", "upstream.stream.sse") {
t.Fatalf("unexpected sample upstream header: %q", got)
}
if !strings.Contains(rec.Body.String(), `"content":"hello"`) {
t.Fatalf("expected proxied openai output, got %s", rec.Body.String())
@@ -85,11 +85,7 @@ func TestCaptureRawSampleWritesPersistentSample(t *testing.T) {
if got := int(capture["response_bytes"].(float64)); got == 0 {
t.Fatalf("expected capture bytes to be recorded, got %#v", capture)
}
processed, _ := meta["processed"].(map[string]any)
if processed == nil {
t.Fatalf("missing processed meta: %#v", meta)
}
if processed["file"] != "openai.stream.sse" {
t.Fatalf("unexpected processed file: %#v", processed["file"])
if _, ok := meta["processed"]; ok {
t.Fatalf("unexpected processed meta: %#v", meta["processed"])
}
}

View File

@@ -26,51 +26,29 @@ type CaptureSummary struct {
FinishedTokenCount int `json:"finished_token_count,omitempty"`
}
type ProcessedSummary struct {
Kind string `json:"kind"`
File string `json:"file"`
TextFile string `json:"text_file,omitempty"`
StatusCode int `json:"status_code"`
ContentType string `json:"content_type,omitempty"`
ResponseBytes int `json:"response_bytes"`
ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"`
ReferenceMarkerCount int `json:"reference_marker_count,omitempty"`
ContainsFinishedToken bool `json:"contains_finished_token,omitempty"`
FinishedTokenCount int `json:"finished_token_count,omitempty"`
}
type Meta struct {
SampleID string `json:"sample_id"`
CapturedAtUTC string `json:"captured_at_utc"`
Source string `json:"source,omitempty"`
Request any `json:"request"`
Capture CaptureSummary `json:"capture"`
Processed ProcessedSummary `json:"processed"`
SampleID string `json:"sample_id"`
CapturedAtUTC string `json:"captured_at_utc"`
Source string `json:"source,omitempty"`
Request any `json:"request"`
Capture CaptureSummary `json:"capture"`
}
type PersistOptions struct {
RootDir string
SampleID string
Source string
Request any
Capture CaptureSummary
UpstreamBody []byte
ProcessedBody []byte
ProcessedKind string
ProcessedFile string
ProcessedStatusCode int
ProcessedContentType string
ProcessedText string
RootDir string
SampleID string
Source string
Request any
Capture CaptureSummary
UpstreamBody []byte
}
type SavedSample struct {
SampleID string
Dir string
MetaPath string
UpstreamPath string
ProcessedPath string
OutputPath string
Meta Meta
SampleID string
Dir string
MetaPath string
UpstreamPath string
Meta Meta
}
func Persist(opts PersistOptions) (SavedSample, error) {
@@ -81,9 +59,6 @@ func Persist(opts PersistOptions) (SavedSample, error) {
if len(opts.UpstreamBody) == 0 {
return SavedSample{}, errors.New("upstream body is required")
}
if len(opts.ProcessedBody) == 0 {
return SavedSample{}, errors.New("processed body is required")
}
if err := os.MkdirAll(root, 0o755); err != nil {
return SavedSample{}, fmt.Errorf("create root dir: %w", err)
@@ -114,56 +89,17 @@ func Persist(opts PersistOptions) (SavedSample, error) {
return SavedSample{}, fmt.Errorf("write upstream stream: %w", err)
}
processedKind := strings.TrimSpace(opts.ProcessedKind)
if processedKind == "" {
processedKind = inferProcessedKind(opts.ProcessedContentType)
}
processedFile := strings.TrimSpace(opts.ProcessedFile)
if processedFile == "" {
processedFile = defaultProcessedFile(processedKind, opts.ProcessedContentType)
}
if processedFile == "" {
cleanup()
return SavedSample{}, errors.New("processed file name is required")
}
processedPath := filepath.Join(tempDir, processedFile)
if err := os.WriteFile(processedPath, opts.ProcessedBody, 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write processed output: %w", err)
}
processedText := opts.ProcessedText
if processedText == "" {
processedText = extractProcessedVisibleText(opts.ProcessedBody, processedKind, opts.ProcessedContentType)
}
textPath := filepath.Join(tempDir, "openai.output.txt")
if err := os.WriteFile(textPath, []byte(processedText), 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write processed text: %w", err)
}
now := time.Now().UTC()
capture := opts.Capture
capture.ResponseBytes = len(opts.UpstreamBody)
capture.ContainsReferenceMarkers, capture.ReferenceMarkerCount, capture.ContainsFinishedToken, capture.FinishedTokenCount = analyzeBytes(opts.UpstreamBody)
processed := ProcessedSummary{
Kind: processedKind,
File: processedFile,
TextFile: "openai.output.txt",
StatusCode: opts.ProcessedStatusCode,
ContentType: strings.TrimSpace(opts.ProcessedContentType),
ResponseBytes: len(opts.ProcessedBody),
}
processed.ContainsReferenceMarkers, processed.ReferenceMarkerCount, processed.ContainsFinishedToken, processed.FinishedTokenCount = analyzeBytes(opts.ProcessedBody)
meta := Meta{
SampleID: sampleID,
CapturedAtUTC: now.Format(time.RFC3339),
Source: strings.TrimSpace(opts.Source),
Request: opts.Request,
Capture: capture,
Processed: processed,
}
metaBytes, err := json.MarshalIndent(meta, "", " ")
if err != nil {
@@ -182,13 +118,11 @@ func Persist(opts PersistOptions) (SavedSample, error) {
}
return SavedSample{
SampleID: sampleID,
Dir: finalDir,
MetaPath: filepath.Join(finalDir, "meta.json"),
UpstreamPath: filepath.Join(finalDir, "upstream.stream.sse"),
ProcessedPath: filepath.Join(finalDir, processedFile),
OutputPath: filepath.Join(finalDir, "openai.output.txt"),
Meta: meta,
SampleID: sampleID,
Dir: finalDir,
MetaPath: filepath.Join(finalDir, "meta.json"),
UpstreamPath: filepath.Join(finalDir, "upstream.stream.sse"),
Meta: meta,
}, nil
}
@@ -226,33 +160,6 @@ func DefaultSampleID(prefix string) string {
return fmt.Sprintf("%s-%s", prefix, time.Now().UTC().Format("20060102T150405Z"))
}
func inferProcessedKind(contentType string) string {
ct := strings.ToLower(strings.TrimSpace(contentType))
switch {
case strings.Contains(ct, "text/event-stream"):
return "stream"
case strings.Contains(ct, "application/json"):
return "json"
default:
return "stream"
}
}
func defaultProcessedFile(kind, contentType string) string {
switch strings.ToLower(strings.TrimSpace(kind)) {
case "json":
return "openai.response.json"
case "stream":
return "openai.stream.sse"
}
switch inferProcessedKind(contentType) {
case "json":
return "openai.response.json"
default:
return "openai.stream.sse"
}
}
func uniqueSampleID(root, base string) (string, error) {
if base == "" {
base = DefaultSampleID("capture")

View File

@@ -35,9 +35,6 @@ func TestPersistWritesSampleFilesAndMeta(t *testing.T) {
},
UpstreamBody: []byte("data: {\"v\":\"hello [reference:1]\"}\n\n" +
"data: {\"v\":\"FINISHED\",\"p\":\"response/status\"}\n\n"),
ProcessedBody: []byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n"),
ProcessedStatusCode: 200,
ProcessedContentType: "text/event-stream",
})
if err != nil {
t.Fatalf("Persist failed: %v", err)
@@ -52,14 +49,11 @@ func TestPersistWritesSampleFilesAndMeta(t *testing.T) {
if _, err := os.Stat(saved.UpstreamPath); err != nil {
t.Fatalf("upstream file missing: %v", err)
}
if _, err := os.Stat(saved.ProcessedPath); err != nil {
t.Fatalf("processed file missing: %v", err)
if _, err := os.Stat(filepath.Join(saved.Dir, "openai.stream.sse")); !os.IsNotExist(err) {
t.Fatalf("unexpected processed stream file: %v", err)
}
if saved.OutputPath != filepath.Join(saved.Dir, "openai.output.txt") {
t.Fatalf("unexpected processed text path: %s", saved.OutputPath)
}
if _, err := os.Stat(saved.OutputPath); err != nil {
t.Fatalf("processed text file missing: %v", err)
if _, err := os.Stat(filepath.Join(saved.Dir, "openai.output.txt")); !os.IsNotExist(err) {
t.Fatalf("unexpected processed text file: %v", err)
}
metaBytes, err := os.ReadFile(saved.MetaPath)
@@ -79,16 +73,7 @@ func TestPersistWritesSampleFilesAndMeta(t *testing.T) {
if meta.Capture.FinishedTokenCount != 1 {
t.Fatalf("expected one finished token, got %+v", meta.Capture)
}
if meta.Processed.File != "openai.stream.sse" {
t.Fatalf("expected stream processed file, got %+v", meta.Processed)
}
if meta.Processed.TextFile != "openai.output.txt" {
t.Fatalf("expected text file metadata, got %+v", meta.Processed)
}
if meta.Processed.ResponseBytes == 0 {
t.Fatalf("expected processed bytes to be recorded, got %+v", meta.Processed)
}
if !strings.HasSuffix(saved.ProcessedPath, filepath.Join(saved.SampleID, "openai.stream.sse")) {
t.Fatalf("unexpected processed path: %s", saved.ProcessedPath)
if strings.Contains(string(metaBytes), "\"processed\"") {
t.Fatalf("meta should not include processed payload: %s", string(metaBytes))
}
}

View File

@@ -8,28 +8,32 @@ func filterLeakedContentFilterParts(parts []ContentPart) []ContentPart {
}
out := make([]ContentPart, 0, len(parts))
for _, p := range parts {
cleaned := stripLeakedContentFilterSuffix(p.Text)
if shouldDropCleanedLeakedChunk(cleaned) {
cleaned, stripped := stripLeakedContentFilterSuffix(p.Text)
// Only drop the chunk when we actually stripped a leaked CONTENT_FILTER
// suffix. Plain whitespace chunks are valid SSE content and must stay.
if stripped && shouldDropCleanedLeakedChunk(cleaned) {
continue
}
p.Text = cleaned
if stripped {
p.Text = cleaned
}
out = append(out, p)
}
return out
}
func stripLeakedContentFilterSuffix(text string) string {
func stripLeakedContentFilterSuffix(text string) (string, bool) {
if text == "" {
return text
return text, false
}
upperText := strings.ToUpper(text)
idx := strings.Index(upperText, "CONTENT_FILTER")
if idx < 0 {
return text
return text, false
}
// Keep "\n" so we don't collapse line structure when the upstream model
// appends leaked CONTENT_FILTER markers after a line break.
return strings.TrimRight(text[:idx], " \t\r")
return strings.TrimRight(text[:idx], " \t\r"), true
}
func shouldDropCleanedLeakedChunk(cleaned string) bool {

View File

@@ -63,6 +63,16 @@ func TestParseDeepSeekContentLineContent(t *testing.T) {
}
}
func TestParseDeepSeekContentLinePreservesSpaceOnlyChunk(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"v":" "}`), false, "text")
if !res.Parsed || res.Stop {
t.Fatalf("expected parsed non-stop result: %#v", res)
}
if len(res.Parts) != 1 || res.Parts[0].Text != " " || res.Parts[0].Type != "text" {
t.Fatalf("unexpected parts for space-only chunk: %#v", res.Parts)
}
}
func TestParseDeepSeekContentLineStripsLeakedContentFilterSuffix(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"p":"response/content","v":"正常输出CONTENT_FILTER你好这个问题我暂时无法回答"}`), false, "text")
if !res.Parsed || res.Stop {