feat: implement rawsample package for automated capture persistence and add admin handlers for sample management

This commit is contained in:
CJACK
2026-04-05 01:12:31 +08:00
parent c9201174f6
commit 93879c9808
17 changed files with 1216 additions and 69 deletions

View File

@@ -77,6 +77,14 @@ data: <json or text>
./tests/scripts/run-raw-stream-sim.sh
```
如果需要新增永久样本,可以直接走本地专用接口:
```bash
POST /admin/dev/raw-samples/capture
```
它会把原始上游流和项目最终输出一起落到 `tests/raw_stream_samples/<sample-id>/`,并生成 `openai.output.txt` 作为最终输出的对照基线,便于后续继续分析字段和做回放比对。
## 6. `CONTENT_FILTER` 终态样本
`content-filter-trigger-20260405-jwt3` 样本中,末尾会出现一组明确的风控终态字段:

View File

@@ -235,8 +235,19 @@ go run ./cmd/ds2api-tests --no-preflight
说明:
- 该工具默认重放 `tests/raw_stream_samples/manifest.json` 声明的 canonical 样本,按上游 SSE 顺序做 1:1 仿真解析。
- 默认校验不出现 `FINISHED` 文本泄露,并要求存在结束信号。
- 如果样本目录里存在 `openai.output.txt`,会直接拿它和重放结果做对照;否则会回退到 `openai.stream.sse` / `openai.response.json`
- 结果会写入 `artifacts/raw-stream-sim/*.json`,可供其他测试脚本或排障流程复用。
### 采集永久样本
本地启动服务后,可以直接打:
```bash
POST /admin/dev/raw-samples/capture
```
这个接口会把请求、上游原始流和最终输出一起写入 `tests/raw_stream_samples/<sample-id>/`,以后可以直接拿来做回放和字段分析。
### 指定输出目录和超时
```bash

View File

@@ -42,6 +42,10 @@ type PoolController interface {
ApplyRuntimeLimits(maxInflightPerAccount, maxQueueSize, globalMaxInflight int)
}
type OpenAIChatCaller interface {
ChatCompletions(w http.ResponseWriter, r *http.Request)
}
type DeepSeekCaller interface {
Login(ctx context.Context, acc config.Account) (string, error)
CreateSession(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error)

View File

@@ -5,9 +5,10 @@ import (
)
type Handler struct {
Store ConfigStore
Pool PoolController
DS DeepSeekCaller
Store ConfigStore
Pool PoolController
DS DeepSeekCaller
OpenAI OpenAIChatCaller
}
func RegisterRoutes(r chi.Router, h *Handler) {
@@ -34,6 +35,7 @@ func RegisterRoutes(r chi.Router, h *Handler) {
pr.Post("/accounts/sessions/delete-all", h.deleteAllSessions)
pr.Post("/import", h.batchImport)
pr.Post("/test", h.testAPI)
pr.Post("/dev/raw-samples/capture", h.captureRawSample)
pr.Post("/vercel/sync", h.syncVercel)
pr.Get("/vercel/status", h.vercelStatus)
pr.Post("/vercel/status", h.vercelStatus)

View File

@@ -0,0 +1,216 @@
package admin
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"ds2api/internal/config"
"ds2api/internal/devcapture"
"ds2api/internal/rawsample"
)
func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) {
if h.OpenAI == nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "OpenAI handler is not configured"})
return
}
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid json"})
return
}
payload, sampleID, apiKey, err := prepareRawSampleCaptureRequest(h.Store, req)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": err.Error()})
return
}
body, err := json.Marshal(payload)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "failed to encode capture request"})
return
}
traceID := rawsample.NormalizeSampleID(sampleID)
if traceID == "" {
traceID = rawsample.DefaultSampleID("capture")
}
before := devcapture.Global().Snapshot()
rec := httptest.NewRecorder()
captureReq := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__trace_id="+url.QueryEscape(traceID), bytes.NewReader(body))
captureReq.Header.Set("Authorization", "Bearer "+apiKey)
captureReq.Header.Set("Content-Type", "application/json")
h.OpenAI.ChatCompletions(rec, captureReq)
after := devcapture.Global().Snapshot()
if rec.Code >= http.StatusBadRequest {
copyHeader(w.Header(), rec.Header())
w.WriteHeader(rec.Code)
_, _ = io.Copy(w, bytes.NewReader(rec.Body.Bytes()))
return
}
captureEntry, ok := selectNewestCaptureEntry(before, after)
if !ok {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "no upstream capture was recorded"})
return
}
processedContentType := strings.TrimSpace(rec.Header().Get("Content-Type"))
saved, err := rawsample.Persist(rawsample.PersistOptions{
RootDir: config.RawStreamSampleRoot(),
SampleID: sampleID,
Source: "admin/dev/raw-samples/capture",
Request: payload,
Capture: captureSummaryFromEntry(captureEntry),
UpstreamBody: []byte(captureEntry.ResponseBody),
ProcessedBody: rec.Body.Bytes(),
ProcessedKind: processedKindFromContentType(processedContentType),
ProcessedStatusCode: rec.Code,
ProcessedContentType: processedContentType,
})
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": err.Error()})
return
}
copyHeader(w.Header(), rec.Header())
w.Header().Set("X-Ds2-Sample-Id", saved.SampleID)
w.Header().Set("X-Ds2-Sample-Dir", saved.Dir)
w.Header().Set("X-Ds2-Sample-Meta", saved.MetaPath)
w.Header().Set("X-Ds2-Sample-Upstream", saved.UpstreamPath)
w.Header().Set("X-Ds2-Sample-Processed", saved.ProcessedPath)
w.Header().Set("X-Ds2-Sample-Output", saved.OutputPath)
w.WriteHeader(rec.Code)
_, _ = io.Copy(w, bytes.NewReader(rec.Body.Bytes()))
}
func prepareRawSampleCaptureRequest(store ConfigStore, req map[string]any) (map[string]any, string, string, error) {
payload := cloneMap(req)
sampleID := strings.TrimSpace(fieldString(payload, "sample_id"))
apiKey := strings.TrimSpace(fieldString(payload, "api_key"))
for _, k := range []string{"sample_id", "api_key", "promote_default", "persist", "source"} {
delete(payload, k)
}
if apiKey == "" {
if store == nil {
return nil, "", "", fmt.Errorf("no api key provided")
}
keys := store.Keys()
if len(keys) == 0 {
return nil, "", "", fmt.Errorf("no api key available")
}
apiKey = strings.TrimSpace(keys[0])
}
if model := strings.TrimSpace(fieldString(payload, "model")); model == "" {
payload["model"] = "deepseek-chat"
}
if _, ok := payload["stream"]; !ok {
payload["stream"] = true
}
if messagesRaw, ok := payload["messages"].([]any); !ok || len(messagesRaw) == 0 {
message := strings.TrimSpace(fieldString(payload, "message"))
if message == "" {
message = "你好"
}
payload["messages"] = []map[string]any{{"role": "user", "content": message}}
}
delete(payload, "message")
if sampleID == "" {
model := strings.TrimSpace(fieldString(payload, "model"))
if model == "" {
model = "capture"
}
sampleID = rawsample.DefaultSampleID(model)
}
return payload, sampleID, apiKey, nil
}
func selectNewestCaptureEntry(before, after []devcapture.Entry) (devcapture.Entry, bool) {
beforeIDs := make(map[string]struct{}, len(before))
for _, entry := range before {
beforeIDs[entry.ID] = struct{}{}
}
candidates := make([]devcapture.Entry, 0, len(after))
for _, entry := range after {
if _, ok := beforeIDs[entry.ID]; ok {
continue
}
if strings.TrimSpace(entry.ResponseBody) == "" {
continue
}
candidates = append(candidates, entry)
}
if len(candidates) == 0 {
candidates = append(candidates, after...)
}
if len(candidates) == 0 {
return devcapture.Entry{}, false
}
best := candidates[0]
bestScore := len(best.ResponseBody)
for _, entry := range candidates[1:] {
score := len(entry.ResponseBody)
if entry.CreatedAt > best.CreatedAt || (entry.CreatedAt == best.CreatedAt && score > bestScore) {
best = entry
bestScore = score
}
}
return best, true
}
func captureSummaryFromEntry(entry devcapture.Entry) rawsample.CaptureSummary {
return rawsample.CaptureSummary{
Label: strings.TrimSpace(entry.Label),
URL: strings.TrimSpace(entry.URL),
StatusCode: entry.StatusCode,
ResponseBytes: len(entry.ResponseBody),
}
}
func processedKindFromContentType(contentType string) string {
ct := strings.ToLower(strings.TrimSpace(contentType))
switch {
case strings.Contains(ct, "text/event-stream"):
return "stream"
case strings.Contains(ct, "application/json"):
return "json"
default:
return "stream"
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
dst.Del(k)
for _, v := range vv {
dst.Add(k, v)
}
}
}
func cloneMap(in map[string]any) map[string]any {
if len(in) == 0 {
return map[string]any{}
}
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}

View File

@@ -0,0 +1,95 @@
package admin
import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"ds2api/internal/devcapture"
)
type stubOpenAIChatCaller struct{}
func (stubOpenAIChatCaller) ChatCompletions(w http.ResponseWriter, _ *http.Request) {
store := devcapture.Global()
session := store.Start("deepseek_completion", "https://chat.deepseek.com/api/v0/chat/completion", "acct-test", map[string]any{"model": "deepseek-chat"})
raw := io.NopCloser(strings.NewReader(
"data: {\"v\":\"hello [reference:1]\"}\n\n" +
"data: {\"v\":\"FINISHED\",\"p\":\"response/status\"}\n\n",
))
if session != nil {
raw = session.WrapBody(raw, http.StatusOK)
}
_, _ = io.ReadAll(raw)
_ = raw.Close()
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n")
}
func TestCaptureRawSampleWritesPersistentSample(t *testing.T) {
t.Setenv("DS2API_RAW_STREAM_SAMPLE_ROOT", t.TempDir())
devcapture.Global().Clear()
defer devcapture.Global().Clear()
h := &Handler{OpenAI: stubOpenAIChatCaller{}}
reqBody := `{
"sample_id":"My Sample 01",
"api_key":"local-key",
"model":"deepseek-chat",
"message":"广州天气",
"stream":true
}`
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/admin/dev/raw-samples/capture", strings.NewReader(reqBody))
h.captureRawSample(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
if got := rec.Header().Get("X-Ds2-Sample-Id"); got != "my-sample-01" {
t.Fatalf("expected sample id header my-sample-01, got %q", got)
}
if got := rec.Header().Get("X-Ds2-Sample-Output"); got != filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01", "openai.output.txt") {
t.Fatalf("unexpected sample output header: %q", got)
}
if !strings.Contains(rec.Body.String(), `"content":"hello"`) {
t.Fatalf("expected proxied openai output, got %s", rec.Body.String())
}
sampleDir := filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01")
if _, err := os.Stat(sampleDir); err != nil {
t.Fatalf("sample dir missing: %v", err)
}
metaBytes, err := os.ReadFile(filepath.Join(sampleDir, "meta.json"))
if err != nil {
t.Fatalf("read meta: %v", err)
}
var meta map[string]any
if err := json.Unmarshal(metaBytes, &meta); err != nil {
t.Fatalf("decode meta: %v", err)
}
if meta["sample_id"] != "my-sample-01" {
t.Fatalf("unexpected meta sample_id: %#v", meta["sample_id"])
}
capture, _ := meta["capture"].(map[string]any)
if capture == nil {
t.Fatalf("missing capture meta: %#v", meta)
}
if got := int(capture["response_bytes"].(float64)); got == 0 {
t.Fatalf("expected capture bytes to be recorded, got %#v", capture)
}
processed, _ := meta["processed"].(map[string]any)
if processed == nil {
t.Fatalf("missing processed meta: %#v", meta)
}
if processed["file"] != "openai.stream.sse" {
t.Fatalf("unexpected processed file: %#v", processed["file"])
}
}

View File

@@ -37,6 +37,10 @@ func WASMPath() string {
return ResolvePath("DS2API_WASM_PATH", "sha3_wasm_bg.7b9ca65ddd.wasm")
}
func RawStreamSampleRoot() string {
return ResolvePath("DS2API_RAW_STREAM_SAMPLE_ROOT", "tests/raw_stream_samples")
}
func StaticAdminDir() string {
return ResolvePath("DS2API_STATIC_ADMIN_DIR", "static/admin")
}

View File

@@ -0,0 +1,284 @@
package rawsample
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/google/uuid"
)
var referenceMarkerRe = regexp.MustCompile(`(?i)\[reference:\s*\d+\]`)
type CaptureSummary struct {
Label string `json:"label,omitempty"`
URL string `json:"url,omitempty"`
StatusCode int `json:"status_code"`
ResponseBytes int `json:"response_bytes"`
ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"`
ReferenceMarkerCount int `json:"reference_marker_count,omitempty"`
ContainsFinishedToken bool `json:"contains_finished_token,omitempty"`
FinishedTokenCount int `json:"finished_token_count,omitempty"`
}
type ProcessedSummary struct {
Kind string `json:"kind"`
File string `json:"file"`
TextFile string `json:"text_file,omitempty"`
StatusCode int `json:"status_code"`
ContentType string `json:"content_type,omitempty"`
ResponseBytes int `json:"response_bytes"`
ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"`
ReferenceMarkerCount int `json:"reference_marker_count,omitempty"`
ContainsFinishedToken bool `json:"contains_finished_token,omitempty"`
FinishedTokenCount int `json:"finished_token_count,omitempty"`
}
type Meta struct {
SampleID string `json:"sample_id"`
CapturedAtUTC string `json:"captured_at_utc"`
Source string `json:"source,omitempty"`
Request any `json:"request"`
Capture CaptureSummary `json:"capture"`
Processed ProcessedSummary `json:"processed"`
}
type PersistOptions struct {
RootDir string
SampleID string
Source string
Request any
Capture CaptureSummary
UpstreamBody []byte
ProcessedBody []byte
ProcessedKind string
ProcessedFile string
ProcessedStatusCode int
ProcessedContentType string
ProcessedText string
}
type SavedSample struct {
SampleID string
Dir string
MetaPath string
UpstreamPath string
ProcessedPath string
OutputPath string
Meta Meta
}
func Persist(opts PersistOptions) (SavedSample, error) {
root := strings.TrimSpace(opts.RootDir)
if root == "" {
return SavedSample{}, errors.New("root dir is required")
}
if len(opts.UpstreamBody) == 0 {
return SavedSample{}, errors.New("upstream body is required")
}
if len(opts.ProcessedBody) == 0 {
return SavedSample{}, errors.New("processed body is required")
}
if err := os.MkdirAll(root, 0o755); err != nil {
return SavedSample{}, fmt.Errorf("create root dir: %w", err)
}
baseID := NormalizeSampleID(opts.SampleID)
if baseID == "" {
baseID = DefaultSampleID("capture")
}
sampleID, err := uniqueSampleID(root, baseID)
if err != nil {
return SavedSample{}, err
}
tempID := ".tmp-" + sampleID + "-" + strings.ToLower(strings.ReplaceAll(uuid.NewString(), "-", ""))
tempDir := filepath.Join(root, tempID)
finalDir := filepath.Join(root, sampleID)
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return SavedSample{}, fmt.Errorf("create temp dir: %w", err)
}
cleanup := func() {
_ = os.RemoveAll(tempDir)
}
upstreamPath := filepath.Join(tempDir, "upstream.stream.sse")
if err := os.WriteFile(upstreamPath, opts.UpstreamBody, 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write upstream stream: %w", err)
}
processedKind := strings.TrimSpace(opts.ProcessedKind)
if processedKind == "" {
processedKind = inferProcessedKind(opts.ProcessedContentType)
}
processedFile := strings.TrimSpace(opts.ProcessedFile)
if processedFile == "" {
processedFile = defaultProcessedFile(processedKind, opts.ProcessedContentType)
}
if processedFile == "" {
cleanup()
return SavedSample{}, errors.New("processed file name is required")
}
processedPath := filepath.Join(tempDir, processedFile)
if err := os.WriteFile(processedPath, opts.ProcessedBody, 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write processed output: %w", err)
}
processedText := opts.ProcessedText
if processedText == "" {
processedText = extractProcessedVisibleText(opts.ProcessedBody, processedKind, opts.ProcessedContentType)
}
textPath := filepath.Join(tempDir, "openai.output.txt")
if err := os.WriteFile(textPath, []byte(processedText), 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write processed text: %w", err)
}
now := time.Now().UTC()
capture := opts.Capture
capture.ResponseBytes = len(opts.UpstreamBody)
capture.ContainsReferenceMarkers, capture.ReferenceMarkerCount, capture.ContainsFinishedToken, capture.FinishedTokenCount = analyzeBytes(opts.UpstreamBody)
processed := ProcessedSummary{
Kind: processedKind,
File: processedFile,
TextFile: "openai.output.txt",
StatusCode: opts.ProcessedStatusCode,
ContentType: strings.TrimSpace(opts.ProcessedContentType),
ResponseBytes: len(opts.ProcessedBody),
}
processed.ContainsReferenceMarkers, processed.ReferenceMarkerCount, processed.ContainsFinishedToken, processed.FinishedTokenCount = analyzeBytes(opts.ProcessedBody)
meta := Meta{
SampleID: sampleID,
CapturedAtUTC: now.Format(time.RFC3339),
Source: strings.TrimSpace(opts.Source),
Request: opts.Request,
Capture: capture,
Processed: processed,
}
metaBytes, err := json.MarshalIndent(meta, "", " ")
if err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("marshal meta: %w", err)
}
metaPath := filepath.Join(tempDir, "meta.json")
if err := os.WriteFile(metaPath, append(metaBytes, '\n'), 0o644); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("write meta: %w", err)
}
if err := os.Rename(tempDir, finalDir); err != nil {
cleanup()
return SavedSample{}, fmt.Errorf("promote sample dir: %w", err)
}
return SavedSample{
SampleID: sampleID,
Dir: finalDir,
MetaPath: filepath.Join(finalDir, "meta.json"),
UpstreamPath: filepath.Join(finalDir, "upstream.stream.sse"),
ProcessedPath: filepath.Join(finalDir, processedFile),
OutputPath: filepath.Join(finalDir, "openai.output.txt"),
Meta: meta,
}, nil
}
func NormalizeSampleID(raw string) string {
raw = strings.TrimSpace(strings.ToLower(raw))
if raw == "" {
return ""
}
var b strings.Builder
prevDash := false
for _, r := range raw {
switch {
case r >= 'a' && r <= 'z', r >= '0' && r <= '9', r == '-', r == '_', r == '.':
b.WriteRune(r)
prevDash = false
default:
if !prevDash {
b.WriteRune('-')
prevDash = true
}
}
}
out := strings.Trim(b.String(), "-_.")
if out == "" {
return ""
}
return out
}
func DefaultSampleID(prefix string) string {
prefix = NormalizeSampleID(prefix)
if prefix == "" {
prefix = "capture"
}
return fmt.Sprintf("%s-%s", prefix, time.Now().UTC().Format("20060102T150405Z"))
}
func inferProcessedKind(contentType string) string {
ct := strings.ToLower(strings.TrimSpace(contentType))
switch {
case strings.Contains(ct, "text/event-stream"):
return "stream"
case strings.Contains(ct, "application/json"):
return "json"
default:
return "stream"
}
}
func defaultProcessedFile(kind, contentType string) string {
switch strings.ToLower(strings.TrimSpace(kind)) {
case "json":
return "openai.response.json"
case "stream":
return "openai.stream.sse"
}
switch inferProcessedKind(contentType) {
case "json":
return "openai.response.json"
default:
return "openai.stream.sse"
}
}
func uniqueSampleID(root, base string) (string, error) {
if base == "" {
base = DefaultSampleID("capture")
}
candidate := base
for i := 2; ; i++ {
finalDir := filepath.Join(root, candidate)
if _, err := os.Stat(finalDir); err != nil {
if os.IsNotExist(err) {
return candidate, nil
}
return "", fmt.Errorf("stat sample dir: %w", err)
}
candidate = fmt.Sprintf("%s-%d", base, i)
}
}
func analyzeBytes(raw []byte) (containsReferenceMarkers bool, referenceMarkerCount int, containsFinishedToken bool, finishedTokenCount int) {
if len(raw) == 0 {
return false, 0, false, 0
}
text := string(raw)
referenceMarkerCount = len(referenceMarkerRe.FindAllStringIndex(text, -1))
containsReferenceMarkers = referenceMarkerCount > 0
upper := strings.ToUpper(text)
finishedTokenCount = strings.Count(upper, "FINISHED")
containsFinishedToken = finishedTokenCount > 0
return
}

View File

@@ -0,0 +1,94 @@
package rawsample
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
)
func TestNormalizeSampleID(t *testing.T) {
got := NormalizeSampleID(" Hello, World! ")
if got != "hello-world" {
t.Fatalf("expected hello-world, got %q", got)
}
}
func TestPersistWritesSampleFilesAndMeta(t *testing.T) {
root := t.TempDir()
saved, err := Persist(PersistOptions{
RootDir: root,
SampleID: "My Sample! 01",
Source: "unit-test",
Request: map[string]any{
"model": "deepseek-chat",
"stream": true,
"messages": []any{
map[string]any{"role": "user", "content": "广州天气"},
},
},
Capture: CaptureSummary{
Label: "deepseek_completion",
URL: "https://chat.deepseek.com/api/v0/chat/completion",
StatusCode: 200,
},
UpstreamBody: []byte("data: {\"v\":\"hello [reference:1]\"}\n\n" +
"data: {\"v\":\"FINISHED\",\"p\":\"response/status\"}\n\n"),
ProcessedBody: []byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n"),
ProcessedStatusCode: 200,
ProcessedContentType: "text/event-stream",
})
if err != nil {
t.Fatalf("Persist failed: %v", err)
}
if saved.SampleID != "my-sample-01" {
t.Fatalf("expected normalized sample id, got %q", saved.SampleID)
}
if _, err := os.Stat(saved.Dir); err != nil {
t.Fatalf("sample dir missing: %v", err)
}
if _, err := os.Stat(saved.UpstreamPath); err != nil {
t.Fatalf("upstream file missing: %v", err)
}
if _, err := os.Stat(saved.ProcessedPath); err != nil {
t.Fatalf("processed file missing: %v", err)
}
if saved.OutputPath != filepath.Join(saved.Dir, "openai.output.txt") {
t.Fatalf("unexpected processed text path: %s", saved.OutputPath)
}
if _, err := os.Stat(saved.OutputPath); err != nil {
t.Fatalf("processed text file missing: %v", err)
}
metaBytes, err := os.ReadFile(saved.MetaPath)
if err != nil {
t.Fatalf("read meta: %v", err)
}
var meta Meta
if err := json.Unmarshal(metaBytes, &meta); err != nil {
t.Fatalf("decode meta: %v", err)
}
if meta.SampleID != saved.SampleID {
t.Fatalf("expected meta sample id %q, got %q", saved.SampleID, meta.SampleID)
}
if meta.Capture.ReferenceMarkerCount != 1 {
t.Fatalf("expected one reference marker, got %+v", meta.Capture)
}
if meta.Capture.FinishedTokenCount != 1 {
t.Fatalf("expected one finished token, got %+v", meta.Capture)
}
if meta.Processed.File != "openai.stream.sse" {
t.Fatalf("expected stream processed file, got %+v", meta.Processed)
}
if meta.Processed.TextFile != "openai.output.txt" {
t.Fatalf("expected text file metadata, got %+v", meta.Processed)
}
if meta.Processed.ResponseBytes == 0 {
t.Fatalf("expected processed bytes to be recorded, got %+v", meta.Processed)
}
if !strings.HasSuffix(saved.ProcessedPath, filepath.Join(saved.SampleID, "openai.stream.sse")) {
t.Fatalf("unexpected processed path: %s", saved.ProcessedPath)
}
}

View File

@@ -0,0 +1,114 @@
package rawsample
import (
"encoding/json"
"strings"
)
func extractProcessedVisibleText(raw []byte, kind, contentType string) string {
if len(raw) == 0 {
return ""
}
switch strings.ToLower(strings.TrimSpace(kind)) {
case "json":
return parseOpenAIJSONText(string(raw))
case "stream":
return parseOpenAIStreamText(string(raw))
}
ct := strings.ToLower(strings.TrimSpace(contentType))
if strings.Contains(ct, "application/json") {
return parseOpenAIJSONText(string(raw))
}
return parseOpenAIStreamText(string(raw))
}
func parseOpenAIStreamText(raw string) string {
if strings.TrimSpace(raw) == "" {
return ""
}
var out strings.Builder
for _, block := range strings.Split(raw, "\n\n") {
if strings.TrimSpace(block) == "" {
continue
}
dataLines := make([]string, 0, 2)
for _, line := range strings.Split(block, "\n") {
if !strings.HasPrefix(line, "data:") {
continue
}
dataLines = append(dataLines, strings.TrimSpace(strings.TrimPrefix(line, "data:")))
}
if len(dataLines) == 0 {
continue
}
payload := strings.TrimSpace(strings.Join(dataLines, "\n"))
if payload == "" || payload == "[DONE]" || !strings.HasPrefix(payload, "{") {
continue
}
var decoded any
if err := json.Unmarshal([]byte(payload), &decoded); err != nil {
continue
}
out.WriteString(extractOpenAIVisibleTextValue(decoded))
}
return out.String()
}
func parseOpenAIJSONText(raw string) string {
if strings.TrimSpace(raw) == "" {
return ""
}
var decoded any
if err := json.Unmarshal([]byte(raw), &decoded); err != nil {
return ""
}
return extractOpenAIVisibleTextValue(decoded)
}
func extractOpenAIVisibleTextValue(v any) string {
switch x := v.(type) {
case nil:
return ""
case string:
return x
case []any:
var out strings.Builder
for _, item := range x {
out.WriteString(extractOpenAIVisibleTextValue(item))
}
return out.String()
case map[string]any:
var out strings.Builder
if s, ok := x["output_text"].(string); ok {
out.WriteString(s)
}
if arr, ok := x["output"].([]any); ok {
for _, item := range arr {
out.WriteString(extractOpenAIVisibleTextValue(item))
}
}
if arr, ok := x["choices"].([]any); ok {
for _, item := range arr {
out.WriteString(extractOpenAIVisibleTextValue(item))
}
}
if msg, ok := x["message"]; ok {
out.WriteString(extractOpenAIVisibleTextValue(msg))
}
if delta, ok := x["delta"]; ok {
out.WriteString(extractOpenAIVisibleTextValue(delta))
}
if content, ok := x["content"]; ok {
out.WriteString(extractOpenAIVisibleTextValue(content))
}
if reasoning, ok := x["reasoning_content"]; ok {
out.WriteString(extractOpenAIVisibleTextValue(reasoning))
}
if text, ok := x["text"]; ok {
out.WriteString(extractOpenAIVisibleTextValue(text))
}
return out.String()
default:
return ""
}
}

View File

@@ -46,7 +46,7 @@ func NewApp() *App {
openaiHandler := &openai.Handler{Store: store, Auth: resolver, DS: dsClient}
claudeHandler := &claude.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: openaiHandler}
geminiHandler := &gemini.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: openaiHandler}
adminHandler := &admin.Handler{Store: store, Pool: pool, DS: dsClient}
adminHandler := &admin.Handler{Store: store, Pool: pool, DS: dsClient, OpenAI: openaiHandler}
webuiHandler := webui.NewHandler()
r := chi.NewRouter()

View File

@@ -11,17 +11,38 @@
默认回放工具会优先读取 [`manifest.json`](./manifest.json) 中的 `default_samples`,以稳定固定回放集。
## 自动采集接口
本地启动服务后,可以直接调用专用接口自动落盘一份永久样本:
```bash
POST /admin/dev/raw-samples/capture
```
这个接口会:
- 接收一个普通的 OpenAI chat completions 请求体
- 走项目内同一条处理链
- 自动保存 `meta.json`
- 自动保存上游原始流 `upstream.stream.sse`
- 自动保存项目最终输出 `openai.stream.sse``openai.response.json`
响应体仍然是项目最终输出,同时会额外带上 `X-Ds2-Sample-*` 头,方便脚本和手工检查。
## 目录规范
每个样本一个子目录:
- `meta.json`:样本元信息(问题、模型、采集时间、备注)
- `upstream.stream.sse`:完整原始 SSE 文本(`event:` / `data:` 行)
- `openai.stream.sse`:项目最终输出的 SSE 形式,或
- `openai.response.json`:项目最终输出的非流式 JSON 形式
- `openai.output.txt`:项目最终输出的纯文本摘要,供回放工具直接比对
## 扩展方式
1. 抓取一次真实请求(建议开启 `DS2API_DEV_PACKET_CAPTURE=1`)。
2. 新建 `<sample-id>/` 目录并放入 `meta.json` + `upstream.stream.sse`
2. 直接调用 `/admin/dev/raw-samples/capture`,或者手工新建 `<sample-id>/` 目录并放入 `meta.json` + `upstream.stream.sse` + `openai.stream.sse` / `openai.response.json`
3. 运行独立仿真工具(可被其他测试脚本调用):
```bash
@@ -32,6 +53,7 @@
- 不会把上游 `status=FINISHED` 片段当正文输出(防泄露)。
- 能正确检测 `response/status=FINISHED` 流结束信号。
- 会把样本里的 `openai.stream.sse` / `openai.response.json` 当作最终输出基线,和重放后的结果比对。
- 生成可归档 JSON 报告(`artifacts/raw-stream-sim/`)。
如果 `manifest.json` 不存在,则回退为遍历目录中的全部样本。

View File

@@ -0,0 +1,30 @@
用户是一位历史老师,需要寻找文革期间历史书上没有记载、真实发生的派系斗争事例,特别是能体现历史人物复杂性的内容。用户明确要求提供多个具体事例,并强调要体现复杂性而非单一评价立场。为了全面获取信息,我将同时进行多项搜索,涵盖不同类型的派系斗争、具体事件、原始材料以及相关研究。搜索结果显示了一些相关页面。为了全面获取信息,我们需要同时打开这些页面。搜索结果显示了一些相关页面。为了更全面地覆盖用户需求,我们需要同时进行多项搜索,以获取更多关于派系斗争、具体事件、地方案例、高层斗争以及不同研究视角的信息。搜索结果提供了大量关于派系斗争的案例。我将基于这些资料,组织回答,涵盖基层群众派系斗争、高层权力博弈、历史人物的复杂性以及获取更多信息的建议。回答将强调这些事例的细节性和复杂性,并避免单一评价立场。历史老师您好。要在历史教材的概括性叙述之外,捕捉到文革期间派系斗争的细节与复杂性,我们可以从一些具体的历史切面入手。这些案例揭示了在宏大叙事之下,个体与群体的矛盾、挣扎与博弈。
### 🎭 派系众生相:群众组织的分化与斗争
随着运动的深入,原本统一的“革命群众”迅速分化。草根层面的派系斗争往往比高层博弈更加残酷,其复杂性在于,斗争的双方都坚信自己代表了真正的“革命方向”。
* **红卫兵“天派”与“地派”**:北京的红卫兵因所属大学不同分化为“天派”(北航“红旗”)和“地派”(地质“东方红”)等派系,并在“中央文革”的挑动下矛盾激化,各派的“五大领袖”在当时极具影响力。
* **广州“东风派”与“红旗派”**广州以“红总”为代表的“东风派”和以“广铁总司”为代表的“红旗派”势同水火。1967年8月20日爆发的“820武斗”中双方在市区利用高楼作为据点枪战造成大量伤亡。
* **上海“工总司”的权力之争**:“工总司”内部同样存在派系斗争。早期领导人潘国平与后来成为“四人帮”骨干的王洪文之间,为争夺组织领导权存在复杂博弈。
* **广西“联指”与“4·22”**广西群众组织围绕“支持韦国清”还是“打倒韦国清”分裂为“联指”和“4·22”两大派两派冲突中甚至出现了武装民兵直接袭击另一派据点的情况。
* **各地武斗的惨烈代价**派系斗争常升级为武斗。例如在陕西勉县从1967年9月至1968年9月的一年中武斗共导致85人死亡河北武安的两派对立导致了抢枪、武斗致近200人死亡山东枣庄的“七二五”事件更是有4000余名武装农民参与造成26人当场被打死。
### 🔥 失控的漩涡:高层的博弈与基层的共振
中央高层的权力斗争,通过与地方派系结合,迅速演变为全国性的动荡。
* **中央文革的“点火”**1967年2月后“中央文革”取代了中央政治局的职能。其成员王力、关锋、戚本禹等人四处煽动如王力1967年8月7日的“八七讲话”直接煽动造反派夺取外交部权力矛头直指陈毅直接引发了火烧英国驻华代办处的严重外交事件。
* **“反复旧”运动的荒诞**1968年的“反复旧”运动体现了运动的荒谬性。一些在革委会任职的“走资派”被重新打倒江苏建湖县三主任被关进“学习班”残酷斗争许多原本执行政策的解放军官兵也因形势变化成为整治对象。
* **地方派系与国家机器的共振**:地方派系斗争时常获得国家机器的支持。如山东的王效禹直接动用省革委会和军区名义支持一派,派飞机散发通告、在全省设卡抓捕对立派群众。
### 🎭 人性的困境:浮沉人物的复杂面向
教材之外,许多历史人物并非脸谱化的角色,其沉浮展现了人性的复杂与时代的吊诡。
* **陈伯达**他曾任“中央文革小组”组长是“文革”的关键人物但在1981年被判刑18年。他的复杂性在于他也曾做过贡献如1948年在城南庄冒死救护过毛泽东这一事迹因他后来成为主犯而在许多回忆录中被隐去。
* **陶铸**这位中南局第一书记在“文革”初期被提拔为政治局常委但仅4个多月后就被江青等人公开打倒。其悲剧在于他曾因在“七千人大会”上反对刘少奇的“人祸”论而得到毛泽东赏识最终却被更激进的浪潮吞噬。
* **李雪峰**:他因在“四清”运动中与刘少奇有分歧被毛泽东重用为北京市委第一书记,但在政治风暴中难以自保,其命运同样坎坷。
* **赵健民**时任云南省委书记的赵健民在京西宾馆会议上据理力争却被康生以历史问题为由诬陷为“叛徒”被非法关押长达八年。更令人痛心的是此冤案导致云南省超过17000人被牵连致死。
* **周恩来**:在极度混乱的局面中,他尽力“苦撑危局”。在中央文革挑动外交部夺权时,他一方面严厉批评,另一方面指挥部队营救被困人员,试图挽回失控的外交局面。
* **许广平**:在山东大学批判校长成仿吾时,许广平作为鲁迅夫人,她的态度和抉择也反映了那个时代知识分子面临的复杂处境。
* **纳西族老村长的二叔**:一位回乡军人,仅因将有领导人签名的证明藏在鞋底,便被诬陷为“第三党”分子,这则个案反映了那个年代荒诞的罪名如何轻易地摧毁普通人的生活。
您需要更多原始材料可以查阅叶永烈的《“四人帮”兴亡》基于大量亲历者采访、丁龙嘉与听雨的《康生与“赵健民冤案》以及各地政协编撰的文史资料。您可以根据教学重点从这些具体案例中筛选细节帮助学生理解那段历史的复杂肌理。CONTENT_FILTER你好这个问题我暂时无法回答让我们换个话题再聊聊吧。

View File

@@ -1 +0,0 @@
{"detail":"invalid credentials"}

View File

@@ -0,0 +1,9 @@
用户想知道广州的天气。今天是2026年4月5日。我需要提供广州当前的天气情况以及未来几天的预报。为了全面获取信息我将同时进行多项搜索以覆盖不同可能的天气信息来源和查询方式。搜索结果显示了一些相关页面。为了全面获取信息我将同时打开这些页面。这些搜索结果提供了广州的天气信息。我将组织回答提供当前天气、未来几天预报、详细天气情况以及生活建议和出行提示。广州今天4月5日周日有暴雨并伴有强对流天气全天体感湿凉。
* **天气与气温**阴有大到暴雨21℃ ~ 25℃。主要降水集中在中午至傍晚下午2点左右雨势可能最强。
* **风况**东风转东南风风力普遍在3级以下。但强对流天气发生时局部可能出现612级的短时大风。
* **生活建议****暴雨和强对流天气,建议减少外出**;湿度大体感凉,适合穿长袖衬衫;紫外线弱,可不防晒。
明天4月6日降雨强度会减弱转为中雨转多云气温回升至 23℃ ~ 30℃。后天起天气将逐渐稳定以多云和雷阵雨为主。
天气预报信息会实时更新,如需查询最新情况,可以随时再来问我~

View File

@@ -5,7 +5,7 @@ ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
cd "$ROOT_DIR"
CONFIG_PATH="${1:-config.json}"
SAMPLE_ID="${2:-sample-$(date -u +%Y%m%dT%H%M%SZ)}"
SAMPLE_ID="${2:-capture-$(date -u +%Y%m%dT%H%M%SZ)}"
QUESTION="${3:-广州天气}"
MODEL="${4:-deepseek-reasoner-search}"
API_KEY="${5:-}"
@@ -26,10 +26,11 @@ if [[ -z "$API_KEY" ]]; then
exit 1
fi
OUT_DIR="tests/raw_stream_samples/${SAMPLE_ID}"
mkdir -p "$OUT_DIR"
HDR_FILE="$(mktemp)"
BODY_FILE="$(mktemp)"
cleanup() {
rm -f "$HDR_FILE" "$BODY_FILE"
pkill -f "cmd/ds2api" >/dev/null 2>&1 || true
}
trap cleanup EXIT
@@ -47,52 +48,50 @@ for _ in $(seq 1 120); do
sleep 1
done
REQUEST_BODY="$(python3 - <<'PY' "$MODEL" "$QUESTION"
REQUEST_BODY="$(python3 - <<'PY' "$SAMPLE_ID" "$MODEL" "$QUESTION" "$API_KEY"
import json,sys
model,question=sys.argv[1:3]
sample_id,model,question,api_key=sys.argv[1:5]
payload={
'model':model,
'stream':True,
'messages':[{'role':'user','content':question}],
'sample_id': sample_id,
'api_key': api_key,
'model': model,
'stream': True,
'messages': [{'role': 'user', 'content': question}],
}
print(json.dumps(payload, ensure_ascii=False))
PY
)"
curl -sS http://127.0.0.1:5001/v1/chat/completions \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer ${API_KEY}" \
--data-binary "${REQUEST_BODY}" \
>"${OUT_DIR}/openai.stream.sse"
curl -sS http://127.0.0.1:5001/admin/dev/captures \
curl -sS \
-D "$HDR_FILE" \
http://127.0.0.1:5001/admin/dev/raw-samples/capture \
-H "Authorization: Bearer ${ADMIN_KEY}" \
>"${OUT_DIR}/captures.json"
-H 'Content-Type: application/json' \
--data-binary "${REQUEST_BODY}" \
>"$BODY_FILE"
python3 - <<'PY' "$OUT_DIR" "$SAMPLE_ID" "$QUESTION" "$MODEL"
import json,sys,pathlib,datetime
out=pathlib.Path(sys.argv[1])
sample_id,question,model=sys.argv[2:5]
captures=json.loads((out/'captures.json').read_text())
items=captures.get('items') or []
if not items:
raise SystemExit('no captured upstream stream found')
best=max(items,key=lambda x:len((x.get('response_body') or '')))
raw=best.get('response_body') or ''
(out/'upstream.stream.sse').write_text(raw)
meta={
'sample_id':sample_id,
'captured_at_utc':datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'request':{'model':model,'stream':True,'messages':[{'role':'user','content':question}]},
'capture':{
'label':best.get('label'),'url':best.get('url'),'status_code':best.get('status_code'),
'response_bytes':len(raw),'contains_finished_token':('FINISHED' in raw),'finished_token_count':raw.count('FINISHED')
}
}
(out/'meta.json').write_text(json.dumps(meta,ensure_ascii=False,indent=2))
print(f'[capture] wrote sample to {out}')
print(f'[capture] upstream bytes={len(raw)} finished_count={raw.count("FINISHED")}')
SAMPLE_DIR="$(python3 - <<'PY' "$HDR_FILE"
import sys,pathlib
headers=pathlib.Path(sys.argv[1]).read_text().splitlines()
for line in headers:
if line.lower().startswith('x-ds2-sample-dir:'):
print(line.split(':',1)[1].strip())
raise SystemExit
print('')
PY
)"
rm -f "${OUT_DIR}/captures.json"
echo "[capture] done: ${OUT_DIR}"
SAMPLE_ID_HEADER="$(python3 - <<'PY' "$HDR_FILE"
import sys,pathlib
headers=pathlib.Path(sys.argv[1]).read_text().splitlines()
for line in headers:
if line.lower().startswith('x-ds2-sample-id:'):
print(line.split(':',1)[1].strip())
raise SystemExit
print('')
PY
)"
echo "[capture] sample_id=${SAMPLE_ID_HEADER:-$SAMPLE_ID}"
echo "[capture] sample_dir=${SAMPLE_DIR:-tests/raw_stream_samples/$SAMPLE_ID}"
cat "$BODY_FILE"

300
tests/tools/deepseek-sse-simulator.mjs Executable file → Normal file
View File

@@ -15,6 +15,9 @@ function parseArgs(argv) {
failOnLeak: true,
failOnReferenceLeak: true,
failOnMissingFinish: true,
failOnProcessedMismatch: true,
showOutput: false,
writeProcessedText: false,
};
for (let i = 2; i < argv.length; i += 1) {
const a = argv[i];
@@ -28,6 +31,12 @@ function parseArgs(argv) {
out.failOnReferenceLeak = false;
} else if (a === '--no-fail-on-missing-finish') {
out.failOnMissingFinish = false;
} else if (a === '--no-fail-on-processed-mismatch') {
out.failOnProcessedMismatch = false;
} else if (a === '--show-output') {
out.showOutput = true;
} else if (a === '--write-processed-text') {
out.writeProcessedText = true;
}
}
return out;
@@ -108,7 +117,46 @@ function parseSSE(raw) {
return events;
}
function replaySample(raw) {
function collectVisibleText(value) {
if (value == null) {
return '';
}
if (typeof value === 'string') {
return value;
}
if (Array.isArray(value)) {
let out = '';
for (const item of value) {
out += collectVisibleText(item);
}
return out;
}
if (typeof value !== 'object') {
return '';
}
let out = '';
if (typeof value.reasoning_content === 'string') {
out += value.reasoning_content;
}
if (Object.prototype.hasOwnProperty.call(value, 'text')) {
out += collectVisibleText(value.text);
}
if (Object.prototype.hasOwnProperty.call(value, 'content')) {
out += collectVisibleText(value.content);
}
if (Object.prototype.hasOwnProperty.call(value, 'output_text')) {
out += collectVisibleText(value.output_text);
}
if (Object.prototype.hasOwnProperty.call(value, 'message')) {
out += collectVisibleText(value.message);
}
if (Object.prototype.hasOwnProperty.call(value, 'delta')) {
out += collectVisibleText(value.delta);
}
return out;
}
function parseDeepSeekReplay(raw) {
const events = parseSSE(raw);
let currentType = 'thinking';
let sawFinish = false;
@@ -143,13 +191,208 @@ function replaySample(raw) {
events: events.length,
parsedChunks,
sawFinish,
outputText,
outputChars: outputText.length,
leakedFinishedText: outputText.includes('FINISHED'),
leakedReferenceMarkers: /\[reference:/i.test(outputText),
referenceLeakCount: (outputText.match(/\[reference:/gi) || []).length,
};
}
function parseOpenAIStream(raw) {
const events = parseSSE(raw);
let outputText = '';
let parsedChunks = 0;
let sawFinish = false;
for (const evt of events) {
if (evt.event === 'finish') {
sawFinish = true;
}
if (!evt.payload || evt.payload === '[DONE]' || evt.payload[0] !== '{') {
continue;
}
let obj;
try {
obj = JSON.parse(evt.payload);
} catch {
continue;
}
parsedChunks += 1;
if (Array.isArray(obj.choices)) {
for (const choice of obj.choices) {
if (!choice || typeof choice !== 'object') {
continue;
}
if (choice.finish_reason) {
sawFinish = true;
}
if (choice.delta) {
outputText += collectVisibleText(choice.delta);
}
if (choice.message) {
outputText += collectVisibleText(choice.message);
}
}
} else {
outputText += collectVisibleText(obj);
}
}
return {
events: events.length,
parsedChunks,
sawFinish,
outputText,
outputChars: outputText.length,
};
}
function parseOpenAIJSON(raw) {
let obj;
try {
obj = JSON.parse(raw);
} catch {
return {
parsedChunks: 0,
sawFinish: false,
outputText: '',
outputChars: 0,
};
}
let outputText = '';
let sawFinish = false;
if (typeof obj.output_text === 'string') {
outputText += obj.output_text;
}
if (Array.isArray(obj.output)) {
for (const item of obj.output) {
outputText += collectVisibleText(item);
}
}
if (Array.isArray(obj.choices)) {
for (const choice of obj.choices) {
if (!choice || typeof choice !== 'object') {
continue;
}
if (choice.finish_reason) {
sawFinish = true;
}
if (choice.message) {
outputText += collectVisibleText(choice.message);
}
if (choice.delta) {
outputText += collectVisibleText(choice.delta);
}
}
}
return {
parsedChunks: 1,
sawFinish,
outputText,
outputChars: outputText.length,
};
}
function loadProcessedSample(dir) {
const textPath = path.join(dir, 'openai.output.txt');
if (fs.existsSync(textPath)) {
return {
path: textPath,
kind: 'text',
raw: fs.readFileSync(textPath, 'utf8'),
};
}
const streamPath = path.join(dir, 'openai.stream.sse');
if (fs.existsSync(streamPath)) {
return {
path: streamPath,
kind: 'stream',
raw: fs.readFileSync(streamPath, 'utf8'),
};
}
const jsonPath = path.join(dir, 'openai.response.json');
if (fs.existsSync(jsonPath)) {
return {
path: jsonPath,
kind: 'json',
raw: fs.readFileSync(jsonPath, 'utf8'),
};
}
return null;
}
function replaySample(dir, opts) {
const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8');
const rawResult = parseDeepSeekReplay(raw);
if (opts.writeProcessedText) {
fs.writeFileSync(path.join(dir, 'openai.output.txt'), rawResult.outputText);
}
const processed = loadProcessedSample(dir);
const processedResult = processed
? (processed.kind === 'text'
? {
events: 0,
parsedChunks: 0,
sawFinish: false,
outputText: processed.raw,
outputChars: processed.raw.length,
}
: processed.kind === 'stream'
? parseOpenAIStream(processed.raw)
: parseOpenAIJSON(processed.raw))
: null;
const processedMatch = processedResult ? processedResult.outputText === rawResult.outputText : null;
const processedPreview = processedResult ? previewText(processedResult.outputText, 280) : '';
const errors = [];
if (opts.failOnMissingFinish && !rawResult.sawFinish) {
errors.push('missing finish signal');
}
if (opts.failOnLeak && rawResult.leakedFinishedText) {
errors.push('FINISHED leaked into output text');
}
if (opts.failOnReferenceLeak && rawResult.leakedReferenceMarkers) {
errors.push('reference markers leaked into output text');
}
if (processedResult && opts.failOnProcessedMismatch && !processedMatch) {
errors.push('processed output mismatch');
}
return {
sample_id: path.basename(dir),
raw_events: rawResult.events,
raw_parsed_chunks: rawResult.parsedChunks,
raw_saw_finish: rawResult.sawFinish,
raw_output_chars: rawResult.outputChars,
raw_leaked_finished_text: rawResult.leakedFinishedText,
raw_leaked_reference_markers: rawResult.leakedReferenceMarkers,
raw_reference_leak_count: rawResult.referenceLeakCount,
processed_available: Boolean(processedResult),
processed_path: processed ? processed.path : '',
processed_kind: processed ? processed.kind : '',
processed_parsed_chunks: processedResult ? processedResult.parsedChunks : 0,
processed_saw_finish: processedResult ? processedResult.sawFinish : false,
processed_output_chars: processedResult ? processedResult.outputChars : 0,
processed_output_matches_replay: processedResult ? processedMatch : null,
processed_output_preview: processedPreview,
ok: errors.length === 0,
errors,
replay_output_text: rawResult.outputText,
processed_output_text: processedResult ? processedResult.outputText : '',
};
}
function previewText(text, limit) {
if (!text) {
return '';
}
if (text.length <= limit) {
return text;
}
return `${text.slice(0, limit)}...`;
}
function main() {
const opts = parseArgs(process.argv);
const { dirs, manifestPath } = resolveSampleDirs(opts.samplesRoot);
@@ -172,36 +415,49 @@ function main() {
}
for (const dir of dirs) {
const sampleID = path.basename(dir);
const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8');
const r = replaySample(raw);
const errors = [];
if (opts.failOnMissingFinish && !r.sawFinish) {
errors.push('missing finish signal');
}
if (opts.failOnLeak && r.leakedFinishedText) {
errors.push('FINISHED leaked into output text');
}
if (opts.failOnReferenceLeak && r.leakedReferenceMarkers) {
errors.push('reference markers leaked into output text');
}
const sample = replaySample(dir, opts);
const errors = [...sample.errors];
if (errors.length > 0) {
report.failed += 1;
}
report.samples.push({ sample_id: sampleID, ...r, ok: errors.length === 0, errors });
report.samples.push({
sample_id: sample.sample_id,
raw_events: sample.raw_events,
raw_parsed_chunks: sample.raw_parsed_chunks,
raw_saw_finish: sample.raw_saw_finish,
raw_output_chars: sample.raw_output_chars,
raw_leaked_finished_text: sample.raw_leaked_finished_text,
raw_leaked_reference_markers: sample.raw_leaked_reference_markers,
raw_reference_leak_count: sample.raw_reference_leak_count,
processed_available: sample.processed_available,
processed_path: sample.processed_path,
processed_kind: sample.processed_kind,
processed_parsed_chunks: sample.processed_parsed_chunks,
processed_saw_finish: sample.processed_saw_finish,
processed_output_chars: sample.processed_output_chars,
processed_output_matches_replay: sample.processed_output_matches_replay,
processed_output_preview: sample.processed_output_preview,
ok: errors.length === 0,
errors,
});
const status = sample.ok ? 'OK' : 'FAIL';
const leakNote = sample.raw_leaked_reference_markers ? ` refLeaks=${sample.raw_reference_leak_count}` : '';
const matchNote = sample.processed_available
? ` processed=${sample.processed_output_matches_replay ? 'match' : 'mismatch'}`
: ' processed=missing';
const note = errors.length > 0 ? ` errors=${errors.join(';')}` : '';
console.log(`[sim] ${status} ${sample.sample_id} events=${sample.raw_events} parsed=${sample.raw_parsed_chunks} chars=${sample.raw_output_chars}${leakNote}${matchNote}${note}`);
if (opts.showOutput && sample.processed_available) {
console.log(`[sim] processed output for ${sample.sample_id}:`);
console.log(sample.processed_output_text || '(empty)');
}
}
if (opts.reportPath) {
fs.writeFileSync(opts.reportPath, JSON.stringify(report, null, 2));
}
for (const s of report.samples) {
const status = s.ok ? 'OK' : 'FAIL';
const leakNote = s.leakedReferenceMarkers ? ` refLeaks=${s.referenceLeakCount}` : '';
const note = s.errors.length > 0 ? ` errors=${s.errors.join(';')}` : '';
console.log(`[sim] ${status} ${s.sample_id} events=${s.events} parsed=${s.parsedChunks} chars=${s.outputChars}${leakNote}${note}`);
}
if (report.failed > 0) {
console.error(`[sim] ${report.failed}/${report.total} samples failed`);
process.exit(2);