mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-04 16:35:27 +08:00
feat: implement file readiness polling and add IsImage field to upload results
This commit is contained in:
188
internal/deepseek/client_file_status.go
Normal file
188
internal/deepseek/client_file_status.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package deepseek
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
)
|
||||
|
||||
const (
|
||||
fileReadyPollAttempts = 60
|
||||
fileReadyPollInterval = time.Second
|
||||
fileReadyPollTimeout = 65 * time.Second
|
||||
)
|
||||
|
||||
var fileReadySleep = time.Sleep
|
||||
|
||||
func (c *Client) waitForUploadedFile(ctx context.Context, a *auth.RequestAuth, result *UploadFileResult) error {
|
||||
if result == nil || strings.TrimSpace(result.ID) == "" {
|
||||
return nil
|
||||
}
|
||||
if isReadyUploadFileStatus(result.Status) {
|
||||
return nil
|
||||
}
|
||||
|
||||
pollCtx, cancel := context.WithTimeout(ctx, fileReadyPollTimeout)
|
||||
defer cancel()
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < fileReadyPollAttempts; attempt++ {
|
||||
if err := pollCtx.Err(); err != nil {
|
||||
if lastErr != nil {
|
||||
return fmt.Errorf("waiting for file %s to become ready: %w", result.ID, lastErr)
|
||||
}
|
||||
return fmt.Errorf("waiting for file %s to become ready: %w", result.ID, err)
|
||||
}
|
||||
|
||||
fetched, err := c.fetchUploadedFile(pollCtx, a, result.ID)
|
||||
if err == nil && fetched != nil {
|
||||
mergeUploadFileResults(result, fetched)
|
||||
if isReadyUploadFileStatus(result.Status) {
|
||||
return nil
|
||||
}
|
||||
lastErr = fmt.Errorf("status=%s", strings.TrimSpace(result.Status))
|
||||
} else if err != nil {
|
||||
lastErr = err
|
||||
config.Logger.Debug("[upload_file] waiting for file readiness", "file_id", result.ID, "attempt", attempt+1, "error", err)
|
||||
}
|
||||
|
||||
if attempt < fileReadyPollAttempts-1 {
|
||||
fileReadySleep(fileReadyPollInterval)
|
||||
}
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("status=%s", strings.TrimSpace(result.Status))
|
||||
}
|
||||
return fmt.Errorf("file %s did not become ready: %w", result.ID, lastErr)
|
||||
}
|
||||
|
||||
func (c *Client) fetchUploadedFile(ctx context.Context, a *auth.RequestAuth, fileID string) (*UploadFileResult, error) {
|
||||
fileID = strings.TrimSpace(fileID)
|
||||
if fileID == "" {
|
||||
return nil, errors.New("file id is required")
|
||||
}
|
||||
clients := c.requestClientsForAuth(ctx, a)
|
||||
reqURL := DeepSeekFetchFilesURL + "?file_ids=" + url.QueryEscape(fileID)
|
||||
headers := c.authHeaders(a.DeepSeekToken)
|
||||
|
||||
resp, status, err := c.getJSONWithStatus(ctx, clients.regular, reqURL, headers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
code, bizCode, msg, bizMsg := extractResponseStatus(resp)
|
||||
if status != http.StatusOK || code != 0 || bizCode != 0 {
|
||||
if strings.TrimSpace(bizMsg) != "" {
|
||||
msg = bizMsg
|
||||
}
|
||||
if msg == "" {
|
||||
msg = http.StatusText(status)
|
||||
}
|
||||
return nil, fmt.Errorf("request failed: status=%d, code=%d, msg=%s", status, code, msg)
|
||||
}
|
||||
|
||||
result := extractFetchedUploadFileResult(resp, fileID)
|
||||
if result == nil || strings.TrimSpace(result.ID) == "" {
|
||||
return nil, errors.New("fetch files succeeded without matching file data")
|
||||
}
|
||||
result.Raw = resp
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func extractFetchedUploadFileResult(resp map[string]any, targetID string) *UploadFileResult {
|
||||
targetID = strings.TrimSpace(targetID)
|
||||
if resp == nil || targetID == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var walk func(any) *UploadFileResult
|
||||
walk = func(v any) *UploadFileResult {
|
||||
switch x := v.(type) {
|
||||
case map[string]any:
|
||||
if result := buildUploadFileResultFromMap(x, targetID); result != nil {
|
||||
return result
|
||||
}
|
||||
for _, nested := range x {
|
||||
if result := walk(nested); result != nil {
|
||||
return result
|
||||
}
|
||||
}
|
||||
case []any:
|
||||
for _, item := range x {
|
||||
if result := walk(item); result != nil {
|
||||
return result
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if result := walk(resp); result != nil {
|
||||
return result
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildUploadFileResultFromMap(m map[string]any, targetID string) *UploadFileResult {
|
||||
fileID := strings.TrimSpace(firstNonEmptyString(m, "id", "file_id"))
|
||||
if fileID == "" || !strings.EqualFold(fileID, targetID) {
|
||||
return nil
|
||||
}
|
||||
result := &UploadFileResult{
|
||||
ID: fileID,
|
||||
Filename: firstNonEmptyString(m, "name", "filename", "file_name"),
|
||||
Status: firstNonEmptyString(m, "status", "file_status"),
|
||||
Purpose: firstNonEmptyString(m, "purpose"),
|
||||
IsImage: firstBool(m, "is_image", "isImage"),
|
||||
Bytes: firstPositiveInt64(m, "bytes", "size", "file_size"),
|
||||
}
|
||||
if result.Status == "" {
|
||||
result.Status = "uploaded"
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func mergeUploadFileResults(dst, src *UploadFileResult) {
|
||||
if dst == nil || src == nil {
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(src.ID) != "" {
|
||||
dst.ID = strings.TrimSpace(src.ID)
|
||||
}
|
||||
if strings.TrimSpace(src.Filename) != "" {
|
||||
dst.Filename = strings.TrimSpace(src.Filename)
|
||||
}
|
||||
if src.Bytes > 0 {
|
||||
dst.Bytes = src.Bytes
|
||||
}
|
||||
if strings.TrimSpace(src.Status) != "" {
|
||||
dst.Status = strings.TrimSpace(src.Status)
|
||||
}
|
||||
if strings.TrimSpace(src.Purpose) != "" {
|
||||
dst.Purpose = strings.TrimSpace(src.Purpose)
|
||||
}
|
||||
dst.IsImage = src.IsImage
|
||||
if len(src.Raw) > 0 {
|
||||
dst.Raw = src.Raw
|
||||
}
|
||||
if src.RawHeaders != nil {
|
||||
dst.RawHeaders = src.RawHeaders.Clone()
|
||||
}
|
||||
}
|
||||
|
||||
func isReadyUploadFileStatus(status string) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(status)) {
|
||||
case "processed", "ready", "done", "available", "success", "completed", "finished":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ type UploadFileResult struct {
|
||||
Bytes int64
|
||||
Status string
|
||||
Purpose string
|
||||
IsImage bool
|
||||
Raw map[string]any
|
||||
RawHeaders http.Header
|
||||
}
|
||||
@@ -119,6 +120,9 @@ func (c *Client) UploadFile(ctx context.Context, a *auth.RequestAuth, req Upload
|
||||
if result.ID == "" {
|
||||
return nil, errors.New("upload file succeeded without file id")
|
||||
}
|
||||
if err := c.waitForUploadedFile(ctx, a, result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
config.Logger.Warn("[upload_file] failed", "status", resp.StatusCode, "code", code, "biz_code", bizCode, "msg", msg, "biz_msg", bizMsg, "account", a.AccountID, "filename", filename)
|
||||
@@ -224,6 +228,9 @@ func extractUploadFileResult(resp map[string]any) *UploadFileResult {
|
||||
result.Status = status
|
||||
}
|
||||
}
|
||||
if !result.IsImage {
|
||||
result.IsImage = firstBool(m, "is_image", "isImage")
|
||||
}
|
||||
if result.Purpose == "" {
|
||||
result.Purpose = firstNonEmptyString(m, "purpose")
|
||||
}
|
||||
@@ -234,6 +241,21 @@ func extractUploadFileResult(resp map[string]any) *UploadFileResult {
|
||||
return result
|
||||
}
|
||||
|
||||
func firstBool(m map[string]any, keys ...string) bool {
|
||||
for _, key := range keys {
|
||||
switch v := m[key].(type) {
|
||||
case bool:
|
||||
return v
|
||||
case string:
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "true", "1", "yes", "y":
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func firstNonEmptyString(m map[string]any, keys ...string) string {
|
||||
for _, key := range keys {
|
||||
if v, _ := m[key].(string); strings.TrimSpace(v) != "" {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
powpkg "ds2api/pow"
|
||||
@@ -47,6 +48,7 @@ func TestExtractUploadFileResultSupportsNestedShapes(t *testing.T) {
|
||||
"file_size": 99,
|
||||
"status": "processed",
|
||||
"purpose": "assistants",
|
||||
"is_image": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -66,12 +68,15 @@ func TestExtractUploadFileResultSupportsNestedShapes(t *testing.T) {
|
||||
if got.Purpose != "assistants" {
|
||||
t.Fatalf("expected purpose assistants, got %#v", got)
|
||||
}
|
||||
if !got.IsImage {
|
||||
t.Fatalf("expected image flag true, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileUsesUploadTargetPowAndMultipartHeaders(t *testing.T) {
|
||||
challengeHash := powpkg.DeepSeekHashV1([]byte(powpkg.BuildPrefix("salt", 1712345678) + "42"))
|
||||
powResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"challenge":{"algorithm":"DeepSeekHashV1","challenge":"` + hex.EncodeToString(challengeHash[:]) + `","salt":"salt","expire_at":1712345678,"difficulty":1000,"signature":"sig","target_path":"` + DeepSeekUploadTargetPath + `"}}}}`
|
||||
uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"uploaded","purpose":"assistants"}}}}`
|
||||
uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"processed","purpose":"assistants","is_image":false}}}}`
|
||||
var seenPow string
|
||||
var seenTargetPath string
|
||||
var seenContentType string
|
||||
@@ -141,3 +146,71 @@ func TestUploadFileUsesUploadTargetPowAndMultipartHeaders(t *testing.T) {
|
||||
t.Fatalf("expected file part in upload body: %q", seenBody)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileWaitsForProcessedFetchFiles(t *testing.T) {
|
||||
oldSleep := fileReadySleep
|
||||
fileReadySleep = func(time.Duration) {}
|
||||
defer func() { fileReadySleep = oldSleep }()
|
||||
|
||||
challengeHash := powpkg.DeepSeekHashV1([]byte(powpkg.BuildPrefix("salt", 1712345678) + "42"))
|
||||
powResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"challenge":{"algorithm":"DeepSeekHashV1","challenge":"` + hex.EncodeToString(challengeHash[:]) + `","salt":"salt","expire_at":1712345678,"difficulty":1000,"signature":"sig","target_path":"` + DeepSeekUploadTargetPath + `"}}}}`
|
||||
uploadResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"file":{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"PENDING","purpose":"assistants","is_image":false}}}}`
|
||||
pendingFetchResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"files":[{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"PENDING","purpose":"assistants","is_image":false}]}}}`
|
||||
processedFetchResponse := `{"code":0,"msg":"ok","data":{"biz_code":0,"biz_data":{"files":[{"file_id":"file_789","filename":"demo.txt","bytes":5,"status":"processed","purpose":"assistants","is_image":true}]}}}`
|
||||
|
||||
var call int
|
||||
client := &Client{
|
||||
regular: doerFunc(func(req *http.Request) (*http.Response, error) {
|
||||
call++
|
||||
switch call {
|
||||
case 1:
|
||||
bodyBytes, _ := io.ReadAll(req.Body)
|
||||
if !strings.Contains(string(bodyBytes), `"target_path":"`+DeepSeekUploadTargetPath+`"`) {
|
||||
t.Fatalf("expected pow target path request, got %s", string(bodyBytes))
|
||||
}
|
||||
return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(powResponse)), Request: req}, nil
|
||||
case 2:
|
||||
return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(uploadResponse)), Request: req}, nil
|
||||
case 3, 4:
|
||||
if req.Method != http.MethodGet {
|
||||
t.Fatalf("expected GET fetch request, got %s", req.Method)
|
||||
}
|
||||
if req.URL.Path != "/api/v0/file/fetch_files" {
|
||||
t.Fatalf("expected fetch files path /api/v0/file/fetch_files, got %q", req.URL.Path)
|
||||
}
|
||||
if got := req.URL.Query().Get("file_ids"); got != "file_789" {
|
||||
t.Fatalf("expected file_ids=file_789, got %q", got)
|
||||
}
|
||||
respBody := pendingFetchResponse
|
||||
if call == 4 {
|
||||
respBody = processedFetchResponse
|
||||
}
|
||||
return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader(respBody)), Request: req}, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request count %d", call)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
fallback: &http.Client{Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { return nil, nil })},
|
||||
maxRetries: 1,
|
||||
}
|
||||
|
||||
result, err := client.UploadFile(context.Background(), &auth.RequestAuth{DeepSeekToken: "token", TriedAccounts: map[string]bool{}}, UploadFileRequest{
|
||||
Filename: "demo.txt",
|
||||
ContentType: "text/plain",
|
||||
Purpose: "assistants",
|
||||
Data: []byte("hello"),
|
||||
}, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("UploadFile error: %v", err)
|
||||
}
|
||||
if result.ID != "file_789" {
|
||||
t.Fatalf("expected uploaded file id file_789, got %#v", result)
|
||||
}
|
||||
if result.Status != "processed" {
|
||||
t.Fatalf("expected final status processed, got %#v", result.Status)
|
||||
}
|
||||
if call != 4 {
|
||||
t.Fatalf("expected 4 requests, got %d", call)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user