mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-07 01:45:27 +08:00
feat: implement local dev packet capture functionality with admin endpoints and configurable limits for debugging.
This commit is contained in:
26
README.MD
26
README.MD
@@ -283,6 +283,9 @@ cp opencode.json.example opencode.json
|
||||
| `DS2API_ACCOUNT_QUEUE_SIZE` | 同上(兼容旧名) | — |
|
||||
| `DS2API_VERCEL_INTERNAL_SECRET` | Vercel 混合流式内部鉴权密钥 | 回退用 `DS2API_ADMIN_KEY` |
|
||||
| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | 流式 lease 过期秒数 | `900` |
|
||||
| `DS2API_DEV_PACKET_CAPTURE` | 本地开发抓包开关(记录最近会话请求/响应体) | 本地非 Vercel 默认开启 |
|
||||
| `DS2API_DEV_PACKET_CAPTURE_LIMIT` | 本地抓包保留条数(超出自动淘汰) | `5` |
|
||||
| `DS2API_DEV_PACKET_CAPTURE_MAX_BODY_BYTES` | 单条响应体最大记录字节数 | `2097152` |
|
||||
| `VERCEL_TOKEN` | Vercel 同步 token | — |
|
||||
| `VERCEL_PROJECT_ID` | Vercel 项目 ID | — |
|
||||
| `VERCEL_TEAM_ID` | Vercel 团队 ID | — |
|
||||
@@ -321,6 +324,29 @@ cp opencode.json.example opencode.json
|
||||
3. 已确认的 toolcall JSON 片段不会泄漏到 `delta.content`
|
||||
4. 前文/后文自然语言保持顺序透传,支持混合文本与增量参数输出
|
||||
|
||||
## 本地开发抓包工具
|
||||
|
||||
用于定位「responses 思考流/工具调用」等问题。开启后会自动记录最近 N 条 DeepSeek 对话上游请求体与响应体(默认 5 条,超出自动淘汰)。
|
||||
|
||||
启用示例:
|
||||
|
||||
```bash
|
||||
DS2API_DEV_PACKET_CAPTURE=true \
|
||||
DS2API_DEV_PACKET_CAPTURE_LIMIT=5 \
|
||||
go run ./cmd/ds2api
|
||||
```
|
||||
|
||||
查询/清空(需 Admin JWT):
|
||||
|
||||
- `GET /admin/dev/captures`:查看抓包列表(最新在前)
|
||||
- `DELETE /admin/dev/captures`:清空抓包
|
||||
|
||||
返回字段包含:
|
||||
|
||||
- `request_body`:发送给 DeepSeek 的完整请求体
|
||||
- `response_body`:上游返回的原始流式内容拼接文本
|
||||
- `response_truncated`:是否触发单条大小截断
|
||||
|
||||
## 项目结构
|
||||
|
||||
```text
|
||||
|
||||
26
README.en.md
26
README.en.md
@@ -283,6 +283,9 @@ cp opencode.json.example opencode.json
|
||||
| `DS2API_ACCOUNT_QUEUE_SIZE` | Alias (legacy compat) | — |
|
||||
| `DS2API_VERCEL_INTERNAL_SECRET` | Vercel hybrid streaming internal auth | Falls back to `DS2API_ADMIN_KEY` |
|
||||
| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Stream lease TTL seconds | `900` |
|
||||
| `DS2API_DEV_PACKET_CAPTURE` | Local dev packet capture switch (record recent request/response bodies) | Enabled by default on non-Vercel local runtime |
|
||||
| `DS2API_DEV_PACKET_CAPTURE_LIMIT` | Number of captured sessions to retain (auto-evict overflow) | `5` |
|
||||
| `DS2API_DEV_PACKET_CAPTURE_MAX_BODY_BYTES` | Max recorded bytes per captured response body | `2097152` |
|
||||
| `VERCEL_TOKEN` | Vercel sync token | — |
|
||||
| `VERCEL_PROJECT_ID` | Vercel project ID | — |
|
||||
| `VERCEL_TEAM_ID` | Vercel team ID | — |
|
||||
@@ -321,6 +324,29 @@ When `tools` is present in the request, DS2API performs anti-leak handling:
|
||||
3. Confirmed toolcall JSON fragments are never leaked into `delta.content`
|
||||
4. Natural language before/after toolcalls keeps original order, with incremental argument output supported
|
||||
|
||||
## Local Dev Packet Capture
|
||||
|
||||
This is for debugging issues such as Responses reasoning streaming and tool-call handoff. When enabled, DS2API stores the latest N DeepSeek conversation payload pairs (request body + upstream response body), defaulting to 5 entries with auto-eviction.
|
||||
|
||||
Enable example:
|
||||
|
||||
```bash
|
||||
DS2API_DEV_PACKET_CAPTURE=true \
|
||||
DS2API_DEV_PACKET_CAPTURE_LIMIT=5 \
|
||||
go run ./cmd/ds2api
|
||||
```
|
||||
|
||||
Inspect/clear (Admin JWT required):
|
||||
|
||||
- `GET /admin/dev/captures`: list captured items (newest first)
|
||||
- `DELETE /admin/dev/captures`: clear captured items
|
||||
|
||||
Response fields include:
|
||||
|
||||
- `request_body`: full payload sent to DeepSeek
|
||||
- `response_body`: concatenated raw upstream stream body text
|
||||
- `response_truncated`: whether body-size truncation happened
|
||||
|
||||
## Project Structure
|
||||
|
||||
```text
|
||||
|
||||
@@ -36,5 +36,7 @@ func RegisterRoutes(r chi.Router, h *Handler) {
|
||||
pr.Post("/vercel/sync", h.syncVercel)
|
||||
pr.Get("/vercel/status", h.vercelStatus)
|
||||
pr.Get("/export", h.exportConfig)
|
||||
pr.Get("/dev/captures", h.getDevCaptures)
|
||||
pr.Delete("/dev/captures", h.clearDevCaptures)
|
||||
})
|
||||
}
|
||||
|
||||
26
internal/admin/handler_dev_capture.go
Normal file
26
internal/admin/handler_dev_capture.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"ds2api/internal/devcapture"
|
||||
)
|
||||
|
||||
func (h *Handler) getDevCaptures(w http.ResponseWriter, _ *http.Request) {
|
||||
store := devcapture.Global()
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"enabled": store.Enabled(),
|
||||
"limit": store.Limit(),
|
||||
"max_body_bytes": store.MaxBodyBytes(),
|
||||
"items": store.Snapshot(),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) clearDevCaptures(w http.ResponseWriter, _ *http.Request) {
|
||||
store := devcapture.Global()
|
||||
store.Clear()
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"success": true,
|
||||
"detail": "capture logs cleared",
|
||||
})
|
||||
}
|
||||
45
internal/admin/handler_dev_capture_test.go
Normal file
45
internal/admin/handler_dev_capture_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetDevCapturesShape(t *testing.T) {
|
||||
h := &Handler{}
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/admin/dev/captures", nil)
|
||||
h.getDevCaptures(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
var out map[string]any
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
|
||||
t.Fatalf("decode failed: %v", err)
|
||||
}
|
||||
if _, ok := out["enabled"]; !ok {
|
||||
t.Fatalf("expected enabled field, got %#v", out)
|
||||
}
|
||||
if _, ok := out["items"]; !ok {
|
||||
t.Fatalf("expected items field, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearDevCapturesShape(t *testing.T) {
|
||||
h := &Handler{}
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodDelete, "/admin/dev/captures", nil)
|
||||
h.clearDevCaptures(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
var out map[string]any
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
|
||||
t.Fatalf("decode failed: %v", err)
|
||||
}
|
||||
if out["success"] != true {
|
||||
t.Fatalf("expected success=true, got %#v", out)
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
trans "ds2api/internal/deepseek/transport"
|
||||
"ds2api/internal/devcapture"
|
||||
"ds2api/internal/util"
|
||||
|
||||
"github.com/andybalholm/brotli"
|
||||
@@ -27,6 +28,7 @@ var intFrom = util.IntFrom
|
||||
type Client struct {
|
||||
Store *config.Store
|
||||
Auth *auth.Resolver
|
||||
capture *devcapture.Store
|
||||
regular trans.Doer
|
||||
stream trans.Doer
|
||||
fallback *http.Client
|
||||
@@ -39,6 +41,7 @@ func NewClient(store *config.Store, resolver *auth.Resolver) *Client {
|
||||
return &Client{
|
||||
Store: store,
|
||||
Auth: resolver,
|
||||
capture: devcapture.Global(),
|
||||
regular: trans.New(60 * time.Second),
|
||||
stream: trans.New(0),
|
||||
fallback: &http.Client{Timeout: 60 * time.Second},
|
||||
@@ -179,6 +182,7 @@ func (c *Client) CallCompletion(ctx context.Context, a *auth.RequestAuth, payloa
|
||||
}
|
||||
headers := c.authHeaders(a.DeepSeekToken)
|
||||
headers["x-ds-pow-response"] = powResp
|
||||
captureSession := c.capture.Start("deepseek_completion", DeepSeekCompletionURL, a.AccountID, payload)
|
||||
attempts := 0
|
||||
for attempts < maxAttempts {
|
||||
resp, err := c.streamPost(ctx, DeepSeekCompletionURL, headers, payload)
|
||||
@@ -188,8 +192,14 @@ func (c *Client) CallCompletion(ctx context.Context, a *auth.RequestAuth, payloa
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
if captureSession != nil {
|
||||
resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
if captureSession != nil {
|
||||
resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
attempts++
|
||||
time.Sleep(time.Second)
|
||||
|
||||
259
internal/devcapture/store.go
Normal file
259
internal/devcapture/store.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package devcapture
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLimit = 5
|
||||
defaultMaxBodyBytes = 2 * 1024 * 1024
|
||||
maxLimit = 50
|
||||
)
|
||||
|
||||
type Entry struct {
|
||||
ID string `json:"id"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
Label string `json:"label"`
|
||||
URL string `json:"url"`
|
||||
AccountID string `json:"account_id,omitempty"`
|
||||
StatusCode int `json:"status_code"`
|
||||
RequestBody string `json:"request_body"`
|
||||
ResponseBody string `json:"response_body"`
|
||||
ResponseTruncated bool `json:"response_truncated"`
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
mu sync.Mutex
|
||||
enabled bool
|
||||
limit int
|
||||
maxBodyBytes int
|
||||
items []Entry
|
||||
}
|
||||
|
||||
type Session struct {
|
||||
store *Store
|
||||
id string
|
||||
createdAt int64
|
||||
label string
|
||||
url string
|
||||
accountID string
|
||||
requestRaw string
|
||||
}
|
||||
|
||||
type captureBody struct {
|
||||
rc io.ReadCloser
|
||||
s *Session
|
||||
statusCode int
|
||||
buf strings.Builder
|
||||
truncated bool
|
||||
finalized bool
|
||||
}
|
||||
|
||||
var (
|
||||
globalOnce sync.Once
|
||||
globalInst *Store
|
||||
)
|
||||
|
||||
func Global() *Store {
|
||||
globalOnce.Do(func() {
|
||||
globalInst = NewFromEnv()
|
||||
})
|
||||
return globalInst
|
||||
}
|
||||
|
||||
func NewFromEnv() *Store {
|
||||
enabled := !isVercelRuntime()
|
||||
if raw, ok := os.LookupEnv("DS2API_DEV_PACKET_CAPTURE"); ok {
|
||||
enabled = parseBool(raw)
|
||||
}
|
||||
limit := parseIntWithDefault(os.Getenv("DS2API_DEV_PACKET_CAPTURE_LIMIT"), defaultLimit)
|
||||
if limit < 1 {
|
||||
limit = defaultLimit
|
||||
}
|
||||
if limit > maxLimit {
|
||||
limit = maxLimit
|
||||
}
|
||||
maxBodyBytes := parseIntWithDefault(os.Getenv("DS2API_DEV_PACKET_CAPTURE_MAX_BODY_BYTES"), defaultMaxBodyBytes)
|
||||
if maxBodyBytes < 1024 {
|
||||
maxBodyBytes = defaultMaxBodyBytes
|
||||
}
|
||||
return &Store{
|
||||
enabled: enabled,
|
||||
limit: limit,
|
||||
maxBodyBytes: maxBodyBytes,
|
||||
items: make([]Entry, 0, limit),
|
||||
}
|
||||
}
|
||||
|
||||
func isVercelRuntime() bool {
|
||||
return strings.TrimSpace(os.Getenv("VERCEL")) != "" || strings.TrimSpace(os.Getenv("NOW_REGION")) != ""
|
||||
}
|
||||
|
||||
func (s *Store) Enabled() bool {
|
||||
if s == nil {
|
||||
return false
|
||||
}
|
||||
return s.enabled
|
||||
}
|
||||
|
||||
func (s *Store) Limit() int {
|
||||
if s == nil {
|
||||
return defaultLimit
|
||||
}
|
||||
return s.limit
|
||||
}
|
||||
|
||||
func (s *Store) MaxBodyBytes() int {
|
||||
if s == nil {
|
||||
return defaultMaxBodyBytes
|
||||
}
|
||||
return s.maxBodyBytes
|
||||
}
|
||||
|
||||
func (s *Store) Snapshot() []Entry {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
out := make([]Entry, len(s.items))
|
||||
copy(out, s.items)
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *Store) Clear() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.items = s.items[:0]
|
||||
}
|
||||
|
||||
func (s *Store) Start(label, url, accountID string, requestPayload any) *Session {
|
||||
if s == nil || !s.enabled {
|
||||
return nil
|
||||
}
|
||||
return &Session{
|
||||
store: s,
|
||||
id: "cap_" + strings.ReplaceAll(uuid.NewString(), "-", ""),
|
||||
createdAt: time.Now().Unix(),
|
||||
label: strings.TrimSpace(label),
|
||||
url: strings.TrimSpace(url),
|
||||
accountID: strings.TrimSpace(accountID),
|
||||
requestRaw: marshalPayload(requestPayload),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) WrapBody(rc io.ReadCloser, statusCode int) io.ReadCloser {
|
||||
if s == nil || rc == nil {
|
||||
return rc
|
||||
}
|
||||
return &captureBody{
|
||||
rc: rc,
|
||||
s: s,
|
||||
statusCode: statusCode,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *captureBody) Read(p []byte) (int, error) {
|
||||
n, err := c.rc.Read(p)
|
||||
if n > 0 {
|
||||
c.append(string(p[:n]))
|
||||
}
|
||||
if err == io.EOF {
|
||||
c.finalize()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *captureBody) Close() error {
|
||||
err := c.rc.Close()
|
||||
c.finalize()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *captureBody) append(chunk string) {
|
||||
if chunk == "" || c.s == nil || c.s.store == nil {
|
||||
return
|
||||
}
|
||||
maxLen := c.s.store.maxBodyBytes
|
||||
current := c.buf.Len()
|
||||
if current >= maxLen {
|
||||
c.truncated = true
|
||||
return
|
||||
}
|
||||
remain := maxLen - current
|
||||
if len(chunk) > remain {
|
||||
c.buf.WriteString(chunk[:remain])
|
||||
c.truncated = true
|
||||
return
|
||||
}
|
||||
c.buf.WriteString(chunk)
|
||||
}
|
||||
|
||||
func (c *captureBody) finalize() {
|
||||
if c.finalized || c.s == nil || c.s.store == nil {
|
||||
return
|
||||
}
|
||||
c.finalized = true
|
||||
entry := Entry{
|
||||
ID: c.s.id,
|
||||
CreatedAt: c.s.createdAt,
|
||||
Label: c.s.label,
|
||||
URL: c.s.url,
|
||||
AccountID: c.s.accountID,
|
||||
StatusCode: c.statusCode,
|
||||
RequestBody: c.s.requestRaw,
|
||||
ResponseBody: c.buf.String(),
|
||||
ResponseTruncated: c.truncated,
|
||||
}
|
||||
c.s.store.push(entry)
|
||||
}
|
||||
|
||||
func (s *Store) push(entry Entry) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.items = append([]Entry{entry}, s.items...)
|
||||
if len(s.items) > s.limit {
|
||||
s.items = s.items[:s.limit]
|
||||
}
|
||||
}
|
||||
|
||||
func marshalPayload(v any) string {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("%v", v)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func parseBool(v string) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "1", "true", "yes", "on":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func parseIntWithDefault(raw string, d int) int {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return d
|
||||
}
|
||||
n, err := strconv.Atoi(raw)
|
||||
if err != nil {
|
||||
return d
|
||||
}
|
||||
return n
|
||||
}
|
||||
55
internal/devcapture/store_test.go
Normal file
55
internal/devcapture/store_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package devcapture
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStorePushKeepsNewestWithinLimit(t *testing.T) {
|
||||
s := &Store{enabled: true, limit: 2, maxBodyBytes: 1024}
|
||||
for i := 0; i < 3; i++ {
|
||||
session := s.Start("test", "http://x", "", map[string]any{"seq": i})
|
||||
if session == nil {
|
||||
t.Fatal("expected session")
|
||||
}
|
||||
rc := session.WrapBody(io.NopCloser(strings.NewReader("ok")), 200)
|
||||
_, _ = io.ReadAll(rc)
|
||||
_ = rc.Close()
|
||||
}
|
||||
items := s.Snapshot()
|
||||
if len(items) != 2 {
|
||||
t.Fatalf("expected 2 items, got %d", len(items))
|
||||
}
|
||||
if !strings.Contains(items[0].RequestBody, `"seq":2`) {
|
||||
t.Fatalf("expected newest first, got %#v", items[0].RequestBody)
|
||||
}
|
||||
if !strings.Contains(items[1].RequestBody, `"seq":1`) {
|
||||
t.Fatalf("expected second newest, got %#v", items[1].RequestBody)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrapBodyTruncatesByLimit(t *testing.T) {
|
||||
s := &Store{enabled: true, limit: 5, maxBodyBytes: 4}
|
||||
session := s.Start("test", "http://x", "acc1", map[string]any{"x": 1})
|
||||
if session == nil {
|
||||
t.Fatal("expected session")
|
||||
}
|
||||
rc := session.WrapBody(io.NopCloser(strings.NewReader("abcdef")), 200)
|
||||
_, _ = io.ReadAll(rc)
|
||||
_ = rc.Close()
|
||||
|
||||
items := s.Snapshot()
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("expected 1 item, got %d", len(items))
|
||||
}
|
||||
if items[0].ResponseBody != "abcd" {
|
||||
t.Fatalf("expected truncated body, got %q", items[0].ResponseBody)
|
||||
}
|
||||
if !items[0].ResponseTruncated {
|
||||
t.Fatal("expected truncated flag true")
|
||||
}
|
||||
if items[0].AccountID != "acc1" {
|
||||
t.Fatalf("expected account id, got %q", items[0].AccountID)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user