From 2a0758bdfe7ca5c8db188a631c2ee83bb0f5e3f3 Mon Sep 17 00:00:00 2001 From: ianshaw Date: Sun, 11 Jan 2026 21:54:52 -0800 Subject: [PATCH 1/4] =?UTF-8?q?feat(gateway):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=B5=81=E8=B6=85=E6=97=B6=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加 StreamTimeoutSettings 配置结构体和系统设置 - 实现 TimeoutCounterCache Redis 计数器用于累计超时次数 - 在 RateLimitService 添加 HandleStreamTimeout 方法 - 在 gateway_service、openai_gateway_service、antigravity_gateway_service 中调用超时处理 - 添加后端 API 端点 GET/PUT /admin/settings/stream-timeout - 添加前端配置界面到系统设置页面 - 支持配置:启用开关、超时阈值、处理方式、暂停时长、触发阈值、阈值窗口 默认配置: - 启用:true - 超时阈值:60秒 - 处理方式:临时不可调度 - 暂停时长:5分钟 - 触发阈值:3次 - 阈值窗口:10分钟 --- backend/cmd/server/wire_gen.go | 3 +- .../internal/handler/admin/setting_handler.go | 69 ++++++ backend/internal/handler/dto/settings.go | 10 + .../repository/timeout_counter_cache.go | 80 +++++++ backend/internal/repository/wire.go | 1 + backend/internal/server/routes/admin.go | 3 + .../service/antigravity_gateway_service.go | 2 + backend/internal/service/domain_constants.go | 7 + backend/internal/service/gateway_service.go | 4 + .../service/openai_gateway_service.go | 4 + backend/internal/service/ratelimit_service.go | 148 ++++++++++++- backend/internal/service/setting_service.go | 97 +++++++++ backend/internal/service/settings_view.go | 35 +++ backend/internal/service/temp_unsched.go | 14 ++ backend/internal/service/wire.go | 18 +- frontend/src/api/admin/settings.ts | 40 +++- frontend/src/i18n/locales/en.ts | 21 ++ frontend/src/i18n/locales/zh.ts | 21 ++ frontend/src/views/admin/SettingsView.vue | 203 ++++++++++++++++++ 19 files changed, 770 insertions(+), 10 deletions(-) create mode 100644 backend/internal/repository/timeout_counter_cache.go diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index e66e0e05..52018dc0 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -97,7 +97,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { antigravityOAuthService := service.NewAntigravityOAuthService(proxyRepository) geminiQuotaService := service.NewGeminiQuotaService(configConfig, settingRepository) tempUnschedCache := repository.NewTempUnschedCache(redisClient) - rateLimitService := service.NewRateLimitService(accountRepository, usageLogRepository, configConfig, geminiQuotaService, tempUnschedCache) + timeoutCounterCache := repository.NewTimeoutCounterCache(redisClient) + rateLimitService := service.ProvideRateLimitService(accountRepository, usageLogRepository, configConfig, geminiQuotaService, tempUnschedCache, timeoutCounterCache, settingService) claudeUsageFetcher := repository.NewClaudeUsageFetcher() antigravityQuotaFetcher := service.NewAntigravityQuotaFetcher(proxyRepository) usageCache := service.NewUsageCache() diff --git a/backend/internal/handler/admin/setting_handler.go b/backend/internal/handler/admin/setting_handler.go index 2f9785ee..1071879b 100644 --- a/backend/internal/handler/admin/setting_handler.go +++ b/backend/internal/handler/admin/setting_handler.go @@ -654,3 +654,72 @@ func (h *SettingHandler) DeleteAdminAPIKey(c *gin.Context) { response.Success(c, gin.H{"message": "Admin API key deleted"}) } + +// GetStreamTimeoutSettings 获取流超时处理配置 +// GET /api/v1/admin/settings/stream-timeout +func (h *SettingHandler) GetStreamTimeoutSettings(c *gin.Context) { + settings, err := h.settingService.GetStreamTimeoutSettings(c.Request.Context()) + if err != nil { + response.ErrorFrom(c, err) + return + } + + response.Success(c, dto.StreamTimeoutSettings{ + Enabled: settings.Enabled, + TimeoutSeconds: settings.TimeoutSeconds, + Action: settings.Action, + TempUnschedMinutes: settings.TempUnschedMinutes, + ThresholdCount: settings.ThresholdCount, + ThresholdWindowMinutes: settings.ThresholdWindowMinutes, + }) +} + +// UpdateStreamTimeoutSettingsRequest 更新流超时配置请求 +type UpdateStreamTimeoutSettingsRequest struct { + Enabled bool `json:"enabled"` + TimeoutSeconds int `json:"timeout_seconds"` + Action string `json:"action"` + TempUnschedMinutes int `json:"temp_unsched_minutes"` + ThresholdCount int `json:"threshold_count"` + ThresholdWindowMinutes int `json:"threshold_window_minutes"` +} + +// UpdateStreamTimeoutSettings 更新流超时处理配置 +// PUT /api/v1/admin/settings/stream-timeout +func (h *SettingHandler) UpdateStreamTimeoutSettings(c *gin.Context) { + var req UpdateStreamTimeoutSettingsRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request: "+err.Error()) + return + } + + settings := &service.StreamTimeoutSettings{ + Enabled: req.Enabled, + TimeoutSeconds: req.TimeoutSeconds, + Action: req.Action, + TempUnschedMinutes: req.TempUnschedMinutes, + ThresholdCount: req.ThresholdCount, + ThresholdWindowMinutes: req.ThresholdWindowMinutes, + } + + if err := h.settingService.SetStreamTimeoutSettings(c.Request.Context(), settings); err != nil { + response.BadRequest(c, err.Error()) + return + } + + // 重新获取设置返回 + updatedSettings, err := h.settingService.GetStreamTimeoutSettings(c.Request.Context()) + if err != nil { + response.ErrorFrom(c, err) + return + } + + response.Success(c, dto.StreamTimeoutSettings{ + Enabled: updatedSettings.Enabled, + TimeoutSeconds: updatedSettings.TimeoutSeconds, + Action: updatedSettings.Action, + TempUnschedMinutes: updatedSettings.TempUnschedMinutes, + ThresholdCount: updatedSettings.ThresholdCount, + ThresholdWindowMinutes: updatedSettings.ThresholdWindowMinutes, + }) +} diff --git a/backend/internal/handler/dto/settings.go b/backend/internal/handler/dto/settings.go index d95fb121..bb498914 100644 --- a/backend/internal/handler/dto/settings.go +++ b/backend/internal/handler/dto/settings.go @@ -66,3 +66,13 @@ type PublicSettings struct { LinuxDoOAuthEnabled bool `json:"linuxdo_oauth_enabled"` Version string `json:"version"` } + +// StreamTimeoutSettings 流超时处理配置 DTO +type StreamTimeoutSettings struct { + Enabled bool `json:"enabled"` + TimeoutSeconds int `json:"timeout_seconds"` + Action string `json:"action"` + TempUnschedMinutes int `json:"temp_unsched_minutes"` + ThresholdCount int `json:"threshold_count"` + ThresholdWindowMinutes int `json:"threshold_window_minutes"` +} diff --git a/backend/internal/repository/timeout_counter_cache.go b/backend/internal/repository/timeout_counter_cache.go new file mode 100644 index 00000000..64cde22a --- /dev/null +++ b/backend/internal/repository/timeout_counter_cache.go @@ -0,0 +1,80 @@ +package repository + +import ( + "context" + "fmt" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/redis/go-redis/v9" +) + +const timeoutCounterPrefix = "timeout_count:account:" + +// timeoutCounterIncrScript 使用 Lua 脚本原子性地增加计数并返回当前值 +// 如果 key 不存在,则创建并设置过期时间 +var timeoutCounterIncrScript = redis.NewScript(` + local key = KEYS[1] + local ttl = tonumber(ARGV[1]) + + local count = redis.call('INCR', key) + if count == 1 then + redis.call('EXPIRE', key, ttl) + end + + return count +`) + +type timeoutCounterCache struct { + rdb *redis.Client +} + +// NewTimeoutCounterCache 创建超时计数器缓存实例 +func NewTimeoutCounterCache(rdb *redis.Client) service.TimeoutCounterCache { + return &timeoutCounterCache{rdb: rdb} +} + +// IncrementTimeoutCount 增加账户的超时计数,返回当前计数值 +// windowMinutes 是计数窗口时间(分钟),超过此时间计数器会自动重置 +func (c *timeoutCounterCache) IncrementTimeoutCount(ctx context.Context, accountID int64, windowMinutes int) (int64, error) { + key := fmt.Sprintf("%s%d", timeoutCounterPrefix, accountID) + + ttlSeconds := windowMinutes * 60 + if ttlSeconds < 60 { + ttlSeconds = 60 // 最小1分钟 + } + + result, err := timeoutCounterIncrScript.Run(ctx, c.rdb, []string{key}, ttlSeconds).Int64() + if err != nil { + return 0, fmt.Errorf("increment timeout count: %w", err) + } + + return result, nil +} + +// GetTimeoutCount 获取账户当前的超时计数 +func (c *timeoutCounterCache) GetTimeoutCount(ctx context.Context, accountID int64) (int64, error) { + key := fmt.Sprintf("%s%d", timeoutCounterPrefix, accountID) + + val, err := c.rdb.Get(ctx, key).Int64() + if err == redis.Nil { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("get timeout count: %w", err) + } + + return val, nil +} + +// ResetTimeoutCount 重置账户的超时计数 +func (c *timeoutCounterCache) ResetTimeoutCount(ctx context.Context, accountID int64) error { + key := fmt.Sprintf("%s%d", timeoutCounterPrefix, accountID) + return c.rdb.Del(ctx, key).Err() +} + +// GetTimeoutCountTTL 获取计数器剩余过期时间 +func (c *timeoutCounterCache) GetTimeoutCountTTL(ctx context.Context, accountID int64) (time.Duration, error) { + key := fmt.Sprintf("%s%d", timeoutCounterPrefix, accountID) + return c.rdb.TTL(ctx, key).Result() +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index e1c6c3d4..4bd18b9a 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -59,6 +59,7 @@ var ProviderSet = wire.NewSet( NewBillingCache, NewAPIKeyCache, NewTempUnschedCache, + NewTimeoutCounterCache, ProvideConcurrencyCache, NewDashboardCache, NewEmailCache, diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index a2f1b8c7..111e4578 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -283,6 +283,9 @@ func registerSettingsRoutes(admin *gin.RouterGroup, h *handler.Handlers) { adminSettings.GET("/admin-api-key", h.Admin.Setting.GetAdminAPIKey) adminSettings.POST("/admin-api-key/regenerate", h.Admin.Setting.RegenerateAdminAPIKey) adminSettings.DELETE("/admin-api-key", h.Admin.Setting.DeleteAdminAPIKey) + // 流超时处理配置 + adminSettings.GET("/stream-timeout", h.Admin.Setting.GetStreamTimeoutSettings) + adminSettings.PUT("/stream-timeout", h.Admin.Setting.UpdateStreamTimeoutSettings) } } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 4dd4d303..3dab7b7f 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -1717,6 +1717,7 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context continue } log.Printf("Stream data interval timeout (antigravity)") + // 注意:此函数没有 account 上下文,无法调用 HandleStreamTimeout sendErrorEvent("stream_timeout") return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") } @@ -2271,6 +2272,7 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context continue } log.Printf("Stream data interval timeout (antigravity)") + // 注意:此函数没有 account 上下文,无法调用 HandleStreamTimeout sendErrorEvent("stream_timeout") return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") } diff --git a/backend/internal/service/domain_constants.go b/backend/internal/service/domain_constants.go index 398d9fbd..49bb86a7 100644 --- a/backend/internal/service/domain_constants.go +++ b/backend/internal/service/domain_constants.go @@ -146,6 +146,13 @@ const ( // SettingKeyOpsAdvancedSettings stores JSON config for ops advanced settings (data retention, aggregation). SettingKeyOpsAdvancedSettings = "ops_advanced_settings" + + // ========================= + // Stream Timeout Handling + // ========================= + + // SettingKeyStreamTimeoutSettings stores JSON config for stream timeout handling. + SettingKeyStreamTimeoutSettings = "stream_timeout_settings" ) // AdminAPIKeyPrefix is the prefix for admin API keys (distinct from user "sk-" keys). diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index b48af7b0..164dd1e2 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2340,6 +2340,10 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil } log.Printf("Stream data interval timeout: account=%d model=%s interval=%s", account.ID, originalModel, streamInterval) + // 处理流超时,可能标记账户为临时不可调度或错误状态 + if s.rateLimitService != nil { + s.rateLimitService.HandleStreamTimeout(ctx, account, originalModel) + } sendErrorEvent("stream_timeout") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index b3ee469a..5f423b5e 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1042,6 +1042,10 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp continue } log.Printf("Stream data interval timeout: account=%d model=%s interval=%s", account.ID, originalModel, streamInterval) + // 处理流超时,可能标记账户为临时不可调度或错误状态 + if s.rateLimitService != nil { + s.rateLimitService.HandleStreamTimeout(ctx, account, originalModel) + } sendErrorEvent("stream_timeout") return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index d570b92e..e557d479 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -15,13 +15,15 @@ import ( // RateLimitService 处理限流和过载状态管理 type RateLimitService struct { - accountRepo AccountRepository - usageRepo UsageLogRepository - cfg *config.Config - geminiQuotaService *GeminiQuotaService - tempUnschedCache TempUnschedCache - usageCacheMu sync.RWMutex - usageCache map[int64]*geminiUsageCacheEntry + accountRepo AccountRepository + usageRepo UsageLogRepository + cfg *config.Config + geminiQuotaService *GeminiQuotaService + tempUnschedCache TempUnschedCache + timeoutCounterCache TimeoutCounterCache + settingService *SettingService + usageCacheMu sync.RWMutex + usageCache map[int64]*geminiUsageCacheEntry } type geminiUsageCacheEntry struct { @@ -44,6 +46,16 @@ func NewRateLimitService(accountRepo AccountRepository, usageRepo UsageLogReposi } } +// SetTimeoutCounterCache 设置超时计数器缓存(可选依赖) +func (s *RateLimitService) SetTimeoutCounterCache(cache TimeoutCounterCache) { + s.timeoutCounterCache = cache +} + +// SetSettingService 设置系统设置服务(可选依赖) +func (s *RateLimitService) SetSettingService(settingService *SettingService) { + s.settingService = settingService +} + // HandleUpstreamError 处理上游错误响应,标记账号状态 // 返回是否应该停止该账号的调度 func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, responseBody []byte) (shouldDisable bool) { @@ -555,3 +567,125 @@ func truncateTempUnschedMessage(body []byte, maxBytes int) string { } return strings.TrimSpace(string(body)) } + +// HandleStreamTimeout 处理流数据超时 +// 根据系统设置决定是否标记账户为临时不可调度或错误状态 +// 返回是否应该停止该账号的调度 +func (s *RateLimitService) HandleStreamTimeout(ctx context.Context, account *Account, model string) bool { + if account == nil { + return false + } + + // 获取系统设置 + if s.settingService == nil { + log.Printf("[StreamTimeout] settingService not configured, skipping timeout handling for account %d", account.ID) + return false + } + + settings, err := s.settingService.GetStreamTimeoutSettings(ctx) + if err != nil { + log.Printf("[StreamTimeout] Failed to get settings: %v", err) + return false + } + + if !settings.Enabled { + return false + } + + if settings.Action == StreamTimeoutActionNone { + return false + } + + // 增加超时计数 + var count int64 = 1 + if s.timeoutCounterCache != nil { + count, err = s.timeoutCounterCache.IncrementTimeoutCount(ctx, account.ID, settings.ThresholdWindowMinutes) + if err != nil { + log.Printf("[StreamTimeout] Failed to increment timeout count for account %d: %v", account.ID, err) + // 继续处理,使用 count=1 + count = 1 + } + } + + log.Printf("[StreamTimeout] Account %d timeout count: %d/%d (window: %d min, model: %s)", + account.ID, count, settings.ThresholdCount, settings.ThresholdWindowMinutes, model) + + // 检查是否达到阈值 + if count < int64(settings.ThresholdCount) { + return false + } + + // 达到阈值,执行相应操作 + switch settings.Action { + case StreamTimeoutActionTempUnsched: + return s.triggerStreamTimeoutTempUnsched(ctx, account, settings, model) + case StreamTimeoutActionError: + return s.triggerStreamTimeoutError(ctx, account, model) + default: + return false + } +} + +// triggerStreamTimeoutTempUnsched 触发流超时临时不可调度 +func (s *RateLimitService) triggerStreamTimeoutTempUnsched(ctx context.Context, account *Account, settings *StreamTimeoutSettings, model string) bool { + now := time.Now() + until := now.Add(time.Duration(settings.TempUnschedMinutes) * time.Minute) + + state := &TempUnschedState{ + UntilUnix: until.Unix(), + TriggeredAtUnix: now.Unix(), + StatusCode: 0, // 超时没有状态码 + MatchedKeyword: "stream_timeout", + RuleIndex: -1, // 表示系统级规则 + ErrorMessage: "Stream data interval timeout for model: " + model, + } + + reason := "" + if raw, err := json.Marshal(state); err == nil { + reason = string(raw) + } + if reason == "" { + reason = state.ErrorMessage + } + + if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil { + log.Printf("[StreamTimeout] SetTempUnschedulable failed for account %d: %v", account.ID, err) + return false + } + + if s.tempUnschedCache != nil { + if err := s.tempUnschedCache.SetTempUnsched(ctx, account.ID, state); err != nil { + log.Printf("[StreamTimeout] SetTempUnsched cache failed for account %d: %v", account.ID, err) + } + } + + // 重置超时计数 + if s.timeoutCounterCache != nil { + if err := s.timeoutCounterCache.ResetTimeoutCount(ctx, account.ID); err != nil { + log.Printf("[StreamTimeout] ResetTimeoutCount failed for account %d: %v", account.ID, err) + } + } + + log.Printf("[StreamTimeout] Account %d marked as temp unschedulable until %v (model: %s)", account.ID, until, model) + return true +} + +// triggerStreamTimeoutError 触发流超时错误状态 +func (s *RateLimitService) triggerStreamTimeoutError(ctx context.Context, account *Account, model string) bool { + errorMsg := "Stream data interval timeout (repeated failures) for model: " + model + + if err := s.accountRepo.SetError(ctx, account.ID, errorMsg); err != nil { + log.Printf("[StreamTimeout] SetError failed for account %d: %v", account.ID, err) + return false + } + + // 重置超时计数 + if s.timeoutCounterCache != nil { + if err := s.timeoutCounterCache.ResetTimeoutCount(ctx, account.ID); err != nil { + log.Printf("[StreamTimeout] ResetTimeoutCount failed for account %d: %v", account.ID, err) + } + } + + log.Printf("[StreamTimeout] Account %d marked as error (model: %s)", account.ID, model) + return true +} diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 863d8a57..4707a854 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "encoding/json" "errors" "fmt" "strconv" @@ -675,3 +676,99 @@ func (s *SettingService) GetLinuxDoConnectOAuthConfig(ctx context.Context) (conf return effective, nil } + +// GetStreamTimeoutSettings 获取流超时处理配置 +func (s *SettingService) GetStreamTimeoutSettings(ctx context.Context) (*StreamTimeoutSettings, error) { + value, err := s.settingRepo.GetValue(ctx, SettingKeyStreamTimeoutSettings) + if err != nil { + if errors.Is(err, ErrSettingNotFound) { + return DefaultStreamTimeoutSettings(), nil + } + return nil, fmt.Errorf("get stream timeout settings: %w", err) + } + if value == "" { + return DefaultStreamTimeoutSettings(), nil + } + + var settings StreamTimeoutSettings + if err := json.Unmarshal([]byte(value), &settings); err != nil { + return DefaultStreamTimeoutSettings(), nil + } + + // 验证并修正配置值 + if settings.TimeoutSeconds < 0 { + settings.TimeoutSeconds = 0 + } + if settings.TimeoutSeconds > 0 && settings.TimeoutSeconds < 30 { + settings.TimeoutSeconds = 30 + } + if settings.TimeoutSeconds > 300 { + settings.TimeoutSeconds = 300 + } + if settings.TempUnschedMinutes < 1 { + settings.TempUnschedMinutes = 1 + } + if settings.TempUnschedMinutes > 60 { + settings.TempUnschedMinutes = 60 + } + if settings.ThresholdCount < 1 { + settings.ThresholdCount = 1 + } + if settings.ThresholdCount > 10 { + settings.ThresholdCount = 10 + } + if settings.ThresholdWindowMinutes < 1 { + settings.ThresholdWindowMinutes = 1 + } + if settings.ThresholdWindowMinutes > 60 { + settings.ThresholdWindowMinutes = 60 + } + + // 验证 action + switch settings.Action { + case StreamTimeoutActionTempUnsched, StreamTimeoutActionError, StreamTimeoutActionNone: + // valid + default: + settings.Action = StreamTimeoutActionTempUnsched + } + + return &settings, nil +} + +// SetStreamTimeoutSettings 设置流超时处理配置 +func (s *SettingService) SetStreamTimeoutSettings(ctx context.Context, settings *StreamTimeoutSettings) error { + if settings == nil { + return fmt.Errorf("settings cannot be nil") + } + + // 验证配置值 + if settings.TimeoutSeconds < 0 { + return fmt.Errorf("timeout_seconds must be non-negative") + } + if settings.TimeoutSeconds > 0 && (settings.TimeoutSeconds < 30 || settings.TimeoutSeconds > 300) { + return fmt.Errorf("timeout_seconds must be 0 or between 30-300") + } + if settings.TempUnschedMinutes < 1 || settings.TempUnschedMinutes > 60 { + return fmt.Errorf("temp_unsched_minutes must be between 1-60") + } + if settings.ThresholdCount < 1 || settings.ThresholdCount > 10 { + return fmt.Errorf("threshold_count must be between 1-10") + } + if settings.ThresholdWindowMinutes < 1 || settings.ThresholdWindowMinutes > 60 { + return fmt.Errorf("threshold_window_minutes must be between 1-60") + } + + switch settings.Action { + case StreamTimeoutActionTempUnsched, StreamTimeoutActionError, StreamTimeoutActionNone: + // valid + default: + return fmt.Errorf("invalid action: %s", settings.Action) + } + + data, err := json.Marshal(settings) + if err != nil { + return fmt.Errorf("marshal stream timeout settings: %w", err) + } + + return s.settingRepo.Set(ctx, SettingKeyStreamTimeoutSettings, string(data)) +} diff --git a/backend/internal/service/settings_view.go b/backend/internal/service/settings_view.go index e20a230a..ce5387d8 100644 --- a/backend/internal/service/settings_view.go +++ b/backend/internal/service/settings_view.go @@ -69,3 +69,38 @@ type PublicSettings struct { LinuxDoOAuthEnabled bool Version string } + +// StreamTimeoutSettings 流超时处理配置 +type StreamTimeoutSettings struct { + // Enabled 是否启用流超时处理 + Enabled bool `json:"enabled"` + // TimeoutSeconds 流数据间隔超时阈值(秒),0表示禁用 + TimeoutSeconds int `json:"timeout_seconds"` + // Action 超时后的处理方式: "temp_unsched" | "error" | "none" + Action string `json:"action"` + // TempUnschedMinutes 临时不可调度持续时间(分钟) + TempUnschedMinutes int `json:"temp_unsched_minutes"` + // ThresholdCount 触发阈值次数(累计多少次超时才触发) + ThresholdCount int `json:"threshold_count"` + // ThresholdWindowMinutes 阈值窗口时间(分钟) + ThresholdWindowMinutes int `json:"threshold_window_minutes"` +} + +// StreamTimeoutAction 流超时处理方式常量 +const ( + StreamTimeoutActionTempUnsched = "temp_unsched" // 临时不可调度 + StreamTimeoutActionError = "error" // 标记为错误状态 + StreamTimeoutActionNone = "none" // 不处理 +) + +// DefaultStreamTimeoutSettings 返回默认的流超时配置 +func DefaultStreamTimeoutSettings() *StreamTimeoutSettings { + return &StreamTimeoutSettings{ + Enabled: true, + TimeoutSeconds: 60, + Action: StreamTimeoutActionTempUnsched, + TempUnschedMinutes: 5, + ThresholdCount: 3, + ThresholdWindowMinutes: 10, + } +} diff --git a/backend/internal/service/temp_unsched.go b/backend/internal/service/temp_unsched.go index fcb5025e..3871b72b 100644 --- a/backend/internal/service/temp_unsched.go +++ b/backend/internal/service/temp_unsched.go @@ -2,6 +2,7 @@ package service import ( "context" + "time" ) // TempUnschedState 临时不可调度状态 @@ -20,3 +21,16 @@ type TempUnschedCache interface { GetTempUnsched(ctx context.Context, accountID int64) (*TempUnschedState, error) DeleteTempUnsched(ctx context.Context, accountID int64) error } + +// TimeoutCounterCache 超时计数器缓存接口 +type TimeoutCounterCache interface { + // IncrementTimeoutCount 增加账户的超时计数,返回当前计数值 + // windowMinutes 是计数窗口时间(分钟),超过此时间计数器会自动重置 + IncrementTimeoutCount(ctx context.Context, accountID int64, windowMinutes int) (int64, error) + // GetTimeoutCount 获取账户当前的超时计数 + GetTimeoutCount(ctx context.Context, accountID int64) (int64, error) + // ResetTimeoutCount 重置账户的超时计数 + ResetTimeoutCount(ctx context.Context, accountID int64) error + // GetTimeoutCountTTL 获取计数器剩余过期时间 + GetTimeoutCountTTL(ctx context.Context, accountID int64) (time.Duration, error) +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index f2cb9c44..40e3b166 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -86,6 +86,22 @@ func ProvideConcurrencyService(cache ConcurrencyCache, accountRepo AccountReposi return svc } +// ProvideRateLimitService creates RateLimitService with optional dependencies. +func ProvideRateLimitService( + accountRepo AccountRepository, + usageRepo UsageLogRepository, + cfg *config.Config, + geminiQuotaService *GeminiQuotaService, + tempUnschedCache TempUnschedCache, + timeoutCounterCache TimeoutCounterCache, + settingService *SettingService, +) *RateLimitService { + svc := NewRateLimitService(accountRepo, usageRepo, cfg, geminiQuotaService, tempUnschedCache) + svc.SetTimeoutCounterCache(timeoutCounterCache) + svc.SetSettingService(settingService) + return svc +} + // ProvideOpsMetricsCollector creates and starts OpsMetricsCollector. func ProvideOpsMetricsCollector( opsRepo OpsRepository, @@ -186,7 +202,7 @@ var ProviderSet = wire.NewSet( NewGeminiMessagesCompatService, NewAntigravityTokenProvider, NewAntigravityGatewayService, - NewRateLimitService, + ProvideRateLimitService, NewAccountUsageService, NewAccountTestService, NewSettingService, diff --git a/frontend/src/api/admin/settings.ts b/frontend/src/api/admin/settings.ts index 913c9652..eb046741 100644 --- a/frontend/src/api/admin/settings.ts +++ b/frontend/src/api/admin/settings.ts @@ -201,6 +201,42 @@ export async function deleteAdminApiKey(): Promise<{ message: string }> { return data } +/** + * Stream timeout settings interface + */ +export interface StreamTimeoutSettings { + enabled: boolean + timeout_seconds: number + action: 'temp_unsched' | 'error' | 'none' + temp_unsched_minutes: number + threshold_count: number + threshold_window_minutes: number +} + +/** + * Get stream timeout settings + * @returns Stream timeout settings + */ +export async function getStreamTimeoutSettings(): Promise { + const { data } = await apiClient.get('/admin/settings/stream-timeout') + return data +} + +/** + * Update stream timeout settings + * @param settings - Stream timeout settings to update + * @returns Updated settings + */ +export async function updateStreamTimeoutSettings( + settings: StreamTimeoutSettings +): Promise { + const { data } = await apiClient.put( + '/admin/settings/stream-timeout', + settings + ) + return data +} + export const settingsAPI = { getSettings, updateSettings, @@ -208,7 +244,9 @@ export const settingsAPI = { sendTestEmail, getAdminApiKey, regenerateAdminApiKey, - deleteAdminApiKey + deleteAdminApiKey, + getStreamTimeoutSettings, + updateStreamTimeoutSettings } export default settingsAPI diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index f0e7db55..23e9a1fb 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -2512,6 +2512,27 @@ export default { securityWarning: 'Warning: This key provides full admin access. Keep it secure.', usage: 'Usage: Add to request header - x-api-key: ' }, + streamTimeout: { + title: 'Stream Timeout Handling', + description: 'Configure account handling strategy when upstream response times out', + enabled: 'Enable Stream Timeout Handling', + enabledHint: 'Automatically handle problematic accounts when upstream times out', + timeoutSeconds: 'Timeout Threshold (seconds)', + timeoutSecondsHint: 'Stream data interval exceeding this time is considered timeout (30-300s)', + action: 'Action', + actionTempUnsched: 'Temporarily Unschedulable', + actionError: 'Mark as Error', + actionNone: 'No Action', + actionHint: 'Action to take on the account after timeout', + tempUnschedMinutes: 'Pause Duration (minutes)', + tempUnschedMinutesHint: 'Duration of temporary unschedulable state (1-60 minutes)', + thresholdCount: 'Trigger Threshold (count)', + thresholdCountHint: 'Number of timeouts before triggering action (1-10)', + thresholdWindowMinutes: 'Threshold Window (minutes)', + thresholdWindowMinutesHint: 'Time window for counting timeouts (1-60 minutes)', + saved: 'Stream timeout settings saved', + saveFailed: 'Failed to save stream timeout settings' + }, saveSettings: 'Save Settings', saving: 'Saving...', settingsSaved: 'Settings saved successfully', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index ecdcb13f..fb77e834 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -2696,6 +2696,27 @@ export default { securityWarning: '警告:此密钥拥有完整的管理员权限,请妥善保管。', usage: '使用方法:在请求头中添加 x-api-key: ' }, + streamTimeout: { + title: '流超时处理', + description: '配置上游响应超时时的账户处理策略,避免问题账户持续被选中', + enabled: '启用流超时处理', + enabledHint: '当上游响应超时时,自动处理问题账户', + timeoutSeconds: '超时阈值(秒)', + timeoutSecondsHint: '流数据间隔超过此时间视为超时(30-300秒)', + action: '处理方式', + actionTempUnsched: '临时不可调度', + actionError: '标记为错误状态', + actionNone: '不处理', + actionHint: '超时后对账户执行的操作', + tempUnschedMinutes: '暂停时长(分钟)', + tempUnschedMinutesHint: '临时不可调度的持续时间(1-60分钟)', + thresholdCount: '触发阈值(次数)', + thresholdCountHint: '累计超时多少次后触发处理(1-10次)', + thresholdWindowMinutes: '阈值窗口(分钟)', + thresholdWindowMinutesHint: '超时计数的时间窗口(1-60分钟)', + saved: '流超时设置保存成功', + saveFailed: '保存流超时设置失败' + }, saveSettings: '保存设置', saving: '保存中...', settingsSaved: '设置保存成功', diff --git a/frontend/src/views/admin/SettingsView.vue b/frontend/src/views/admin/SettingsView.vue index 57b18d0d..890ba28c 100644 --- a/frontend/src/views/admin/SettingsView.vue +++ b/frontend/src/views/admin/SettingsView.vue @@ -147,6 +147,161 @@ + +
+
+

+ {{ t('admin.settings.streamTimeout.title') }} +

+

+ {{ t('admin.settings.streamTimeout.description') }} +

+
+
+ +
+
+ {{ t('common.loading') }} +
+ + +
+
+
@@ -840,6 +995,18 @@ const adminApiKeyMasked = ref('') const adminApiKeyOperating = ref(false) const newAdminApiKey = ref('') +// Stream Timeout 状态 +const streamTimeoutLoading = ref(true) +const streamTimeoutSaving = ref(false) +const streamTimeoutForm = reactive({ + enabled: true, + timeout_seconds: 60, + action: 'temp_unsched' as 'temp_unsched' | 'error' | 'none', + temp_unsched_minutes: 5, + threshold_count: 3, + threshold_window_minutes: 10 +}) + type SettingsForm = SystemSettings & { smtp_password: string turnstile_secret_key: string @@ -1129,8 +1296,44 @@ function copyNewKey() { }) } +// Stream Timeout 方法 +async function loadStreamTimeoutSettings() { + streamTimeoutLoading.value = true + try { + const settings = await adminAPI.settings.getStreamTimeoutSettings() + Object.assign(streamTimeoutForm, settings) + } catch (error: any) { + console.error('Failed to load stream timeout settings:', error) + } finally { + streamTimeoutLoading.value = false + } +} + +async function saveStreamTimeoutSettings() { + streamTimeoutSaving.value = true + try { + const updated = await adminAPI.settings.updateStreamTimeoutSettings({ + enabled: streamTimeoutForm.enabled, + timeout_seconds: streamTimeoutForm.timeout_seconds, + action: streamTimeoutForm.action, + temp_unsched_minutes: streamTimeoutForm.temp_unsched_minutes, + threshold_count: streamTimeoutForm.threshold_count, + threshold_window_minutes: streamTimeoutForm.threshold_window_minutes + }) + Object.assign(streamTimeoutForm, updated) + appStore.showSuccess(t('admin.settings.streamTimeout.saved')) + } catch (error: any) { + appStore.showError( + t('admin.settings.streamTimeout.saveFailed') + ': ' + (error.message || t('common.unknownError')) + ) + } finally { + streamTimeoutSaving.value = false + } +} + onMounted(() => { loadSettings() loadAdminApiKey() + loadStreamTimeoutSettings() }) From 53e730f8d5399a8172a03a525e4b831de2923627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=B5=B7?= <7836246@qq.com> Date: Mon, 12 Jan 2026 14:03:56 +0800 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=89=A3=E6=AC=BE?= =?UTF-8?q?=E6=97=B6=E6=B5=AE=E7=82=B9=E6=95=B0=E7=B2=BE=E5=BA=A6=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E7=9A=84=E4=BD=99=E9=A2=9D=E4=B8=8D=E8=B6=B3=E8=AF=AF?= =?UTF-8?q?=E5=88=A4=E5=92=8C=20-0.00=20=E6=98=BE=E7=A4=BA=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/components/admin/user/UserBalanceModal.vue | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/frontend/src/components/admin/user/UserBalanceModal.vue b/frontend/src/components/admin/user/UserBalanceModal.vue index 1918577a..61d4785e 100644 --- a/frontend/src/components/admin/user/UserBalanceModal.vue +++ b/frontend/src/components/admin/user/UserBalanceModal.vue @@ -35,14 +35,22 @@ const emit = defineEmits(['close', 'success']); const { t } = useI18n(); const a const submitting = ref(false); const form = reactive({ amount: 0, notes: '' }) watch(() => props.show, (v) => { if(v) { form.amount = 0; form.notes = '' } }) -const calculateNewBalance = () => (props.user ? (props.operation === 'add' ? props.user.balance + form.amount : props.user.balance - form.amount) : 0) +const calculateNewBalance = () => { + if (!props.user) return 0 + const result = props.operation === 'add' ? props.user.balance + form.amount : props.user.balance - form.amount + // 避免浮点数精度问题导致的 -0.00 显示 + return result === 0 || Object.is(result, -0) ? 0 : result +} const handleBalanceSubmit = async () => { if (!props.user) return if (!form.amount || form.amount <= 0) { appStore.showError(t('admin.users.amountRequired')) return } - if (props.operation === 'subtract' && form.amount > props.user.balance) { + // 使用小数点后两位精度比较,避免浮点数精度问题 + const amount = Math.round(form.amount * 100) / 100 + const balance = Math.round(props.user.balance * 100) / 100 + if (props.operation === 'subtract' && amount > balance) { appStore.showError(t('admin.users.insufficientBalance')) return } From 0c52809591bd9256e3d502fde9652dd258a5ff2b Mon Sep 17 00:00:00 2001 From: ianshaw Date: Sun, 11 Jan 2026 22:09:35 -0800 Subject: [PATCH 3/4] =?UTF-8?q?refactor(settings):=20=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E6=B5=81=E8=B6=85=E6=97=B6=E9=85=8D=E7=BD=AE=EF=BC=8C=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E5=86=97=E4=BD=99=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除 TimeoutSeconds 字段,超时判定由网关配置控制 - 默认禁用流超时处理功能 --- backend/internal/handler/admin/setting_handler.go | 4 ---- backend/internal/handler/dto/settings.go | 1 - backend/internal/service/setting_service.go | 15 --------------- backend/internal/service/settings_view.go | 7 ++----- frontend/src/api/admin/settings.ts | 1 - 5 files changed, 2 insertions(+), 26 deletions(-) diff --git a/backend/internal/handler/admin/setting_handler.go b/backend/internal/handler/admin/setting_handler.go index 1071879b..6666ce4e 100644 --- a/backend/internal/handler/admin/setting_handler.go +++ b/backend/internal/handler/admin/setting_handler.go @@ -666,7 +666,6 @@ func (h *SettingHandler) GetStreamTimeoutSettings(c *gin.Context) { response.Success(c, dto.StreamTimeoutSettings{ Enabled: settings.Enabled, - TimeoutSeconds: settings.TimeoutSeconds, Action: settings.Action, TempUnschedMinutes: settings.TempUnschedMinutes, ThresholdCount: settings.ThresholdCount, @@ -677,7 +676,6 @@ func (h *SettingHandler) GetStreamTimeoutSettings(c *gin.Context) { // UpdateStreamTimeoutSettingsRequest 更新流超时配置请求 type UpdateStreamTimeoutSettingsRequest struct { Enabled bool `json:"enabled"` - TimeoutSeconds int `json:"timeout_seconds"` Action string `json:"action"` TempUnschedMinutes int `json:"temp_unsched_minutes"` ThresholdCount int `json:"threshold_count"` @@ -695,7 +693,6 @@ func (h *SettingHandler) UpdateStreamTimeoutSettings(c *gin.Context) { settings := &service.StreamTimeoutSettings{ Enabled: req.Enabled, - TimeoutSeconds: req.TimeoutSeconds, Action: req.Action, TempUnschedMinutes: req.TempUnschedMinutes, ThresholdCount: req.ThresholdCount, @@ -716,7 +713,6 @@ func (h *SettingHandler) UpdateStreamTimeoutSettings(c *gin.Context) { response.Success(c, dto.StreamTimeoutSettings{ Enabled: updatedSettings.Enabled, - TimeoutSeconds: updatedSettings.TimeoutSeconds, Action: updatedSettings.Action, TempUnschedMinutes: updatedSettings.TempUnschedMinutes, ThresholdCount: updatedSettings.ThresholdCount, diff --git a/backend/internal/handler/dto/settings.go b/backend/internal/handler/dto/settings.go index bb498914..81206def 100644 --- a/backend/internal/handler/dto/settings.go +++ b/backend/internal/handler/dto/settings.go @@ -70,7 +70,6 @@ type PublicSettings struct { // StreamTimeoutSettings 流超时处理配置 DTO type StreamTimeoutSettings struct { Enabled bool `json:"enabled"` - TimeoutSeconds int `json:"timeout_seconds"` Action string `json:"action"` TempUnschedMinutes int `json:"temp_unsched_minutes"` ThresholdCount int `json:"threshold_count"` diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 4707a854..0a7426f8 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -696,15 +696,6 @@ func (s *SettingService) GetStreamTimeoutSettings(ctx context.Context) (*StreamT } // 验证并修正配置值 - if settings.TimeoutSeconds < 0 { - settings.TimeoutSeconds = 0 - } - if settings.TimeoutSeconds > 0 && settings.TimeoutSeconds < 30 { - settings.TimeoutSeconds = 30 - } - if settings.TimeoutSeconds > 300 { - settings.TimeoutSeconds = 300 - } if settings.TempUnschedMinutes < 1 { settings.TempUnschedMinutes = 1 } @@ -742,12 +733,6 @@ func (s *SettingService) SetStreamTimeoutSettings(ctx context.Context, settings } // 验证配置值 - if settings.TimeoutSeconds < 0 { - return fmt.Errorf("timeout_seconds must be non-negative") - } - if settings.TimeoutSeconds > 0 && (settings.TimeoutSeconds < 30 || settings.TimeoutSeconds > 300) { - return fmt.Errorf("timeout_seconds must be 0 or between 30-300") - } if settings.TempUnschedMinutes < 1 || settings.TempUnschedMinutes > 60 { return fmt.Errorf("temp_unsched_minutes must be between 1-60") } diff --git a/backend/internal/service/settings_view.go b/backend/internal/service/settings_view.go index ce5387d8..e4ee2826 100644 --- a/backend/internal/service/settings_view.go +++ b/backend/internal/service/settings_view.go @@ -70,12 +70,10 @@ type PublicSettings struct { Version string } -// StreamTimeoutSettings 流超时处理配置 +// StreamTimeoutSettings 流超时处理配置(仅控制超时后的处理方式,超时判定由网关配置控制) type StreamTimeoutSettings struct { // Enabled 是否启用流超时处理 Enabled bool `json:"enabled"` - // TimeoutSeconds 流数据间隔超时阈值(秒),0表示禁用 - TimeoutSeconds int `json:"timeout_seconds"` // Action 超时后的处理方式: "temp_unsched" | "error" | "none" Action string `json:"action"` // TempUnschedMinutes 临时不可调度持续时间(分钟) @@ -96,8 +94,7 @@ const ( // DefaultStreamTimeoutSettings 返回默认的流超时配置 func DefaultStreamTimeoutSettings() *StreamTimeoutSettings { return &StreamTimeoutSettings{ - Enabled: true, - TimeoutSeconds: 60, + Enabled: false, Action: StreamTimeoutActionTempUnsched, TempUnschedMinutes: 5, ThresholdCount: 3, diff --git a/frontend/src/api/admin/settings.ts b/frontend/src/api/admin/settings.ts index eb046741..fc72be8d 100644 --- a/frontend/src/api/admin/settings.ts +++ b/frontend/src/api/admin/settings.ts @@ -206,7 +206,6 @@ export async function deleteAdminApiKey(): Promise<{ message: string }> { */ export interface StreamTimeoutSettings { enabled: boolean - timeout_seconds: number action: 'temp_unsched' | 'error' | 'none' temp_unsched_minutes: number threshold_count: number From 8c1958c9ad272069dc99492749e168d34c681311 Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Mon, 12 Jan 2026 15:13:39 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix(=E8=B0=83=E5=BA=A6):=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=B5=81=E8=B6=85=E6=97=B6=E9=85=8D=E7=BD=AE=E5=B9=B6?= =?UTF-8?q?=E8=A1=A5=E5=9B=9E=E6=94=BE=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 删除前端未支持的 timeout_seconds 字段,避免类型检查失败 新增调度 outbox 回放集成测试 调整调度默认等待超时断言 测试: make test --- backend/internal/config/config_test.go | 4 +- ...eduler_snapshot_outbox_integration_test.go | 81 +++++++++++++++++++ frontend/src/views/admin/SettingsView.vue | 19 ----- 3 files changed, 83 insertions(+), 21 deletions(-) create mode 100644 backend/internal/repository/scheduler_snapshot_outbox_integration_test.go diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 1ba6d053..4637989e 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -39,8 +39,8 @@ func TestLoadDefaultSchedulingConfig(t *testing.T) { if cfg.Gateway.Scheduling.StickySessionMaxWaiting != 3 { t.Fatalf("StickySessionMaxWaiting = %d, want 3", cfg.Gateway.Scheduling.StickySessionMaxWaiting) } - if cfg.Gateway.Scheduling.StickySessionWaitTimeout != 45*time.Second { - t.Fatalf("StickySessionWaitTimeout = %v, want 45s", cfg.Gateway.Scheduling.StickySessionWaitTimeout) + if cfg.Gateway.Scheduling.StickySessionWaitTimeout != 120*time.Second { + t.Fatalf("StickySessionWaitTimeout = %v, want 120s", cfg.Gateway.Scheduling.StickySessionWaitTimeout) } if cfg.Gateway.Scheduling.FallbackWaitTimeout != 30*time.Second { t.Fatalf("FallbackWaitTimeout = %v, want 30s", cfg.Gateway.Scheduling.FallbackWaitTimeout) diff --git a/backend/internal/repository/scheduler_snapshot_outbox_integration_test.go b/backend/internal/repository/scheduler_snapshot_outbox_integration_test.go new file mode 100644 index 00000000..e82d663f --- /dev/null +++ b/backend/internal/repository/scheduler_snapshot_outbox_integration_test.go @@ -0,0 +1,81 @@ +//go:build integration + +package repository + +import ( + "context" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/stretchr/testify/require" +) + +func TestSchedulerSnapshotOutboxReplay(t *testing.T) { + ctx := context.Background() + rdb := testRedis(t) + client := testEntClient(t) + + _, _ = integrationDB.ExecContext(ctx, "TRUNCATE scheduler_outbox") + + accountRepo := newAccountRepositoryWithSQL(client, integrationDB) + outboxRepo := NewSchedulerOutboxRepository(integrationDB) + cache := NewSchedulerCache(rdb) + + cfg := &config.Config{ + RunMode: config.RunModeStandard, + Gateway: config.GatewayConfig{ + Scheduling: config.GatewaySchedulingConfig{ + OutboxPollIntervalSeconds: 1, + FullRebuildIntervalSeconds: 0, + DbFallbackEnabled: true, + }, + }, + } + + account := &service.Account{ + Name: "outbox-replay-" + time.Now().Format("150405.000000"), + Platform: service.PlatformOpenAI, + Type: service.AccountTypeAPIKey, + Status: service.StatusActive, + Schedulable: true, + Concurrency: 3, + Priority: 1, + Credentials: map[string]any{}, + Extra: map[string]any{}, + } + require.NoError(t, accountRepo.Create(ctx, account)) + + svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg) + svc.Start() + t.Cleanup(svc.Stop) + + bucket := service.SchedulerBucket{GroupID: 0, Platform: service.PlatformOpenAI, Mode: service.SchedulerModeSingle} + require.Eventually(t, func() bool { + accounts, hit, err := cache.GetSnapshot(ctx, bucket) + if err != nil || !hit { + return false + } + for _, acc := range accounts { + if acc.ID == account.ID { + return true + } + } + return false + }, 5*time.Second, 100*time.Millisecond) + + require.NoError(t, accountRepo.UpdateLastUsed(ctx, account.ID)) + updated, err := accountRepo.GetByID(ctx, account.ID) + require.NoError(t, err) + require.NotNil(t, updated.LastUsedAt) + expectedUnix := updated.LastUsedAt.Unix() + + require.Eventually(t, func() bool { + cached, err := cache.GetAccount(ctx, account.ID) + if err != nil || cached == nil || cached.LastUsedAt == nil { + return false + } + return cached.LastUsedAt.Unix() == expectedUnix + }, 5*time.Second, 100*time.Millisecond) +} diff --git a/frontend/src/views/admin/SettingsView.vue b/frontend/src/views/admin/SettingsView.vue index 890ba28c..d46c3329 100644 --- a/frontend/src/views/admin/SettingsView.vue +++ b/frontend/src/views/admin/SettingsView.vue @@ -183,23 +183,6 @@ v-if="streamTimeoutForm.enabled" class="space-y-4 border-t border-gray-100 pt-4 dark:border-dark-700" > - -
- - -

- {{ t('admin.settings.streamTimeout.timeoutSecondsHint') }} -

-
-