From ffca8be5974ee86fd631661b5b957d4d01f0b87d Mon Sep 17 00:00:00 2001 From: CJACK Date: Mon, 13 Apr 2026 02:55:45 +0800 Subject: [PATCH] feat: implement file readiness polling and add IsImage field to upload results --- internal/deepseek/client_file_status.go | 188 ++++++++++++++++++++++++ internal/deepseek/client_upload.go | 22 +++ internal/deepseek/client_upload_test.go | 75 +++++++++- 3 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 internal/deepseek/client_file_status.go diff --git a/internal/deepseek/client_file_status.go b/internal/deepseek/client_file_status.go new file mode 100644 index 0000000..ba50ab8 --- /dev/null +++ b/internal/deepseek/client_file_status.go @@ -0,0 +1,188 @@ +package deepseek + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "ds2api/internal/auth" + "ds2api/internal/config" +) + +const ( + fileReadyPollAttempts = 60 + fileReadyPollInterval = time.Second + fileReadyPollTimeout = 65 * time.Second +) + +var fileReadySleep = time.Sleep + +func (c *Client) waitForUploadedFile(ctx context.Context, a *auth.RequestAuth, result *UploadFileResult) error { + if result == nil || strings.TrimSpace(result.ID) == "" { + return nil + } + if isReadyUploadFileStatus(result.Status) { + return nil + } + + pollCtx, cancel := context.WithTimeout(ctx, fileReadyPollTimeout) + defer cancel() + + var lastErr error + for attempt := 0; attempt < fileReadyPollAttempts; attempt++ { + if err := pollCtx.Err(); err != nil { + if lastErr != nil { + return fmt.Errorf("waiting for file %s to become ready: %w", result.ID, lastErr) + } + return fmt.Errorf("waiting for file %s to become ready: %w", result.ID, err) + } + + fetched, err := c.fetchUploadedFile(pollCtx, a, result.ID) + if err == nil && fetched != nil { + mergeUploadFileResults(result, fetched) + if isReadyUploadFileStatus(result.Status) { + return nil + } + lastErr = fmt.Errorf("status=%s", strings.TrimSpace(result.Status)) + } else if err != nil { + lastErr = err + config.Logger.Debug("[upload_file] waiting for file readiness", "file_id", result.ID, "attempt", attempt+1, "error", err) + } + + if attempt < fileReadyPollAttempts-1 { + fileReadySleep(fileReadyPollInterval) + } + } + + if lastErr == nil { + lastErr = fmt.Errorf("status=%s", strings.TrimSpace(result.Status)) + } + return fmt.Errorf("file %s did not become ready: %w", result.ID, lastErr) +} + +func (c *Client) fetchUploadedFile(ctx context.Context, a *auth.RequestAuth, fileID string) (*UploadFileResult, error) { + fileID = strings.TrimSpace(fileID) + if fileID == "" { + return nil, errors.New("file id is required") + } + clients := c.requestClientsForAuth(ctx, a) + reqURL := DeepSeekFetchFilesURL + "?file_ids=" + url.QueryEscape(fileID) + headers := c.authHeaders(a.DeepSeekToken) + + resp, status, err := c.getJSONWithStatus(ctx, clients.regular, reqURL, headers) + if err != nil { + return nil, err + } + + code, bizCode, msg, bizMsg := extractResponseStatus(resp) + if status != http.StatusOK || code != 0 || bizCode != 0 { + if strings.TrimSpace(bizMsg) != "" { + msg = bizMsg + } + if msg == "" { + msg = http.StatusText(status) + } + return nil, fmt.Errorf("request failed: status=%d, code=%d, msg=%s", status, code, msg) + } + + result := extractFetchedUploadFileResult(resp, fileID) + if result == nil || strings.TrimSpace(result.ID) == "" { + return nil, errors.New("fetch files succeeded without matching file data") + } + result.Raw = resp + return result, nil +} + +func extractFetchedUploadFileResult(resp map[string]any, targetID string) *UploadFileResult { + targetID = strings.TrimSpace(targetID) + if resp == nil || targetID == "" { + return nil + } + + var walk func(any) *UploadFileResult + walk = func(v any) *UploadFileResult { + switch x := v.(type) { + case map[string]any: + if result := buildUploadFileResultFromMap(x, targetID); result != nil { + return result + } + for _, nested := range x { + if result := walk(nested); result != nil { + return result + } + } + case []any: + for _, item := range x { + if result := walk(item); result != nil { + return result + } + } + } + return nil + } + + if result := walk(resp); result != nil { + return result + } + return nil +} + +func buildUploadFileResultFromMap(m map[string]any, targetID string) *UploadFileResult { + fileID := strings.TrimSpace(firstNonEmptyString(m, "id", "file_id")) + if fileID == "" || !strings.EqualFold(fileID, targetID) { + return nil + } + result := &UploadFileResult{ + ID: fileID, + Filename: firstNonEmptyString(m, "name", "filename", "file_name"), + Status: firstNonEmptyString(m, "status", "file_status"), + Purpose: firstNonEmptyString(m, "purpose"), + IsImage: firstBool(m, "is_image", "isImage"), + Bytes: firstPositiveInt64(m, "bytes", "size", "file_size"), + } + if result.Status == "" { + result.Status = "uploaded" + } + return result +} + +func mergeUploadFileResults(dst, src *UploadFileResult) { + if dst == nil || src == nil { + return + } + if strings.TrimSpace(src.ID) != "" { + dst.ID = strings.TrimSpace(src.ID) + } + if strings.TrimSpace(src.Filename) != "" { + dst.Filename = strings.TrimSpace(src.Filename) + } + if src.Bytes > 0 { + dst.Bytes = src.Bytes + } + if strings.TrimSpace(src.Status) != "" { + dst.Status = strings.TrimSpace(src.Status) + } + if strings.TrimSpace(src.Purpose) != "" { + dst.Purpose = strings.TrimSpace(src.Purpose) + } + dst.IsImage = src.IsImage + if len(src.Raw) > 0 { + dst.Raw = src.Raw + } + if src.RawHeaders != nil { + dst.RawHeaders = src.RawHeaders.Clone() + } +} + +func isReadyUploadFileStatus(status string) bool { + switch strings.ToLower(strings.TrimSpace(status)) { + case "processed", "ready", "done", "available", "success", "completed", "finished": + return true + default: + return false + } +} diff --git a/internal/deepseek/client_upload.go b/internal/deepseek/client_upload.go index 16272e5..573a804 100644 --- a/internal/deepseek/client_upload.go +++ b/internal/deepseek/client_upload.go @@ -31,6 +31,7 @@ type UploadFileResult struct { Bytes int64 Status string Purpose string + IsImage bool Raw map[string]any RawHeaders http.Header } @@ -119,6 +120,9 @@ func (c *Client) UploadFile(ctx context.Context, a *auth.RequestAuth, req Upload if result.ID == "" { return nil, errors.New("upload file succeeded without file id") } + if err := c.waitForUploadedFile(ctx, a, result); err != nil { + return nil, err + } return result, nil } config.Logger.Warn("[upload_file] failed", "status", resp.StatusCode, "code", code, "biz_code", bizCode, "msg", msg, "biz_msg", bizMsg, "account", a.AccountID, "filename", filename) @@ -224,6 +228,9 @@ func extractUploadFileResult(resp map[string]any) *UploadFileResult { result.Status = status } } + if !result.IsImage { + result.IsImage = firstBool(m, "is_image", "isImage") + } if result.Purpose == "" { result.Purpose = firstNonEmptyString(m, "purpose") } @@ -234,6 +241,21 @@ func extractUploadFileResult(resp map[string]any) *UploadFileResult { return result } +func firstBool(m map[string]any, keys ...string) bool { + for _, key := range keys { + switch v := m[key].(type) { + case bool: + return v + case string: + switch strings.ToLower(strings.TrimSpace(v)) { + case "true", "1", "yes", "y": + return true + } + } + } + return false +} + func firstNonEmptyString(m map[string]any, keys ...string) string { for _, key := range keys { if v, _ := m[key].(string); strings.TrimSpace(v) != "" { diff --git a/internal/deepseek/client_upload_test.go b/internal/deepseek/client_upload_test.go index 6532f8d..7a41073 100644 --- a/internal/deepseek/client_upload_test.go +++ b/internal/deepseek/client_upload_test.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "testing" + "time" "ds2api/internal/auth" powpkg "ds2api/pow" @@ -47,6 +48,7 @@ func TestExtractUploadFileResultSupportsNestedShapes(t *testing.T) { "file_size": 99, "status": "processed", "purpose": "assistants", + "is_image": true, }, }, }, @@ -66,12 +68,15 @@ func TestExtractUploadFileResultSupportsNestedShapes(t *testing.T) { if got.Purpose != "assistants" { t.Fatalf("expected purpose assistants, got %#v", got) } + if !got.IsImage { + t.Fatalf("expected image flag true, got %#v", got) + } } func TestUploadFileUsesUploadTargetPowAndMultipartHeaders(t *testing.T) { challengeHash := powpkg.DeepSeekHashV1([]byte(powpkg.BuildPrefix("salt", 1712345678) + "42")) powResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"challenge":{"algorithm":"DeepSeekHashV1","challenge":"` + hex.EncodeToString(challengeHash[:]) + `","salt":"salt","expire_at":1712345678,"difficulty":1000,"signature":"sig","target_path":"` + DeepSeekUploadTargetPath + `"}}}}` - uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"uploaded","purpose":"assistants"}}}}` + uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"processed","purpose":"assistants","is_image":false}}}}` var seenPow string var seenTargetPath string var seenContentType string @@ -141,3 +146,71 @@ func TestUploadFileUsesUploadTargetPowAndMultipartHeaders(t *testing.T) { t.Fatalf("expected file part in upload body: %q", seenBody) } } + +func TestUploadFileWaitsForProcessedFetchFiles(t *testing.T) { + oldSleep := fileReadySleep + fileReadySleep = func(time.Duration) {} + defer func() { fileReadySleep = oldSleep }() + + challengeHash := powpkg.DeepSeekHashV1([]byte(powpkg.BuildPrefix("salt", 1712345678) + "42")) + powResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"challenge":{"algorithm":"DeepSeekHashV1","challenge":"` + hex.EncodeToString(challengeHash[:]) + `","salt":"salt","expire_at":1712345678,"difficulty":1000,"signature":"sig","target_path":"` + DeepSeekUploadTargetPath + `"}}}}` + uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"PENDING","purpose":"assistants","is_image":false}}}}` + pendingFetchResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"files":[{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"PENDING","purpose":"assistants","is_image":false}]}}}` + processedFetchResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"files":[{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"processed","purpose":"assistants","is_image":true}]}}}` + + var call int + client := &Client{ + regular: doerFunc(func(req *http.Request) (*http.Response, error) { + call++ + switch call { + case 1: + bodyBytes, _ := io.ReadAll(req.Body) + if !strings.Contains(string(bodyBytes), `"target_path":"`+DeepSeekUploadTargetPath+`"`) { + t.Fatalf("expected pow target path request, got %s", string(bodyBytes)) + } + return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(powResponse)), Request: req}, nil + case 2: + return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(uploadResponse)), Request: req}, nil + case 3, 4: + if req.Method != http.MethodGet { + t.Fatalf("expected GET fetch request, got %s", req.Method) + } + if req.URL.Path != "/api/v0/file/fetch_files" { + t.Fatalf("expected fetch files path /api/v0/file/fetch_files, got %q", req.URL.Path) + } + if got := req.URL.Query().Get("file_ids"); got != "file_789" { + t.Fatalf("expected file_ids=file_789, got %q", got) + } + respBody := pendingFetchResponse + if call == 4 { + respBody = processedFetchResponse + } + return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(respBody)), Request: req}, nil + default: + t.Fatalf("unexpected request count %d", call) + return nil, nil + } + }), + fallback: &http.Client{Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return nil, nil })}, + maxRetries: 1, + } + + result, err := client.UploadFile(context.Background(), &auth.RequestAuth{DeepSeekToken: "token", TriedAccounts: map[string]bool{}}, UploadFileRequest{ + Filename: "demo.txt", + ContentType: "text/plain", + Purpose: "assistants", + Data: []byte("hello"), + }, 1) + if err != nil { + t.Fatalf("UploadFile error: %v", err) + } + if result.ID != "file_789" { + t.Fatalf("expected uploaded file id file_789, got %#v", result) + } + if result.Status != "processed" { + t.Fatalf("expected final status processed, got %#v", result.Status) + } + if call != 4 { + t.Fatalf("expected 4 requests, got %d", call) + } +}