diff --git a/docs/DeepSeekSSE流格式字段分析-2026-04-03.md b/docs/DeepSeekSSE流格式字段分析-2026-04-03.md index e7f31e0..97b4432 100644 --- a/docs/DeepSeekSSE流格式字段分析-2026-04-03.md +++ b/docs/DeepSeekSSE流格式字段分析-2026-04-03.md @@ -77,6 +77,14 @@ data: ./tests/scripts/run-raw-stream-sim.sh ``` +如果需要新增永久样本,可以直接走本地专用接口: + +```bash +POST /admin/dev/raw-samples/capture +``` + +它会把原始上游流和项目最终输出一起落到 `tests/raw_stream_samples//`,并生成 `openai.output.txt` 作为最终输出的对照基线,便于后续继续分析字段和做回放比对。 + ## 6. `CONTENT_FILTER` 终态样本 在 `content-filter-trigger-20260405-jwt3` 样本中,末尾会出现一组明确的风控终态字段: diff --git a/docs/TESTING.md b/docs/TESTING.md index af1bd25..ab41027 100644 --- a/docs/TESTING.md +++ b/docs/TESTING.md @@ -235,8 +235,19 @@ go run ./cmd/ds2api-tests --no-preflight 说明: - 该工具默认重放 `tests/raw_stream_samples/manifest.json` 声明的 canonical 样本,按上游 SSE 顺序做 1:1 仿真解析。 - 默认校验不出现 `FINISHED` 文本泄露,并要求存在结束信号。 +- 如果样本目录里存在 `openai.output.txt`,会直接拿它和重放结果做对照;否则会回退到 `openai.stream.sse` / `openai.response.json`。 - 结果会写入 `artifacts/raw-stream-sim/*.json`,可供其他测试脚本或排障流程复用。 +### 采集永久样本 + +本地启动服务后,可以直接打: + +```bash +POST /admin/dev/raw-samples/capture +``` + +这个接口会把请求、上游原始流和最终输出一起写入 `tests/raw_stream_samples//`,以后可以直接拿来做回放和字段分析。 + ### 指定输出目录和超时 ```bash diff --git a/internal/admin/deps.go b/internal/admin/deps.go index d61a67d..8686038 100644 --- a/internal/admin/deps.go +++ b/internal/admin/deps.go @@ -42,6 +42,10 @@ type PoolController interface { ApplyRuntimeLimits(maxInflightPerAccount, maxQueueSize, globalMaxInflight int) } +type OpenAIChatCaller interface { + ChatCompletions(w http.ResponseWriter, r *http.Request) +} + type DeepSeekCaller interface { Login(ctx context.Context, acc config.Account) (string, error) CreateSession(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error) diff --git a/internal/admin/handler.go b/internal/admin/handler.go index 125abd0..9c67492 100644 --- a/internal/admin/handler.go +++ b/internal/admin/handler.go @@ -5,9 +5,10 @@ import ( ) type Handler struct { - Store ConfigStore - Pool PoolController - DS DeepSeekCaller + Store ConfigStore + Pool PoolController + DS DeepSeekCaller + OpenAI OpenAIChatCaller } func RegisterRoutes(r chi.Router, h *Handler) { @@ -34,6 +35,7 @@ func RegisterRoutes(r chi.Router, h *Handler) { pr.Post("/accounts/sessions/delete-all", h.deleteAllSessions) pr.Post("/import", h.batchImport) pr.Post("/test", h.testAPI) + pr.Post("/dev/raw-samples/capture", h.captureRawSample) pr.Post("/vercel/sync", h.syncVercel) pr.Get("/vercel/status", h.vercelStatus) pr.Post("/vercel/status", h.vercelStatus) diff --git a/internal/admin/handler_raw_samples.go b/internal/admin/handler_raw_samples.go new file mode 100644 index 0000000..c165424 --- /dev/null +++ b/internal/admin/handler_raw_samples.go @@ -0,0 +1,216 @@ +package admin + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + + "ds2api/internal/config" + "ds2api/internal/devcapture" + "ds2api/internal/rawsample" +) + +func (h *Handler) captureRawSample(w http.ResponseWriter, r *http.Request) { + if h.OpenAI == nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "OpenAI handler is not configured"}) + return + } + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid json"}) + return + } + + payload, sampleID, apiKey, err := prepareRawSampleCaptureRequest(h.Store, req) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"detail": err.Error()}) + return + } + + body, err := json.Marshal(payload) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "failed to encode capture request"}) + return + } + + traceID := rawsample.NormalizeSampleID(sampleID) + if traceID == "" { + traceID = rawsample.DefaultSampleID("capture") + } + + before := devcapture.Global().Snapshot() + rec := httptest.NewRecorder() + captureReq := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__trace_id="+url.QueryEscape(traceID), bytes.NewReader(body)) + captureReq.Header.Set("Authorization", "Bearer "+apiKey) + captureReq.Header.Set("Content-Type", "application/json") + h.OpenAI.ChatCompletions(rec, captureReq) + after := devcapture.Global().Snapshot() + + if rec.Code >= http.StatusBadRequest { + copyHeader(w.Header(), rec.Header()) + w.WriteHeader(rec.Code) + _, _ = io.Copy(w, bytes.NewReader(rec.Body.Bytes())) + return + } + + captureEntry, ok := selectNewestCaptureEntry(before, after) + if !ok { + writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": "no upstream capture was recorded"}) + return + } + + processedContentType := strings.TrimSpace(rec.Header().Get("Content-Type")) + saved, err := rawsample.Persist(rawsample.PersistOptions{ + RootDir: config.RawStreamSampleRoot(), + SampleID: sampleID, + Source: "admin/dev/raw-samples/capture", + Request: payload, + Capture: captureSummaryFromEntry(captureEntry), + UpstreamBody: []byte(captureEntry.ResponseBody), + ProcessedBody: rec.Body.Bytes(), + ProcessedKind: processedKindFromContentType(processedContentType), + ProcessedStatusCode: rec.Code, + ProcessedContentType: processedContentType, + }) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"detail": err.Error()}) + return + } + + copyHeader(w.Header(), rec.Header()) + w.Header().Set("X-Ds2-Sample-Id", saved.SampleID) + w.Header().Set("X-Ds2-Sample-Dir", saved.Dir) + w.Header().Set("X-Ds2-Sample-Meta", saved.MetaPath) + w.Header().Set("X-Ds2-Sample-Upstream", saved.UpstreamPath) + w.Header().Set("X-Ds2-Sample-Processed", saved.ProcessedPath) + w.Header().Set("X-Ds2-Sample-Output", saved.OutputPath) + w.WriteHeader(rec.Code) + _, _ = io.Copy(w, bytes.NewReader(rec.Body.Bytes())) +} + +func prepareRawSampleCaptureRequest(store ConfigStore, req map[string]any) (map[string]any, string, string, error) { + payload := cloneMap(req) + sampleID := strings.TrimSpace(fieldString(payload, "sample_id")) + apiKey := strings.TrimSpace(fieldString(payload, "api_key")) + + for _, k := range []string{"sample_id", "api_key", "promote_default", "persist", "source"} { + delete(payload, k) + } + + if apiKey == "" { + if store == nil { + return nil, "", "", fmt.Errorf("no api key provided") + } + keys := store.Keys() + if len(keys) == 0 { + return nil, "", "", fmt.Errorf("no api key available") + } + apiKey = strings.TrimSpace(keys[0]) + } + + if model := strings.TrimSpace(fieldString(payload, "model")); model == "" { + payload["model"] = "deepseek-chat" + } + if _, ok := payload["stream"]; !ok { + payload["stream"] = true + } + + if messagesRaw, ok := payload["messages"].([]any); !ok || len(messagesRaw) == 0 { + message := strings.TrimSpace(fieldString(payload, "message")) + if message == "" { + message = "你好" + } + payload["messages"] = []map[string]any{{"role": "user", "content": message}} + } + delete(payload, "message") + + if sampleID == "" { + model := strings.TrimSpace(fieldString(payload, "model")) + if model == "" { + model = "capture" + } + sampleID = rawsample.DefaultSampleID(model) + } + + return payload, sampleID, apiKey, nil +} + +func selectNewestCaptureEntry(before, after []devcapture.Entry) (devcapture.Entry, bool) { + beforeIDs := make(map[string]struct{}, len(before)) + for _, entry := range before { + beforeIDs[entry.ID] = struct{}{} + } + candidates := make([]devcapture.Entry, 0, len(after)) + for _, entry := range after { + if _, ok := beforeIDs[entry.ID]; ok { + continue + } + if strings.TrimSpace(entry.ResponseBody) == "" { + continue + } + candidates = append(candidates, entry) + } + if len(candidates) == 0 { + candidates = append(candidates, after...) + } + if len(candidates) == 0 { + return devcapture.Entry{}, false + } + best := candidates[0] + bestScore := len(best.ResponseBody) + for _, entry := range candidates[1:] { + score := len(entry.ResponseBody) + if entry.CreatedAt > best.CreatedAt || (entry.CreatedAt == best.CreatedAt && score > bestScore) { + best = entry + bestScore = score + } + } + return best, true +} + +func captureSummaryFromEntry(entry devcapture.Entry) rawsample.CaptureSummary { + return rawsample.CaptureSummary{ + Label: strings.TrimSpace(entry.Label), + URL: strings.TrimSpace(entry.URL), + StatusCode: entry.StatusCode, + ResponseBytes: len(entry.ResponseBody), + } +} + +func processedKindFromContentType(contentType string) string { + ct := strings.ToLower(strings.TrimSpace(contentType)) + switch { + case strings.Contains(ct, "text/event-stream"): + return "stream" + case strings.Contains(ct, "application/json"): + return "json" + default: + return "stream" + } +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + dst.Del(k) + for _, v := range vv { + dst.Add(k, v) + } + } +} + +func cloneMap(in map[string]any) map[string]any { + if len(in) == 0 { + return map[string]any{} + } + out := make(map[string]any, len(in)) + for k, v := range in { + out[k] = v + } + return out +} diff --git a/internal/admin/handler_raw_samples_test.go b/internal/admin/handler_raw_samples_test.go new file mode 100644 index 0000000..ebbc8d0 --- /dev/null +++ b/internal/admin/handler_raw_samples_test.go @@ -0,0 +1,95 @@ +package admin + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "ds2api/internal/devcapture" +) + +type stubOpenAIChatCaller struct{} + +func (stubOpenAIChatCaller) ChatCompletions(w http.ResponseWriter, _ *http.Request) { + store := devcapture.Global() + session := store.Start("deepseek_completion", "https://chat.deepseek.com/api/v0/chat/completion", "acct-test", map[string]any{"model": "deepseek-chat"}) + raw := io.NopCloser(strings.NewReader( + "data: {\"v\":\"hello [reference:1]\"}\n\n" + + "data: {\"v\":\"FINISHED\",\"p\":\"response/status\"}\n\n", + )) + if session != nil { + raw = session.WrapBody(raw, http.StatusOK) + } + _, _ = io.ReadAll(raw) + _ = raw.Close() + + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n") +} + +func TestCaptureRawSampleWritesPersistentSample(t *testing.T) { + t.Setenv("DS2API_RAW_STREAM_SAMPLE_ROOT", t.TempDir()) + devcapture.Global().Clear() + defer devcapture.Global().Clear() + + h := &Handler{OpenAI: stubOpenAIChatCaller{}} + reqBody := `{ + "sample_id":"My Sample 01", + "api_key":"local-key", + "model":"deepseek-chat", + "message":"广州天气", + "stream":true + }` + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/admin/dev/raw-samples/capture", strings.NewReader(reqBody)) + h.captureRawSample(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if got := rec.Header().Get("X-Ds2-Sample-Id"); got != "my-sample-01" { + t.Fatalf("expected sample id header my-sample-01, got %q", got) + } + if got := rec.Header().Get("X-Ds2-Sample-Output"); got != filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01", "openai.output.txt") { + t.Fatalf("unexpected sample output header: %q", got) + } + if !strings.Contains(rec.Body.String(), `"content":"hello"`) { + t.Fatalf("expected proxied openai output, got %s", rec.Body.String()) + } + + sampleDir := filepath.Join(os.Getenv("DS2API_RAW_STREAM_SAMPLE_ROOT"), "my-sample-01") + if _, err := os.Stat(sampleDir); err != nil { + t.Fatalf("sample dir missing: %v", err) + } + metaBytes, err := os.ReadFile(filepath.Join(sampleDir, "meta.json")) + if err != nil { + t.Fatalf("read meta: %v", err) + } + var meta map[string]any + if err := json.Unmarshal(metaBytes, &meta); err != nil { + t.Fatalf("decode meta: %v", err) + } + if meta["sample_id"] != "my-sample-01" { + t.Fatalf("unexpected meta sample_id: %#v", meta["sample_id"]) + } + capture, _ := meta["capture"].(map[string]any) + if capture == nil { + t.Fatalf("missing capture meta: %#v", meta) + } + if got := int(capture["response_bytes"].(float64)); got == 0 { + t.Fatalf("expected capture bytes to be recorded, got %#v", capture) + } + processed, _ := meta["processed"].(map[string]any) + if processed == nil { + t.Fatalf("missing processed meta: %#v", meta) + } + if processed["file"] != "openai.stream.sse" { + t.Fatalf("unexpected processed file: %#v", processed["file"]) + } +} diff --git a/internal/config/paths.go b/internal/config/paths.go index 23dfe54..18723a3 100644 --- a/internal/config/paths.go +++ b/internal/config/paths.go @@ -37,6 +37,10 @@ func WASMPath() string { return ResolvePath("DS2API_WASM_PATH", "sha3_wasm_bg.7b9ca65ddd.wasm") } +func RawStreamSampleRoot() string { + return ResolvePath("DS2API_RAW_STREAM_SAMPLE_ROOT", "tests/raw_stream_samples") +} + func StaticAdminDir() string { return ResolvePath("DS2API_STATIC_ADMIN_DIR", "static/admin") } diff --git a/internal/rawsample/rawsample.go b/internal/rawsample/rawsample.go new file mode 100644 index 0000000..7b3c414 --- /dev/null +++ b/internal/rawsample/rawsample.go @@ -0,0 +1,284 @@ +package rawsample + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/google/uuid" +) + +var referenceMarkerRe = regexp.MustCompile(`(?i)\[reference:\s*\d+\]`) + +type CaptureSummary struct { + Label string `json:"label,omitempty"` + URL string `json:"url,omitempty"` + StatusCode int `json:"status_code"` + ResponseBytes int `json:"response_bytes"` + ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"` + ReferenceMarkerCount int `json:"reference_marker_count,omitempty"` + ContainsFinishedToken bool `json:"contains_finished_token,omitempty"` + FinishedTokenCount int `json:"finished_token_count,omitempty"` +} + +type ProcessedSummary struct { + Kind string `json:"kind"` + File string `json:"file"` + TextFile string `json:"text_file,omitempty"` + StatusCode int `json:"status_code"` + ContentType string `json:"content_type,omitempty"` + ResponseBytes int `json:"response_bytes"` + ContainsReferenceMarkers bool `json:"contains_reference_markers,omitempty"` + ReferenceMarkerCount int `json:"reference_marker_count,omitempty"` + ContainsFinishedToken bool `json:"contains_finished_token,omitempty"` + FinishedTokenCount int `json:"finished_token_count,omitempty"` +} + +type Meta struct { + SampleID string `json:"sample_id"` + CapturedAtUTC string `json:"captured_at_utc"` + Source string `json:"source,omitempty"` + Request any `json:"request"` + Capture CaptureSummary `json:"capture"` + Processed ProcessedSummary `json:"processed"` +} + +type PersistOptions struct { + RootDir string + SampleID string + Source string + Request any + Capture CaptureSummary + UpstreamBody []byte + ProcessedBody []byte + ProcessedKind string + ProcessedFile string + ProcessedStatusCode int + ProcessedContentType string + ProcessedText string +} + +type SavedSample struct { + SampleID string + Dir string + MetaPath string + UpstreamPath string + ProcessedPath string + OutputPath string + Meta Meta +} + +func Persist(opts PersistOptions) (SavedSample, error) { + root := strings.TrimSpace(opts.RootDir) + if root == "" { + return SavedSample{}, errors.New("root dir is required") + } + if len(opts.UpstreamBody) == 0 { + return SavedSample{}, errors.New("upstream body is required") + } + if len(opts.ProcessedBody) == 0 { + return SavedSample{}, errors.New("processed body is required") + } + + if err := os.MkdirAll(root, 0o755); err != nil { + return SavedSample{}, fmt.Errorf("create root dir: %w", err) + } + + baseID := NormalizeSampleID(opts.SampleID) + if baseID == "" { + baseID = DefaultSampleID("capture") + } + sampleID, err := uniqueSampleID(root, baseID) + if err != nil { + return SavedSample{}, err + } + + tempID := ".tmp-" + sampleID + "-" + strings.ToLower(strings.ReplaceAll(uuid.NewString(), "-", "")) + tempDir := filepath.Join(root, tempID) + finalDir := filepath.Join(root, sampleID) + if err := os.MkdirAll(tempDir, 0o755); err != nil { + return SavedSample{}, fmt.Errorf("create temp dir: %w", err) + } + cleanup := func() { + _ = os.RemoveAll(tempDir) + } + + upstreamPath := filepath.Join(tempDir, "upstream.stream.sse") + if err := os.WriteFile(upstreamPath, opts.UpstreamBody, 0o644); err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("write upstream stream: %w", err) + } + + processedKind := strings.TrimSpace(opts.ProcessedKind) + if processedKind == "" { + processedKind = inferProcessedKind(opts.ProcessedContentType) + } + processedFile := strings.TrimSpace(opts.ProcessedFile) + if processedFile == "" { + processedFile = defaultProcessedFile(processedKind, opts.ProcessedContentType) + } + if processedFile == "" { + cleanup() + return SavedSample{}, errors.New("processed file name is required") + } + processedPath := filepath.Join(tempDir, processedFile) + if err := os.WriteFile(processedPath, opts.ProcessedBody, 0o644); err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("write processed output: %w", err) + } + + processedText := opts.ProcessedText + if processedText == "" { + processedText = extractProcessedVisibleText(opts.ProcessedBody, processedKind, opts.ProcessedContentType) + } + textPath := filepath.Join(tempDir, "openai.output.txt") + if err := os.WriteFile(textPath, []byte(processedText), 0o644); err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("write processed text: %w", err) + } + + now := time.Now().UTC() + capture := opts.Capture + capture.ResponseBytes = len(opts.UpstreamBody) + capture.ContainsReferenceMarkers, capture.ReferenceMarkerCount, capture.ContainsFinishedToken, capture.FinishedTokenCount = analyzeBytes(opts.UpstreamBody) + + processed := ProcessedSummary{ + Kind: processedKind, + File: processedFile, + TextFile: "openai.output.txt", + StatusCode: opts.ProcessedStatusCode, + ContentType: strings.TrimSpace(opts.ProcessedContentType), + ResponseBytes: len(opts.ProcessedBody), + } + processed.ContainsReferenceMarkers, processed.ReferenceMarkerCount, processed.ContainsFinishedToken, processed.FinishedTokenCount = analyzeBytes(opts.ProcessedBody) + + meta := Meta{ + SampleID: sampleID, + CapturedAtUTC: now.Format(time.RFC3339), + Source: strings.TrimSpace(opts.Source), + Request: opts.Request, + Capture: capture, + Processed: processed, + } + metaBytes, err := json.MarshalIndent(meta, "", " ") + if err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("marshal meta: %w", err) + } + metaPath := filepath.Join(tempDir, "meta.json") + if err := os.WriteFile(metaPath, append(metaBytes, '\n'), 0o644); err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("write meta: %w", err) + } + + if err := os.Rename(tempDir, finalDir); err != nil { + cleanup() + return SavedSample{}, fmt.Errorf("promote sample dir: %w", err) + } + + return SavedSample{ + SampleID: sampleID, + Dir: finalDir, + MetaPath: filepath.Join(finalDir, "meta.json"), + UpstreamPath: filepath.Join(finalDir, "upstream.stream.sse"), + ProcessedPath: filepath.Join(finalDir, processedFile), + OutputPath: filepath.Join(finalDir, "openai.output.txt"), + Meta: meta, + }, nil +} + +func NormalizeSampleID(raw string) string { + raw = strings.TrimSpace(strings.ToLower(raw)) + if raw == "" { + return "" + } + var b strings.Builder + prevDash := false + for _, r := range raw { + switch { + case r >= 'a' && r <= 'z', r >= '0' && r <= '9', r == '-', r == '_', r == '.': + b.WriteRune(r) + prevDash = false + default: + if !prevDash { + b.WriteRune('-') + prevDash = true + } + } + } + out := strings.Trim(b.String(), "-_.") + if out == "" { + return "" + } + return out +} + +func DefaultSampleID(prefix string) string { + prefix = NormalizeSampleID(prefix) + if prefix == "" { + prefix = "capture" + } + return fmt.Sprintf("%s-%s", prefix, time.Now().UTC().Format("20060102T150405Z")) +} + +func inferProcessedKind(contentType string) string { + ct := strings.ToLower(strings.TrimSpace(contentType)) + switch { + case strings.Contains(ct, "text/event-stream"): + return "stream" + case strings.Contains(ct, "application/json"): + return "json" + default: + return "stream" + } +} + +func defaultProcessedFile(kind, contentType string) string { + switch strings.ToLower(strings.TrimSpace(kind)) { + case "json": + return "openai.response.json" + case "stream": + return "openai.stream.sse" + } + switch inferProcessedKind(contentType) { + case "json": + return "openai.response.json" + default: + return "openai.stream.sse" + } +} + +func uniqueSampleID(root, base string) (string, error) { + if base == "" { + base = DefaultSampleID("capture") + } + candidate := base + for i := 2; ; i++ { + finalDir := filepath.Join(root, candidate) + if _, err := os.Stat(finalDir); err != nil { + if os.IsNotExist(err) { + return candidate, nil + } + return "", fmt.Errorf("stat sample dir: %w", err) + } + candidate = fmt.Sprintf("%s-%d", base, i) + } +} + +func analyzeBytes(raw []byte) (containsReferenceMarkers bool, referenceMarkerCount int, containsFinishedToken bool, finishedTokenCount int) { + if len(raw) == 0 { + return false, 0, false, 0 + } + text := string(raw) + referenceMarkerCount = len(referenceMarkerRe.FindAllStringIndex(text, -1)) + containsReferenceMarkers = referenceMarkerCount > 0 + upper := strings.ToUpper(text) + finishedTokenCount = strings.Count(upper, "FINISHED") + containsFinishedToken = finishedTokenCount > 0 + return +} diff --git a/internal/rawsample/rawsample_test.go b/internal/rawsample/rawsample_test.go new file mode 100644 index 0000000..12067b5 --- /dev/null +++ b/internal/rawsample/rawsample_test.go @@ -0,0 +1,94 @@ +package rawsample + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestNormalizeSampleID(t *testing.T) { + got := NormalizeSampleID(" Hello, World! ") + if got != "hello-world" { + t.Fatalf("expected hello-world, got %q", got) + } +} + +func TestPersistWritesSampleFilesAndMeta(t *testing.T) { + root := t.TempDir() + saved, err := Persist(PersistOptions{ + RootDir: root, + SampleID: "My Sample! 01", + Source: "unit-test", + Request: map[string]any{ + "model": "deepseek-chat", + "stream": true, + "messages": []any{ + map[string]any{"role": "user", "content": "广州天气"}, + }, + }, + Capture: CaptureSummary{ + Label: "deepseek_completion", + URL: "https://chat.deepseek.com/api/v0/chat/completion", + StatusCode: 200, + }, + UpstreamBody: []byte("data: {\"v\":\"hello [reference:1]\"}\n\n" + + "data: {\"v\":\"FINISHED\",\"p\":\"response/status\"}\n\n"), + ProcessedBody: []byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"},\"index\":0}],\"created\":1,\"id\":\"id\",\"model\":\"m\",\"object\":\"chat.completion.chunk\"}\n\n"), + ProcessedStatusCode: 200, + ProcessedContentType: "text/event-stream", + }) + if err != nil { + t.Fatalf("Persist failed: %v", err) + } + + if saved.SampleID != "my-sample-01" { + t.Fatalf("expected normalized sample id, got %q", saved.SampleID) + } + if _, err := os.Stat(saved.Dir); err != nil { + t.Fatalf("sample dir missing: %v", err) + } + if _, err := os.Stat(saved.UpstreamPath); err != nil { + t.Fatalf("upstream file missing: %v", err) + } + if _, err := os.Stat(saved.ProcessedPath); err != nil { + t.Fatalf("processed file missing: %v", err) + } + if saved.OutputPath != filepath.Join(saved.Dir, "openai.output.txt") { + t.Fatalf("unexpected processed text path: %s", saved.OutputPath) + } + if _, err := os.Stat(saved.OutputPath); err != nil { + t.Fatalf("processed text file missing: %v", err) + } + + metaBytes, err := os.ReadFile(saved.MetaPath) + if err != nil { + t.Fatalf("read meta: %v", err) + } + var meta Meta + if err := json.Unmarshal(metaBytes, &meta); err != nil { + t.Fatalf("decode meta: %v", err) + } + if meta.SampleID != saved.SampleID { + t.Fatalf("expected meta sample id %q, got %q", saved.SampleID, meta.SampleID) + } + if meta.Capture.ReferenceMarkerCount != 1 { + t.Fatalf("expected one reference marker, got %+v", meta.Capture) + } + if meta.Capture.FinishedTokenCount != 1 { + t.Fatalf("expected one finished token, got %+v", meta.Capture) + } + if meta.Processed.File != "openai.stream.sse" { + t.Fatalf("expected stream processed file, got %+v", meta.Processed) + } + if meta.Processed.TextFile != "openai.output.txt" { + t.Fatalf("expected text file metadata, got %+v", meta.Processed) + } + if meta.Processed.ResponseBytes == 0 { + t.Fatalf("expected processed bytes to be recorded, got %+v", meta.Processed) + } + if !strings.HasSuffix(saved.ProcessedPath, filepath.Join(saved.SampleID, "openai.stream.sse")) { + t.Fatalf("unexpected processed path: %s", saved.ProcessedPath) + } +} diff --git a/internal/rawsample/visible_text.go b/internal/rawsample/visible_text.go new file mode 100644 index 0000000..4746590 --- /dev/null +++ b/internal/rawsample/visible_text.go @@ -0,0 +1,114 @@ +package rawsample + +import ( + "encoding/json" + "strings" +) + +func extractProcessedVisibleText(raw []byte, kind, contentType string) string { + if len(raw) == 0 { + return "" + } + switch strings.ToLower(strings.TrimSpace(kind)) { + case "json": + return parseOpenAIJSONText(string(raw)) + case "stream": + return parseOpenAIStreamText(string(raw)) + } + ct := strings.ToLower(strings.TrimSpace(contentType)) + if strings.Contains(ct, "application/json") { + return parseOpenAIJSONText(string(raw)) + } + return parseOpenAIStreamText(string(raw)) +} + +func parseOpenAIStreamText(raw string) string { + if strings.TrimSpace(raw) == "" { + return "" + } + var out strings.Builder + for _, block := range strings.Split(raw, "\n\n") { + if strings.TrimSpace(block) == "" { + continue + } + dataLines := make([]string, 0, 2) + for _, line := range strings.Split(block, "\n") { + if !strings.HasPrefix(line, "data:") { + continue + } + dataLines = append(dataLines, strings.TrimSpace(strings.TrimPrefix(line, "data:"))) + } + if len(dataLines) == 0 { + continue + } + payload := strings.TrimSpace(strings.Join(dataLines, "\n")) + if payload == "" || payload == "[DONE]" || !strings.HasPrefix(payload, "{") { + continue + } + var decoded any + if err := json.Unmarshal([]byte(payload), &decoded); err != nil { + continue + } + out.WriteString(extractOpenAIVisibleTextValue(decoded)) + } + return out.String() +} + +func parseOpenAIJSONText(raw string) string { + if strings.TrimSpace(raw) == "" { + return "" + } + var decoded any + if err := json.Unmarshal([]byte(raw), &decoded); err != nil { + return "" + } + return extractOpenAIVisibleTextValue(decoded) +} + +func extractOpenAIVisibleTextValue(v any) string { + switch x := v.(type) { + case nil: + return "" + case string: + return x + case []any: + var out strings.Builder + for _, item := range x { + out.WriteString(extractOpenAIVisibleTextValue(item)) + } + return out.String() + case map[string]any: + var out strings.Builder + if s, ok := x["output_text"].(string); ok { + out.WriteString(s) + } + if arr, ok := x["output"].([]any); ok { + for _, item := range arr { + out.WriteString(extractOpenAIVisibleTextValue(item)) + } + } + if arr, ok := x["choices"].([]any); ok { + for _, item := range arr { + out.WriteString(extractOpenAIVisibleTextValue(item)) + } + } + if msg, ok := x["message"]; ok { + out.WriteString(extractOpenAIVisibleTextValue(msg)) + } + if delta, ok := x["delta"]; ok { + out.WriteString(extractOpenAIVisibleTextValue(delta)) + } + if content, ok := x["content"]; ok { + out.WriteString(extractOpenAIVisibleTextValue(content)) + } + if reasoning, ok := x["reasoning_content"]; ok { + out.WriteString(extractOpenAIVisibleTextValue(reasoning)) + } + if text, ok := x["text"]; ok { + out.WriteString(extractOpenAIVisibleTextValue(text)) + } + return out.String() + default: + return "" + } +} diff --git a/internal/server/router.go b/internal/server/router.go index a6c71f3..7557afb 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -46,7 +46,7 @@ func NewApp() *App { openaiHandler := &openai.Handler{Store: store, Auth: resolver, DS: dsClient} claudeHandler := &claude.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: openaiHandler} geminiHandler := &gemini.Handler{Store: store, Auth: resolver, DS: dsClient, OpenAI: openaiHandler} - adminHandler := &admin.Handler{Store: store, Pool: pool, DS: dsClient} + adminHandler := &admin.Handler{Store: store, Pool: pool, DS: dsClient, OpenAI: openaiHandler} webuiHandler := webui.NewHandler() r := chi.NewRouter() diff --git a/tests/raw_stream_samples/README.md b/tests/raw_stream_samples/README.md index e3911d7..7c45bf6 100644 --- a/tests/raw_stream_samples/README.md +++ b/tests/raw_stream_samples/README.md @@ -11,17 +11,38 @@ 默认回放工具会优先读取 [`manifest.json`](./manifest.json) 中的 `default_samples`,以稳定固定回放集。 +## 自动采集接口 + +本地启动服务后,可以直接调用专用接口自动落盘一份永久样本: + +```bash +POST /admin/dev/raw-samples/capture +``` + +这个接口会: + +- 接收一个普通的 OpenAI chat completions 请求体 +- 走项目内同一条处理链 +- 自动保存 `meta.json` +- 自动保存上游原始流 `upstream.stream.sse` +- 自动保存项目最终输出 `openai.stream.sse` 或 `openai.response.json` + +响应体仍然是项目最终输出,同时会额外带上 `X-Ds2-Sample-*` 头,方便脚本和手工检查。 + ## 目录规范 每个样本一个子目录: - `meta.json`:样本元信息(问题、模型、采集时间、备注) - `upstream.stream.sse`:完整原始 SSE 文本(`event:` / `data:` 行) +- `openai.stream.sse`:项目最终输出的 SSE 形式,或 +- `openai.response.json`:项目最终输出的非流式 JSON 形式 +- `openai.output.txt`:项目最终输出的纯文本摘要,供回放工具直接比对 ## 扩展方式 1. 抓取一次真实请求(建议开启 `DS2API_DEV_PACKET_CAPTURE=1`)。 -2. 新建 `/` 目录并放入 `meta.json` + `upstream.stream.sse`。 +2. 直接调用 `/admin/dev/raw-samples/capture`,或者手工新建 `/` 目录并放入 `meta.json` + `upstream.stream.sse` + `openai.stream.sse` / `openai.response.json`。 3. 运行独立仿真工具(可被其他测试脚本调用): ```bash @@ -32,6 +53,7 @@ - 不会把上游 `status=FINISHED` 片段当正文输出(防泄露)。 - 能正确检测 `response/status=FINISHED` 流结束信号。 +- 会把样本里的 `openai.stream.sse` / `openai.response.json` 当作最终输出基线,和重放后的结果比对。 - 生成可归档 JSON 报告(`artifacts/raw-stream-sim/`)。 如果 `manifest.json` 不存在,则回退为遍历目录中的全部样本。 diff --git a/tests/raw_stream_samples/content-filter-trigger-20260405-jwt3/openai.output.txt b/tests/raw_stream_samples/content-filter-trigger-20260405-jwt3/openai.output.txt new file mode 100644 index 0000000..59ddaeb --- /dev/null +++ b/tests/raw_stream_samples/content-filter-trigger-20260405-jwt3/openai.output.txt @@ -0,0 +1,30 @@ +用户是一位历史老师,需要寻找文革期间历史书上没有记载、真实发生的派系斗争事例,特别是能体现历史人物复杂性的内容。用户明确要求提供多个具体事例,并强调要体现复杂性而非单一评价立场。为了全面获取信息,我将同时进行多项搜索,涵盖不同类型的派系斗争、具体事件、原始材料以及相关研究。搜索结果显示了一些相关页面。为了全面获取信息,我们需要同时打开这些页面。搜索结果显示了一些相关页面。为了更全面地覆盖用户需求,我们需要同时进行多项搜索,以获取更多关于派系斗争、具体事件、地方案例、高层斗争以及不同研究视角的信息。搜索结果提供了大量关于派系斗争的案例。我将基于这些资料,组织回答,涵盖基层群众派系斗争、高层权力博弈、历史人物的复杂性以及获取更多信息的建议。回答将强调这些事例的细节性和复杂性,并避免单一评价立场。历史老师您好。要在历史教材的概括性叙述之外,捕捉到文革期间派系斗争的细节与复杂性,我们可以从一些具体的历史切面入手。这些案例揭示了在宏大叙事之下,个体与群体的矛盾、挣扎与博弈。 + +### 🎭 派系众生相:群众组织的分化与斗争 +随着运动的深入,原本统一的“革命群众”迅速分化。草根层面的派系斗争往往比高层博弈更加残酷,其复杂性在于,斗争的双方都坚信自己代表了真正的“革命方向”。 + +* **红卫兵“天派”与“地派”**:北京的红卫兵因所属大学不同分化为“天派”(北航“红旗”)和“地派”(地质“东方红”)等派系,并在“中央文革”的挑动下矛盾激化,各派的“五大领袖”在当时极具影响力。 +* **广州“东风派”与“红旗派”**:广州以“红总”为代表的“东风派”和以“广铁总司”为代表的“红旗派”势同水火。1967年8月20日爆发的“820武斗”中,双方在市区利用高楼作为据点枪战,造成大量伤亡。 +* **上海“工总司”的权力之争**:“工总司”内部同样存在派系斗争。早期领导人潘国平与后来成为“四人帮”骨干的王洪文之间,为争夺组织领导权存在复杂博弈。 +* **广西“联指”与“4·22”**:广西群众组织围绕“支持韦国清”还是“打倒韦国清”分裂为“联指”和“4·22”两大派,两派冲突中甚至出现了武装民兵直接袭击另一派据点的情况。 +* **各地武斗的惨烈代价**:派系斗争常升级为武斗。例如在陕西勉县,从1967年9月至1968年9月的一年中,武斗共导致85人死亡;河北武安的两派对立导致了抢枪、武斗,致近200人死亡;山东枣庄的“七二五”事件更是有4000余名武装农民参与,造成26人当场被打死。 + +### 🔥 失控的漩涡:高层的博弈与基层的共振 +中央高层的权力斗争,通过与地方派系结合,迅速演变为全国性的动荡。 + +* **中央文革的“点火”**:1967年2月后,“中央文革”取代了中央政治局的职能。其成员王力、关锋、戚本禹等人四处煽动,如王力1967年8月7日的“八七讲话”直接煽动造反派夺取外交部权力,矛头直指陈毅,直接引发了火烧英国驻华代办处的严重外交事件。 +* **“反复旧”运动的荒诞**:1968年的“反复旧”运动体现了运动的荒谬性。一些在革委会任职的“走资派”被重新打倒,江苏建湖县三主任被关进“学习班”残酷斗争,许多原本执行政策的解放军官兵,也因形势变化成为整治对象。 +* **地方派系与国家机器的共振**:地方派系斗争时常获得国家机器的支持。如山东的王效禹直接动用省革委会和军区名义支持一派,派飞机散发通告、在全省设卡抓捕对立派群众。 + +### 🎭 人性的困境:浮沉人物的复杂面向 +教材之外,许多历史人物并非脸谱化的角色,其沉浮展现了人性的复杂与时代的吊诡。 + +* **陈伯达**:他曾任“中央文革小组”组长,是“文革”的关键人物,但在1981年被判刑18年。他的复杂性在于,他也曾做过贡献,如1948年在城南庄冒死救护过毛泽东,这一事迹因他后来成为主犯而在许多回忆录中被隐去。 +* **陶铸**:这位中南局第一书记在“文革”初期被提拔为政治局常委,但仅4个多月后就被江青等人公开打倒。其悲剧在于,他曾因在“七千人大会”上反对刘少奇的“人祸”论而得到毛泽东赏识,最终却被更激进的浪潮吞噬。 +* **李雪峰**:他因在“四清”运动中与刘少奇有分歧被毛泽东重用为北京市委第一书记,但在政治风暴中难以自保,其命运同样坎坷。 +* **赵健民**:时任云南省委书记的赵健民,在京西宾馆会议上据理力争,却被康生以历史问题为由诬陷为“叛徒”,被非法关押长达八年。更令人痛心的是,此冤案导致云南省超过17000人被牵连致死。 +* **周恩来**:在极度混乱的局面中,他尽力“苦撑危局”。在中央文革挑动外交部夺权时,他一方面严厉批评,另一方面指挥部队营救被困人员,试图挽回失控的外交局面。 +* **许广平**:在山东大学批判校长成仿吾时,许广平作为鲁迅夫人,她的态度和抉择也反映了那个时代知识分子面临的复杂处境。 +* **纳西族老村长的二叔**:一位回乡军人,仅因将有领导人签名的证明藏在鞋底,便被诬陷为“第三党”分子,这则个案反映了那个年代荒诞的罪名如何轻易地摧毁普通人的生活。 + +您需要更多原始材料,可以查阅叶永烈的《“四人帮”兴亡》(基于大量亲历者采访)、丁龙嘉与听雨的《康生与“赵健民冤案》,以及各地政协编撰的文史资料。您可以根据教学重点,从这些具体案例中筛选细节,帮助学生理解那段历史的复杂肌理。CONTENT_FILTER你好,这个问题我暂时无法回答,让我们换个话题再聊聊吧。 \ No newline at end of file diff --git a/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/captures.json b/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/captures.json deleted file mode 100644 index 3de3c97..0000000 --- a/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/captures.json +++ /dev/null @@ -1 +0,0 @@ -{"detail":"invalid credentials"} diff --git a/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/openai.output.txt b/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/openai.output.txt new file mode 100644 index 0000000..2d88a82 --- /dev/null +++ b/tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260404/openai.output.txt @@ -0,0 +1,9 @@ +用户想知道广州的天气。今天是2026年4月5日。我需要提供广州当前的天气情况以及未来几天的预报。为了全面获取信息,我将同时进行多项搜索,以覆盖不同可能的天气信息来源和查询方式。搜索结果显示了一些相关页面。为了全面获取信息,我将同时打开这些页面。这些搜索结果提供了广州的天气信息。我将组织回答,提供当前天气、未来几天预报、详细天气情况以及生活建议和出行提示。广州今天(4月5日,周日)有暴雨,并伴有强对流天气,全天体感湿凉。 + +* **天气与气温**:阴有大到暴雨,21℃ ~ 25℃。主要降水集中在中午至傍晚,下午2点左右雨势可能最强。 +* **风况**:东风转东南风,风力普遍在3级以下。但强对流天气发生时,局部可能出现6~12级的短时大风。 +* **生活建议**:**暴雨和强对流天气,建议减少外出**;湿度大体感凉,适合穿长袖衬衫;紫外线弱,可不防晒。 + +明天(4月6日)降雨强度会减弱,转为中雨转多云,气温回升至 23℃ ~ 30℃。后天起天气将逐渐稳定,以多云和雷阵雨为主。 + +天气预报信息会实时更新,如需查询最新情况,可以随时再来问我~ \ No newline at end of file diff --git a/tests/scripts/capture-raw-stream-sample.sh b/tests/scripts/capture-raw-stream-sample.sh index 153d4b4..6d1cce0 100755 --- a/tests/scripts/capture-raw-stream-sample.sh +++ b/tests/scripts/capture-raw-stream-sample.sh @@ -5,7 +5,7 @@ ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)" cd "$ROOT_DIR" CONFIG_PATH="${1:-config.json}" -SAMPLE_ID="${2:-sample-$(date -u +%Y%m%dT%H%M%SZ)}" +SAMPLE_ID="${2:-capture-$(date -u +%Y%m%dT%H%M%SZ)}" QUESTION="${3:-广州天气}" MODEL="${4:-deepseek-reasoner-search}" API_KEY="${5:-}" @@ -26,10 +26,11 @@ if [[ -z "$API_KEY" ]]; then exit 1 fi -OUT_DIR="tests/raw_stream_samples/${SAMPLE_ID}" -mkdir -p "$OUT_DIR" +HDR_FILE="$(mktemp)" +BODY_FILE="$(mktemp)" cleanup() { + rm -f "$HDR_FILE" "$BODY_FILE" pkill -f "cmd/ds2api" >/dev/null 2>&1 || true } trap cleanup EXIT @@ -47,52 +48,50 @@ for _ in $(seq 1 120); do sleep 1 done -REQUEST_BODY="$(python3 - <<'PY' "$MODEL" "$QUESTION" +REQUEST_BODY="$(python3 - <<'PY' "$SAMPLE_ID" "$MODEL" "$QUESTION" "$API_KEY" import json,sys -model,question=sys.argv[1:3] +sample_id,model,question,api_key=sys.argv[1:5] payload={ - 'model':model, - 'stream':True, - 'messages':[{'role':'user','content':question}], + 'sample_id': sample_id, + 'api_key': api_key, + 'model': model, + 'stream': True, + 'messages': [{'role': 'user', 'content': question}], } print(json.dumps(payload, ensure_ascii=False)) PY )" -curl -sS http://127.0.0.1:5001/v1/chat/completions \ - -H 'Content-Type: application/json' \ - -H "Authorization: Bearer ${API_KEY}" \ - --data-binary "${REQUEST_BODY}" \ - >"${OUT_DIR}/openai.stream.sse" - -curl -sS http://127.0.0.1:5001/admin/dev/captures \ +curl -sS \ + -D "$HDR_FILE" \ + http://127.0.0.1:5001/admin/dev/raw-samples/capture \ -H "Authorization: Bearer ${ADMIN_KEY}" \ - >"${OUT_DIR}/captures.json" + -H 'Content-Type: application/json' \ + --data-binary "${REQUEST_BODY}" \ + >"$BODY_FILE" -python3 - <<'PY' "$OUT_DIR" "$SAMPLE_ID" "$QUESTION" "$MODEL" -import json,sys,pathlib,datetime -out=pathlib.Path(sys.argv[1]) -sample_id,question,model=sys.argv[2:5] -captures=json.loads((out/'captures.json').read_text()) -items=captures.get('items') or [] -if not items: - raise SystemExit('no captured upstream stream found') -best=max(items,key=lambda x:len((x.get('response_body') or ''))) -raw=best.get('response_body') or '' -(out/'upstream.stream.sse').write_text(raw) -meta={ - 'sample_id':sample_id, - 'captured_at_utc':datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), - 'request':{'model':model,'stream':True,'messages':[{'role':'user','content':question}]}, - 'capture':{ - 'label':best.get('label'),'url':best.get('url'),'status_code':best.get('status_code'), - 'response_bytes':len(raw),'contains_finished_token':('FINISHED' in raw),'finished_token_count':raw.count('FINISHED') - } -} -(out/'meta.json').write_text(json.dumps(meta,ensure_ascii=False,indent=2)) -print(f'[capture] wrote sample to {out}') -print(f'[capture] upstream bytes={len(raw)} finished_count={raw.count("FINISHED")}') +SAMPLE_DIR="$(python3 - <<'PY' "$HDR_FILE" +import sys,pathlib +headers=pathlib.Path(sys.argv[1]).read_text().splitlines() +for line in headers: + if line.lower().startswith('x-ds2-sample-dir:'): + print(line.split(':',1)[1].strip()) + raise SystemExit +print('') PY +)" -rm -f "${OUT_DIR}/captures.json" -echo "[capture] done: ${OUT_DIR}" +SAMPLE_ID_HEADER="$(python3 - <<'PY' "$HDR_FILE" +import sys,pathlib +headers=pathlib.Path(sys.argv[1]).read_text().splitlines() +for line in headers: + if line.lower().startswith('x-ds2-sample-id:'): + print(line.split(':',1)[1].strip()) + raise SystemExit +print('') +PY +)" + +echo "[capture] sample_id=${SAMPLE_ID_HEADER:-$SAMPLE_ID}" +echo "[capture] sample_dir=${SAMPLE_DIR:-tests/raw_stream_samples/$SAMPLE_ID}" +cat "$BODY_FILE" diff --git a/tests/tools/deepseek-sse-simulator.mjs b/tests/tools/deepseek-sse-simulator.mjs old mode 100755 new mode 100644 index 3829bef..da65a14 --- a/tests/tools/deepseek-sse-simulator.mjs +++ b/tests/tools/deepseek-sse-simulator.mjs @@ -15,6 +15,9 @@ function parseArgs(argv) { failOnLeak: true, failOnReferenceLeak: true, failOnMissingFinish: true, + failOnProcessedMismatch: true, + showOutput: false, + writeProcessedText: false, }; for (let i = 2; i < argv.length; i += 1) { const a = argv[i]; @@ -28,6 +31,12 @@ function parseArgs(argv) { out.failOnReferenceLeak = false; } else if (a === '--no-fail-on-missing-finish') { out.failOnMissingFinish = false; + } else if (a === '--no-fail-on-processed-mismatch') { + out.failOnProcessedMismatch = false; + } else if (a === '--show-output') { + out.showOutput = true; + } else if (a === '--write-processed-text') { + out.writeProcessedText = true; } } return out; @@ -108,7 +117,46 @@ function parseSSE(raw) { return events; } -function replaySample(raw) { +function collectVisibleText(value) { + if (value == null) { + return ''; + } + if (typeof value === 'string') { + return value; + } + if (Array.isArray(value)) { + let out = ''; + for (const item of value) { + out += collectVisibleText(item); + } + return out; + } + if (typeof value !== 'object') { + return ''; + } + let out = ''; + if (typeof value.reasoning_content === 'string') { + out += value.reasoning_content; + } + if (Object.prototype.hasOwnProperty.call(value, 'text')) { + out += collectVisibleText(value.text); + } + if (Object.prototype.hasOwnProperty.call(value, 'content')) { + out += collectVisibleText(value.content); + } + if (Object.prototype.hasOwnProperty.call(value, 'output_text')) { + out += collectVisibleText(value.output_text); + } + if (Object.prototype.hasOwnProperty.call(value, 'message')) { + out += collectVisibleText(value.message); + } + if (Object.prototype.hasOwnProperty.call(value, 'delta')) { + out += collectVisibleText(value.delta); + } + return out; +} + +function parseDeepSeekReplay(raw) { const events = parseSSE(raw); let currentType = 'thinking'; let sawFinish = false; @@ -143,13 +191,208 @@ function replaySample(raw) { events: events.length, parsedChunks, sawFinish, + outputText, + outputChars: outputText.length, leakedFinishedText: outputText.includes('FINISHED'), leakedReferenceMarkers: /\[reference:/i.test(outputText), referenceLeakCount: (outputText.match(/\[reference:/gi) || []).length, + }; +} + +function parseOpenAIStream(raw) { + const events = parseSSE(raw); + let outputText = ''; + let parsedChunks = 0; + let sawFinish = false; + + for (const evt of events) { + if (evt.event === 'finish') { + sawFinish = true; + } + if (!evt.payload || evt.payload === '[DONE]' || evt.payload[0] !== '{') { + continue; + } + let obj; + try { + obj = JSON.parse(evt.payload); + } catch { + continue; + } + parsedChunks += 1; + if (Array.isArray(obj.choices)) { + for (const choice of obj.choices) { + if (!choice || typeof choice !== 'object') { + continue; + } + if (choice.finish_reason) { + sawFinish = true; + } + if (choice.delta) { + outputText += collectVisibleText(choice.delta); + } + if (choice.message) { + outputText += collectVisibleText(choice.message); + } + } + } else { + outputText += collectVisibleText(obj); + } + } + + return { + events: events.length, + parsedChunks, + sawFinish, + outputText, outputChars: outputText.length, }; } +function parseOpenAIJSON(raw) { + let obj; + try { + obj = JSON.parse(raw); + } catch { + return { + parsedChunks: 0, + sawFinish: false, + outputText: '', + outputChars: 0, + }; + } + let outputText = ''; + let sawFinish = false; + if (typeof obj.output_text === 'string') { + outputText += obj.output_text; + } + if (Array.isArray(obj.output)) { + for (const item of obj.output) { + outputText += collectVisibleText(item); + } + } + if (Array.isArray(obj.choices)) { + for (const choice of obj.choices) { + if (!choice || typeof choice !== 'object') { + continue; + } + if (choice.finish_reason) { + sawFinish = true; + } + if (choice.message) { + outputText += collectVisibleText(choice.message); + } + if (choice.delta) { + outputText += collectVisibleText(choice.delta); + } + } + } + return { + parsedChunks: 1, + sawFinish, + outputText, + outputChars: outputText.length, + }; +} + +function loadProcessedSample(dir) { + const textPath = path.join(dir, 'openai.output.txt'); + if (fs.existsSync(textPath)) { + return { + path: textPath, + kind: 'text', + raw: fs.readFileSync(textPath, 'utf8'), + }; + } + const streamPath = path.join(dir, 'openai.stream.sse'); + if (fs.existsSync(streamPath)) { + return { + path: streamPath, + kind: 'stream', + raw: fs.readFileSync(streamPath, 'utf8'), + }; + } + const jsonPath = path.join(dir, 'openai.response.json'); + if (fs.existsSync(jsonPath)) { + return { + path: jsonPath, + kind: 'json', + raw: fs.readFileSync(jsonPath, 'utf8'), + }; + } + return null; +} + +function replaySample(dir, opts) { + const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8'); + const rawResult = parseDeepSeekReplay(raw); + if (opts.writeProcessedText) { + fs.writeFileSync(path.join(dir, 'openai.output.txt'), rawResult.outputText); + } + const processed = loadProcessedSample(dir); + const processedResult = processed + ? (processed.kind === 'text' + ? { + events: 0, + parsedChunks: 0, + sawFinish: false, + outputText: processed.raw, + outputChars: processed.raw.length, + } + : processed.kind === 'stream' + ? parseOpenAIStream(processed.raw) + : parseOpenAIJSON(processed.raw)) + : null; + const processedMatch = processedResult ? processedResult.outputText === rawResult.outputText : null; + const processedPreview = processedResult ? previewText(processedResult.outputText, 280) : ''; + const errors = []; + + if (opts.failOnMissingFinish && !rawResult.sawFinish) { + errors.push('missing finish signal'); + } + if (opts.failOnLeak && rawResult.leakedFinishedText) { + errors.push('FINISHED leaked into output text'); + } + if (opts.failOnReferenceLeak && rawResult.leakedReferenceMarkers) { + errors.push('reference markers leaked into output text'); + } + if (processedResult && opts.failOnProcessedMismatch && !processedMatch) { + errors.push('processed output mismatch'); + } + + return { + sample_id: path.basename(dir), + raw_events: rawResult.events, + raw_parsed_chunks: rawResult.parsedChunks, + raw_saw_finish: rawResult.sawFinish, + raw_output_chars: rawResult.outputChars, + raw_leaked_finished_text: rawResult.leakedFinishedText, + raw_leaked_reference_markers: rawResult.leakedReferenceMarkers, + raw_reference_leak_count: rawResult.referenceLeakCount, + processed_available: Boolean(processedResult), + processed_path: processed ? processed.path : '', + processed_kind: processed ? processed.kind : '', + processed_parsed_chunks: processedResult ? processedResult.parsedChunks : 0, + processed_saw_finish: processedResult ? processedResult.sawFinish : false, + processed_output_chars: processedResult ? processedResult.outputChars : 0, + processed_output_matches_replay: processedResult ? processedMatch : null, + processed_output_preview: processedPreview, + ok: errors.length === 0, + errors, + replay_output_text: rawResult.outputText, + processed_output_text: processedResult ? processedResult.outputText : '', + }; +} + +function previewText(text, limit) { + if (!text) { + return ''; + } + if (text.length <= limit) { + return text; + } + return `${text.slice(0, limit)}...`; +} + function main() { const opts = parseArgs(process.argv); const { dirs, manifestPath } = resolveSampleDirs(opts.samplesRoot); @@ -172,36 +415,49 @@ function main() { } for (const dir of dirs) { - const sampleID = path.basename(dir); - const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8'); - const r = replaySample(raw); - const errors = []; - if (opts.failOnMissingFinish && !r.sawFinish) { - errors.push('missing finish signal'); - } - if (opts.failOnLeak && r.leakedFinishedText) { - errors.push('FINISHED leaked into output text'); - } - if (opts.failOnReferenceLeak && r.leakedReferenceMarkers) { - errors.push('reference markers leaked into output text'); - } + const sample = replaySample(dir, opts); + const errors = [...sample.errors]; if (errors.length > 0) { report.failed += 1; } - report.samples.push({ sample_id: sampleID, ...r, ok: errors.length === 0, errors }); + report.samples.push({ + sample_id: sample.sample_id, + raw_events: sample.raw_events, + raw_parsed_chunks: sample.raw_parsed_chunks, + raw_saw_finish: sample.raw_saw_finish, + raw_output_chars: sample.raw_output_chars, + raw_leaked_finished_text: sample.raw_leaked_finished_text, + raw_leaked_reference_markers: sample.raw_leaked_reference_markers, + raw_reference_leak_count: sample.raw_reference_leak_count, + processed_available: sample.processed_available, + processed_path: sample.processed_path, + processed_kind: sample.processed_kind, + processed_parsed_chunks: sample.processed_parsed_chunks, + processed_saw_finish: sample.processed_saw_finish, + processed_output_chars: sample.processed_output_chars, + processed_output_matches_replay: sample.processed_output_matches_replay, + processed_output_preview: sample.processed_output_preview, + ok: errors.length === 0, + errors, + }); + + const status = sample.ok ? 'OK' : 'FAIL'; + const leakNote = sample.raw_leaked_reference_markers ? ` refLeaks=${sample.raw_reference_leak_count}` : ''; + const matchNote = sample.processed_available + ? ` processed=${sample.processed_output_matches_replay ? 'match' : 'mismatch'}` + : ' processed=missing'; + const note = errors.length > 0 ? ` errors=${errors.join(';')}` : ''; + console.log(`[sim] ${status} ${sample.sample_id} events=${sample.raw_events} parsed=${sample.raw_parsed_chunks} chars=${sample.raw_output_chars}${leakNote}${matchNote}${note}`); + if (opts.showOutput && sample.processed_available) { + console.log(`[sim] processed output for ${sample.sample_id}:`); + console.log(sample.processed_output_text || '(empty)'); + } } if (opts.reportPath) { fs.writeFileSync(opts.reportPath, JSON.stringify(report, null, 2)); } - for (const s of report.samples) { - const status = s.ok ? 'OK' : 'FAIL'; - const leakNote = s.leakedReferenceMarkers ? ` refLeaks=${s.referenceLeakCount}` : ''; - const note = s.errors.length > 0 ? ` errors=${s.errors.join(';')}` : ''; - console.log(`[sim] ${status} ${s.sample_id} events=${s.events} parsed=${s.parsedChunks} chars=${s.outputChars}${leakNote}${note}`); - } - if (report.failed > 0) { console.error(`[sim] ${report.failed}/${report.total} samples failed`); process.exit(2);