Files
ds2api/internal/stream/engine.go
CJACK 0bfddf7943 1
2026-04-26 09:17:40 +08:00

147 lines
3.0 KiB
Go

package stream
import (
"context"
"io"
"time"
"ds2api/internal/sse"
)
type StopReason string
const (
StopReasonNone StopReason = ""
StopReasonContextCancelled StopReason = "context_cancelled"
StopReasonNoContentTimeout StopReason = "no_content_timeout"
StopReasonIdleTimeout StopReason = "idle_timeout"
StopReasonUpstreamCompleted StopReason = "upstream_completed"
StopReasonHandlerRequested StopReason = "handler_requested"
)
type ConsumeConfig struct {
Context context.Context
Body io.Reader
ThinkingEnabled bool
InitialType string
KeepAliveInterval time.Duration
IdleTimeout time.Duration
MaxKeepAliveNoInput int
}
type ParsedDecision struct {
Stop bool
StopReason StopReason
ContentSeen bool
}
type ConsumeHooks struct {
OnParsed func(parsed sse.LineResult) ParsedDecision
OnKeepAlive func()
OnFinalize func(reason StopReason, scannerErr error)
OnContextDone func()
}
func ConsumeSSE(cfg ConsumeConfig, hooks ConsumeHooks) {
if cfg.Context == nil {
cfg.Context = context.Background()
}
initialType := cfg.InitialType
if initialType == "" {
if cfg.ThinkingEnabled {
initialType = "thinking"
} else {
initialType = "text"
}
}
parsedLines, done := sse.StartParsedLinePump(cfg.Context, cfg.Body, cfg.ThinkingEnabled, initialType)
var ticker *time.Ticker
if cfg.KeepAliveInterval > 0 {
ticker = time.NewTicker(cfg.KeepAliveInterval)
defer ticker.Stop()
}
hasContent := false
lastContent := time.Now()
keepaliveCount := 0
finalize := func(reason StopReason, scannerErr error) {
if hooks.OnFinalize != nil {
hooks.OnFinalize(reason, scannerErr)
}
}
contextDone := func() bool {
if cfg.Context.Err() == nil {
return false
}
if hooks.OnContextDone != nil {
hooks.OnContextDone()
}
return true
}
for {
if contextDone() {
return
}
select {
case <-cfg.Context.Done():
if contextDone() {
return
}
return
case <-tickCh(ticker):
if contextDone() {
return
}
if !hasContent {
keepaliveCount++
if cfg.MaxKeepAliveNoInput > 0 && keepaliveCount >= cfg.MaxKeepAliveNoInput {
finalize(StopReasonNoContentTimeout, nil)
return
}
}
if hasContent && cfg.IdleTimeout > 0 && time.Since(lastContent) > cfg.IdleTimeout {
finalize(StopReasonIdleTimeout, nil)
return
}
if hooks.OnKeepAlive != nil {
hooks.OnKeepAlive()
}
case parsed, ok := <-parsedLines:
if contextDone() {
return
}
if !ok {
finalize(StopReasonUpstreamCompleted, <-done)
return
}
if hooks.OnParsed == nil {
continue
}
decision := hooks.OnParsed(parsed)
if decision.ContentSeen {
hasContent = true
lastContent = time.Now()
keepaliveCount = 0
}
if decision.Stop {
reason := decision.StopReason
if reason == StopReasonNone {
reason = StopReasonHandlerRequested
}
finalize(reason, nil)
return
}
}
}
}
func tickCh(ticker *time.Ticker) <-chan time.Time {
if ticker == nil {
return nil
}
return ticker.C
}