From 34d6b0a6016a57e71275d6eb96c2427325873245 Mon Sep 17 00:00:00 2001 From: song Date: Fri, 16 Jan 2026 20:18:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(gateway):=20=E8=B4=A6=E6=88=B7=E5=88=87?= =?UTF-8?q?=E6=8D=A2=E6=AC=A1=E6=95=B0=E5=92=8C=20Antigravity=20=E9=99=90?= =?UTF-8?q?=E6=B5=81=E6=97=B6=E9=97=B4=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - gateway.max_account_switches: 账户切换最大次数,默认 10 - gateway.max_account_switches_gemini: Gemini 账户切换次数,默认 3 - gateway.antigravity_fallback_cooldown_minutes: Antigravity 429 fallback 限流时间,默认 5 分钟 - Antigravity 429 不再重试,直接标记账户限流 --- backend/internal/config/config.go | 11 ++++++ backend/internal/handler/gateway_handler.go | 16 +++++++-- .../internal/handler/gemini_v1beta_handler.go | 2 +- .../handler/openai_gateway_handler.go | 8 ++++- .../service/antigravity_gateway_service.go | 36 +++++-------------- 5 files changed, 41 insertions(+), 32 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 2cc11967..b2105bc6 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -228,6 +228,14 @@ type GatewayConfig struct { // 是否允许对部分 400 错误触发 failover(默认关闭以避免改变语义) FailoverOn400 bool `mapstructure:"failover_on_400"` + // 账户切换最大次数(遇到上游错误时切换到其他账户的次数上限) + MaxAccountSwitches int `mapstructure:"max_account_switches"` + // Gemini 账户切换最大次数(Gemini 平台单独配置,因 API 限制更严格) + MaxAccountSwitchesGemini int `mapstructure:"max_account_switches_gemini"` + + // Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用 + AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"` + // Scheduling: 账号调度相关配置 Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"` } @@ -661,6 +669,9 @@ func setDefaults() { viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048) viper.SetDefault("gateway.inject_beta_for_apikey", false) viper.SetDefault("gateway.failover_on_400", false) + viper.SetDefault("gateway.max_account_switches", 10) + viper.SetDefault("gateway.max_account_switches_gemini", 3) + viper.SetDefault("gateway.antigravity_fallback_cooldown_minutes", 5) viper.SetDefault("gateway.max_body_size", int64(100*1024*1024)) viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy) // HTTP 上游连接池配置(针对 5000+ 并发用户优化) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 48a827f3..2cad9c40 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -30,6 +30,8 @@ type GatewayHandler struct { userService *service.UserService billingCacheService *service.BillingCacheService concurrencyHelper *ConcurrencyHelper + maxAccountSwitches int + maxAccountSwitchesGemini int } // NewGatewayHandler creates a new GatewayHandler @@ -43,8 +45,16 @@ func NewGatewayHandler( cfg *config.Config, ) *GatewayHandler { pingInterval := time.Duration(0) + maxAccountSwitches := 10 + maxAccountSwitchesGemini := 3 if cfg != nil { pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second + if cfg.Gateway.MaxAccountSwitches > 0 { + maxAccountSwitches = cfg.Gateway.MaxAccountSwitches + } + if cfg.Gateway.MaxAccountSwitchesGemini > 0 { + maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini + } } return &GatewayHandler{ gatewayService: gatewayService, @@ -53,6 +63,8 @@ func NewGatewayHandler( userService: userService, billingCacheService: billingCacheService, concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval), + maxAccountSwitches: maxAccountSwitches, + maxAccountSwitchesGemini: maxAccountSwitchesGemini, } } @@ -164,7 +176,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } if platform == service.PlatformGemini { - const maxAccountSwitches = 3 + maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 failedAccountIDs := make(map[int64]struct{}) lastFailoverStatus := 0 @@ -291,7 +303,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } } - const maxAccountSwitches = 10 + maxAccountSwitches := h.maxAccountSwitches switchCount := 0 failedAccountIDs := make(map[int64]struct{}) lastFailoverStatus := 0 diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 0cbe44f2..9909fa90 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -212,7 +212,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { if sessionHash != "" { sessionKey = "gemini:" + sessionHash } - const maxAccountSwitches = 3 + maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 failedAccountIDs := make(map[int64]struct{}) lastFailoverStatus := 0 diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 70131417..334d1368 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -23,6 +23,7 @@ type OpenAIGatewayHandler struct { gatewayService *service.OpenAIGatewayService billingCacheService *service.BillingCacheService concurrencyHelper *ConcurrencyHelper + maxAccountSwitches int } // NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler @@ -33,13 +34,18 @@ func NewOpenAIGatewayHandler( cfg *config.Config, ) *OpenAIGatewayHandler { pingInterval := time.Duration(0) + maxAccountSwitches := 3 if cfg != nil { pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second + if cfg.Gateway.MaxAccountSwitches > 0 { + maxAccountSwitches = cfg.Gateway.MaxAccountSwitches + } } return &OpenAIGatewayHandler{ gatewayService: gatewayService, billingCacheService: billingCacheService, concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval), + maxAccountSwitches: maxAccountSwitches, } } @@ -147,7 +153,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { // Generate session hash (from header for OpenAI) sessionHash := h.gatewayService.GenerateSessionHash(c) - const maxAccountSwitches = 3 + maxAccountSwitches := h.maxAccountSwitches switchCount := 0 failedAccountIDs := make(map[int64]struct{}) lastFailoverStatus := 0 diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 347877ee..a0e845ee 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -587,19 +587,11 @@ urlFallbackLoop: return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries") } - // 429 重试3次后限流账户 + // 429 不重试,直接限流账户 if resp.StatusCode == http.StatusTooManyRequests { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() - if attempt < 3 { - log.Printf("%s status=429 retry=%d/3 body=%s", prefix, attempt, truncateForLog(respBody, 200)) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - return nil, ctx.Err() - } - continue - } - // 3次重试都失败,限流账户 s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200)) resp = &http.Response{ @@ -622,10 +614,6 @@ urlFallbackLoop: } continue } - // 所有重试都失败,标记限流状态 - if resp.StatusCode == 429 { - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) - } // 最后一次尝试也失败 resp = &http.Response{ StatusCode: resp.StatusCode, @@ -1145,19 +1133,11 @@ urlFallbackLoop: return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries") } - // 429 重试3次后限流账户 + // 429 不重试,直接限流账户 if resp.StatusCode == http.StatusTooManyRequests { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() - if attempt < 3 { - log.Printf("%s status=429 retry=%d/3 body=%s", prefix, attempt, truncateForLog(respBody, 200)) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - return nil, ctx.Err() - } - continue - } - // 3次重试都失败,限流账户 s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200)) resp = &http.Response{ @@ -1180,10 +1160,6 @@ urlFallbackLoop: } continue } - // 所有重试都失败,标记限流状态 - if resp.StatusCode == 429 { - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) - } resp = &http.Response{ StatusCode: resp.StatusCode, Header: resp.Header.Clone(), @@ -1356,8 +1332,12 @@ func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, pre if statusCode == 429 { resetAt := ParseGeminiRateLimitResetTime(body) if resetAt == nil { - // 解析失败:默认 5 分钟,直接限流整个账户 - defaultDur := 5 * time.Minute + // 解析失败:使用配置的 fallback 时间,直接限流整个账户 + fallbackMinutes := 5 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes > 0 { + fallbackMinutes = s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes + } + defaultDur := time.Duration(fallbackMinutes) * time.Minute ra := time.Now().Add(defaultDur) log.Printf("%s status=429 rate_limited account=%d reset_in=%v (fallback)", prefix, account.ID, defaultDur) if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil {