merge: sync upstream main (antigravity single-account 503 retry)
合并上游新增的 Antigravity 单账号 503 退避重试机制, 解决与本地 MODEL_CAPACITY_EXHAUSTED 逻辑的冲突,两者共存。
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
@@ -52,6 +53,19 @@ const (
|
||||
googleRPCTypeErrorInfo = "type.googleapis.com/google.rpc.ErrorInfo"
|
||||
googleRPCReasonModelCapacityExhausted = "MODEL_CAPACITY_EXHAUSTED"
|
||||
googleRPCReasonRateLimitExceeded = "RATE_LIMIT_EXCEEDED"
|
||||
|
||||
// 单账号 503 退避重试:Service 层原地重试的最大次数
|
||||
// 在 handleSmartRetry 中,对于 shouldRateLimitModel(长延迟 ≥ 7s)的情况,
|
||||
// 多账号模式下会设限流+切换账号;但单账号模式下改为原地等待+重试。
|
||||
antigravitySingleAccountSmartRetryMaxAttempts = 3
|
||||
|
||||
// 单账号 503 退避重试:原地重试时单次最大等待时间
|
||||
// 防止上游返回过长的 retryDelay 导致请求卡住太久
|
||||
antigravitySingleAccountSmartRetryMaxWait = 15 * time.Second
|
||||
|
||||
// 单账号 503 退避重试:原地重试的总累计等待时间上限
|
||||
// 超过此上限将不再重试,直接返回 503
|
||||
antigravitySingleAccountSmartRetryTotalMaxWait = 30 * time.Second
|
||||
)
|
||||
|
||||
// antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写)
|
||||
@@ -154,6 +168,13 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
|
||||
// 情况1: retryDelay >= 阈值,限流模型并切换账号
|
||||
if shouldRateLimitModel {
|
||||
// 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试
|
||||
// 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。
|
||||
// 多账号场景下切换账号是最优选择,但单账号场景下设限流毫无意义(只会导致双重等待)。
|
||||
if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) {
|
||||
return s.handleSingleAccountRetryInPlace(p, resp, respBody, baseURL, waitDuration, modelName)
|
||||
}
|
||||
|
||||
rateLimitDuration := waitDuration
|
||||
if rateLimitDuration <= 0 {
|
||||
rateLimitDuration = antigravityDefaultRateLimitDuration
|
||||
@@ -250,6 +271,10 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
|
||||
// 所有重试都失败
|
||||
rateLimitDuration := waitDuration
|
||||
if rateLimitDuration <= 0 {
|
||||
rateLimitDuration = antigravityDefaultRateLimitDuration
|
||||
}
|
||||
retryBody := lastRetryBody
|
||||
if retryBody == nil {
|
||||
retryBody = respBody
|
||||
@@ -270,11 +295,21 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
}
|
||||
|
||||
// RATE_LIMIT_EXCEEDED:账号级限流,限流当前模型并切换账号
|
||||
rateLimitDuration := waitDuration
|
||||
if rateLimitDuration <= 0 {
|
||||
rateLimitDuration = antigravityDefaultRateLimitDuration
|
||||
// 单账号 503 退避重试模式:智能重试耗尽后不设限流、不切换账号,
|
||||
// 直接返回 503 让 Handler 层的单账号退避循环做最终处理。
|
||||
if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) {
|
||||
log.Printf("%s status=%d smart_retry_exhausted_single_account attempts=%d model=%s account=%d body=%s (return 503 directly)",
|
||||
p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, truncateForLog(retryBody, 200))
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
resp: &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryBody)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("%s status=%d smart_retry_exhausted attempts=%d model=%s account=%d upstream_retry_delay=%v body=%s (switch account)",
|
||||
p.prefix, resp.StatusCode, maxAttempts, modelName, p.account.ID, rateLimitDuration, truncateForLog(retryBody, 200))
|
||||
|
||||
@@ -309,17 +344,152 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
return &smartRetryResult{action: smartRetryActionContinue}
|
||||
}
|
||||
|
||||
// handleSingleAccountRetryInPlace 单账号 503 退避重试的原地重试逻辑。
|
||||
//
|
||||
// 在多账号场景下,收到 503 + 长 retryDelay(≥ 7s)时会设置模型限流 + 切换账号;
|
||||
// 但在单账号场景下,设限流毫无意义(因为切换回来的还是同一个账号,还要等限流过期)。
|
||||
// 此方法改为在 Service 层原地等待 + 重试,避免双重等待问题:
|
||||
//
|
||||
// 旧流程:Service 设限流 → Handler 退避等待 → Service 等限流过期 → 再请求(总耗时 = 退避 + 限流)
|
||||
// 新流程:Service 直接等 retryDelay → 重试 → 成功/再等 → 重试...(总耗时 ≈ 实际 retryDelay × 重试次数)
|
||||
//
|
||||
// 约束:
|
||||
// - 单次等待不超过 antigravitySingleAccountSmartRetryMaxWait
|
||||
// - 总累计等待不超过 antigravitySingleAccountSmartRetryTotalMaxWait
|
||||
// - 最多重试 antigravitySingleAccountSmartRetryMaxAttempts 次
|
||||
func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace(
|
||||
p antigravityRetryLoopParams,
|
||||
resp *http.Response,
|
||||
respBody []byte,
|
||||
baseURL string,
|
||||
waitDuration time.Duration,
|
||||
modelName string,
|
||||
) *smartRetryResult {
|
||||
// 限制单次等待时间
|
||||
if waitDuration > antigravitySingleAccountSmartRetryMaxWait {
|
||||
waitDuration = antigravitySingleAccountSmartRetryMaxWait
|
||||
}
|
||||
if waitDuration < antigravitySmartRetryMinWait {
|
||||
waitDuration = antigravitySmartRetryMinWait
|
||||
}
|
||||
|
||||
log.Printf("%s status=%d single_account_503_retry_in_place model=%s account=%d upstream_retry_delay=%v (retrying in-place instead of rate-limiting)",
|
||||
p.prefix, resp.StatusCode, modelName, p.account.ID, waitDuration)
|
||||
|
||||
var lastRetryResp *http.Response
|
||||
var lastRetryBody []byte
|
||||
totalWaited := time.Duration(0)
|
||||
|
||||
for attempt := 1; attempt <= antigravitySingleAccountSmartRetryMaxAttempts; attempt++ {
|
||||
// 检查累计等待是否超限
|
||||
if totalWaited+waitDuration > antigravitySingleAccountSmartRetryTotalMaxWait {
|
||||
remaining := antigravitySingleAccountSmartRetryTotalMaxWait - totalWaited
|
||||
if remaining <= 0 {
|
||||
log.Printf("%s single_account_503_retry: total_wait_exceeded total=%v max=%v, giving up",
|
||||
p.prefix, totalWaited, antigravitySingleAccountSmartRetryTotalMaxWait)
|
||||
break
|
||||
}
|
||||
waitDuration = remaining
|
||||
}
|
||||
|
||||
log.Printf("%s status=%d single_account_503_retry attempt=%d/%d delay=%v total_waited=%v model=%s account=%d",
|
||||
p.prefix, resp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, waitDuration, totalWaited, modelName, p.account.ID)
|
||||
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
log.Printf("%s status=context_canceled_during_single_account_retry", p.prefix)
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()}
|
||||
case <-time.After(waitDuration):
|
||||
}
|
||||
totalWaited += waitDuration
|
||||
|
||||
// 创建新请求
|
||||
retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body)
|
||||
if err != nil {
|
||||
log.Printf("%s single_account_503_retry: request_build_failed error=%v", p.prefix, err)
|
||||
break
|
||||
}
|
||||
|
||||
retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency)
|
||||
if retryErr == nil && retryResp != nil && retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable {
|
||||
log.Printf("%s status=%d single_account_503_retry_success attempt=%d/%d total_waited=%v",
|
||||
p.prefix, retryResp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited)
|
||||
// 关闭之前的响应
|
||||
if lastRetryResp != nil {
|
||||
_ = lastRetryResp.Body.Close()
|
||||
}
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
|
||||
}
|
||||
|
||||
// 网络错误时继续重试
|
||||
if retryErr != nil || retryResp == nil {
|
||||
log.Printf("%s single_account_503_retry: network_error attempt=%d/%d error=%v",
|
||||
p.prefix, attempt, antigravitySingleAccountSmartRetryMaxAttempts, retryErr)
|
||||
continue
|
||||
}
|
||||
|
||||
// 关闭之前的响应
|
||||
if lastRetryResp != nil {
|
||||
_ = lastRetryResp.Body.Close()
|
||||
}
|
||||
lastRetryResp = retryResp
|
||||
lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
_ = retryResp.Body.Close()
|
||||
|
||||
// 解析新的重试信息,更新下次等待时间
|
||||
if attempt < antigravitySingleAccountSmartRetryMaxAttempts && lastRetryBody != nil {
|
||||
_, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
if newWaitDuration > 0 {
|
||||
waitDuration = newWaitDuration
|
||||
if waitDuration > antigravitySingleAccountSmartRetryMaxWait {
|
||||
waitDuration = antigravitySingleAccountSmartRetryMaxWait
|
||||
}
|
||||
if waitDuration < antigravitySmartRetryMinWait {
|
||||
waitDuration = antigravitySmartRetryMinWait
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 所有重试都失败,不设限流,直接返回 503
|
||||
// Handler 层的单账号退避循环会做最终处理
|
||||
retryBody := lastRetryBody
|
||||
if retryBody == nil {
|
||||
retryBody = respBody
|
||||
}
|
||||
log.Printf("%s status=%d single_account_503_retry_exhausted attempts=%d total_waited=%v model=%s account=%d body=%s (return 503 directly)",
|
||||
p.prefix, resp.StatusCode, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited, modelName, p.account.ID, truncateForLog(retryBody, 200))
|
||||
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
resp: &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryBody)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// antigravityRetryLoop 执行带 URL fallback 的重试循环
|
||||
func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
|
||||
// 预检查:如果账号已限流,直接返回切换信号
|
||||
if p.requestedModel != "" {
|
||||
if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 {
|
||||
log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d",
|
||||
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
|
||||
return nil, &AntigravityAccountSwitchError{
|
||||
OriginalAccountID: p.account.ID,
|
||||
RateLimitedModel: p.requestedModel,
|
||||
IsStickySession: p.isStickySession,
|
||||
// 单账号 503 退避重试模式:跳过限流预检查,直接发请求。
|
||||
// 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。
|
||||
// 如果上游确实还不可用,handleSmartRetry → handleSingleAccountRetryInPlace
|
||||
// 会在 Service 层原地等待+重试,不需要在预检查这里等。
|
||||
if isSingleAccountRetry(p.ctx) {
|
||||
log.Printf("%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)",
|
||||
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
|
||||
} else {
|
||||
log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d",
|
||||
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
|
||||
return nil, &AntigravityAccountSwitchError{
|
||||
OriginalAccountID: p.account.ID,
|
||||
RateLimitedModel: p.requestedModel,
|
||||
IsStickySession: p.isStickySession,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2049,6 +2219,12 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// isSingleAccountRetry 检查 context 中是否设置了单账号退避重试标记
|
||||
func isSingleAccountRetry(ctx context.Context) bool {
|
||||
v, _ := ctx.Value(ctxkey.SingleAccountRetry).(bool)
|
||||
return v
|
||||
}
|
||||
|
||||
// setModelRateLimitByModelName 使用官方模型 ID 设置模型级限流
|
||||
// 直接使用上游返回的模型 ID(如 claude-sonnet-4-5)作为限流 key
|
||||
// 返回是否已成功设置(若模型名为空或 repo 为 nil 将返回 false)
|
||||
|
||||
@@ -0,0 +1,902 @@
|
||||
//go:build unit
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 辅助函数:构造带 SingleAccountRetry 标记的 context
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func ctxWithSingleAccountRetry() context.Context {
|
||||
return context.WithValue(context.Background(), ctxkey.SingleAccountRetry, true)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 1. isSingleAccountRetry 测试
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestIsSingleAccountRetry_True(t *testing.T) {
|
||||
ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, true)
|
||||
require.True(t, isSingleAccountRetry(ctx))
|
||||
}
|
||||
|
||||
func TestIsSingleAccountRetry_False_NoValue(t *testing.T) {
|
||||
require.False(t, isSingleAccountRetry(context.Background()))
|
||||
}
|
||||
|
||||
func TestIsSingleAccountRetry_False_ExplicitFalse(t *testing.T) {
|
||||
ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, false)
|
||||
require.False(t, isSingleAccountRetry(ctx))
|
||||
}
|
||||
|
||||
func TestIsSingleAccountRetry_False_WrongType(t *testing.T) {
|
||||
ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, "true")
|
||||
require.False(t, isSingleAccountRetry(ctx))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 2. 常量验证
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestSingleAccountRetryConstants(t *testing.T) {
|
||||
require.Equal(t, 3, antigravitySingleAccountSmartRetryMaxAttempts,
|
||||
"单账号原地重试最多 3 次")
|
||||
require.Equal(t, 15*time.Second, antigravitySingleAccountSmartRetryMaxWait,
|
||||
"单次最大等待 15s")
|
||||
require.Equal(t, 30*time.Second, antigravitySingleAccountSmartRetryTotalMaxWait,
|
||||
"总累计等待不超过 30s")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 3. handleSmartRetry + 503 + SingleAccountRetry → 走 handleSingleAccountRetryInPlace
|
||||
// (而非设模型限流 + 切换账号)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// TestHandleSmartRetry_503_LongDelay_SingleAccountRetry_RetryInPlace
|
||||
// 核心场景:503 + retryDelay >= 7s + SingleAccountRetry 标记
|
||||
// → 不设模型限流、不切换账号,改为原地重试
|
||||
func TestHandleSmartRetry_503_LongDelay_SingleAccountRetry_RetryInPlace(t *testing.T) {
|
||||
// 原地重试成功
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{successResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 1,
|
||||
Name: "acc-single",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
// 503 + 39s >= 7s 阈值 + MODEL_CAPACITY_EXHAUSTED
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "39s"}
|
||||
],
|
||||
"message": "No capacity available for model gemini-3-pro-high on the server"
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(), // 关键:设置单账号标记
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 关键断言:返回 resp(原地重试成功),而非 switchError(切换账号)
|
||||
require.NotNil(t, result.resp, "should return successful response from in-place retry")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
require.Nil(t, result.switchError, "should NOT return switchError in single account mode")
|
||||
require.Nil(t, result.err)
|
||||
|
||||
// 验证未设模型限流(单账号模式不应设限流)
|
||||
require.Len(t, repo.modelRateLimitCalls, 0,
|
||||
"should NOT set model rate limit in single account retry mode")
|
||||
|
||||
// 验证确实调用了 upstream(原地重试)
|
||||
require.GreaterOrEqual(t, len(upstream.calls), 1, "should have made at least one retry call")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_503_LongDelay_NoSingleAccountRetry_StillSwitches
|
||||
// 对照组:503 + retryDelay >= 7s + 无 SingleAccountRetry 标记
|
||||
// → 照常设模型限流 + 切换账号
|
||||
func TestHandleSmartRetry_503_LongDelay_NoSingleAccountRetry_StillSwitches(t *testing.T) {
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 2,
|
||||
Name: "acc-multi",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
// 503 + 39s >= 7s 阈值
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "39s"}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: context.Background(), // 关键:无单账号标记
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 对照:多账号模式返回 switchError
|
||||
require.NotNil(t, result.switchError, "multi-account mode should return switchError for 503")
|
||||
require.Nil(t, result.resp, "should not return resp when switchError is set")
|
||||
|
||||
// 对照:多账号模式应设模型限流
|
||||
require.Len(t, repo.modelRateLimitCalls, 1,
|
||||
"multi-account mode SHOULD set model rate limit")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches
|
||||
// 边界情况:429(非 503)+ SingleAccountRetry 标记
|
||||
// → 单账号原地重试仅针对 503,429 依然走切换账号逻辑
|
||||
func TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches(t *testing.T) {
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 3,
|
||||
Name: "acc-429",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
// 429 + 15s >= 7s 阈值
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"status": "RESOURCE_EXHAUSTED",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "15s"}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusTooManyRequests, // 429,不是 503
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(), // 有单账号标记
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 429 即使有单账号标记,也应走切换账号
|
||||
require.NotNil(t, result.switchError, "429 should still return switchError even with SingleAccountRetry")
|
||||
require.Len(t, repo.modelRateLimitCalls, 1,
|
||||
"429 should still set model rate limit even with SingleAccountRetry")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 4. handleSmartRetry + 503 + 短延迟 + SingleAccountRetry → 智能重试耗尽后不设限流
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit
|
||||
// 503 + retryDelay < 7s + SingleAccountRetry → 智能重试耗尽后直接返回 503,不设限流
|
||||
func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testing.T) {
|
||||
// 智能重试也返回 503
|
||||
failRespBody := `{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`
|
||||
failResp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(failRespBody)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{failResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 4,
|
||||
Name: "acc-short-503",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
// 0.1s < 7s 阈值
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 关键断言:单账号 503 模式下,智能重试耗尽后直接返回 503 响应,不切换
|
||||
require.NotNil(t, result.resp, "should return 503 response directly for single account mode")
|
||||
require.Equal(t, http.StatusServiceUnavailable, result.resp.StatusCode)
|
||||
require.Nil(t, result.switchError, "should NOT switch account in single account mode")
|
||||
|
||||
// 关键断言:不设模型限流
|
||||
require.Len(t, repo.modelRateLimitCalls, 0,
|
||||
"should NOT set model rate limit for 503 in single account mode")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_503_ShortDelay_NoSingleAccountRetry_SetsRateLimit
|
||||
// 对照组:503 + retryDelay < 7s + 无 SingleAccountRetry → 智能重试耗尽后照常设限流
|
||||
func TestHandleSmartRetry_503_ShortDelay_NoSingleAccountRetry_SetsRateLimit(t *testing.T) {
|
||||
failRespBody := `{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`
|
||||
failResp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(failRespBody)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{failResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 5,
|
||||
Name: "acc-multi-503",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: context.Background(), // 无单账号标记
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 对照:多账号模式应返回 switchError
|
||||
require.NotNil(t, result.switchError, "multi-account mode should return switchError for 503")
|
||||
// 对照:多账号模式应设模型限流
|
||||
require.Len(t, repo.modelRateLimitCalls, 1,
|
||||
"multi-account mode should set model rate limit")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 5. handleSingleAccountRetryInPlace 直接测试
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// TestHandleSingleAccountRetryInPlace_Success 原地重试成功
|
||||
func TestHandleSingleAccountRetryInPlace_Success(t *testing.T) {
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{successResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 10,
|
||||
Name: "acc-inplace-ok",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro")
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.NotNil(t, result.resp, "should return successful response")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
require.Nil(t, result.switchError, "should not switch account on success")
|
||||
require.Nil(t, result.err)
|
||||
}
|
||||
|
||||
// TestHandleSingleAccountRetryInPlace_AllRetriesFail 所有重试都失败,返回 503(不设限流)
|
||||
func TestHandleSingleAccountRetryInPlace_AllRetriesFail(t *testing.T) {
|
||||
// 构造 3 个 503 响应(对应 3 次原地重试)
|
||||
var responses []*http.Response
|
||||
var errors []error
|
||||
for i := 0; i < antigravitySingleAccountSmartRetryMaxAttempts; i++ {
|
||||
responses = append(responses, &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`)),
|
||||
})
|
||||
errors = append(errors, nil)
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: responses,
|
||||
errors: errors,
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 11,
|
||||
Name: "acc-inplace-fail",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
origBody := []byte(`{"error":{"code":503,"status":"UNAVAILABLE"}}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{"X-Test": {"original"}},
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, origBody, "https://ag-1.test", 1*time.Second, "gemini-3-pro")
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
// 关键:返回 503 resp,不返回 switchError
|
||||
require.NotNil(t, result.resp, "should return 503 response directly")
|
||||
require.Equal(t, http.StatusServiceUnavailable, result.resp.StatusCode)
|
||||
require.Nil(t, result.switchError, "should NOT return switchError - let Handler handle it")
|
||||
require.Nil(t, result.err)
|
||||
|
||||
// 验证确实重试了指定次数
|
||||
require.Len(t, upstream.calls, antigravitySingleAccountSmartRetryMaxAttempts,
|
||||
"should have made exactly maxAttempts retry calls")
|
||||
}
|
||||
|
||||
// TestHandleSingleAccountRetryInPlace_WaitDurationClamped 等待时间被限制在 [min, max] 范围
|
||||
func TestHandleSingleAccountRetryInPlace_WaitDurationClamped(t *testing.T) {
|
||||
// 用短延迟的成功响应,只验证不 panic
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{successResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 12,
|
||||
Name: "acc-clamp",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
|
||||
// 等待时间过大应被 clamp 到 antigravitySingleAccountSmartRetryMaxWait
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 999*time.Second, "gemini-3-pro")
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.NotNil(t, result.resp)
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
}
|
||||
|
||||
// TestHandleSingleAccountRetryInPlace_ContextCanceled context 取消时立即返回
|
||||
func TestHandleSingleAccountRetryInPlace_ContextCanceled(t *testing.T) {
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{nil},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 13,
|
||||
Name: "acc-cancel",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = context.WithValue(ctx, ctxkey.SingleAccountRetry, true)
|
||||
cancel() // 立即取消
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctx,
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro")
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.Error(t, result.err, "should return context error")
|
||||
// 不应调用 upstream(因为在等待阶段就被取消了)
|
||||
require.Len(t, upstream.calls, 0, "should not call upstream when context is canceled")
|
||||
}
|
||||
|
||||
// TestHandleSingleAccountRetryInPlace_NetworkError_ContinuesRetry 网络错误时继续重试
|
||||
func TestHandleSingleAccountRetryInPlace_NetworkError_ContinuesRetry(t *testing.T) {
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
// 第1次网络错误(nil resp),第2次成功
|
||||
responses: []*http.Response{nil, successResp},
|
||||
errors: []error{nil, nil},
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 14,
|
||||
Name: "acc-net-retry",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro")
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.NotNil(t, result.resp, "should return successful response after network error recovery")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
require.Len(t, upstream.calls, 2, "first call fails (network error), second succeeds")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 6. antigravityRetryLoop 预检查:单账号模式跳过限流
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// TestAntigravityRetryLoop_PreCheck_SingleAccountRetry_SkipsRateLimit
|
||||
// 预检查中,如果有 SingleAccountRetry 标记,即使账号已限流也跳过直接发请求
|
||||
func TestAntigravityRetryLoop_PreCheck_SingleAccountRetry_SkipsRateLimit(t *testing.T) {
|
||||
// 创建一个已设模型限流的账号
|
||||
upstream := &recordingOKUpstream{}
|
||||
account := &Account{
|
||||
ID: 20,
|
||||
Name: "acc-rate-limited",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Schedulable: true,
|
||||
Status: StatusActive,
|
||||
Concurrency: 1,
|
||||
Extra: map[string]any{
|
||||
modelRateLimitsKey: map[string]any{
|
||||
"claude-sonnet-4-5": map[string]any{
|
||||
"rate_limit_reset_at": time.Now().Add(30 * time.Second).Format(time.RFC3339),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
requestedModel: "claude-sonnet-4-5",
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err, "should not return error")
|
||||
require.NotNil(t, result, "should return result")
|
||||
require.NotNil(t, result.resp, "should have response")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
// 关键:尽管限流了,有 SingleAccountRetry 标记时仍然到达了 upstream
|
||||
require.Equal(t, 1, upstream.calls, "should have reached upstream despite rate limit")
|
||||
}
|
||||
|
||||
// TestAntigravityRetryLoop_PreCheck_NoSingleAccountRetry_SwitchesOnRateLimit
|
||||
// 对照组:无 SingleAccountRetry + 已限流 → 预检查返回 switchError
|
||||
func TestAntigravityRetryLoop_PreCheck_NoSingleAccountRetry_SwitchesOnRateLimit(t *testing.T) {
|
||||
upstream := &recordingOKUpstream{}
|
||||
account := &Account{
|
||||
ID: 21,
|
||||
Name: "acc-rate-limited-multi",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Schedulable: true,
|
||||
Status: StatusActive,
|
||||
Concurrency: 1,
|
||||
Extra: map[string]any{
|
||||
modelRateLimitsKey: map[string]any{
|
||||
"claude-sonnet-4-5": map[string]any{
|
||||
"rate_limit_reset_at": time.Now().Add(30 * time.Second).Format(time.RFC3339),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{
|
||||
ctx: context.Background(), // 无单账号标记
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
requestedModel: "claude-sonnet-4-5",
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
require.Nil(t, result, "should not return result on rate limit switch")
|
||||
require.NotNil(t, err, "should return error")
|
||||
|
||||
var switchErr *AntigravityAccountSwitchError
|
||||
require.ErrorAs(t, err, &switchErr, "should return AntigravityAccountSwitchError")
|
||||
require.Equal(t, account.ID, switchErr.OriginalAccountID)
|
||||
require.Equal(t, "claude-sonnet-4-5", switchErr.RateLimitedModel)
|
||||
|
||||
// upstream 不应被调用(预检查就短路了)
|
||||
require.Equal(t, 0, upstream.calls, "upstream should NOT be called when pre-check blocks")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 7. 端到端集成场景测试
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// TestHandleSmartRetry_503_SingleAccount_RetryInPlace_ThenSuccess_E2E
|
||||
// 端到端场景:503 + 单账号 + 原地重试第2次成功
|
||||
func TestHandleSmartRetry_503_SingleAccount_RetryInPlace_ThenSuccess_E2E(t *testing.T) {
|
||||
// 第1次原地重试仍返回 503,第2次成功
|
||||
fail503Body := `{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
|
||||
]
|
||||
}
|
||||
}`
|
||||
resp503 := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(fail503Body)),
|
||||
}
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{resp503, successResp},
|
||||
errors: []error{nil, nil},
|
||||
}
|
||||
|
||||
account := &Account{
|
||||
ID: 30,
|
||||
Name: "acc-e2e",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro")
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.NotNil(t, result.resp, "should return successful response after 2nd attempt")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
require.Nil(t, result.switchError)
|
||||
require.Len(t, upstream.calls, 2, "first 503, second OK")
|
||||
}
|
||||
|
||||
// TestAntigravityRetryLoop_503_SingleAccount_InPlaceRetryUsed_E2E
|
||||
// 通过 antigravityRetryLoop → handleSmartRetry → handleSingleAccountRetryInPlace 完整链路
|
||||
func TestAntigravityRetryLoop_503_SingleAccount_InPlaceRetryUsed_E2E(t *testing.T) {
|
||||
// 初始请求返回 503 + 长延迟
|
||||
initial503Body := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "10s"}
|
||||
],
|
||||
"message": "No capacity available"
|
||||
}
|
||||
}`)
|
||||
initial503Resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(initial503Body)),
|
||||
}
|
||||
|
||||
// 原地重试成功
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)),
|
||||
}
|
||||
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
// 第1次调用(retryLoop 主循环)返回 503
|
||||
// 第2次调用(handleSingleAccountRetryInPlace 原地重试)返回 200
|
||||
responses: []*http.Response{initial503Resp, successResp},
|
||||
errors: []error{nil, nil},
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 31,
|
||||
Name: "acc-e2e-loop",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
Schedulable: true,
|
||||
Status: StatusActive,
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{
|
||||
ctx: ctxWithSingleAccountRetry(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err, "should not return error on successful retry")
|
||||
require.NotNil(t, result, "should return result")
|
||||
require.NotNil(t, result.resp, "should return response")
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode)
|
||||
|
||||
// 验证未设模型限流
|
||||
require.Len(t, repo.modelRateLimitCalls, 0,
|
||||
"should NOT set model rate limit in single account retry mode")
|
||||
}
|
||||
@@ -243,6 +243,12 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// systemBlockFilterPrefixes 需要从 system 中过滤的文本前缀列表
|
||||
// OAuth/SetupToken 账号转发时,匹配这些前缀的 system 元素会被移除
|
||||
var systemBlockFilterPrefixes = []string{
|
||||
"x-anthropic-billing-header",
|
||||
}
|
||||
|
||||
// ErrClaudeCodeOnly 表示分组仅允许 Claude Code 客户端访问
|
||||
var ErrClaudeCodeOnly = errors.New("this group only allows Claude Code clients")
|
||||
|
||||
@@ -1698,6 +1704,17 @@ func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *i
|
||||
return accounts, useMixed, nil
|
||||
}
|
||||
|
||||
// IsSingleAntigravityAccountGroup 检查指定分组是否只有一个 antigravity 平台的可调度账号。
|
||||
// 用于 Handler 层在首次请求时提前设置 SingleAccountRetry context,
|
||||
// 避免单账号分组收到 503 时错误地设置模型限流标记导致后续请求连续快速失败。
|
||||
func (s *GatewayService) IsSingleAntigravityAccountGroup(ctx context.Context, groupID *int64) bool {
|
||||
accounts, _, err := s.listSchedulableAccounts(ctx, groupID, PlatformAntigravity, true)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return len(accounts) == 1
|
||||
}
|
||||
|
||||
func (s *GatewayService) isAccountAllowedForPlatform(account *Account, platform string, useMixed bool) bool {
|
||||
if account == nil {
|
||||
return false
|
||||
@@ -2688,6 +2705,60 @@ func hasClaudeCodePrefix(text string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// matchesFilterPrefix 检查文本是否匹配任一过滤前缀
|
||||
func matchesFilterPrefix(text string) bool {
|
||||
for _, prefix := range systemBlockFilterPrefixes {
|
||||
if strings.HasPrefix(text, prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// filterSystemBlocksByPrefix 从 body 的 system 中移除文本匹配 systemBlockFilterPrefixes 前缀的元素
|
||||
// 直接从 body 解析 system,不依赖外部传入的 parsed.System(因为前置步骤可能已修改 body 中的 system)
|
||||
func filterSystemBlocksByPrefix(body []byte) []byte {
|
||||
sys := gjson.GetBytes(body, "system")
|
||||
if !sys.Exists() {
|
||||
return body
|
||||
}
|
||||
|
||||
switch {
|
||||
case sys.Type == gjson.String:
|
||||
if matchesFilterPrefix(sys.Str) {
|
||||
result, err := sjson.DeleteBytes(body, "system")
|
||||
if err != nil {
|
||||
return body
|
||||
}
|
||||
return result
|
||||
}
|
||||
case sys.IsArray():
|
||||
var parsed []any
|
||||
if err := json.Unmarshal([]byte(sys.Raw), &parsed); err != nil {
|
||||
return body
|
||||
}
|
||||
filtered := make([]any, 0, len(parsed))
|
||||
changed := false
|
||||
for _, item := range parsed {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
if text, ok := m["text"].(string); ok && matchesFilterPrefix(text) {
|
||||
changed = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
filtered = append(filtered, item)
|
||||
}
|
||||
if changed {
|
||||
result, err := sjson.SetBytes(body, "system", filtered)
|
||||
if err != nil {
|
||||
return body
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
// injectClaudeCodePrompt 在 system 开头注入 Claude Code 提示词
|
||||
// 处理 null、字符串、数组三种格式
|
||||
func injectClaudeCodePrompt(body []byte, system any) []byte {
|
||||
@@ -2967,6 +3038,12 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
body, reqModel = normalizeClaudeOAuthRequestBody(body, reqModel, normalizeOpts)
|
||||
}
|
||||
|
||||
// OAuth/SetupToken 账号:移除黑名单前缀匹配的 system 元素(如客户端注入的计费元数据)
|
||||
// 放在 inject/normalize 之后,确保不会被覆盖
|
||||
if account.IsOAuth() {
|
||||
body = filterSystemBlocksByPrefix(body)
|
||||
}
|
||||
|
||||
// 强制执行 cache_control 块数量限制(最多 4 个)
|
||||
body = enforceCacheControlLimit(body)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user