- Extract duplicated failover logic from gateway_handler.go (3 places) and gemini_v1beta_handler.go into shared failover_loop.go - Introduce FailoverState with HandleFailoverError and HandleSelectionExhausted - Move helper functions (needForceCacheBilling, sleepWithContext) into failover_loop.go - Add comprehensive unit tests (32+ test cases) - Delete redundant gateway_handler_single_account_retry_test.go
161 lines
5.5 KiB
Go
161 lines
5.5 KiB
Go
package handler
|
||
|
||
import (
|
||
"context"
|
||
"log"
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||
)
|
||
|
||
// TempUnscheduler 用于 HandleFailoverError 中同账号重试耗尽后的临时封禁。
|
||
// GatewayService 隐式实现此接口。
|
||
type TempUnscheduler interface {
|
||
TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *service.UpstreamFailoverError)
|
||
}
|
||
|
||
// FailoverAction 表示 failover 错误处理后的下一步动作
|
||
type FailoverAction int
|
||
|
||
const (
|
||
// FailoverContinue 继续循环(同账号重试或切换账号,调用方统一 continue)
|
||
FailoverContinue FailoverAction = iota
|
||
// FailoverExhausted 切换次数耗尽(调用方应返回错误响应)
|
||
FailoverExhausted
|
||
// FailoverCanceled context 已取消(调用方应直接 return)
|
||
FailoverCanceled
|
||
)
|
||
|
||
const (
|
||
// maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误)
|
||
maxSameAccountRetries = 2
|
||
// sameAccountRetryDelay 同账号重试间隔
|
||
sameAccountRetryDelay = 500 * time.Millisecond
|
||
// singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。
|
||
// Service 层在 SingleAccountRetry 模式下已做充分原地重试(最多 3 次、总等待 30s),
|
||
// Handler 层只需短暂间隔后重新进入 Service 层即可。
|
||
singleAccountBackoffDelay = 2 * time.Second
|
||
)
|
||
|
||
// FailoverState 跨循环迭代共享的 failover 状态
|
||
type FailoverState struct {
|
||
SwitchCount int
|
||
MaxSwitches int
|
||
FailedAccountIDs map[int64]struct{}
|
||
SameAccountRetryCount map[int64]int
|
||
LastFailoverErr *service.UpstreamFailoverError
|
||
ForceCacheBilling bool
|
||
hasBoundSession bool
|
||
}
|
||
|
||
// NewFailoverState 创建 failover 状态
|
||
func NewFailoverState(maxSwitches int, hasBoundSession bool) *FailoverState {
|
||
return &FailoverState{
|
||
MaxSwitches: maxSwitches,
|
||
FailedAccountIDs: make(map[int64]struct{}),
|
||
SameAccountRetryCount: make(map[int64]int),
|
||
hasBoundSession: hasBoundSession,
|
||
}
|
||
}
|
||
|
||
// HandleFailoverError 处理 UpstreamFailoverError,返回下一步动作。
|
||
// 包含:缓存计费判断、同账号重试、临时封禁、切换计数、Antigravity 延时。
|
||
func (s *FailoverState) HandleFailoverError(
|
||
ctx context.Context,
|
||
gatewayService TempUnscheduler,
|
||
accountID int64,
|
||
platform string,
|
||
failoverErr *service.UpstreamFailoverError,
|
||
) FailoverAction {
|
||
s.LastFailoverErr = failoverErr
|
||
|
||
// 缓存计费判断
|
||
if needForceCacheBilling(s.hasBoundSession, failoverErr) {
|
||
s.ForceCacheBilling = true
|
||
}
|
||
|
||
// 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试
|
||
if failoverErr.RetryableOnSameAccount && s.SameAccountRetryCount[accountID] < maxSameAccountRetries {
|
||
s.SameAccountRetryCount[accountID]++
|
||
log.Printf("Account %d: retryable error %d, same-account retry %d/%d",
|
||
accountID, failoverErr.StatusCode, s.SameAccountRetryCount[accountID], maxSameAccountRetries)
|
||
if !sleepWithContext(ctx, sameAccountRetryDelay) {
|
||
return FailoverCanceled
|
||
}
|
||
return FailoverContinue
|
||
}
|
||
|
||
// 同账号重试用尽,执行临时封禁
|
||
if failoverErr.RetryableOnSameAccount {
|
||
gatewayService.TempUnscheduleRetryableError(ctx, accountID, failoverErr)
|
||
}
|
||
|
||
// 加入失败列表
|
||
s.FailedAccountIDs[accountID] = struct{}{}
|
||
|
||
// 检查是否耗尽
|
||
if s.SwitchCount >= s.MaxSwitches {
|
||
return FailoverExhausted
|
||
}
|
||
|
||
// 递增切换计数
|
||
s.SwitchCount++
|
||
log.Printf("Account %d: upstream error %d, switching account %d/%d",
|
||
accountID, failoverErr.StatusCode, s.SwitchCount, s.MaxSwitches)
|
||
|
||
// Antigravity 平台换号线性递增延时
|
||
if platform == service.PlatformAntigravity {
|
||
delay := time.Duration(s.SwitchCount-1) * time.Second
|
||
if !sleepWithContext(ctx, delay) {
|
||
return FailoverCanceled
|
||
}
|
||
}
|
||
|
||
return FailoverContinue
|
||
}
|
||
|
||
// HandleSelectionExhausted 处理选号失败(所有候选账号都在排除列表中)时的退避重试决策。
|
||
// 针对 Antigravity 单账号分组的 503 (MODEL_CAPACITY_EXHAUSTED) 场景:
|
||
// 清除排除列表、等待退避后重新选号。
|
||
//
|
||
// 返回 FailoverContinue 时,调用方应设置 SingleAccountRetry context 并 continue。
|
||
// 返回 FailoverExhausted 时,调用方应返回错误响应。
|
||
// 返回 FailoverCanceled 时,调用方应直接 return。
|
||
func (s *FailoverState) HandleSelectionExhausted(ctx context.Context) FailoverAction {
|
||
if s.LastFailoverErr != nil &&
|
||
s.LastFailoverErr.StatusCode == http.StatusServiceUnavailable &&
|
||
s.SwitchCount <= s.MaxSwitches {
|
||
|
||
log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)",
|
||
singleAccountBackoffDelay, s.SwitchCount)
|
||
if !sleepWithContext(ctx, singleAccountBackoffDelay) {
|
||
return FailoverCanceled
|
||
}
|
||
log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d",
|
||
s.SwitchCount, s.MaxSwitches)
|
||
s.FailedAccountIDs = make(map[int64]struct{})
|
||
return FailoverContinue
|
||
}
|
||
return FailoverExhausted
|
||
}
|
||
|
||
// needForceCacheBilling 判断 failover 时是否需要强制缓存计费。
|
||
// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费。
|
||
func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool {
|
||
return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling)
|
||
}
|
||
|
||
// sleepWithContext 等待指定时长,返回 false 表示 context 已取消。
|
||
func sleepWithContext(ctx context.Context, d time.Duration) bool {
|
||
if d <= 0 {
|
||
return true
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return false
|
||
case <-time.After(d):
|
||
return true
|
||
}
|
||
}
|