feat(gateway): 账户切换次数和 Antigravity 限流时间可配置
- gateway.max_account_switches: 账户切换最大次数,默认 10 - gateway.max_account_switches_gemini: Gemini 账户切换次数,默认 3 - gateway.antigravity_fallback_cooldown_minutes: Antigravity 429 fallback 限流时间,默认 5 分钟 - Antigravity 429 不再重试,直接标记账户限流
This commit is contained in:
@@ -228,6 +228,14 @@ type GatewayConfig struct {
|
|||||||
// 是否允许对部分 400 错误触发 failover(默认关闭以避免改变语义)
|
// 是否允许对部分 400 错误触发 failover(默认关闭以避免改变语义)
|
||||||
FailoverOn400 bool `mapstructure:"failover_on_400"`
|
FailoverOn400 bool `mapstructure:"failover_on_400"`
|
||||||
|
|
||||||
|
// 账户切换最大次数(遇到上游错误时切换到其他账户的次数上限)
|
||||||
|
MaxAccountSwitches int `mapstructure:"max_account_switches"`
|
||||||
|
// Gemini 账户切换最大次数(Gemini 平台单独配置,因 API 限制更严格)
|
||||||
|
MaxAccountSwitchesGemini int `mapstructure:"max_account_switches_gemini"`
|
||||||
|
|
||||||
|
// Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用
|
||||||
|
AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"`
|
||||||
|
|
||||||
// Scheduling: 账号调度相关配置
|
// Scheduling: 账号调度相关配置
|
||||||
Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"`
|
Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"`
|
||||||
}
|
}
|
||||||
@@ -661,6 +669,9 @@ func setDefaults() {
|
|||||||
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
|
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
|
||||||
viper.SetDefault("gateway.inject_beta_for_apikey", false)
|
viper.SetDefault("gateway.inject_beta_for_apikey", false)
|
||||||
viper.SetDefault("gateway.failover_on_400", false)
|
viper.SetDefault("gateway.failover_on_400", false)
|
||||||
|
viper.SetDefault("gateway.max_account_switches", 10)
|
||||||
|
viper.SetDefault("gateway.max_account_switches_gemini", 3)
|
||||||
|
viper.SetDefault("gateway.antigravity_fallback_cooldown_minutes", 5)
|
||||||
viper.SetDefault("gateway.max_body_size", int64(100*1024*1024))
|
viper.SetDefault("gateway.max_body_size", int64(100*1024*1024))
|
||||||
viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy)
|
viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy)
|
||||||
// HTTP 上游连接池配置(针对 5000+ 并发用户优化)
|
// HTTP 上游连接池配置(针对 5000+ 并发用户优化)
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ type GatewayHandler struct {
|
|||||||
userService *service.UserService
|
userService *service.UserService
|
||||||
billingCacheService *service.BillingCacheService
|
billingCacheService *service.BillingCacheService
|
||||||
concurrencyHelper *ConcurrencyHelper
|
concurrencyHelper *ConcurrencyHelper
|
||||||
|
maxAccountSwitches int
|
||||||
|
maxAccountSwitchesGemini int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGatewayHandler creates a new GatewayHandler
|
// NewGatewayHandler creates a new GatewayHandler
|
||||||
@@ -43,8 +45,16 @@ func NewGatewayHandler(
|
|||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *GatewayHandler {
|
) *GatewayHandler {
|
||||||
pingInterval := time.Duration(0)
|
pingInterval := time.Duration(0)
|
||||||
|
maxAccountSwitches := 10
|
||||||
|
maxAccountSwitchesGemini := 3
|
||||||
if cfg != nil {
|
if cfg != nil {
|
||||||
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
|
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
|
||||||
|
if cfg.Gateway.MaxAccountSwitches > 0 {
|
||||||
|
maxAccountSwitches = cfg.Gateway.MaxAccountSwitches
|
||||||
|
}
|
||||||
|
if cfg.Gateway.MaxAccountSwitchesGemini > 0 {
|
||||||
|
maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &GatewayHandler{
|
return &GatewayHandler{
|
||||||
gatewayService: gatewayService,
|
gatewayService: gatewayService,
|
||||||
@@ -53,6 +63,8 @@ func NewGatewayHandler(
|
|||||||
userService: userService,
|
userService: userService,
|
||||||
billingCacheService: billingCacheService,
|
billingCacheService: billingCacheService,
|
||||||
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
|
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
|
||||||
|
maxAccountSwitches: maxAccountSwitches,
|
||||||
|
maxAccountSwitchesGemini: maxAccountSwitchesGemini,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,7 +176,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if platform == service.PlatformGemini {
|
if platform == service.PlatformGemini {
|
||||||
const maxAccountSwitches = 3
|
maxAccountSwitches := h.maxAccountSwitchesGemini
|
||||||
switchCount := 0
|
switchCount := 0
|
||||||
failedAccountIDs := make(map[int64]struct{})
|
failedAccountIDs := make(map[int64]struct{})
|
||||||
lastFailoverStatus := 0
|
lastFailoverStatus := 0
|
||||||
@@ -291,7 +303,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxAccountSwitches = 10
|
maxAccountSwitches := h.maxAccountSwitches
|
||||||
switchCount := 0
|
switchCount := 0
|
||||||
failedAccountIDs := make(map[int64]struct{})
|
failedAccountIDs := make(map[int64]struct{})
|
||||||
lastFailoverStatus := 0
|
lastFailoverStatus := 0
|
||||||
|
|||||||
@@ -212,7 +212,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
|||||||
if sessionHash != "" {
|
if sessionHash != "" {
|
||||||
sessionKey = "gemini:" + sessionHash
|
sessionKey = "gemini:" + sessionHash
|
||||||
}
|
}
|
||||||
const maxAccountSwitches = 3
|
maxAccountSwitches := h.maxAccountSwitchesGemini
|
||||||
switchCount := 0
|
switchCount := 0
|
||||||
failedAccountIDs := make(map[int64]struct{})
|
failedAccountIDs := make(map[int64]struct{})
|
||||||
lastFailoverStatus := 0
|
lastFailoverStatus := 0
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type OpenAIGatewayHandler struct {
|
|||||||
gatewayService *service.OpenAIGatewayService
|
gatewayService *service.OpenAIGatewayService
|
||||||
billingCacheService *service.BillingCacheService
|
billingCacheService *service.BillingCacheService
|
||||||
concurrencyHelper *ConcurrencyHelper
|
concurrencyHelper *ConcurrencyHelper
|
||||||
|
maxAccountSwitches int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler
|
// NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler
|
||||||
@@ -33,13 +34,18 @@ func NewOpenAIGatewayHandler(
|
|||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *OpenAIGatewayHandler {
|
) *OpenAIGatewayHandler {
|
||||||
pingInterval := time.Duration(0)
|
pingInterval := time.Duration(0)
|
||||||
|
maxAccountSwitches := 3
|
||||||
if cfg != nil {
|
if cfg != nil {
|
||||||
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
|
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
|
||||||
|
if cfg.Gateway.MaxAccountSwitches > 0 {
|
||||||
|
maxAccountSwitches = cfg.Gateway.MaxAccountSwitches
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &OpenAIGatewayHandler{
|
return &OpenAIGatewayHandler{
|
||||||
gatewayService: gatewayService,
|
gatewayService: gatewayService,
|
||||||
billingCacheService: billingCacheService,
|
billingCacheService: billingCacheService,
|
||||||
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
|
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
|
||||||
|
maxAccountSwitches: maxAccountSwitches,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,7 +153,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
|||||||
// Generate session hash (from header for OpenAI)
|
// Generate session hash (from header for OpenAI)
|
||||||
sessionHash := h.gatewayService.GenerateSessionHash(c)
|
sessionHash := h.gatewayService.GenerateSessionHash(c)
|
||||||
|
|
||||||
const maxAccountSwitches = 3
|
maxAccountSwitches := h.maxAccountSwitches
|
||||||
switchCount := 0
|
switchCount := 0
|
||||||
failedAccountIDs := make(map[int64]struct{})
|
failedAccountIDs := make(map[int64]struct{})
|
||||||
lastFailoverStatus := 0
|
lastFailoverStatus := 0
|
||||||
|
|||||||
@@ -587,19 +587,11 @@ urlFallbackLoop:
|
|||||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
|
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 429 重试3次后限流账户
|
// 429 不重试,直接限流账户
|
||||||
if resp.StatusCode == http.StatusTooManyRequests {
|
if resp.StatusCode == http.StatusTooManyRequests {
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
if attempt < 3 {
|
|
||||||
log.Printf("%s status=429 retry=%d/3 body=%s", prefix, attempt, truncateForLog(respBody, 200))
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// 3次重试都失败,限流账户
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
@@ -622,10 +614,6 @@ urlFallbackLoop:
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 所有重试都失败,标记限流状态
|
|
||||||
if resp.StatusCode == 429 {
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
|
||||||
}
|
|
||||||
// 最后一次尝试也失败
|
// 最后一次尝试也失败
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
StatusCode: resp.StatusCode,
|
StatusCode: resp.StatusCode,
|
||||||
@@ -1145,19 +1133,11 @@ urlFallbackLoop:
|
|||||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 429 重试3次后限流账户
|
// 429 不重试,直接限流账户
|
||||||
if resp.StatusCode == http.StatusTooManyRequests {
|
if resp.StatusCode == http.StatusTooManyRequests {
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
if attempt < 3 {
|
|
||||||
log.Printf("%s status=429 retry=%d/3 body=%s", prefix, attempt, truncateForLog(respBody, 200))
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// 3次重试都失败,限流账户
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
@@ -1180,10 +1160,6 @@ urlFallbackLoop:
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 所有重试都失败,标记限流状态
|
|
||||||
if resp.StatusCode == 429 {
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
|
||||||
}
|
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
StatusCode: resp.StatusCode,
|
StatusCode: resp.StatusCode,
|
||||||
Header: resp.Header.Clone(),
|
Header: resp.Header.Clone(),
|
||||||
@@ -1356,8 +1332,12 @@ func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, pre
|
|||||||
if statusCode == 429 {
|
if statusCode == 429 {
|
||||||
resetAt := ParseGeminiRateLimitResetTime(body)
|
resetAt := ParseGeminiRateLimitResetTime(body)
|
||||||
if resetAt == nil {
|
if resetAt == nil {
|
||||||
// 解析失败:默认 5 分钟,直接限流整个账户
|
// 解析失败:使用配置的 fallback 时间,直接限流整个账户
|
||||||
defaultDur := 5 * time.Minute
|
fallbackMinutes := 5
|
||||||
|
if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes > 0 {
|
||||||
|
fallbackMinutes = s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes
|
||||||
|
}
|
||||||
|
defaultDur := time.Duration(fallbackMinutes) * time.Minute
|
||||||
ra := time.Now().Add(defaultDur)
|
ra := time.Now().Add(defaultDur)
|
||||||
log.Printf("%s status=429 rate_limited account=%d reset_in=%v (fallback)", prefix, account.ID, defaultDur)
|
log.Printf("%s status=429 rate_limited account=%d reset_in=%v (fallback)", prefix, account.ID, defaultDur)
|
||||||
if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil {
|
if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user