Merge branch 'Wei-Shaw:main' into main
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -40,6 +41,12 @@ const (
|
||||
antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待)
|
||||
antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用)
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED 专用重试参数
|
||||
// 模型容量不足时,所有账号共享同一容量池,切换账号无意义
|
||||
// 使用固定 1s 间隔重试,最多重试 60 次
|
||||
antigravityModelCapacityRetryMaxAttempts = 60
|
||||
antigravityModelCapacityRetryWait = 1 * time.Second
|
||||
|
||||
// Google RPC 状态和类型常量
|
||||
googleRPCStatusResourceExhausted = "RESOURCE_EXHAUSTED"
|
||||
googleRPCStatusUnavailable = "UNAVAILABLE"
|
||||
@@ -60,6 +67,9 @@ const (
|
||||
// 单账号 503 退避重试:原地重试的总累计等待时间上限
|
||||
// 超过此上限将不再重试,直接返回 503
|
||||
antigravitySingleAccountSmartRetryTotalMaxWait = 30 * time.Second
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间
|
||||
antigravityModelCapacityCooldown = 10 * time.Second
|
||||
)
|
||||
|
||||
// antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写)
|
||||
@@ -68,8 +78,15 @@ var antigravityPassthroughErrorMessages = []string{
|
||||
"prompt is too long",
|
||||
}
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED 全局去重:避免多个并发请求同时对同一模型进行容量耗尽重试
|
||||
var (
|
||||
modelCapacityExhaustedMu sync.RWMutex
|
||||
modelCapacityExhaustedUntil = make(map[string]time.Time) // modelName -> cooldown until
|
||||
)
|
||||
|
||||
const (
|
||||
antigravityBillingModelEnv = "GATEWAY_ANTIGRAVITY_BILL_WITH_MAPPED_MODEL"
|
||||
antigravityForwardBaseURLEnv = "GATEWAY_ANTIGRAVITY_FORWARD_BASE_URL"
|
||||
antigravityFallbackSecondsEnv = "GATEWAY_ANTIGRAVITY_FALLBACK_COOLDOWN_SECONDS"
|
||||
)
|
||||
|
||||
@@ -131,6 +148,20 @@ type antigravityRetryLoopResult struct {
|
||||
resp *http.Response
|
||||
}
|
||||
|
||||
// resolveAntigravityForwardBaseURL 解析转发用 base URL。
|
||||
// 默认使用 daily(ForwardBaseURLs 的首个地址);当环境变量为 prod 时使用第二个地址。
|
||||
func resolveAntigravityForwardBaseURL() string {
|
||||
baseURLs := antigravity.ForwardBaseURLs()
|
||||
if len(baseURLs) == 0 {
|
||||
return ""
|
||||
}
|
||||
mode := strings.ToLower(strings.TrimSpace(os.Getenv(antigravityForwardBaseURLEnv)))
|
||||
if mode == "prod" && len(baseURLs) > 1 {
|
||||
return baseURLs[1]
|
||||
}
|
||||
return baseURLs[0]
|
||||
}
|
||||
|
||||
// smartRetryAction 智能重试的处理结果
|
||||
type smartRetryAction int
|
||||
|
||||
@@ -158,7 +189,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
|
||||
// 判断是否触发智能重试
|
||||
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName := shouldTriggerAntigravitySmartRetry(p.account, respBody)
|
||||
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody)
|
||||
|
||||
// 情况1: retryDelay >= 阈值,限流模型并切换账号
|
||||
if shouldRateLimitModel {
|
||||
@@ -195,20 +226,48 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
}
|
||||
|
||||
// 情况2: retryDelay < 阈值,智能重试(最多 antigravitySmartRetryMaxAttempts 次)
|
||||
// 情况2: retryDelay < 阈值(或 MODEL_CAPACITY_EXHAUSTED),智能重试
|
||||
if shouldSmartRetry {
|
||||
var lastRetryResp *http.Response
|
||||
var lastRetryBody []byte
|
||||
|
||||
for attempt := 1; attempt <= antigravitySmartRetryMaxAttempts; attempt++ {
|
||||
log.Printf("%s status=%d oauth_smart_retry attempt=%d/%d delay=%v model=%s account=%d",
|
||||
p.prefix, resp.StatusCode, attempt, antigravitySmartRetryMaxAttempts, waitDuration, modelName, p.account.ID)
|
||||
// MODEL_CAPACITY_EXHAUSTED 使用独立的重试参数(60 次,固定 1s 间隔)
|
||||
maxAttempts := antigravitySmartRetryMaxAttempts
|
||||
if isModelCapacityExhausted {
|
||||
maxAttempts = antigravityModelCapacityRetryMaxAttempts
|
||||
waitDuration = antigravityModelCapacityRetryWait
|
||||
|
||||
// 全局去重:如果其他 goroutine 已在重试同一模型且尚在 cooldown 中,直接返回 503
|
||||
if modelName != "" {
|
||||
modelCapacityExhaustedMu.RLock()
|
||||
cooldownUntil, exists := modelCapacityExhaustedUntil[modelName]
|
||||
modelCapacityExhaustedMu.RUnlock()
|
||||
if exists && time.Now().Before(cooldownUntil) {
|
||||
log.Printf("%s status=%d model_capacity_exhausted_dedup model=%s account=%d cooldown_until=%v (skip retry)",
|
||||
p.prefix, resp.StatusCode, modelName, p.account.ID, cooldownUntil.Format("15:04:05"))
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
resp: &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||
log.Printf("%s status=%d oauth_smart_retry attempt=%d/%d delay=%v model=%s account=%d",
|
||||
p.prefix, resp.StatusCode, attempt, maxAttempts, waitDuration, modelName, p.account.ID)
|
||||
|
||||
timer := time.NewTimer(waitDuration)
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
timer.Stop()
|
||||
log.Printf("%s status=context_canceled_during_smart_retry", p.prefix)
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()}
|
||||
case <-time.After(waitDuration):
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
// 智能重试:创建新请求
|
||||
@@ -228,13 +287,19 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
|
||||
retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency)
|
||||
if retryErr == nil && retryResp != nil && retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable {
|
||||
log.Printf("%s status=%d smart_retry_success attempt=%d/%d", p.prefix, retryResp.StatusCode, attempt, antigravitySmartRetryMaxAttempts)
|
||||
log.Printf("%s status=%d smart_retry_success attempt=%d/%d", p.prefix, retryResp.StatusCode, attempt, maxAttempts)
|
||||
// 重试成功,清除 MODEL_CAPACITY_EXHAUSTED cooldown
|
||||
if isModelCapacityExhausted && modelName != "" {
|
||||
modelCapacityExhaustedMu.Lock()
|
||||
delete(modelCapacityExhaustedUntil, modelName)
|
||||
modelCapacityExhaustedMu.Unlock()
|
||||
}
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
|
||||
}
|
||||
|
||||
// 网络错误时,继续重试
|
||||
if retryErr != nil || retryResp == nil {
|
||||
log.Printf("%s status=smart_retry_network_error attempt=%d/%d error=%v", p.prefix, attempt, antigravitySmartRetryMaxAttempts, retryErr)
|
||||
log.Printf("%s status=smart_retry_network_error attempt=%d/%d error=%v", p.prefix, attempt, maxAttempts, retryErr)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -244,13 +309,13 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
lastRetryResp = retryResp
|
||||
if retryResp != nil {
|
||||
lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 8<<10))
|
||||
_ = retryResp.Body.Close()
|
||||
}
|
||||
|
||||
// 解析新的重试信息,用于下次重试的等待时间
|
||||
if attempt < antigravitySmartRetryMaxAttempts && lastRetryBody != nil {
|
||||
newShouldRetry, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
// 解析新的重试信息,用于下次重试的等待时间(MODEL_CAPACITY_EXHAUSTED 使用固定循环,跳过)
|
||||
if !isModelCapacityExhausted && attempt < maxAttempts && lastRetryBody != nil {
|
||||
newShouldRetry, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
if newShouldRetry && newWaitDuration > 0 {
|
||||
waitDuration = newWaitDuration
|
||||
}
|
||||
@@ -267,6 +332,27 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
retryBody = respBody
|
||||
}
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED:模型容量不足,切换账号无意义
|
||||
// 直接返回上游错误响应,不设置模型限流,不切换账号
|
||||
if isModelCapacityExhausted {
|
||||
// 设置 cooldown,让后续请求快速失败,避免重复重试
|
||||
if modelName != "" {
|
||||
modelCapacityExhaustedMu.Lock()
|
||||
modelCapacityExhaustedUntil[modelName] = time.Now().Add(antigravityModelCapacityCooldown)
|
||||
modelCapacityExhaustedMu.Unlock()
|
||||
}
|
||||
log.Printf("%s status=%d smart_retry_exhausted_model_capacity attempts=%d model=%s account=%d body=%s (model capacity exhausted, not switching account)",
|
||||
p.prefix, resp.StatusCode, maxAttempts, modelName, p.account.ID, truncateForLog(retryBody, 200))
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
resp: &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryBody)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// 单账号 503 退避重试模式:智能重试耗尽后不设限流、不切换账号,
|
||||
// 直接返回 503 让 Handler 层的单账号退避循环做最终处理。
|
||||
if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) {
|
||||
@@ -283,7 +369,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
|
||||
log.Printf("%s status=%d smart_retry_exhausted attempts=%d model=%s account=%d upstream_retry_delay=%v body=%s (switch account)",
|
||||
p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, rateLimitDuration, truncateForLog(retryBody, 200))
|
||||
p.prefix, resp.StatusCode, maxAttempts, modelName, p.account.ID, rateLimitDuration, truncateForLog(retryBody, 200))
|
||||
|
||||
resetAt := time.Now().Add(rateLimitDuration)
|
||||
if p.accountRepo != nil && modelName != "" {
|
||||
@@ -367,11 +453,13 @@ func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace(
|
||||
log.Printf("%s status=%d single_account_503_retry attempt=%d/%d delay=%v total_waited=%v model=%s account=%d",
|
||||
p.prefix, resp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, waitDuration, totalWaited, modelName, p.account.ID)
|
||||
|
||||
timer := time.NewTimer(waitDuration)
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
timer.Stop()
|
||||
log.Printf("%s status=context_canceled_during_single_account_retry", p.prefix)
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()}
|
||||
case <-time.After(waitDuration):
|
||||
case <-timer.C:
|
||||
}
|
||||
totalWaited += waitDuration
|
||||
|
||||
@@ -405,12 +493,12 @@ func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace(
|
||||
_ = lastRetryResp.Body.Close()
|
||||
}
|
||||
lastRetryResp = retryResp
|
||||
lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 8<<10))
|
||||
_ = retryResp.Body.Close()
|
||||
|
||||
// 解析新的重试信息,更新下次等待时间
|
||||
if attempt < antigravitySingleAccountSmartRetryMaxAttempts && lastRetryBody != nil {
|
||||
_, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
_, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
if newWaitDuration > 0 {
|
||||
waitDuration = newWaitDuration
|
||||
if waitDuration > antigravitySingleAccountSmartRetryMaxWait {
|
||||
@@ -466,10 +554,11 @@ func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopP
|
||||
}
|
||||
}
|
||||
|
||||
availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
|
||||
if len(availableURLs) == 0 {
|
||||
availableURLs = antigravity.BaseURLs
|
||||
baseURL := resolveAntigravityForwardBaseURL()
|
||||
if baseURL == "" {
|
||||
return nil, errors.New("no antigravity forward base url configured")
|
||||
}
|
||||
availableURLs := []string{baseURL}
|
||||
|
||||
var resp *http.Response
|
||||
var usedBaseURL string
|
||||
@@ -907,11 +996,11 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// URL fallback 循环
|
||||
availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
|
||||
if len(availableURLs) == 0 {
|
||||
availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有
|
||||
baseURL := resolveAntigravityForwardBaseURL()
|
||||
if baseURL == "" {
|
||||
return nil, errors.New("no antigravity forward base url configured")
|
||||
}
|
||||
availableURLs := []string{baseURL}
|
||||
|
||||
var lastErr error
|
||||
for urlIdx, baseURL := range availableURLs {
|
||||
@@ -1376,7 +1465,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
break
|
||||
}
|
||||
|
||||
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 8<<10))
|
||||
_ = retryResp.Body.Close()
|
||||
if retryResp.StatusCode == http.StatusTooManyRequests {
|
||||
retryBaseURL := ""
|
||||
@@ -1457,6 +1546,27 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
|
||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession)
|
||||
|
||||
// 精确匹配服务端配置类 400 错误,触发同账号重试 + failover
|
||||
if resp.StatusCode == http.StatusBadRequest {
|
||||
msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
|
||||
if isGoogleProjectConfigError(msg) {
|
||||
upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
|
||||
upstreamDetail := s.getUpstreamErrorDetail(respBody)
|
||||
log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID)
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true}
|
||||
}
|
||||
}
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
@@ -1997,6 +2107,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
// Always record upstream context for Ops error logs, even when we will failover.
|
||||
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
|
||||
|
||||
// 精确匹配服务端配置类 400 错误,触发同账号重试 + failover
|
||||
if resp.StatusCode == http.StatusBadRequest && isGoogleProjectConfigError(strings.ToLower(upstreamMsg)) {
|
||||
log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID)
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: requestID,
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps, RetryableOnSameAccount: true}
|
||||
}
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
@@ -2092,6 +2218,44 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
|
||||
}
|
||||
}
|
||||
|
||||
// isGoogleProjectConfigError 判断(已提取的小写)错误消息是否属于 Google 服务端配置类问题。
|
||||
// 只精确匹配已知的服务端侧错误,避免对客户端请求错误做无意义重试。
|
||||
// 适用于所有走 Google 后端的平台(Antigravity、Gemini)。
|
||||
func isGoogleProjectConfigError(lowerMsg string) bool {
|
||||
// Google 间歇性 Bug:Project ID 有效但被临时识别失败
|
||||
return strings.Contains(lowerMsg, "invalid project resource name")
|
||||
}
|
||||
|
||||
// googleConfigErrorCooldown 服务端配置类 400 错误的临时封禁时长
|
||||
const googleConfigErrorCooldown = 1 * time.Minute
|
||||
|
||||
// tempUnscheduleGoogleConfigError 对服务端配置类 400 错误触发临时封禁,
|
||||
// 避免短时间内反复调度到同一个有问题的账号。
|
||||
func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) {
|
||||
until := time.Now().Add(googleConfigErrorCooldown)
|
||||
reason := "400: invalid project resource name (auto temp-unschedule 1m)"
|
||||
if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil {
|
||||
log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err)
|
||||
} else {
|
||||
log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason)
|
||||
}
|
||||
}
|
||||
|
||||
// emptyResponseCooldown 空流式响应的临时封禁时长
|
||||
const emptyResponseCooldown = 1 * time.Minute
|
||||
|
||||
// tempUnscheduleEmptyResponse 对空流式响应触发临时封禁,
|
||||
// 避免短时间内反复调度到同一个返回空响应的账号。
|
||||
func tempUnscheduleEmptyResponse(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) {
|
||||
until := time.Now().Add(emptyResponseCooldown)
|
||||
reason := "empty stream response (auto temp-unschedule 1m)"
|
||||
if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil {
|
||||
log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err)
|
||||
} else {
|
||||
log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason)
|
||||
}
|
||||
}
|
||||
|
||||
// sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待
|
||||
// 返回 true 表示正常完成等待,false 表示 context 已取消
|
||||
func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
|
||||
@@ -2108,10 +2272,12 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
|
||||
sleepFor = 0
|
||||
}
|
||||
|
||||
timer := time.NewTimer(sleepFor)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return false
|
||||
case <-time.After(sleepFor):
|
||||
case <-timer.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -2156,8 +2322,9 @@ func antigravityFallbackCooldownSeconds() (time.Duration, bool) {
|
||||
|
||||
// antigravitySmartRetryInfo 智能重试所需的信息
|
||||
type antigravitySmartRetryInfo struct {
|
||||
RetryDelay time.Duration // 重试延迟时间
|
||||
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5")
|
||||
RetryDelay time.Duration // 重试延迟时间
|
||||
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5")
|
||||
IsModelCapacityExhausted bool // 是否为模型容量不足(MODEL_CAPACITY_EXHAUSTED)
|
||||
}
|
||||
|
||||
// parseAntigravitySmartRetryInfo 解析 Google RPC RetryInfo 和 ErrorInfo 信息
|
||||
@@ -2272,31 +2439,40 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo {
|
||||
}
|
||||
|
||||
return &antigravitySmartRetryInfo{
|
||||
RetryDelay: retryDelay,
|
||||
ModelName: modelName,
|
||||
RetryDelay: retryDelay,
|
||||
ModelName: modelName,
|
||||
IsModelCapacityExhausted: hasModelCapacityExhausted,
|
||||
}
|
||||
}
|
||||
|
||||
// shouldTriggerAntigravitySmartRetry 判断是否应该触发智能重试
|
||||
// 返回:
|
||||
// - shouldRetry: 是否应该智能重试(retryDelay < antigravityRateLimitThreshold)
|
||||
// - shouldRateLimitModel: 是否应该限流模型(retryDelay >= antigravityRateLimitThreshold)
|
||||
// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为 0)
|
||||
// - shouldRetry: 是否应该智能重试(retryDelay < antigravityRateLimitThreshold,或 MODEL_CAPACITY_EXHAUSTED)
|
||||
// - shouldRateLimitModel: 是否应该限流模型并切换账号(仅 RATE_LIMIT_EXCEEDED 且 retryDelay >= 阈值)
|
||||
// - waitDuration: 等待时间
|
||||
// - modelName: 限流的模型名称
|
||||
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string) {
|
||||
// - isModelCapacityExhausted: 是否为模型容量不足(MODEL_CAPACITY_EXHAUSTED)
|
||||
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string, isModelCapacityExhausted bool) {
|
||||
if account.Platform != PlatformAntigravity {
|
||||
return false, false, 0, ""
|
||||
return false, false, 0, "", false
|
||||
}
|
||||
|
||||
info := parseAntigravitySmartRetryInfo(respBody)
|
||||
if info == nil {
|
||||
return false, false, 0, ""
|
||||
return false, false, 0, "", false
|
||||
}
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED(模型容量不足):所有账号共享同一模型容量池
|
||||
// 切换账号无意义,使用固定 1s 间隔重试
|
||||
if info.IsModelCapacityExhausted {
|
||||
return true, false, antigravityModelCapacityRetryWait, info.ModelName, true
|
||||
}
|
||||
|
||||
// RATE_LIMIT_EXCEEDED(账号级限流):
|
||||
// retryDelay >= 阈值:直接限流模型,不重试
|
||||
// 注意:如果上游未提供 retryDelay,parseAntigravitySmartRetryInfo 已设置为默认 30s
|
||||
if info.RetryDelay >= antigravityRateLimitThreshold {
|
||||
return false, true, info.RetryDelay, info.ModelName
|
||||
return false, true, info.RetryDelay, info.ModelName, false
|
||||
}
|
||||
|
||||
// retryDelay < 阈值:智能重试
|
||||
@@ -2305,7 +2481,7 @@ func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shou
|
||||
waitDuration = antigravitySmartRetryMinWait
|
||||
}
|
||||
|
||||
return true, false, waitDuration, info.ModelName
|
||||
return true, false, waitDuration, info.ModelName, false
|
||||
}
|
||||
|
||||
// handleModelRateLimitParams 模型级限流处理参数
|
||||
@@ -2331,8 +2507,9 @@ type handleModelRateLimitResult struct {
|
||||
|
||||
// handleModelRateLimit 处理模型级限流(在原有逻辑之前调用)
|
||||
// 仅处理 429/503,解析模型名和 retryDelay
|
||||
// - retryDelay < antigravityRateLimitThreshold: 返回 ShouldRetry=true,由调用方等待后重试
|
||||
// - retryDelay >= antigravityRateLimitThreshold: 设置模型限流 + 清除粘性会话 + 返回 SwitchError
|
||||
// - MODEL_CAPACITY_EXHAUSTED: 返回 Handled=true(实际重试由 handleSmartRetry 处理)
|
||||
// - RATE_LIMIT_EXCEEDED + retryDelay < 阈值: 返回 ShouldRetry=true,由调用方等待后重试
|
||||
// - RATE_LIMIT_EXCEEDED + retryDelay >= 阈值: 设置模型限流 + 清除粘性会话 + 返回 SwitchError
|
||||
func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimitParams) *handleModelRateLimitResult {
|
||||
if p.statusCode != 429 && p.statusCode != 503 {
|
||||
return &handleModelRateLimitResult{Handled: false}
|
||||
@@ -2343,7 +2520,17 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit
|
||||
return &handleModelRateLimitResult{Handled: false}
|
||||
}
|
||||
|
||||
// < antigravityRateLimitThreshold: 等待后重试
|
||||
// MODEL_CAPACITY_EXHAUSTED:模型容量不足,所有账号共享同一容量池
|
||||
// 切换账号无意义,不设置模型限流(实际重试由 handleSmartRetry 处理)
|
||||
if info.IsModelCapacityExhausted {
|
||||
log.Printf("%s status=%d model_capacity_exhausted model=%s (not switching account, retry handled by smart retry)",
|
||||
p.prefix, p.statusCode, info.ModelName)
|
||||
return &handleModelRateLimitResult{
|
||||
Handled: true,
|
||||
}
|
||||
}
|
||||
|
||||
// RATE_LIMIT_EXCEEDED: < antigravityRateLimitThreshold: 等待后重试
|
||||
if info.RetryDelay < antigravityRateLimitThreshold {
|
||||
log.Printf("%s status=%d model_rate_limit_wait model=%s wait=%v",
|
||||
p.prefix, p.statusCode, info.ModelName, info.RetryDelay)
|
||||
@@ -2354,7 +2541,7 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit
|
||||
}
|
||||
}
|
||||
|
||||
// >= antigravityRateLimitThreshold: 设置限流 + 清除粘性会话 + 切换账号
|
||||
// RATE_LIMIT_EXCEEDED: >= antigravityRateLimitThreshold: 设置限流 + 清除粘性会话 + 切换账号
|
||||
s.setModelRateLimitAndClearSession(p, info)
|
||||
|
||||
return &handleModelRateLimitResult{
|
||||
@@ -2906,9 +3093,14 @@ returnResponse:
|
||||
// 选择最后一个有效响应
|
||||
finalResponse := pickGeminiCollectResult(last, lastWithParts)
|
||||
|
||||
// 处理空响应情况
|
||||
// 处理空响应情况 — 触发同账号重试 + failover 切换账号
|
||||
if last == nil && lastWithParts == nil {
|
||||
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
|
||||
log.Printf("[antigravity-Forward] warning: empty stream response (gemini non-stream), triggering failover")
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusBadGateway,
|
||||
ResponseBody: []byte(`{"error":"empty stream response from upstream"}`),
|
||||
RetryableOnSameAccount: true,
|
||||
}
|
||||
}
|
||||
|
||||
// 如果收集到了图片 parts,需要合并到最终响应中
|
||||
@@ -3126,6 +3318,21 @@ func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, accou
|
||||
log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, truncateForLog(body, maxBytes))
|
||||
}
|
||||
|
||||
// 检查错误透传规则
|
||||
if ptStatus, ptErrType, ptErrMsg, matched := applyErrorPassthroughRule(
|
||||
c, account.Platform, upstreamStatus, body,
|
||||
0, "", "",
|
||||
); matched {
|
||||
c.JSON(ptStatus, gin.H{
|
||||
"type": "error",
|
||||
"error": gin.H{"type": ptErrType, "message": ptErrMsg},
|
||||
})
|
||||
if upstreamMsg == "" {
|
||||
return fmt.Errorf("upstream error: %d", upstreamStatus)
|
||||
}
|
||||
return fmt.Errorf("upstream error: %d message=%s", upstreamStatus, upstreamMsg)
|
||||
}
|
||||
|
||||
var statusCode int
|
||||
var errType, errMsg string
|
||||
|
||||
@@ -3323,10 +3530,14 @@ returnResponse:
|
||||
// 选择最后一个有效响应
|
||||
finalResponse := pickGeminiCollectResult(last, lastWithParts)
|
||||
|
||||
// 处理空响应情况
|
||||
// 处理空响应情况 — 触发同账号重试 + failover 切换账号
|
||||
if last == nil && lastWithParts == nil {
|
||||
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream")
|
||||
log.Printf("[antigravity-Forward] warning: empty stream response (claude non-stream), triggering failover")
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusBadGateway,
|
||||
ResponseBody: []byte(`{"error":"empty stream response from upstream"}`),
|
||||
RetryableOnSameAccount: true,
|
||||
}
|
||||
}
|
||||
|
||||
// 将收集的所有 parts 合并到最终响应中
|
||||
|
||||
Reference in New Issue
Block a user