feat(gateway): 添加流超时处理机制
- 添加 StreamTimeoutSettings 配置结构体和系统设置 - 实现 TimeoutCounterCache Redis 计数器用于累计超时次数 - 在 RateLimitService 添加 HandleStreamTimeout 方法 - 在 gateway_service、openai_gateway_service、antigravity_gateway_service 中调用超时处理 - 添加后端 API 端点 GET/PUT /admin/settings/stream-timeout - 添加前端配置界面到系统设置页面 - 支持配置:启用开关、超时阈值、处理方式、暂停时长、触发阈值、阈值窗口 默认配置: - 启用:true - 超时阈值:60秒 - 处理方式:临时不可调度 - 暂停时长:5分钟 - 触发阈值:3次 - 阈值窗口:10分钟
This commit is contained in:
@@ -15,13 +15,15 @@ import (
|
||||
|
||||
// RateLimitService 处理限流和过载状态管理
|
||||
type RateLimitService struct {
|
||||
accountRepo AccountRepository
|
||||
usageRepo UsageLogRepository
|
||||
cfg *config.Config
|
||||
geminiQuotaService *GeminiQuotaService
|
||||
tempUnschedCache TempUnschedCache
|
||||
usageCacheMu sync.RWMutex
|
||||
usageCache map[int64]*geminiUsageCacheEntry
|
||||
accountRepo AccountRepository
|
||||
usageRepo UsageLogRepository
|
||||
cfg *config.Config
|
||||
geminiQuotaService *GeminiQuotaService
|
||||
tempUnschedCache TempUnschedCache
|
||||
timeoutCounterCache TimeoutCounterCache
|
||||
settingService *SettingService
|
||||
usageCacheMu sync.RWMutex
|
||||
usageCache map[int64]*geminiUsageCacheEntry
|
||||
}
|
||||
|
||||
type geminiUsageCacheEntry struct {
|
||||
@@ -44,6 +46,16 @@ func NewRateLimitService(accountRepo AccountRepository, usageRepo UsageLogReposi
|
||||
}
|
||||
}
|
||||
|
||||
// SetTimeoutCounterCache 设置超时计数器缓存(可选依赖)
|
||||
func (s *RateLimitService) SetTimeoutCounterCache(cache TimeoutCounterCache) {
|
||||
s.timeoutCounterCache = cache
|
||||
}
|
||||
|
||||
// SetSettingService 设置系统设置服务(可选依赖)
|
||||
func (s *RateLimitService) SetSettingService(settingService *SettingService) {
|
||||
s.settingService = settingService
|
||||
}
|
||||
|
||||
// HandleUpstreamError 处理上游错误响应,标记账号状态
|
||||
// 返回是否应该停止该账号的调度
|
||||
func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, responseBody []byte) (shouldDisable bool) {
|
||||
@@ -555,3 +567,125 @@ func truncateTempUnschedMessage(body []byte, maxBytes int) string {
|
||||
}
|
||||
return strings.TrimSpace(string(body))
|
||||
}
|
||||
|
||||
// HandleStreamTimeout 处理流数据超时
|
||||
// 根据系统设置决定是否标记账户为临时不可调度或错误状态
|
||||
// 返回是否应该停止该账号的调度
|
||||
func (s *RateLimitService) HandleStreamTimeout(ctx context.Context, account *Account, model string) bool {
|
||||
if account == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// 获取系统设置
|
||||
if s.settingService == nil {
|
||||
log.Printf("[StreamTimeout] settingService not configured, skipping timeout handling for account %d", account.ID)
|
||||
return false
|
||||
}
|
||||
|
||||
settings, err := s.settingService.GetStreamTimeoutSettings(ctx)
|
||||
if err != nil {
|
||||
log.Printf("[StreamTimeout] Failed to get settings: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if !settings.Enabled {
|
||||
return false
|
||||
}
|
||||
|
||||
if settings.Action == StreamTimeoutActionNone {
|
||||
return false
|
||||
}
|
||||
|
||||
// 增加超时计数
|
||||
var count int64 = 1
|
||||
if s.timeoutCounterCache != nil {
|
||||
count, err = s.timeoutCounterCache.IncrementTimeoutCount(ctx, account.ID, settings.ThresholdWindowMinutes)
|
||||
if err != nil {
|
||||
log.Printf("[StreamTimeout] Failed to increment timeout count for account %d: %v", account.ID, err)
|
||||
// 继续处理,使用 count=1
|
||||
count = 1
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[StreamTimeout] Account %d timeout count: %d/%d (window: %d min, model: %s)",
|
||||
account.ID, count, settings.ThresholdCount, settings.ThresholdWindowMinutes, model)
|
||||
|
||||
// 检查是否达到阈值
|
||||
if count < int64(settings.ThresholdCount) {
|
||||
return false
|
||||
}
|
||||
|
||||
// 达到阈值,执行相应操作
|
||||
switch settings.Action {
|
||||
case StreamTimeoutActionTempUnsched:
|
||||
return s.triggerStreamTimeoutTempUnsched(ctx, account, settings, model)
|
||||
case StreamTimeoutActionError:
|
||||
return s.triggerStreamTimeoutError(ctx, account, model)
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// triggerStreamTimeoutTempUnsched 触发流超时临时不可调度
|
||||
func (s *RateLimitService) triggerStreamTimeoutTempUnsched(ctx context.Context, account *Account, settings *StreamTimeoutSettings, model string) bool {
|
||||
now := time.Now()
|
||||
until := now.Add(time.Duration(settings.TempUnschedMinutes) * time.Minute)
|
||||
|
||||
state := &TempUnschedState{
|
||||
UntilUnix: until.Unix(),
|
||||
TriggeredAtUnix: now.Unix(),
|
||||
StatusCode: 0, // 超时没有状态码
|
||||
MatchedKeyword: "stream_timeout",
|
||||
RuleIndex: -1, // 表示系统级规则
|
||||
ErrorMessage: "Stream data interval timeout for model: " + model,
|
||||
}
|
||||
|
||||
reason := ""
|
||||
if raw, err := json.Marshal(state); err == nil {
|
||||
reason = string(raw)
|
||||
}
|
||||
if reason == "" {
|
||||
reason = state.ErrorMessage
|
||||
}
|
||||
|
||||
if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil {
|
||||
log.Printf("[StreamTimeout] SetTempUnschedulable failed for account %d: %v", account.ID, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if s.tempUnschedCache != nil {
|
||||
if err := s.tempUnschedCache.SetTempUnsched(ctx, account.ID, state); err != nil {
|
||||
log.Printf("[StreamTimeout] SetTempUnsched cache failed for account %d: %v", account.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 重置超时计数
|
||||
if s.timeoutCounterCache != nil {
|
||||
if err := s.timeoutCounterCache.ResetTimeoutCount(ctx, account.ID); err != nil {
|
||||
log.Printf("[StreamTimeout] ResetTimeoutCount failed for account %d: %v", account.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[StreamTimeout] Account %d marked as temp unschedulable until %v (model: %s)", account.ID, until, model)
|
||||
return true
|
||||
}
|
||||
|
||||
// triggerStreamTimeoutError 触发流超时错误状态
|
||||
func (s *RateLimitService) triggerStreamTimeoutError(ctx context.Context, account *Account, model string) bool {
|
||||
errorMsg := "Stream data interval timeout (repeated failures) for model: " + model
|
||||
|
||||
if err := s.accountRepo.SetError(ctx, account.ID, errorMsg); err != nil {
|
||||
log.Printf("[StreamTimeout] SetError failed for account %d: %v", account.ID, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// 重置超时计数
|
||||
if s.timeoutCounterCache != nil {
|
||||
if err := s.timeoutCounterCache.ResetTimeoutCount(ctx, account.ID); err != nil {
|
||||
log.Printf("[StreamTimeout] ResetTimeoutCount failed for account %d: %v", account.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[StreamTimeout] Account %d marked as error (model: %s)", account.ID, model)
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user