fix(流式): 以上游读取判定超时并调大事件缓冲

- 以读取时间戳判定流式间隔超时,避免下游阻塞误判
- antigravity 流式读取使用 MaxLineSize 配置
- 事件通道缓冲提升到 16

测试: go test ./...
This commit is contained in:
yangjianbo
2026-01-04 20:19:07 +08:00
parent 73ffb58518
commit 7489da49cb
3 changed files with 71 additions and 75 deletions

View File

@@ -15,6 +15,7 @@ import (
"regexp"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
@@ -1460,7 +1461,7 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
err error
}
// 独立 goroutine 读取上游,避免读取阻塞导致超时/keepalive无法处理
events := make(chan scanEvent, 1)
events := make(chan scanEvent, 16)
done := make(chan struct{})
sendEvent := func(ev scanEvent) bool {
select {
@@ -1470,9 +1471,12 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
return false
}
}
var lastReadAt int64
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
go func() {
defer close(events)
for scanner.Scan() {
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
if !sendEvent(scanEvent{line: scanner.Text()}) {
return
}
@@ -1487,11 +1491,15 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
if s.cfg != nil && s.cfg.Gateway.StreamDataIntervalTimeout > 0 {
streamInterval = time.Duration(s.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
}
// 仅监控上游数据间隔超时,避免上游挂起占用资源
var intervalTimer *time.Timer
// 仅监控上游数据间隔超时,避免下游写入阻塞导致误判
var intervalTicker *time.Ticker
if streamInterval > 0 {
intervalTimer = time.NewTimer(streamInterval)
defer intervalTimer.Stop()
intervalTicker = time.NewTicker(streamInterval)
defer intervalTicker.Stop()
}
var intervalCh <-chan time.Time
if intervalTicker != nil {
intervalCh = intervalTicker.C
}
// 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端)
@@ -1523,9 +1531,6 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream read error: %w", ev.err)
}
line := ev.line
if intervalTimer != nil {
resetTimer(intervalTimer, streamInterval)
}
if line == "event: error" {
return nil, errors.New("have error in stream")
}
@@ -1561,12 +1566,11 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
flusher.Flush()
}
case <-func() <-chan time.Time {
if intervalTimer != nil {
return intervalTimer.C
case <-intervalCh:
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
if time.Since(lastRead) < streamInterval {
continue
}
return nil
}():
log.Printf("Stream data interval timeout: account=%d model=%s interval=%s", account.ID, originalModel, streamInterval)
sendErrorEvent("stream_timeout")
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
@@ -1576,16 +1580,6 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, nil
}
func resetTimer(timer *time.Timer, interval time.Duration) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(interval)
}
// replaceModelInSSELine 替换SSE数据行中的model字段
func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
if !sseDataRe.MatchString(line) {