@@ -20,6 +20,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
@@ -46,6 +47,23 @@ const (
googleRPCTypeErrorInfo = "type.googleapis.com/google.rpc.ErrorInfo"
googleRPCReasonModelCapacityExhausted = "MODEL_CAPACITY_EXHAUSTED"
googleRPCReasonRateLimitExceeded = "RATE_LIMIT_EXCEEDED"
// 单账号 503 退避重试:预检查中等待模型限流过期的最大时间
// 超过此值的限流将直接切换账号(避免请求等待过久)
antigravitySingleAccountMaxWait = 30 * time . Second
// 单账号 503 退避重试: Service 层原地重试的最大次数
// 在 handleSmartRetry 中,对于 shouldRateLimitModel( 长延迟 ≥ 7s) 的情况,
// 多账号模式下会设限流+切换账号;但单账号模式下改为原地等待+重试。
antigravitySingleAccountSmartRetryMaxAttempts = 3
// 单账号 503 退避重试:原地重试时单次最大等待时间
// 防止上游返回过长的 retryDelay 导致请求卡住太久
antigravitySingleAccountSmartRetryMaxWait = 15 * time . Second
// 单账号 503 退避重试:原地重试的总累计等待时间上限
// 超过此上限将不再重试,直接返回 503
antigravitySingleAccountSmartRetryTotalMaxWait = 30 * time . Second
)
// antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写)
@@ -148,6 +166,13 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
// 情况1: retryDelay >= 阈值,限流模型并切换账号
if shouldRateLimitModel {
// 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试
// 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。
// 多账号场景下切换账号是最优选择,但单账号场景下设限流毫无意义(只会导致双重等待)。
if resp . StatusCode == http . StatusServiceUnavailable && isSingleAccountRetry ( p . ctx ) {
return s . handleSingleAccountRetryInPlace ( p , resp , respBody , baseURL , waitDuration , modelName )
}
rateLimitDuration := waitDuration
if rateLimitDuration <= 0 {
rateLimitDuration = antigravityDefaultRateLimitDuration
@@ -236,7 +261,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
}
}
// 所有重试都失败,限流当前模型并切换账号
// 所有重试都失败
rateLimitDuration := waitDuration
if rateLimitDuration <= 0 {
rateLimitDuration = antigravityDefaultRateLimitDuration
@@ -245,6 +270,22 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
if retryBody == nil {
retryBody = respBody
}
// 单账号 503 退避重试模式:智能重试耗尽后不设限流、不切换账号,
// 直接返回 503 让 Handler 层的单账号退避循环做最终处理。
if resp . StatusCode == http . StatusServiceUnavailable && isSingleAccountRetry ( p . ctx ) {
log . Printf ( "%s status=%d smart_retry_exhausted_single_account attempts=%d model=%s account=%d body=%s (return 503 directly)" ,
p . prefix , resp . StatusCode , antigravitySmartRetryMaxAttempts , modelName , p . account . ID , truncateForLog ( retryBody , 200 ) )
return & smartRetryResult {
action : smartRetryActionBreakWithResp ,
resp : & http . Response {
StatusCode : resp . StatusCode ,
Header : resp . Header . Clone ( ) ,
Body : io . NopCloser ( bytes . NewReader ( retryBody ) ) ,
} ,
}
}
log . Printf ( "%s status=%d smart_retry_exhausted attempts=%d model=%s account=%d upstream_retry_delay=%v body=%s (switch account)" ,
p . prefix , resp . StatusCode , antigravitySmartRetryMaxAttempts , modelName , p . account . ID , rateLimitDuration , truncateForLog ( retryBody , 200 ) )
@@ -279,17 +320,152 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
return & smartRetryResult { action : smartRetryActionContinue }
}
// handleSingleAccountRetryInPlace 单账号 503 退避重试的原地重试逻辑。
//
// 在多账号场景下,收到 503 + 长 retryDelay( ≥ 7s) 时会设置模型限流 + 切换账号;
// 但在单账号场景下,设限流毫无意义(因为切换回来的还是同一个账号,还要等限流过期)。
// 此方法改为在 Service 层原地等待 + 重试,避免双重等待问题:
//
// 旧流程: Service 设限流 → Handler 退避等待 → Service 等限流过期 → 再请求(总耗时 = 退避 + 限流)
// 新流程: Service 直接等 retryDelay → 重试 → 成功/再等 → 重试...(总耗时 ≈ 实际 retryDelay × 重试次数)
//
// 约束:
// - 单次等待不超过 antigravitySingleAccountSmartRetryMaxWait
// - 总累计等待不超过 antigravitySingleAccountSmartRetryTotalMaxWait
// - 最多重试 antigravitySingleAccountSmartRetryMaxAttempts 次
func ( s * AntigravityGatewayService ) handleSingleAccountRetryInPlace (
p antigravityRetryLoopParams ,
resp * http . Response ,
respBody [ ] byte ,
baseURL string ,
waitDuration time . Duration ,
modelName string ,
) * smartRetryResult {
// 限制单次等待时间
if waitDuration > antigravitySingleAccountSmartRetryMaxWait {
waitDuration = antigravitySingleAccountSmartRetryMaxWait
}
if waitDuration < antigravitySmartRetryMinWait {
waitDuration = antigravitySmartRetryMinWait
}
log . Printf ( "%s status=%d single_account_503_retry_in_place model=%s account=%d upstream_retry_delay=%v (retrying in-place instead of rate-limiting)" ,
p . prefix , resp . StatusCode , modelName , p . account . ID , waitDuration )
var lastRetryResp * http . Response
var lastRetryBody [ ] byte
totalWaited := time . Duration ( 0 )
for attempt := 1 ; attempt <= antigravitySingleAccountSmartRetryMaxAttempts ; attempt ++ {
// 检查累计等待是否超限
if totalWaited + waitDuration > antigravitySingleAccountSmartRetryTotalMaxWait {
remaining := antigravitySingleAccountSmartRetryTotalMaxWait - totalWaited
if remaining <= 0 {
log . Printf ( "%s single_account_503_retry: total_wait_exceeded total=%v max=%v, giving up" ,
p . prefix , totalWaited , antigravitySingleAccountSmartRetryTotalMaxWait )
break
}
waitDuration = remaining
}
log . Printf ( "%s status=%d single_account_503_retry attempt=%d/%d delay=%v total_waited=%v model=%s account=%d" ,
p . prefix , resp . StatusCode , attempt , antigravitySingleAccountSmartRetryMaxAttempts , waitDuration , totalWaited , modelName , p . account . ID )
select {
case <- p . ctx . Done ( ) :
log . Printf ( "%s status=context_canceled_during_single_account_retry" , p . prefix )
return & smartRetryResult { action : smartRetryActionBreakWithResp , err : p . ctx . Err ( ) }
case <- time . After ( waitDuration ) :
}
totalWaited += waitDuration
// 创建新请求
retryReq , err := antigravity . NewAPIRequestWithURL ( p . ctx , baseURL , p . action , p . accessToken , p . body )
if err != nil {
log . Printf ( "%s single_account_503_retry: request_build_failed error=%v" , p . prefix , err )
break
}
retryResp , retryErr := p . httpUpstream . Do ( retryReq , p . proxyURL , p . account . ID , p . account . Concurrency )
if retryErr == nil && retryResp != nil && retryResp . StatusCode != http . StatusTooManyRequests && retryResp . StatusCode != http . StatusServiceUnavailable {
log . Printf ( "%s status=%d single_account_503_retry_success attempt=%d/%d total_waited=%v" ,
p . prefix , retryResp . StatusCode , attempt , antigravitySingleAccountSmartRetryMaxAttempts , totalWaited )
// 关闭之前的响应
if lastRetryResp != nil {
_ = lastRetryResp . Body . Close ( )
}
return & smartRetryResult { action : smartRetryActionBreakWithResp , resp : retryResp }
}
// 网络错误时继续重试
if retryErr != nil || retryResp == nil {
log . Printf ( "%s single_account_503_retry: network_error attempt=%d/%d error=%v" ,
p . prefix , attempt , antigravitySingleAccountSmartRetryMaxAttempts , retryErr )
continue
}
// 关闭之前的响应
if lastRetryResp != nil {
_ = lastRetryResp . Body . Close ( )
}
lastRetryResp = retryResp
lastRetryBody , _ = io . ReadAll ( io . LimitReader ( retryResp . Body , 2 << 20 ) )
_ = retryResp . Body . Close ( )
// 解析新的重试信息,更新下次等待时间
if attempt < antigravitySingleAccountSmartRetryMaxAttempts && lastRetryBody != nil {
_ , _ , newWaitDuration , _ := shouldTriggerAntigravitySmartRetry ( p . account , lastRetryBody )
if newWaitDuration > 0 {
waitDuration = newWaitDuration
if waitDuration > antigravitySingleAccountSmartRetryMaxWait {
waitDuration = antigravitySingleAccountSmartRetryMaxWait
}
if waitDuration < antigravitySmartRetryMinWait {
waitDuration = antigravitySmartRetryMinWait
}
}
}
}
// 所有重试都失败,不设限流,直接返回 503
// Handler 层的单账号退避循环会做最终处理
retryBody := lastRetryBody
if retryBody == nil {
retryBody = respBody
}
log . Printf ( "%s status=%d single_account_503_retry_exhausted attempts=%d total_waited=%v model=%s account=%d body=%s (return 503 directly)" ,
p . prefix , resp . StatusCode , antigravitySingleAccountSmartRetryMaxAttempts , totalWaited , modelName , p . account . ID , truncateForLog ( retryBody , 200 ) )
return & smartRetryResult {
action : smartRetryActionBreakWithResp ,
resp : & http . Response {
StatusCode : resp . StatusCode ,
Header : resp . Header . Clone ( ) ,
Body : io . NopCloser ( bytes . NewReader ( retryBody ) ) ,
} ,
}
}
// antigravityRetryLoop 执行带 URL fallback 的重试循环
func ( s * AntigravityGatewayService ) antigravityRetryLoop ( p antigravityRetryLoopParams ) ( * antigravityRetryLoopResult , error ) {
// 预检查:如果账号已限流,直接返回切换信号
if p . requestedModel != "" {
if remaining := p . account . GetRateLimitRemainingTimeWithContext ( p . ctx , p . requestedModel ) ; remaining > 0 {
log . Printf ( "%s pre_check: rate_limit_switch remaining=%v model=%s account=%d" ,
p . prefix , remaining . Truncate ( time . Millisecond ) , p . requestedModel , p . account . ID )
return nil , & AntigravityAccountSwitchError {
OriginalAccountID : p . account . ID ,
RateLimitedModel : p . requestedModel ,
IsStickySession : p . isStickySession ,
// 单账号 503 退避重试模式:跳过限流预检查,直接发请求。
// 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。
// 如果上游确实还不可用, handleSmartRetry → handleSingleAccountRetryInPlace
// 会在 Service 层原地等待+重试,不需要在预检查这里等。
if isSingleAccountRetry ( p . ctx ) {
log . Printf ( "%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)" ,
p . prefix , remaining . Truncate ( time . Millisecond ) , p . requestedModel , p . account . ID )
} else {
log . Printf ( "%s pre_check: rate_limit_switch remaining=%v model=%s account=%d" ,
p . prefix , remaining . Truncate ( time . Millisecond ) , p . requestedModel , p . account . ID )
return nil , & AntigravityAccountSwitchError {
OriginalAccountID : p . account . ID ,
RateLimitedModel : p . requestedModel ,
IsStickySession : p . isStickySession ,
}
}
}
}
@@ -1943,6 +2119,12 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
}
}
// isSingleAccountRetry 检查 context 中是否设置了单账号退避重试标记
func isSingleAccountRetry ( ctx context . Context ) bool {
v , _ := ctx . Value ( ctxkey . SingleAccountRetry ) . ( bool )
return v
}
// setModelRateLimitByModelName 使用官方模型 ID 设置模型级限流
// 直接使用上游返回的模型 ID( 如 claude-sonnet-4-5) 作为限流 key
// 返回是否已成功设置(若模型名为空或 repo 为 nil 将返回 false)