diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go
index 835db94d..8bb139a6 100644
--- a/backend/internal/config/config.go
+++ b/backend/internal/config/config.go
@@ -257,6 +257,14 @@ type GatewayConfig struct {
// 是否允许对部分 400 错误触发 failover(默认关闭以避免改变语义)
FailoverOn400 bool `mapstructure:"failover_on_400"`
+ // 账户切换最大次数(遇到上游错误时切换到其他账户的次数上限)
+ MaxAccountSwitches int `mapstructure:"max_account_switches"`
+ // Gemini 账户切换最大次数(Gemini 平台单独配置,因 API 限制更严格)
+ MaxAccountSwitchesGemini int `mapstructure:"max_account_switches_gemini"`
+
+ // Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用
+ AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"`
+
// Scheduling: 账号调度相关配置
Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"`
@@ -298,6 +306,9 @@ type GatewaySchedulingConfig struct {
FallbackWaitTimeout time.Duration `mapstructure:"fallback_wait_timeout"`
FallbackMaxWaiting int `mapstructure:"fallback_max_waiting"`
+ // 兜底层账户选择策略: "last_used"(按最后使用时间排序,默认) 或 "random"(随机)
+ FallbackSelectionMode string `mapstructure:"fallback_selection_mode"`
+
// 负载计算
LoadBatchEnabled bool `mapstructure:"load_batch_enabled"`
@@ -786,6 +797,9 @@ func setDefaults() {
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
viper.SetDefault("gateway.inject_beta_for_apikey", false)
viper.SetDefault("gateway.failover_on_400", false)
+ viper.SetDefault("gateway.max_account_switches", 10)
+ viper.SetDefault("gateway.max_account_switches_gemini", 3)
+ viper.SetDefault("gateway.antigravity_fallback_cooldown_minutes", 1)
viper.SetDefault("gateway.max_body_size", int64(100*1024*1024))
viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy)
// HTTP 上游连接池配置(针对 5000+ 并发用户优化)
@@ -798,11 +812,12 @@ func setDefaults() {
viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 30) // 并发槽位过期时间(支持超长请求)
viper.SetDefault("gateway.stream_data_interval_timeout", 180)
viper.SetDefault("gateway.stream_keepalive_interval", 10)
- viper.SetDefault("gateway.max_line_size", 10*1024*1024)
+ viper.SetDefault("gateway.max_line_size", 40*1024*1024)
viper.SetDefault("gateway.scheduling.sticky_session_max_waiting", 3)
viper.SetDefault("gateway.scheduling.sticky_session_wait_timeout", 120*time.Second)
viper.SetDefault("gateway.scheduling.fallback_wait_timeout", 30*time.Second)
viper.SetDefault("gateway.scheduling.fallback_max_waiting", 100)
+ viper.SetDefault("gateway.scheduling.fallback_selection_mode", "last_used")
viper.SetDefault("gateway.scheduling.load_batch_enabled", true)
viper.SetDefault("gateway.scheduling.slot_cleanup_interval", 30*time.Second)
viper.SetDefault("gateway.scheduling.db_fallback_enabled", true)
diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go
index c81bcc29..45d80272 100644
--- a/backend/internal/handler/admin/account_handler.go
+++ b/backend/internal/handler/admin/account_handler.go
@@ -541,6 +541,36 @@ func (h *AccountHandler) Refresh(c *gin.Context) {
newCredentials[k] = v
}
}
+
+ // 如果 project_id 获取失败,先更新凭证,再标记账户为 error
+ if tokenInfo.ProjectIDMissing {
+ // 先更新凭证
+ _, updateErr := h.adminService.UpdateAccount(c.Request.Context(), accountID, &service.UpdateAccountInput{
+ Credentials: newCredentials,
+ })
+ if updateErr != nil {
+ response.InternalError(c, "Failed to update credentials: "+updateErr.Error())
+ return
+ }
+ // 标记账户为 error
+ if setErr := h.adminService.SetAccountError(c.Request.Context(), accountID, "missing_project_id: 账户缺少project id,可能无法使用Antigravity"); setErr != nil {
+ response.InternalError(c, "Failed to set account error: "+setErr.Error())
+ return
+ }
+ response.Success(c, gin.H{
+ "message": "Token refreshed but project_id is missing, account marked as error",
+ "warning": "missing_project_id",
+ })
+ return
+ }
+
+ // 成功获取到 project_id,如果之前是 missing_project_id 错误则清除
+ if account.Status == service.StatusError && strings.Contains(account.ErrorMessage, "missing_project_id:") {
+ if _, clearErr := h.adminService.ClearAccountError(c.Request.Context(), accountID); clearErr != nil {
+ response.InternalError(c, "Failed to clear account error: "+clearErr.Error())
+ return
+ }
+ }
} else {
// Use Anthropic/Claude OAuth service to refresh token
tokenInfo, err := h.oauthService.RefreshAccountToken(c.Request.Context(), account)
diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go
index 8c32be21..6c8d9ebe 100644
--- a/backend/internal/handler/gateway_handler.go
+++ b/backend/internal/handler/gateway_handler.go
@@ -31,6 +31,8 @@ type GatewayHandler struct {
userService *service.UserService
billingCacheService *service.BillingCacheService
concurrencyHelper *ConcurrencyHelper
+ maxAccountSwitches int
+ maxAccountSwitchesGemini int
}
// NewGatewayHandler creates a new GatewayHandler
@@ -44,8 +46,16 @@ func NewGatewayHandler(
cfg *config.Config,
) *GatewayHandler {
pingInterval := time.Duration(0)
+ maxAccountSwitches := 10
+ maxAccountSwitchesGemini := 3
if cfg != nil {
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
+ if cfg.Gateway.MaxAccountSwitches > 0 {
+ maxAccountSwitches = cfg.Gateway.MaxAccountSwitches
+ }
+ if cfg.Gateway.MaxAccountSwitchesGemini > 0 {
+ maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini
+ }
}
return &GatewayHandler{
gatewayService: gatewayService,
@@ -54,6 +64,8 @@ func NewGatewayHandler(
userService: userService,
billingCacheService: billingCacheService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
+ maxAccountSwitches: maxAccountSwitches,
+ maxAccountSwitchesGemini: maxAccountSwitchesGemini,
}
}
@@ -179,7 +191,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
if platform == service.PlatformGemini {
- const maxAccountSwitches = 3
+ maxAccountSwitches := h.maxAccountSwitchesGemini
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
lastFailoverStatus := 0
@@ -313,7 +325,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
}
- const maxAccountSwitches = 10
+ maxAccountSwitches := h.maxAccountSwitches
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
lastFailoverStatus := 0
diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go
index ec943e61..c7646b38 100644
--- a/backend/internal/handler/gemini_v1beta_handler.go
+++ b/backend/internal/handler/gemini_v1beta_handler.go
@@ -220,7 +220,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
if sessionHash != "" {
sessionKey = "gemini:" + sessionHash
}
- const maxAccountSwitches = 3
+ maxAccountSwitches := h.maxAccountSwitchesGemini
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
lastFailoverStatus := 0
diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go
index 68e67656..4c9dd8b9 100644
--- a/backend/internal/handler/openai_gateway_handler.go
+++ b/backend/internal/handler/openai_gateway_handler.go
@@ -25,6 +25,7 @@ type OpenAIGatewayHandler struct {
gatewayService *service.OpenAIGatewayService
billingCacheService *service.BillingCacheService
concurrencyHelper *ConcurrencyHelper
+ maxAccountSwitches int
}
// NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler
@@ -35,13 +36,18 @@ func NewOpenAIGatewayHandler(
cfg *config.Config,
) *OpenAIGatewayHandler {
pingInterval := time.Duration(0)
+ maxAccountSwitches := 3
if cfg != nil {
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
+ if cfg.Gateway.MaxAccountSwitches > 0 {
+ maxAccountSwitches = cfg.Gateway.MaxAccountSwitches
+ }
}
return &OpenAIGatewayHandler{
gatewayService: gatewayService,
billingCacheService: billingCacheService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
+ maxAccountSwitches: maxAccountSwitches,
}
}
@@ -189,7 +195,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
// Generate session hash (header first; fallback to prompt_cache_key)
sessionHash := h.gatewayService.GenerateSessionHash(c, reqBody)
- const maxAccountSwitches = 3
+ maxAccountSwitches := h.maxAccountSwitches
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
lastFailoverStatus := 0
diff --git a/backend/internal/pkg/antigravity/client.go b/backend/internal/pkg/antigravity/client.go
index 1248be95..a6279b11 100644
--- a/backend/internal/pkg/antigravity/client.go
+++ b/backend/internal/pkg/antigravity/client.go
@@ -16,15 +16,6 @@ import (
"time"
)
-// resolveHost 从 URL 解析 host
-func resolveHost(urlStr string) string {
- parsed, err := url.Parse(urlStr)
- if err != nil {
- return ""
- }
- return parsed.Host
-}
-
// NewAPIRequestWithURL 使用指定的 base URL 创建 Antigravity API 请求(v1internal 端点)
func NewAPIRequestWithURL(ctx context.Context, baseURL, action, accessToken string, body []byte) (*http.Request, error) {
// 构建 URL,流式请求添加 ?alt=sse 参数
@@ -39,23 +30,11 @@ func NewAPIRequestWithURL(ctx context.Context, baseURL, action, accessToken stri
return nil, err
}
- // 基础 Headers
+ // 基础 Headers(与 Antigravity-Manager 保持一致,只设置这 3 个)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+accessToken)
req.Header.Set("User-Agent", UserAgent)
- // Accept Header 根据请求类型设置
- if isStream {
- req.Header.Set("Accept", "text/event-stream")
- } else {
- req.Header.Set("Accept", "application/json")
- }
-
- // 显式设置 Host Header
- if host := resolveHost(apiURL); host != "" {
- req.Host = host
- }
-
return req, nil
}
@@ -195,12 +174,15 @@ func isConnectionError(err error) bool {
}
// shouldFallbackToNextURL 判断是否应切换到下一个 URL
-// 仅连接错误和 HTTP 429 触发 URL 降级
+// 与 Antigravity-Manager 保持一致:连接错误、429、408、404、5xx 触发 URL 降级
func shouldFallbackToNextURL(err error, statusCode int) bool {
if isConnectionError(err) {
return true
}
- return statusCode == http.StatusTooManyRequests
+ return statusCode == http.StatusTooManyRequests ||
+ statusCode == http.StatusRequestTimeout ||
+ statusCode == http.StatusNotFound ||
+ statusCode >= 500
}
// ExchangeCode 用 authorization code 交换 token
@@ -321,11 +303,8 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
return nil, nil, fmt.Errorf("序列化请求失败: %w", err)
}
- // 获取可用的 URL 列表
- availableURLs := DefaultURLAvailability.GetAvailableURLs()
- if len(availableURLs) == 0 {
- availableURLs = BaseURLs // 所有 URL 都不可用时,重试所有
- }
+ // 固定顺序:prod -> daily
+ availableURLs := BaseURLs
var lastErr error
for urlIdx, baseURL := range availableURLs {
@@ -343,7 +322,6 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
if err != nil {
lastErr = fmt.Errorf("loadCodeAssist 请求失败: %w", err)
if shouldFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
- DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity] loadCodeAssist URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1])
continue
}
@@ -358,7 +336,6 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
// 检查是否需要 URL 降级
if shouldFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 {
- DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity] loadCodeAssist URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1])
continue
}
@@ -376,6 +353,8 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC
var rawResp map[string]any
_ = json.Unmarshal(respBodyBytes, &rawResp)
+ // 标记成功的 URL,下次优先使用
+ DefaultURLAvailability.MarkSuccess(baseURL)
return &loadResp, rawResp, nil
}
@@ -412,11 +391,8 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
return nil, nil, fmt.Errorf("序列化请求失败: %w", err)
}
- // 获取可用的 URL 列表
- availableURLs := DefaultURLAvailability.GetAvailableURLs()
- if len(availableURLs) == 0 {
- availableURLs = BaseURLs // 所有 URL 都不可用时,重试所有
- }
+ // 固定顺序:prod -> daily
+ availableURLs := BaseURLs
var lastErr error
for urlIdx, baseURL := range availableURLs {
@@ -434,7 +410,6 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
if err != nil {
lastErr = fmt.Errorf("fetchAvailableModels 请求失败: %w", err)
if shouldFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
- DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity] fetchAvailableModels URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1])
continue
}
@@ -449,7 +424,6 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
// 检查是否需要 URL 降级
if shouldFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 {
- DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity] fetchAvailableModels URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1])
continue
}
@@ -467,6 +441,8 @@ func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectI
var rawResp map[string]any
_ = json.Unmarshal(respBodyBytes, &rawResp)
+ // 标记成功的 URL,下次优先使用
+ DefaultURLAvailability.MarkSuccess(baseURL)
return &modelsResp, rawResp, nil
}
diff --git a/backend/internal/pkg/antigravity/gemini_types.go b/backend/internal/pkg/antigravity/gemini_types.go
index f688332f..c1cc998c 100644
--- a/backend/internal/pkg/antigravity/gemini_types.go
+++ b/backend/internal/pkg/antigravity/gemini_types.go
@@ -143,9 +143,10 @@ type GeminiResponse struct {
// GeminiCandidate Gemini 候选响应
type GeminiCandidate struct {
- Content *GeminiContent `json:"content,omitempty"`
- FinishReason string `json:"finishReason,omitempty"`
- Index int `json:"index,omitempty"`
+ Content *GeminiContent `json:"content,omitempty"`
+ FinishReason string `json:"finishReason,omitempty"`
+ Index int `json:"index,omitempty"`
+ GroundingMetadata *GeminiGroundingMetadata `json:"groundingMetadata,omitempty"`
}
// GeminiUsageMetadata Gemini 用量元数据
@@ -156,6 +157,23 @@ type GeminiUsageMetadata struct {
TotalTokenCount int `json:"totalTokenCount,omitempty"`
}
+// GeminiGroundingMetadata Gemini grounding 元数据(Web Search)
+type GeminiGroundingMetadata struct {
+ WebSearchQueries []string `json:"webSearchQueries,omitempty"`
+ GroundingChunks []GeminiGroundingChunk `json:"groundingChunks,omitempty"`
+}
+
+// GeminiGroundingChunk Gemini grounding chunk
+type GeminiGroundingChunk struct {
+ Web *GeminiGroundingWeb `json:"web,omitempty"`
+}
+
+// GeminiGroundingWeb Gemini grounding web 信息
+type GeminiGroundingWeb struct {
+ Title string `json:"title,omitempty"`
+ URI string `json:"uri,omitempty"`
+}
+
// DefaultSafetySettings 默认安全设置(关闭所有过滤)
var DefaultSafetySettings = []GeminiSafetySetting{
{Category: "HARM_CATEGORY_HARASSMENT", Threshold: "OFF"},
diff --git a/backend/internal/pkg/antigravity/oauth.go b/backend/internal/pkg/antigravity/oauth.go
index 736c45df..ee2a6c1a 100644
--- a/backend/internal/pkg/antigravity/oauth.go
+++ b/backend/internal/pkg/antigravity/oauth.go
@@ -32,8 +32,8 @@ const (
"https://www.googleapis.com/auth/cclog " +
"https://www.googleapis.com/auth/experimentsandconfigs"
- // User-Agent(模拟官方客户端)
- UserAgent = "antigravity/1.104.0 darwin/arm64"
+ // User-Agent(与 Antigravity-Manager 保持一致)
+ UserAgent = "antigravity/1.11.9 windows/amd64"
// Session 过期时间
SessionTTL = 30 * time.Minute
@@ -42,22 +42,21 @@ const (
URLAvailabilityTTL = 5 * time.Minute
)
-// BaseURLs 定义 Antigravity API 端点,按优先级排序
-// fallback 顺序: sandbox → daily → prod
+// BaseURLs 定义 Antigravity API 端点(与 Antigravity-Manager 保持一致)
var BaseURLs = []string{
- "https://daily-cloudcode-pa.sandbox.googleapis.com", // sandbox
- "https://daily-cloudcode-pa.googleapis.com", // daily
- "https://cloudcode-pa.googleapis.com", // prod
+ "https://cloudcode-pa.googleapis.com", // prod (优先)
+ "https://daily-cloudcode-pa.sandbox.googleapis.com", // daily sandbox (备用)
}
// BaseURL 默认 URL(保持向后兼容)
var BaseURL = BaseURLs[0]
-// URLAvailability 管理 URL 可用性状态(带 TTL 自动恢复)
+// URLAvailability 管理 URL 可用性状态(带 TTL 自动恢复和动态优先级)
type URLAvailability struct {
mu sync.RWMutex
unavailable map[string]time.Time // URL -> 恢复时间
ttl time.Duration
+ lastSuccess string // 最近成功请求的 URL,优先使用
}
// DefaultURLAvailability 全局 URL 可用性管理器
@@ -78,6 +77,15 @@ func (u *URLAvailability) MarkUnavailable(url string) {
u.unavailable[url] = time.Now().Add(u.ttl)
}
+// MarkSuccess 标记 URL 请求成功,将其设为优先使用
+func (u *URLAvailability) MarkSuccess(url string) {
+ u.mu.Lock()
+ defer u.mu.Unlock()
+ u.lastSuccess = url
+ // 成功后清除该 URL 的不可用标记
+ delete(u.unavailable, url)
+}
+
// IsAvailable 检查 URL 是否可用
func (u *URLAvailability) IsAvailable(url string) bool {
u.mu.RLock()
@@ -89,14 +97,29 @@ func (u *URLAvailability) IsAvailable(url string) bool {
return time.Now().After(expiry)
}
-// GetAvailableURLs 返回可用的 URL 列表(保持优先级顺序)
+// GetAvailableURLs 返回可用的 URL 列表
+// 最近成功的 URL 优先,其他按默认顺序
func (u *URLAvailability) GetAvailableURLs() []string {
u.mu.RLock()
defer u.mu.RUnlock()
now := time.Now()
result := make([]string, 0, len(BaseURLs))
+
+ // 如果有最近成功的 URL 且可用,放在最前面
+ if u.lastSuccess != "" {
+ expiry, exists := u.unavailable[u.lastSuccess]
+ if !exists || now.After(expiry) {
+ result = append(result, u.lastSuccess)
+ }
+ }
+
+ // 添加其他可用的 URL(按默认顺序)
for _, url := range BaseURLs {
+ // 跳过已添加的 lastSuccess
+ if url == u.lastSuccess {
+ continue
+ }
expiry, exists := u.unavailable[url]
if !exists || now.After(expiry) {
result = append(result, url)
@@ -240,24 +263,3 @@ func BuildAuthorizationURL(state, codeChallenge string) string {
return fmt.Sprintf("%s?%s", AuthorizeURL, params.Encode())
}
-
-// GenerateMockProjectID 生成随机 project_id(当 API 不返回时使用)
-// 格式:{形容词}-{名词}-{5位随机字符}
-func GenerateMockProjectID() string {
- adjectives := []string{"useful", "bright", "swift", "calm", "bold"}
- nouns := []string{"fuze", "wave", "spark", "flow", "core"}
-
- randBytes, _ := GenerateRandomBytes(7)
-
- adj := adjectives[int(randBytes[0])%len(adjectives)]
- noun := nouns[int(randBytes[1])%len(nouns)]
-
- // 生成 5 位随机字符(a-z0-9)
- const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
- suffix := make([]byte, 5)
- for i := 0; i < 5; i++ {
- suffix[i] = charset[int(randBytes[i+2])%len(charset)]
- }
-
- return fmt.Sprintf("%s-%s-%s", adj, noun, string(suffix))
-}
diff --git a/backend/internal/pkg/antigravity/request_transformer.go b/backend/internal/pkg/antigravity/request_transformer.go
index a8474576..637a4ea8 100644
--- a/backend/internal/pkg/antigravity/request_transformer.go
+++ b/backend/internal/pkg/antigravity/request_transformer.go
@@ -54,6 +54,9 @@ func DefaultTransformOptions() TransformOptions {
}
}
+// webSearchFallbackModel web_search 请求使用的降级模型
+const webSearchFallbackModel = "gemini-2.5-flash"
+
// TransformClaudeToGemini 将 Claude 请求转换为 v1internal Gemini 格式
func TransformClaudeToGemini(claudeReq *ClaudeRequest, projectID, mappedModel string) ([]byte, error) {
return TransformClaudeToGeminiWithOptions(claudeReq, projectID, mappedModel, DefaultTransformOptions())
@@ -64,12 +67,23 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
// 用于存储 tool_use id -> name 映射
toolIDToName := make(map[string]string)
+ // 检测是否有 web_search 工具
+ hasWebSearchTool := hasWebSearchTool(claudeReq.Tools)
+ requestType := "agent"
+ targetModel := mappedModel
+ if hasWebSearchTool {
+ requestType = "web_search"
+ if targetModel != webSearchFallbackModel {
+ targetModel = webSearchFallbackModel
+ }
+ }
+
// 检测是否启用 thinking
isThinkingEnabled := claudeReq.Thinking != nil && claudeReq.Thinking.Type == "enabled"
// 只有 Gemini 模型支持 dummy thought workaround
// Claude 模型通过 Vertex/Google API 需要有效的 thought signatures
- allowDummyThought := strings.HasPrefix(mappedModel, "gemini-")
+ allowDummyThought := strings.HasPrefix(targetModel, "gemini-")
// 1. 构建 contents
contents, strippedThinking, err := buildContents(claudeReq.Messages, toolIDToName, isThinkingEnabled, allowDummyThought)
@@ -78,7 +92,7 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
}
// 2. 构建 systemInstruction
- systemInstruction := buildSystemInstruction(claudeReq.System, claudeReq.Model, opts)
+ systemInstruction := buildSystemInstruction(claudeReq.System, claudeReq.Model, opts, claudeReq.Tools)
// 3. 构建 generationConfig
reqForConfig := claudeReq
@@ -89,6 +103,11 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
reqCopy.Thinking = nil
reqForConfig = &reqCopy
}
+ if targetModel != "" && targetModel != reqForConfig.Model {
+ reqCopy := *reqForConfig
+ reqCopy.Model = targetModel
+ reqForConfig = &reqCopy
+ }
generationConfig := buildGenerationConfig(reqForConfig)
// 4. 构建 tools
@@ -127,8 +146,8 @@ func TransformClaudeToGeminiWithOptions(claudeReq *ClaudeRequest, projectID, map
Project: projectID,
RequestID: "agent-" + uuid.New().String(),
UserAgent: "antigravity", // 固定值,与官方客户端一致
- RequestType: "agent",
- Model: mappedModel,
+ RequestType: requestType,
+ Model: targetModel,
Request: innerRequest,
}
@@ -154,8 +173,40 @@ func GetDefaultIdentityPatch() string {
return antigravityIdentity
}
-// buildSystemInstruction 构建 systemInstruction
-func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions) *GeminiContent {
+// mcpXMLProtocol MCP XML 工具调用协议(与 Antigravity-Manager 保持一致)
+const mcpXMLProtocol = `
+==== MCP XML 工具调用协议 (Workaround) ====
+当你需要调用名称以 ` + "`mcp__`" + ` 开头的 MCP 工具时:
+1) 优先尝试 XML 格式调用:输出 ` + "`{\"arg\":\"value\"}`" + `。
+2) 必须直接输出 XML 块,无需 markdown 包装,内容为 JSON 格式的入参。
+3) 这种方式具有更高的连通性和容错性,适用于大型结果返回场景。
+===========================================`
+
+// hasMCPTools 检测是否有 mcp__ 前缀的工具
+func hasMCPTools(tools []ClaudeTool) bool {
+ for _, tool := range tools {
+ if strings.HasPrefix(tool.Name, "mcp__") {
+ return true
+ }
+ }
+ return false
+}
+
+// filterOpenCodePrompt 过滤 OpenCode 默认提示词,只保留用户自定义指令
+func filterOpenCodePrompt(text string) string {
+ if !strings.Contains(text, "You are an interactive CLI tool") {
+ return text
+ }
+ // 提取 "Instructions from:" 及之后的部分
+ if idx := strings.Index(text, "Instructions from:"); idx >= 0 {
+ return text[idx:]
+ }
+ // 如果没有自定义指令,返回空
+ return ""
+}
+
+// buildSystemInstruction 构建 systemInstruction(与 Antigravity-Manager 保持一致)
+func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions, tools []ClaudeTool) *GeminiContent {
var parts []GeminiPart
// 先解析用户的 system prompt,检测是否已包含 Antigravity identity
@@ -167,10 +218,14 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
var sysStr string
if err := json.Unmarshal(system, &sysStr); err == nil {
if strings.TrimSpace(sysStr) != "" {
- userSystemParts = append(userSystemParts, GeminiPart{Text: sysStr})
if strings.Contains(sysStr, "You are Antigravity") {
userHasAntigravityIdentity = true
}
+ // 过滤 OpenCode 默认提示词
+ filtered := filterOpenCodePrompt(sysStr)
+ if filtered != "" {
+ userSystemParts = append(userSystemParts, GeminiPart{Text: filtered})
+ }
}
} else {
// 尝试解析为数组
@@ -178,10 +233,14 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
if err := json.Unmarshal(system, &sysBlocks); err == nil {
for _, block := range sysBlocks {
if block.Type == "text" && strings.TrimSpace(block.Text) != "" {
- userSystemParts = append(userSystemParts, GeminiPart{Text: block.Text})
if strings.Contains(block.Text, "You are Antigravity") {
userHasAntigravityIdentity = true
}
+ // 过滤 OpenCode 默认提示词
+ filtered := filterOpenCodePrompt(block.Text)
+ if filtered != "" {
+ userSystemParts = append(userSystemParts, GeminiPart{Text: filtered})
+ }
}
}
}
@@ -200,6 +259,16 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans
// 添加用户的 system prompt
parts = append(parts, userSystemParts...)
+ // 检测是否有 MCP 工具,如有则注入 XML 调用协议
+ if hasMCPTools(tools) {
+ parts = append(parts, GeminiPart{Text: mcpXMLProtocol})
+ }
+
+ // 如果用户没有提供 Antigravity 身份,添加结束标记
+ if !userHasAntigravityIdentity {
+ parts = append(parts, GeminiPart{Text: "\n--- [SYSTEM_PROMPT_END] ---"})
+ }
+
if len(parts) == 0 {
return nil
}
@@ -429,6 +498,11 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
StopSequences: DefaultStopSequences,
}
+ // 如果请求中指定了 MaxTokens,使用请求值
+ if req.MaxTokens > 0 {
+ config.MaxOutputTokens = req.MaxTokens
+ }
+
// Thinking 配置
if req.Thinking != nil && req.Thinking.Type == "enabled" {
config.ThinkingConfig = &GeminiThinkingConfig{
@@ -458,37 +532,43 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
return config
}
+func hasWebSearchTool(tools []ClaudeTool) bool {
+ for _, tool := range tools {
+ if isWebSearchTool(tool) {
+ return true
+ }
+ }
+ return false
+}
+
+func isWebSearchTool(tool ClaudeTool) bool {
+ if strings.HasPrefix(tool.Type, "web_search") || tool.Type == "google_search" {
+ return true
+ }
+
+ name := strings.TrimSpace(tool.Name)
+ switch name {
+ case "web_search", "google_search", "web_search_20250305":
+ return true
+ default:
+ return false
+ }
+}
+
// buildTools 构建 tools
func buildTools(tools []ClaudeTool) []GeminiToolDeclaration {
if len(tools) == 0 {
return nil
}
- // 检查是否有 web_search 工具
- hasWebSearch := false
- for _, tool := range tools {
- if tool.Name == "web_search" {
- hasWebSearch = true
- break
- }
- }
-
- if hasWebSearch {
- // Web Search 工具映射
- return []GeminiToolDeclaration{{
- GoogleSearch: &GeminiGoogleSearch{
- EnhancedContent: &GeminiEnhancedContent{
- ImageSearch: &GeminiImageSearch{
- MaxResultCount: 5,
- },
- },
- },
- }}
- }
+ hasWebSearch := hasWebSearchTool(tools)
// 普通工具
var funcDecls []GeminiFunctionDecl
for _, tool := range tools {
+ if isWebSearchTool(tool) {
+ continue
+ }
// 跳过无效工具名称
if strings.TrimSpace(tool.Name) == "" {
log.Printf("Warning: skipping tool with empty name")
@@ -531,7 +611,20 @@ func buildTools(tools []ClaudeTool) []GeminiToolDeclaration {
}
if len(funcDecls) == 0 {
- return nil
+ if !hasWebSearch {
+ return nil
+ }
+
+ // Web Search 工具映射
+ return []GeminiToolDeclaration{{
+ GoogleSearch: &GeminiGoogleSearch{
+ EnhancedContent: &GeminiEnhancedContent{
+ ImageSearch: &GeminiImageSearch{
+ MaxResultCount: 5,
+ },
+ },
+ },
+ }}
}
return []GeminiToolDeclaration{{
diff --git a/backend/internal/pkg/antigravity/response_transformer.go b/backend/internal/pkg/antigravity/response_transformer.go
index cd7f5f80..04424c03 100644
--- a/backend/internal/pkg/antigravity/response_transformer.go
+++ b/backend/internal/pkg/antigravity/response_transformer.go
@@ -3,6 +3,7 @@ package antigravity
import (
"encoding/json"
"fmt"
+ "strings"
)
// TransformGeminiToClaude 将 Gemini 响应转换为 Claude 格式(非流式)
@@ -63,6 +64,12 @@ func (p *NonStreamingProcessor) Process(geminiResp *GeminiResponse, responseID,
p.processPart(&part)
}
+ if len(geminiResp.Candidates) > 0 {
+ if grounding := geminiResp.Candidates[0].GroundingMetadata; grounding != nil {
+ p.processGrounding(grounding)
+ }
+ }
+
// 刷新剩余内容
p.flushThinking()
p.flushText()
@@ -190,6 +197,18 @@ func (p *NonStreamingProcessor) processPart(part *GeminiPart) {
}
}
+func (p *NonStreamingProcessor) processGrounding(grounding *GeminiGroundingMetadata) {
+ groundingText := buildGroundingText(grounding)
+ if groundingText == "" {
+ return
+ }
+
+ p.flushThinking()
+ p.flushText()
+ p.textBuilder += groundingText
+ p.flushText()
+}
+
// flushText 刷新 text builder
func (p *NonStreamingProcessor) flushText() {
if p.textBuilder == "" {
@@ -262,6 +281,44 @@ func (p *NonStreamingProcessor) buildResponse(geminiResp *GeminiResponse, respon
}
}
+func buildGroundingText(grounding *GeminiGroundingMetadata) string {
+ if grounding == nil {
+ return ""
+ }
+
+ var builder strings.Builder
+
+ if len(grounding.WebSearchQueries) > 0 {
+ _, _ = builder.WriteString("\n\n---\nWeb search queries: ")
+ _, _ = builder.WriteString(strings.Join(grounding.WebSearchQueries, ", "))
+ }
+
+ if len(grounding.GroundingChunks) > 0 {
+ var links []string
+ for i, chunk := range grounding.GroundingChunks {
+ if chunk.Web == nil {
+ continue
+ }
+ title := strings.TrimSpace(chunk.Web.Title)
+ if title == "" {
+ title = "Source"
+ }
+ uri := strings.TrimSpace(chunk.Web.URI)
+ if uri == "" {
+ uri = "#"
+ }
+ links = append(links, fmt.Sprintf("[%d] [%s](%s)", i+1, title, uri))
+ }
+
+ if len(links) > 0 {
+ _, _ = builder.WriteString("\n\nSources:\n")
+ _, _ = builder.WriteString(strings.Join(links, "\n"))
+ }
+ }
+
+ return builder.String()
+}
+
// generateRandomID 生成随机 ID
func generateRandomID() string {
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
diff --git a/backend/internal/pkg/antigravity/stream_transformer.go b/backend/internal/pkg/antigravity/stream_transformer.go
index 9fe68a11..da0c6f97 100644
--- a/backend/internal/pkg/antigravity/stream_transformer.go
+++ b/backend/internal/pkg/antigravity/stream_transformer.go
@@ -27,6 +27,8 @@ type StreamingProcessor struct {
pendingSignature string
trailingSignature string
originalModel string
+ webSearchQueries []string
+ groundingChunks []GeminiGroundingChunk
// 累计 usage
inputTokens int
@@ -93,6 +95,10 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
}
}
+ if len(geminiResp.Candidates) > 0 {
+ p.captureGrounding(geminiResp.Candidates[0].GroundingMetadata)
+ }
+
// 检查是否结束
if len(geminiResp.Candidates) > 0 {
finishReason := geminiResp.Candidates[0].FinishReason
@@ -200,6 +206,20 @@ func (p *StreamingProcessor) processPart(part *GeminiPart) []byte {
return result.Bytes()
}
+func (p *StreamingProcessor) captureGrounding(grounding *GeminiGroundingMetadata) {
+ if grounding == nil {
+ return
+ }
+
+ if len(grounding.WebSearchQueries) > 0 && len(p.webSearchQueries) == 0 {
+ p.webSearchQueries = append([]string(nil), grounding.WebSearchQueries...)
+ }
+
+ if len(grounding.GroundingChunks) > 0 && len(p.groundingChunks) == 0 {
+ p.groundingChunks = append([]GeminiGroundingChunk(nil), grounding.GroundingChunks...)
+ }
+}
+
// processThinking 处理 thinking
func (p *StreamingProcessor) processThinking(text, signature string) []byte {
var result bytes.Buffer
@@ -417,6 +437,23 @@ func (p *StreamingProcessor) emitFinish(finishReason string) []byte {
p.trailingSignature = ""
}
+ if len(p.webSearchQueries) > 0 || len(p.groundingChunks) > 0 {
+ groundingText := buildGroundingText(&GeminiGroundingMetadata{
+ WebSearchQueries: p.webSearchQueries,
+ GroundingChunks: p.groundingChunks,
+ })
+ if groundingText != "" {
+ _, _ = result.Write(p.startBlock(BlockTypeText, map[string]any{
+ "type": "text",
+ "text": "",
+ }))
+ _, _ = result.Write(p.emitDelta("text_delta", map[string]any{
+ "text": groundingText,
+ }))
+ _, _ = result.Write(p.endBlock())
+ }
+ }
+
// 确定 stop_reason
stopReason := "end_turn"
if p.usedTool {
diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go
index 6f4f0e81..c2673ad3 100644
--- a/backend/internal/repository/account_repo.go
+++ b/backend/internal/repository/account_repo.go
@@ -543,6 +543,15 @@ func (r *accountRepository) SetError(ctx context.Context, id int64, errorMsg str
return nil
}
+func (r *accountRepository) ClearError(ctx context.Context, id int64) error {
+ _, err := r.client.Account.Update().
+ Where(dbaccount.IDEQ(id)).
+ SetStatus(service.StatusActive).
+ SetErrorMessage("").
+ Save(ctx)
+ return err
+}
+
func (r *accountRepository) AddToGroup(ctx context.Context, accountID, groupID int64, priority int) error {
_, err := r.client.AccountGroup.Create().
SetAccountID(accountID).
diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go
index 13fee7ed..055c010d 100644
--- a/backend/internal/server/api_contract_test.go
+++ b/backend/internal/server/api_contract_test.go
@@ -744,6 +744,10 @@ func (s *stubAccountRepo) SetError(ctx context.Context, id int64, errorMsg strin
return errors.New("not implemented")
}
+func (s *stubAccountRepo) ClearError(ctx context.Context, id int64) error {
+ return errors.New("not implemented")
+}
+
func (s *stubAccountRepo) SetSchedulable(ctx context.Context, id int64, schedulable bool) error {
return errors.New("not implemented")
}
diff --git a/backend/internal/service/account_service.go b/backend/internal/service/account_service.go
index ede5b12f..90365d2f 100644
--- a/backend/internal/service/account_service.go
+++ b/backend/internal/service/account_service.go
@@ -37,6 +37,7 @@ type AccountRepository interface {
UpdateLastUsed(ctx context.Context, id int64) error
BatchUpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error
SetError(ctx context.Context, id int64, errorMsg string) error
+ ClearError(ctx context.Context, id int64) error
SetSchedulable(ctx context.Context, id int64, schedulable bool) error
AutoPauseExpiredAccounts(ctx context.Context, now time.Time) (int64, error)
BindGroups(ctx context.Context, accountID int64, groupIDs []int64) error
diff --git a/backend/internal/service/account_service_delete_test.go b/backend/internal/service/account_service_delete_test.go
index 36af719c..e5eabfc6 100644
--- a/backend/internal/service/account_service_delete_test.go
+++ b/backend/internal/service/account_service_delete_test.go
@@ -99,6 +99,10 @@ func (s *accountRepoStub) SetError(ctx context.Context, id int64, errorMsg strin
panic("unexpected SetError call")
}
+func (s *accountRepoStub) ClearError(ctx context.Context, id int64) error {
+ panic("unexpected ClearError call")
+}
+
func (s *accountRepoStub) SetSchedulable(ctx context.Context, id int64, schedulable bool) error {
panic("unexpected SetSchedulable call")
}
diff --git a/backend/internal/service/admin_service.go b/backend/internal/service/admin_service.go
index c0694e4e..0afa0716 100644
--- a/backend/internal/service/admin_service.go
+++ b/backend/internal/service/admin_service.go
@@ -42,6 +42,7 @@ type AdminService interface {
DeleteAccount(ctx context.Context, id int64) error
RefreshAccountCredentials(ctx context.Context, id int64) (*Account, error)
ClearAccountError(ctx context.Context, id int64) (*Account, error)
+ SetAccountError(ctx context.Context, id int64, errorMsg string) error
SetAccountSchedulable(ctx context.Context, id int64, schedulable bool) (*Account, error)
BulkUpdateAccounts(ctx context.Context, input *BulkUpdateAccountsInput) (*BulkUpdateAccountsResult, error)
@@ -1101,6 +1102,10 @@ func (s *adminServiceImpl) ClearAccountError(ctx context.Context, id int64) (*Ac
return account, nil
}
+func (s *adminServiceImpl) SetAccountError(ctx context.Context, id int64, errorMsg string) error {
+ return s.accountRepo.SetError(ctx, id, errorMsg)
+}
+
func (s *adminServiceImpl) SetAccountSchedulable(ctx context.Context, id int64, schedulable bool) (*Account, error) {
if err := s.accountRepo.SetSchedulable(ctx, id, schedulable); err != nil {
return nil, err
diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go
index 7f3e97a2..043f338d 100644
--- a/backend/internal/service/antigravity_gateway_service.go
+++ b/backend/internal/service/antigravity_gateway_service.go
@@ -12,6 +12,7 @@ import (
mathrand "math/rand"
"net"
"net/http"
+ "os"
"strings"
"sync/atomic"
"time"
@@ -28,6 +29,207 @@ const (
antigravityRetryMaxDelay = 16 * time.Second
)
+const antigravityScopeRateLimitEnv = "GATEWAY_ANTIGRAVITY_429_SCOPE_LIMIT"
+
+// antigravityRetryLoopParams 重试循环的参数
+type antigravityRetryLoopParams struct {
+ ctx context.Context
+ prefix string
+ account *Account
+ proxyURL string
+ accessToken string
+ action string
+ body []byte
+ quotaScope AntigravityQuotaScope
+ c *gin.Context
+ httpUpstream HTTPUpstream
+ settingService *SettingService
+ handleError func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope)
+}
+
+// antigravityRetryLoopResult 重试循环的结果
+type antigravityRetryLoopResult struct {
+ resp *http.Response
+}
+
+// antigravityRetryLoop 执行带 URL fallback 的重试循环
+func antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
+ availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
+ if len(availableURLs) == 0 {
+ availableURLs = antigravity.BaseURLs
+ }
+
+ var resp *http.Response
+ var usedBaseURL string
+ logBody := p.settingService != nil && p.settingService.cfg != nil && p.settingService.cfg.Gateway.LogUpstreamErrorBody
+ maxBytes := 2048
+ if p.settingService != nil && p.settingService.cfg != nil && p.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
+ maxBytes = p.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
+ }
+ getUpstreamDetail := func(body []byte) string {
+ if !logBody {
+ return ""
+ }
+ return truncateString(string(body), maxBytes)
+ }
+
+urlFallbackLoop:
+ for urlIdx, baseURL := range availableURLs {
+ usedBaseURL = baseURL
+ for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
+ select {
+ case <-p.ctx.Done():
+ log.Printf("%s status=context_canceled error=%v", p.prefix, p.ctx.Err())
+ return nil, p.ctx.Err()
+ default:
+ }
+
+ upstreamReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body)
+ if err != nil {
+ return nil, err
+ }
+
+ // Capture upstream request body for ops retry of this attempt.
+ if p.c != nil && len(p.body) > 0 {
+ p.c.Set(OpsUpstreamRequestBodyKey, string(p.body))
+ }
+
+ resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency)
+ if err != nil {
+ safeErr := sanitizeUpstreamErrorMessage(err.Error())
+ appendOpsUpstreamError(p.c, OpsUpstreamErrorEvent{
+ Platform: p.account.Platform,
+ AccountID: p.account.ID,
+ AccountName: p.account.Name,
+ UpstreamStatusCode: 0,
+ Kind: "request_error",
+ Message: safeErr,
+ })
+ if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
+ log.Printf("%s URL fallback (connection error): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1])
+ continue urlFallbackLoop
+ }
+ if attempt < antigravityMaxRetries {
+ log.Printf("%s status=request_failed retry=%d/%d error=%v", p.prefix, attempt, antigravityMaxRetries, err)
+ if !sleepAntigravityBackoffWithContext(p.ctx, attempt) {
+ log.Printf("%s status=context_canceled_during_backoff", p.prefix)
+ return nil, p.ctx.Err()
+ }
+ continue
+ }
+ log.Printf("%s status=request_failed retries_exhausted error=%v", p.prefix, err)
+ setOpsUpstreamError(p.c, 0, safeErr, "")
+ return nil, fmt.Errorf("upstream request failed after retries: %w", err)
+ }
+
+ // 429 限流处理:区分 URL 级别限流和账户配额限流
+ if resp.StatusCode == http.StatusTooManyRequests {
+ respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
+ _ = resp.Body.Close()
+
+ // "Resource has been exhausted" 是 URL 级别限流,切换 URL
+ if isURLLevelRateLimit(respBody) && urlIdx < len(availableURLs)-1 {
+ log.Printf("%s URL fallback (429): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1])
+ continue urlFallbackLoop
+ }
+
+ // 账户/模型配额限流,重试 3 次(指数退避)
+ if attempt < antigravityMaxRetries {
+ upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
+ upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
+ appendOpsUpstreamError(p.c, OpsUpstreamErrorEvent{
+ Platform: p.account.Platform,
+ AccountID: p.account.ID,
+ AccountName: p.account.Name,
+ UpstreamStatusCode: resp.StatusCode,
+ UpstreamRequestID: resp.Header.Get("x-request-id"),
+ Kind: "retry",
+ Message: upstreamMsg,
+ Detail: getUpstreamDetail(respBody),
+ })
+ log.Printf("%s status=429 retry=%d/%d body=%s", p.prefix, attempt, antigravityMaxRetries, truncateForLog(respBody, 200))
+ if !sleepAntigravityBackoffWithContext(p.ctx, attempt) {
+ log.Printf("%s status=context_canceled_during_backoff", p.prefix)
+ return nil, p.ctx.Err()
+ }
+ continue
+ }
+
+ // 重试用尽,标记账户限流
+ p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope)
+ log.Printf("%s status=429 rate_limited base_url=%s body=%s", p.prefix, baseURL, truncateForLog(respBody, 200))
+ resp = &http.Response{
+ StatusCode: resp.StatusCode,
+ Header: resp.Header.Clone(),
+ Body: io.NopCloser(bytes.NewReader(respBody)),
+ }
+ break urlFallbackLoop
+ }
+
+ // 其他可重试错误
+ if resp.StatusCode >= 400 && shouldRetryAntigravityError(resp.StatusCode) {
+ respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
+ _ = resp.Body.Close()
+
+ if attempt < antigravityMaxRetries {
+ upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
+ upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
+ appendOpsUpstreamError(p.c, OpsUpstreamErrorEvent{
+ Platform: p.account.Platform,
+ AccountID: p.account.ID,
+ AccountName: p.account.Name,
+ UpstreamStatusCode: resp.StatusCode,
+ UpstreamRequestID: resp.Header.Get("x-request-id"),
+ Kind: "retry",
+ Message: upstreamMsg,
+ Detail: getUpstreamDetail(respBody),
+ })
+ log.Printf("%s status=%d retry=%d/%d body=%s", p.prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
+ if !sleepAntigravityBackoffWithContext(p.ctx, attempt) {
+ log.Printf("%s status=context_canceled_during_backoff", p.prefix)
+ return nil, p.ctx.Err()
+ }
+ continue
+ }
+ resp = &http.Response{
+ StatusCode: resp.StatusCode,
+ Header: resp.Header.Clone(),
+ Body: io.NopCloser(bytes.NewReader(respBody)),
+ }
+ break urlFallbackLoop
+ }
+
+ break urlFallbackLoop
+ }
+ }
+
+ if resp != nil && resp.StatusCode < 400 && usedBaseURL != "" {
+ antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL)
+ }
+
+ return &antigravityRetryLoopResult{resp: resp}, nil
+}
+
+// shouldRetryAntigravityError 判断是否应该重试
+func shouldRetryAntigravityError(statusCode int) bool {
+ switch statusCode {
+ case 429, 500, 502, 503, 504, 529:
+ return true
+ default:
+ return false
+ }
+}
+
+// isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试)
+// "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功
+// "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效
+func isURLLevelRateLimit(body []byte) bool {
+ // 快速检查:包含 "Resource has been exhausted" 且不包含 "capacity on this model"
+ bodyStr := string(body)
+ return strings.Contains(bodyStr, "Resource has been exhausted") &&
+ !strings.Contains(bodyStr, "capacity on this model")
+}
+
// isAntigravityConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
func isAntigravityConnectionError(err error) bool {
if err == nil {
@@ -238,7 +440,6 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
if err != nil {
lastErr = fmt.Errorf("请求失败: %w", err)
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity-Test] URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1])
continue
}
@@ -254,7 +455,6 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 检查是否需要 URL 降级
if shouldAntigravityFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 {
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
log.Printf("[antigravity-Test] URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1])
continue
}
@@ -266,6 +466,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 解析流式响应,提取文本
text := extractTextFromSSEResponse(respBody)
+ // 标记成功的 URL,下次优先使用
+ antigravity.DefaultURLAvailability.MarkSuccess(baseURL)
return &TestConnectionResult{
Text: text,
MappedModel: mappedModel,
@@ -276,13 +478,14 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
}
// buildGeminiTestRequest 构建 Gemini 格式测试请求
+// 使用最小 token 消耗:输入 "." + maxOutputTokens: 1
func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model string) ([]byte, error) {
payload := map[string]any{
"contents": []map[string]any{
{
"role": "user",
"parts": []map[string]any{
- {"text": "hi"},
+ {"text": "."},
},
},
},
@@ -292,22 +495,26 @@ func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model stri
{"text": antigravity.GetDefaultIdentityPatch()},
},
},
+ "generationConfig": map[string]any{
+ "maxOutputTokens": 1,
+ },
}
payloadBytes, _ := json.Marshal(payload)
return s.wrapV1InternalRequest(projectID, model, payloadBytes)
}
// buildClaudeTestRequest 构建 Claude 格式测试请求并转换为 Gemini 格式
+// 使用最小 token 消耗:输入 "." + MaxTokens: 1
func (s *AntigravityGatewayService) buildClaudeTestRequest(projectID, mappedModel string) ([]byte, error) {
claudeReq := &antigravity.ClaudeRequest{
Model: mappedModel,
Messages: []antigravity.ClaudeMessage{
{
Role: "user",
- Content: json.RawMessage(`"hi"`),
+ Content: json.RawMessage(`"."`),
},
},
- MaxTokens: 1024,
+ MaxTokens: 1,
Stream: false,
}
return antigravity.TransformClaudeToGemini(claudeReq, projectID, mappedModel)
@@ -523,9 +730,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
proxyURL = account.Proxy.URL()
}
- // Sanitize thinking blocks (clean cache_control and flatten history thinking)
- sanitizeThinkingBlocks(&claudeReq)
-
// 获取转换选项
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
transformOpts := s.getClaudeTransformOptions(ctx)
@@ -537,150 +741,29 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
return nil, fmt.Errorf("transform request: %w", err)
}
- // Safety net: ensure no cache_control leaked into Gemini request
- geminiBody = cleanCacheControlFromGeminiJSON(geminiBody)
-
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
action := "streamGenerateContent"
- // URL fallback 循环
- availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
- if len(availableURLs) == 0 {
- availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有
- }
-
- // 重试循环
- var resp *http.Response
-urlFallbackLoop:
- for urlIdx, baseURL := range availableURLs {
- for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
- // 检查 context 是否已取消(客户端断开连接)
- select {
- case <-ctx.Done():
- log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
- return nil, ctx.Err()
- default:
- }
-
- upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, action, accessToken, geminiBody)
- // Capture upstream request body for ops retry of this attempt.
- if c != nil {
- c.Set(OpsUpstreamRequestBodyKey, string(geminiBody))
- }
- if err != nil {
- return nil, err
- }
-
- resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
- if err != nil {
- safeErr := sanitizeUpstreamErrorMessage(err.Error())
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: 0,
- Kind: "request_error",
- Message: safeErr,
- })
- // 检查是否应触发 URL 降级
- if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
- log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
- continue urlFallbackLoop
- }
- if attempt < antigravityMaxRetries {
- log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
- if !sleepAntigravityBackoffWithContext(ctx, attempt) {
- log.Printf("%s status=context_canceled_during_backoff", prefix)
- return nil, ctx.Err()
- }
- continue
- }
- log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
- setOpsUpstreamError(c, 0, safeErr, "")
- return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
- }
-
- // 检查是否应触发 URL 降级(仅 429)
- if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 {
- respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
- _ = resp.Body.Close()
- upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
- upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
- logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
- maxBytes := 2048
- if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
- maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
- }
- upstreamDetail := ""
- if logBody {
- upstreamDetail = truncateString(string(respBody), maxBytes)
- }
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: resp.StatusCode,
- UpstreamRequestID: resp.Header.Get("x-request-id"),
- Kind: "retry",
- Message: upstreamMsg,
- Detail: upstreamDetail,
- })
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
- log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200))
- continue urlFallbackLoop
- }
-
- if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) {
- respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
- _ = resp.Body.Close()
-
- if attempt < antigravityMaxRetries {
- upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
- upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
- logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
- maxBytes := 2048
- if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
- maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
- }
- upstreamDetail := ""
- if logBody {
- upstreamDetail = truncateString(string(respBody), maxBytes)
- }
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: resp.StatusCode,
- UpstreamRequestID: resp.Header.Get("x-request-id"),
- Kind: "retry",
- Message: upstreamMsg,
- Detail: upstreamDetail,
- })
- log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
- if !sleepAntigravityBackoffWithContext(ctx, attempt) {
- log.Printf("%s status=context_canceled_during_backoff", prefix)
- return nil, ctx.Err()
- }
- continue
- }
- // 所有重试都失败,标记限流状态
- if resp.StatusCode == 429 {
- s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
- }
- // 最后一次尝试也失败
- resp = &http.Response{
- StatusCode: resp.StatusCode,
- Header: resp.Header.Clone(),
- Body: io.NopCloser(bytes.NewReader(respBody)),
- }
- break urlFallbackLoop
- }
-
- break urlFallbackLoop
- }
+ // 执行带重试的请求
+ result, err := antigravityRetryLoop(antigravityRetryLoopParams{
+ ctx: ctx,
+ prefix: prefix,
+ account: account,
+ proxyURL: proxyURL,
+ accessToken: accessToken,
+ action: action,
+ body: geminiBody,
+ quotaScope: quotaScope,
+ c: c,
+ httpUpstream: s.httpUpstream,
+ settingService: s.settingService,
+ handleError: s.handleUpstreamError,
+ })
+ if err != nil {
+ return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
}
+ resp := result.resp
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 400 {
@@ -739,11 +822,20 @@ urlFallbackLoop:
if txErr != nil {
continue
}
- retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody)
- if buildErr != nil {
- continue
- }
- retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
+ retryResult, retryErr := antigravityRetryLoop(antigravityRetryLoopParams{
+ ctx: ctx,
+ prefix: prefix,
+ account: account,
+ proxyURL: proxyURL,
+ accessToken: accessToken,
+ action: action,
+ body: retryGeminiBody,
+ quotaScope: quotaScope,
+ c: c,
+ httpUpstream: s.httpUpstream,
+ settingService: s.settingService,
+ handleError: s.handleUpstreamError,
+ })
if retryErr != nil {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
@@ -757,6 +849,7 @@ urlFallbackLoop:
continue
}
+ retryResp := retryResult.resp
if retryResp.StatusCode < 400 {
_ = resp.Body.Close()
resp = retryResp
@@ -766,6 +859,13 @@ urlFallbackLoop:
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
+ if retryResp.StatusCode == http.StatusTooManyRequests {
+ retryBaseURL := ""
+ if retryResp.Request != nil && retryResp.Request.URL != nil {
+ retryBaseURL = retryResp.Request.URL.Scheme + "://" + retryResp.Request.URL.Host
+ }
+ log.Printf("%s status=429 rate_limited base_url=%s retry_stage=%s body=%s", prefix, retryBaseURL, stage.name, truncateForLog(retryBody, 200))
+ }
kind := "signature_retry"
if strings.TrimSpace(stage.name) != "" {
kind = "signature_retry_" + strings.ReplaceAll(stage.name, "+", "_")
@@ -920,143 +1020,6 @@ func extractAntigravityErrorMessage(body []byte) string {
return ""
}
-// cleanCacheControlFromGeminiJSON removes cache_control from Gemini JSON (emergency fix)
-// This should not be needed if transformation is correct, but serves as a safety net
-func cleanCacheControlFromGeminiJSON(body []byte) []byte {
- // Try a more robust approach: parse and clean
- var data map[string]any
- if err := json.Unmarshal(body, &data); err != nil {
- log.Printf("[Antigravity] Failed to parse Gemini JSON for cache_control cleaning: %v", err)
- return body
- }
-
- cleaned := removeCacheControlFromAny(data)
- if !cleaned {
- return body
- }
-
- if result, err := json.Marshal(data); err == nil {
- log.Printf("[Antigravity] Successfully cleaned cache_control from Gemini JSON")
- return result
- }
-
- return body
-}
-
-// removeCacheControlFromAny recursively removes cache_control fields
-func removeCacheControlFromAny(v any) bool {
- cleaned := false
-
- switch val := v.(type) {
- case map[string]any:
- for k, child := range val {
- if k == "cache_control" {
- delete(val, k)
- cleaned = true
- } else if removeCacheControlFromAny(child) {
- cleaned = true
- }
- }
- case []any:
- for _, item := range val {
- if removeCacheControlFromAny(item) {
- cleaned = true
- }
- }
- }
-
- return cleaned
-}
-
-// sanitizeThinkingBlocks cleans cache_control and flattens history thinking blocks
-// Thinking blocks do NOT support cache_control field (Anthropic API/Vertex AI requirement)
-// Additionally, history thinking blocks are flattened to text to avoid upstream validation errors
-func sanitizeThinkingBlocks(req *antigravity.ClaudeRequest) {
- if req == nil {
- return
- }
-
- log.Printf("[Antigravity] sanitizeThinkingBlocks: processing request with %d messages", len(req.Messages))
-
- // Clean system blocks
- if len(req.System) > 0 {
- var systemBlocks []map[string]any
- if err := json.Unmarshal(req.System, &systemBlocks); err == nil {
- for i := range systemBlocks {
- if blockType, _ := systemBlocks[i]["type"].(string); blockType == "thinking" || systemBlocks[i]["thinking"] != nil {
- if removeCacheControlFromAny(systemBlocks[i]) {
- log.Printf("[Antigravity] Deep cleaned cache_control from thinking block in system[%d]", i)
- }
- }
- }
- // Marshal back
- if cleaned, err := json.Marshal(systemBlocks); err == nil {
- req.System = cleaned
- }
- }
- }
-
- // Clean message content blocks and flatten history
- lastMsgIdx := len(req.Messages) - 1
- for msgIdx := range req.Messages {
- raw := req.Messages[msgIdx].Content
- if len(raw) == 0 {
- continue
- }
-
- // Try to parse as blocks array
- var blocks []map[string]any
- if err := json.Unmarshal(raw, &blocks); err != nil {
- continue
- }
-
- cleaned := false
- for blockIdx := range blocks {
- blockType, _ := blocks[blockIdx]["type"].(string)
-
- // Check for thinking blocks (typed or untyped)
- if blockType == "thinking" || blocks[blockIdx]["thinking"] != nil {
- // 1. Clean cache_control
- if removeCacheControlFromAny(blocks[blockIdx]) {
- log.Printf("[Antigravity] Deep cleaned cache_control from thinking block in messages[%d].content[%d]", msgIdx, blockIdx)
- cleaned = true
- }
-
- // 2. Flatten to text if it's a history message (not the last one)
- if msgIdx < lastMsgIdx {
- log.Printf("[Antigravity] Flattening history thinking block to text at messages[%d].content[%d]", msgIdx, blockIdx)
-
- // Extract thinking content
- var textContent string
- if t, ok := blocks[blockIdx]["thinking"].(string); ok {
- textContent = t
- } else {
- // Fallback for non-string content (marshal it)
- if b, err := json.Marshal(blocks[blockIdx]["thinking"]); err == nil {
- textContent = string(b)
- }
- }
-
- // Convert to text block
- blocks[blockIdx]["type"] = "text"
- blocks[blockIdx]["text"] = textContent
- delete(blocks[blockIdx], "thinking")
- delete(blocks[blockIdx], "signature")
- delete(blocks[blockIdx], "cache_control") // Ensure it's gone
- cleaned = true
- }
- }
- }
-
- // Marshal back if modified
- if cleaned {
- if marshaled, err := json.Marshal(blocks); err == nil {
- req.Messages[msgIdx].Content = marshaled
- }
- }
- }
-}
-
// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request.
// This preserves the thinking content while avoiding signature validation errors.
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
@@ -1352,138 +1315,25 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
upstreamAction := "streamGenerateContent"
- // URL fallback 循环
- availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
- if len(availableURLs) == 0 {
- availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有
- }
-
- // 重试循环
- var resp *http.Response
-urlFallbackLoop:
- for urlIdx, baseURL := range availableURLs {
- for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
- // 检查 context 是否已取消(客户端断开连接)
- select {
- case <-ctx.Done():
- log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
- return nil, ctx.Err()
- default:
- }
-
- upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, upstreamAction, accessToken, wrappedBody)
- if err != nil {
- return nil, err
- }
-
- resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
- if err != nil {
- safeErr := sanitizeUpstreamErrorMessage(err.Error())
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: 0,
- Kind: "request_error",
- Message: safeErr,
- })
- // 检查是否应触发 URL 降级
- if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
- log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
- continue urlFallbackLoop
- }
- if attempt < antigravityMaxRetries {
- log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
- if !sleepAntigravityBackoffWithContext(ctx, attempt) {
- log.Printf("%s status=context_canceled_during_backoff", prefix)
- return nil, ctx.Err()
- }
- continue
- }
- log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
- setOpsUpstreamError(c, 0, safeErr, "")
- return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
- }
-
- // 检查是否应触发 URL 降级(仅 429)
- if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 {
- respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
- _ = resp.Body.Close()
- upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
- upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
- logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
- maxBytes := 2048
- if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
- maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
- }
- upstreamDetail := ""
- if logBody {
- upstreamDetail = truncateString(string(respBody), maxBytes)
- }
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: resp.StatusCode,
- UpstreamRequestID: resp.Header.Get("x-request-id"),
- Kind: "retry",
- Message: upstreamMsg,
- Detail: upstreamDetail,
- })
- antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
- log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200))
- continue urlFallbackLoop
- }
-
- if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) {
- respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
- _ = resp.Body.Close()
-
- if attempt < antigravityMaxRetries {
- upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
- upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
- logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody
- maxBytes := 2048
- if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 {
- maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
- }
- upstreamDetail := ""
- if logBody {
- upstreamDetail = truncateString(string(respBody), maxBytes)
- }
- appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
- Platform: account.Platform,
- AccountID: account.ID,
- AccountName: account.Name,
- UpstreamStatusCode: resp.StatusCode,
- UpstreamRequestID: resp.Header.Get("x-request-id"),
- Kind: "retry",
- Message: upstreamMsg,
- Detail: upstreamDetail,
- })
- log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
- if !sleepAntigravityBackoffWithContext(ctx, attempt) {
- log.Printf("%s status=context_canceled_during_backoff", prefix)
- return nil, ctx.Err()
- }
- continue
- }
- // 所有重试都失败,标记限流状态
- if resp.StatusCode == 429 {
- s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
- }
- resp = &http.Response{
- StatusCode: resp.StatusCode,
- Header: resp.Header.Clone(),
- Body: io.NopCloser(bytes.NewReader(respBody)),
- }
- break urlFallbackLoop
- }
-
- break urlFallbackLoop
- }
+ // 执行带重试的请求
+ result, err := antigravityRetryLoop(antigravityRetryLoopParams{
+ ctx: ctx,
+ prefix: prefix,
+ account: account,
+ proxyURL: proxyURL,
+ accessToken: accessToken,
+ action: upstreamAction,
+ body: wrappedBody,
+ quotaScope: quotaScope,
+ c: c,
+ httpUpstream: s.httpUpstream,
+ settingService: s.settingService,
+ handleError: s.handleUpstreamError,
+ })
+ if err != nil {
+ return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
}
+ resp := result.resp
defer func() {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
@@ -1525,8 +1375,6 @@ urlFallbackLoop:
goto handleSuccess
}
- s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
-
requestID := resp.Header.Get("x-request-id")
if requestID != "" {
c.Header("x-request-id", requestID)
@@ -1537,6 +1385,7 @@ urlFallbackLoop:
if unwrapErr != nil || len(unwrappedForOps) == 0 {
unwrappedForOps = respBody
}
+ s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(unwrappedForOps))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
@@ -1581,6 +1430,7 @@ urlFallbackLoop:
Message: upstreamMsg,
Detail: upstreamDetail,
})
+ log.Printf("[antigravity-Forward] upstream error status=%d body=%s", resp.StatusCode, truncateForLog(unwrappedForOps, 500))
c.Data(resp.StatusCode, contentType, unwrappedForOps)
return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode)
}
@@ -1637,15 +1487,6 @@ handleSuccess:
}, nil
}
-func (s *AntigravityGatewayService) shouldRetryUpstreamError(statusCode int) bool {
- switch statusCode {
- case 429, 500, 502, 503, 504, 529:
- return true
- default:
- return false
- }
-}
-
func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) bool {
switch statusCode {
case 401, 403, 429, 529:
@@ -1679,33 +1520,48 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
}
}
+func antigravityUseScopeRateLimit() bool {
+ v := strings.ToLower(strings.TrimSpace(os.Getenv(antigravityScopeRateLimitEnv)))
+ return v == "1" || v == "true" || v == "yes" || v == "on"
+}
+
func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope) {
// 429 使用 Gemini 格式解析(从 body 解析重置时间)
if statusCode == 429 {
+ useScopeLimit := antigravityUseScopeRateLimit() && quotaScope != ""
resetAt := ParseGeminiRateLimitResetTime(body)
if resetAt == nil {
- // 解析失败:Gemini 有重试时间用 5 分钟,Claude 没有用 1 分钟
- defaultDur := 1 * time.Minute
- if bytes.Contains(body, []byte("Please retry in")) || bytes.Contains(body, []byte("retryDelay")) {
- defaultDur = 5 * time.Minute
+ // 解析失败:使用配置的 fallback 时间,直接限流整个账户
+ fallbackMinutes := 5
+ if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes > 0 {
+ fallbackMinutes = s.settingService.cfg.Gateway.AntigravityFallbackCooldownMinutes
}
+ defaultDur := time.Duration(fallbackMinutes) * time.Minute
ra := time.Now().Add(defaultDur)
- log.Printf("%s status=429 rate_limited scope=%s reset_in=%v (fallback)", prefix, quotaScope, defaultDur)
- if quotaScope == "" {
- return
- }
- if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, ra); err != nil {
- log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
+ if useScopeLimit {
+ log.Printf("%s status=429 rate_limited scope=%s reset_in=%v (fallback)", prefix, quotaScope, defaultDur)
+ if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, ra); err != nil {
+ log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
+ }
+ } else {
+ log.Printf("%s status=429 rate_limited account=%d reset_in=%v (fallback)", prefix, account.ID, defaultDur)
+ if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil {
+ log.Printf("%s status=429 rate_limit_set_failed account=%d error=%v", prefix, account.ID, err)
+ }
}
return
}
resetTime := time.Unix(*resetAt, 0)
- log.Printf("%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v", prefix, quotaScope, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
- if quotaScope == "" {
- return
- }
- if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, resetTime); err != nil {
- log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
+ if useScopeLimit {
+ log.Printf("%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v", prefix, quotaScope, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
+ if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, resetTime); err != nil {
+ log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
+ }
+ } else {
+ log.Printf("%s status=429 rate_limited account=%d reset_at=%v reset_in=%v", prefix, account.ID, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
+ if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetTime); err != nil {
+ log.Printf("%s status=429 rate_limit_set_failed account=%d error=%v", prefix, account.ID, err)
+ }
}
return
}
@@ -1884,7 +1740,7 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
}
// handleGeminiStreamToNonStreaming 读取上游流式响应,合并为非流式响应返回给客户端
-// Gemini 流式响应中每个 chunk 都包含累积的完整文本,只需保留最后一个有效响应
+// Gemini 流式响应是增量的,需要累积所有 chunk 的内容
func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) {
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
@@ -1897,6 +1753,8 @@ func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Cont
var firstTokenMs *int
var last map[string]any
var lastWithParts map[string]any
+ var collectedImageParts []map[string]any // 收集所有包含图片的 parts
+ var collectedTextParts []string // 收集所有文本片段
type scanEvent struct {
line string
@@ -1999,6 +1857,16 @@ func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Cont
// 保留最后一个有 parts 的响应
if parts := extractGeminiParts(parsed); len(parts) > 0 {
lastWithParts = parsed
+ // 收集包含图片和文本的 parts
+ for _, part := range parts {
+ if inlineData, ok := part["inlineData"].(map[string]any); ok {
+ collectedImageParts = append(collectedImageParts, part)
+ _ = inlineData // 避免 unused 警告
+ }
+ if text, ok := part["text"].(string); ok && text != "" {
+ collectedTextParts = append(collectedTextParts, text)
+ }
+ }
}
case <-intervalCh:
@@ -2020,6 +1888,16 @@ returnResponse:
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
}
+ // 如果收集到了图片 parts,需要合并到最终响应中
+ if len(collectedImageParts) > 0 {
+ finalResponse = mergeImagePartsToResponse(finalResponse, collectedImageParts)
+ }
+
+ // 如果收集到了文本,需要合并到最终响应中
+ if len(collectedTextParts) > 0 {
+ finalResponse = mergeTextPartsToResponse(finalResponse, collectedTextParts)
+ }
+
respBody, err := json.Marshal(finalResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal response: %w", err)
@@ -2029,6 +1907,115 @@ returnResponse:
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
}
+// getOrCreateGeminiParts 获取 Gemini 响应的 parts 结构,返回深拷贝和更新回调
+func getOrCreateGeminiParts(response map[string]any) (result map[string]any, existingParts []any, setParts func([]any)) {
+ // 深拷贝 response
+ result = make(map[string]any)
+ for k, v := range response {
+ result[k] = v
+ }
+
+ // 获取或创建 candidates
+ candidates, ok := result["candidates"].([]any)
+ if !ok || len(candidates) == 0 {
+ candidates = []any{map[string]any{}}
+ }
+
+ // 获取第一个 candidate
+ candidate, ok := candidates[0].(map[string]any)
+ if !ok {
+ candidate = make(map[string]any)
+ candidates[0] = candidate
+ }
+
+ // 获取或创建 content
+ content, ok := candidate["content"].(map[string]any)
+ if !ok {
+ content = map[string]any{"role": "model"}
+ candidate["content"] = content
+ }
+
+ // 获取现有 parts
+ existingParts, ok = content["parts"].([]any)
+ if !ok {
+ existingParts = []any{}
+ }
+
+ // 返回更新回调
+ setParts = func(newParts []any) {
+ content["parts"] = newParts
+ result["candidates"] = candidates
+ }
+
+ return result, existingParts, setParts
+}
+
+// mergeImagePartsToResponse 将收集到的图片 parts 合并到 Gemini 响应中
+func mergeImagePartsToResponse(response map[string]any, imageParts []map[string]any) map[string]any {
+ if len(imageParts) == 0 {
+ return response
+ }
+
+ result, existingParts, setParts := getOrCreateGeminiParts(response)
+
+ // 检查现有 parts 中是否已经有图片
+ for _, p := range existingParts {
+ if pm, ok := p.(map[string]any); ok {
+ if _, hasInline := pm["inlineData"]; hasInline {
+ return result // 已有图片,不重复添加
+ }
+ }
+ }
+
+ // 添加收集到的图片 parts
+ for _, imgPart := range imageParts {
+ existingParts = append(existingParts, imgPart)
+ }
+ setParts(existingParts)
+ return result
+}
+
+// mergeTextPartsToResponse 将收集到的文本合并到 Gemini 响应中
+func mergeTextPartsToResponse(response map[string]any, textParts []string) map[string]any {
+ if len(textParts) == 0 {
+ return response
+ }
+
+ mergedText := strings.Join(textParts, "")
+ result, existingParts, setParts := getOrCreateGeminiParts(response)
+
+ // 查找并更新第一个 text part,或创建新的
+ newParts := make([]any, 0, len(existingParts)+1)
+ textUpdated := false
+
+ for _, p := range existingParts {
+ pm, ok := p.(map[string]any)
+ if !ok {
+ newParts = append(newParts, p)
+ continue
+ }
+ if _, hasText := pm["text"]; hasText && !textUpdated {
+ // 用累积的文本替换
+ newPart := make(map[string]any)
+ for k, v := range pm {
+ newPart[k] = v
+ }
+ newPart["text"] = mergedText
+ newParts = append(newParts, newPart)
+ textUpdated = true
+ } else {
+ newParts = append(newParts, pm)
+ }
+ }
+
+ if !textUpdated {
+ newParts = append([]any{map[string]any{"text": mergedText}}, newParts...)
+ }
+
+ setParts(newParts)
+ return result
+}
+
func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) error {
c.JSON(status, gin.H{
"type": "error",
diff --git a/backend/internal/service/antigravity_oauth_service.go b/backend/internal/service/antigravity_oauth_service.go
index ecf0a553..52293cd5 100644
--- a/backend/internal/service/antigravity_oauth_service.go
+++ b/backend/internal/service/antigravity_oauth_service.go
@@ -82,13 +82,14 @@ type AntigravityExchangeCodeInput struct {
// AntigravityTokenInfo token 信息
type AntigravityTokenInfo struct {
- AccessToken string `json:"access_token"`
- RefreshToken string `json:"refresh_token"`
- ExpiresIn int64 `json:"expires_in"`
- ExpiresAt int64 `json:"expires_at"`
- TokenType string `json:"token_type"`
- Email string `json:"email,omitempty"`
- ProjectID string `json:"project_id,omitempty"`
+ AccessToken string `json:"access_token"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresIn int64 `json:"expires_in"`
+ ExpiresAt int64 `json:"expires_at"`
+ TokenType string `json:"token_type"`
+ Email string `json:"email,omitempty"`
+ ProjectID string `json:"project_id,omitempty"`
+ ProjectIDMissing bool `json:"-"` // LoadCodeAssist 未返回 project_id
}
// ExchangeCode 用 authorization code 交换 token
@@ -149,12 +150,6 @@ func (s *AntigravityOAuthService) ExchangeCode(ctx context.Context, input *Antig
result.ProjectID = loadResp.CloudAICompanionProject
}
- // 兜底:随机生成 project_id
- if result.ProjectID == "" {
- result.ProjectID = antigravity.GenerateMockProjectID()
- fmt.Printf("[AntigravityOAuth] 使用随机生成的 project_id: %s\n", result.ProjectID)
- }
-
return result, nil
}
@@ -236,16 +231,24 @@ func (s *AntigravityOAuthService) RefreshAccountToken(ctx context.Context, accou
return nil, err
}
- // 保留原有的 project_id 和 email
- existingProjectID := strings.TrimSpace(account.GetCredential("project_id"))
- if existingProjectID != "" {
- tokenInfo.ProjectID = existingProjectID
- }
+ // 保留原有的 email
existingEmail := strings.TrimSpace(account.GetCredential("email"))
if existingEmail != "" {
tokenInfo.Email = existingEmail
}
+ // 每次刷新都调用 LoadCodeAssist 获取 project_id
+ client := antigravity.NewClient(proxyURL)
+ loadResp, _, err := client.LoadCodeAssist(ctx, tokenInfo.AccessToken)
+ if err != nil || loadResp == nil || loadResp.CloudAICompanionProject == "" {
+ // LoadCodeAssist 失败或返回空,保留原有 project_id,标记缺失
+ existingProjectID := strings.TrimSpace(account.GetCredential("project_id"))
+ tokenInfo.ProjectID = existingProjectID
+ tokenInfo.ProjectIDMissing = true
+ } else {
+ tokenInfo.ProjectID = loadResp.CloudAICompanionProject
+ }
+
return tokenInfo, nil
}
diff --git a/backend/internal/service/antigravity_quota_fetcher.go b/backend/internal/service/antigravity_quota_fetcher.go
index c9024e33..07eb563d 100644
--- a/backend/internal/service/antigravity_quota_fetcher.go
+++ b/backend/internal/service/antigravity_quota_fetcher.go
@@ -31,11 +31,6 @@ func (f *AntigravityQuotaFetcher) FetchQuota(ctx context.Context, account *Accou
accessToken := account.GetCredential("access_token")
projectID := account.GetCredential("project_id")
- // 如果没有 project_id,生成一个随机的
- if projectID == "" {
- projectID = antigravity.GenerateMockProjectID()
- }
-
client := antigravity.NewClient(proxyURL)
// 调用 API 获取配额
diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go
new file mode 100644
index 00000000..bf02364b
--- /dev/null
+++ b/backend/internal/service/antigravity_rate_limit_test.go
@@ -0,0 +1,186 @@
+//go:build unit
+
+package service
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
+ "github.com/stretchr/testify/require"
+)
+
+type stubAntigravityUpstream struct {
+ firstBase string
+ secondBase string
+ calls []string
+}
+
+func (s *stubAntigravityUpstream) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
+ url := req.URL.String()
+ s.calls = append(s.calls, url)
+ if strings.HasPrefix(url, s.firstBase) {
+ return &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{},
+ Body: io.NopCloser(strings.NewReader(`{"error":{"message":"Resource has been exhausted"}}`)),
+ }, nil
+ }
+ return &http.Response{
+ StatusCode: http.StatusOK,
+ Header: http.Header{},
+ Body: io.NopCloser(strings.NewReader("ok")),
+ }, nil
+}
+
+type scopeLimitCall struct {
+ accountID int64
+ scope AntigravityQuotaScope
+ resetAt time.Time
+}
+
+type rateLimitCall struct {
+ accountID int64
+ resetAt time.Time
+}
+
+type stubAntigravityAccountRepo struct {
+ AccountRepository
+ scopeCalls []scopeLimitCall
+ rateCalls []rateLimitCall
+}
+
+func (s *stubAntigravityAccountRepo) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error {
+ s.scopeCalls = append(s.scopeCalls, scopeLimitCall{accountID: id, scope: scope, resetAt: resetAt})
+ return nil
+}
+
+func (s *stubAntigravityAccountRepo) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
+ s.rateCalls = append(s.rateCalls, rateLimitCall{accountID: id, resetAt: resetAt})
+ return nil
+}
+
+func TestAntigravityRetryLoop_URLFallback_UsesLatestSuccess(t *testing.T) {
+ oldBaseURLs := append([]string(nil), antigravity.BaseURLs...)
+ oldAvailability := antigravity.DefaultURLAvailability
+ defer func() {
+ antigravity.BaseURLs = oldBaseURLs
+ antigravity.DefaultURLAvailability = oldAvailability
+ }()
+
+ base1 := "https://ag-1.test"
+ base2 := "https://ag-2.test"
+ antigravity.BaseURLs = []string{base1, base2}
+ antigravity.DefaultURLAvailability = antigravity.NewURLAvailability(time.Minute)
+
+ upstream := &stubAntigravityUpstream{firstBase: base1, secondBase: base2}
+ account := &Account{
+ ID: 1,
+ Name: "acc-1",
+ Platform: PlatformAntigravity,
+ Schedulable: true,
+ Status: StatusActive,
+ Concurrency: 1,
+ }
+
+ var handleErrorCalled bool
+ result, err := antigravityRetryLoop(antigravityRetryLoopParams{
+ prefix: "[test]",
+ ctx: context.Background(),
+ account: account,
+ proxyURL: "",
+ accessToken: "token",
+ action: "generateContent",
+ body: []byte(`{"input":"test"}`),
+ quotaScope: AntigravityQuotaScopeClaude,
+ httpUpstream: upstream,
+ handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope) {
+ handleErrorCalled = true
+ },
+ })
+
+ require.NoError(t, err)
+ require.NotNil(t, result)
+ require.NotNil(t, result.resp)
+ defer func() { _ = result.resp.Body.Close() }()
+ require.Equal(t, http.StatusOK, result.resp.StatusCode)
+ require.False(t, handleErrorCalled)
+ require.Len(t, upstream.calls, 2)
+ require.True(t, strings.HasPrefix(upstream.calls[0], base1))
+ require.True(t, strings.HasPrefix(upstream.calls[1], base2))
+
+ available := antigravity.DefaultURLAvailability.GetAvailableURLs()
+ require.NotEmpty(t, available)
+ require.Equal(t, base2, available[0])
+}
+
+func TestAntigravityHandleUpstreamError_UsesScopeLimitWhenEnabled(t *testing.T) {
+ t.Setenv(antigravityScopeRateLimitEnv, "true")
+ repo := &stubAntigravityAccountRepo{}
+ svc := &AntigravityGatewayService{accountRepo: repo}
+ account := &Account{ID: 9, Name: "acc-9", Platform: PlatformAntigravity}
+
+ body := buildGeminiRateLimitBody("3s")
+ svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, AntigravityQuotaScopeClaude)
+
+ require.Len(t, repo.scopeCalls, 1)
+ require.Empty(t, repo.rateCalls)
+ call := repo.scopeCalls[0]
+ require.Equal(t, account.ID, call.accountID)
+ require.Equal(t, AntigravityQuotaScopeClaude, call.scope)
+ require.WithinDuration(t, time.Now().Add(3*time.Second), call.resetAt, 2*time.Second)
+}
+
+func TestAntigravityHandleUpstreamError_UsesAccountLimitWhenScopeDisabled(t *testing.T) {
+ t.Setenv(antigravityScopeRateLimitEnv, "false")
+ repo := &stubAntigravityAccountRepo{}
+ svc := &AntigravityGatewayService{accountRepo: repo}
+ account := &Account{ID: 10, Name: "acc-10", Platform: PlatformAntigravity}
+
+ body := buildGeminiRateLimitBody("2s")
+ svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, AntigravityQuotaScopeClaude)
+
+ require.Len(t, repo.rateCalls, 1)
+ require.Empty(t, repo.scopeCalls)
+ call := repo.rateCalls[0]
+ require.Equal(t, account.ID, call.accountID)
+ require.WithinDuration(t, time.Now().Add(2*time.Second), call.resetAt, 2*time.Second)
+}
+
+func TestAccountIsSchedulableForModel_AntigravityRateLimits(t *testing.T) {
+ now := time.Now()
+ future := now.Add(10 * time.Minute)
+
+ account := &Account{
+ ID: 1,
+ Name: "acc",
+ Platform: PlatformAntigravity,
+ Status: StatusActive,
+ Schedulable: true,
+ }
+
+ account.RateLimitResetAt = &future
+ require.False(t, account.IsSchedulableForModel("claude-sonnet-4-5"))
+ require.False(t, account.IsSchedulableForModel("gemini-3-flash"))
+
+ account.RateLimitResetAt = nil
+ account.Extra = map[string]any{
+ antigravityQuotaScopesKey: map[string]any{
+ "claude": map[string]any{
+ "rate_limit_reset_at": future.Format(time.RFC3339),
+ },
+ },
+ }
+
+ require.False(t, account.IsSchedulableForModel("claude-sonnet-4-5"))
+ require.True(t, account.IsSchedulableForModel("gemini-3-flash"))
+}
+
+func buildGeminiRateLimitBody(delay string) []byte {
+ return []byte(fmt.Sprintf(`{"error":{"message":"too many requests","details":[{"metadata":{"quotaResetDelay":%q}}]}}`, delay))
+}
diff --git a/backend/internal/service/antigravity_token_refresher.go b/backend/internal/service/antigravity_token_refresher.go
index 9dd4463f..a07c86e6 100644
--- a/backend/internal/service/antigravity_token_refresher.go
+++ b/backend/internal/service/antigravity_token_refresher.go
@@ -61,5 +61,10 @@ func (r *AntigravityTokenRefresher) Refresh(ctx context.Context, account *Accoun
}
}
+ // 如果 project_id 获取失败,返回 credentials 但同时返回错误让账户被标记
+ if tokenInfo.ProjectIDMissing {
+ return newCredentials, fmt.Errorf("missing_project_id: 账户缺少project id,可能无法使用Antigravity")
+ }
+
return newCredentials, nil
}
diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go
index f543ef1a..4d17d5e1 100644
--- a/backend/internal/service/gateway_multiplatform_test.go
+++ b/backend/internal/service/gateway_multiplatform_test.go
@@ -105,6 +105,9 @@ func (m *mockAccountRepoForPlatform) BatchUpdateLastUsed(ctx context.Context, up
func (m *mockAccountRepoForPlatform) SetError(ctx context.Context, id int64, errorMsg string) error {
return nil
}
+func (m *mockAccountRepoForPlatform) ClearError(ctx context.Context, id int64) error {
+ return nil
+}
func (m *mockAccountRepoForPlatform) SetSchedulable(ctx context.Context, id int64, schedulable bool) error {
return nil
}
diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go
index 01d7cbf3..8dfddec7 100644
--- a/backend/internal/service/gateway_service.go
+++ b/backend/internal/service/gateway_service.go
@@ -11,6 +11,7 @@ import (
"fmt"
"io"
"log"
+ mathrand "math/rand"
"net/http"
"os"
"regexp"
@@ -918,7 +919,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
}
// ============ Layer 3: 兜底排队 ============
- sortAccountsByPriorityAndLastUsed(candidates, preferOAuth)
+ s.sortCandidatesForFallback(candidates, preferOAuth, cfg.FallbackSelectionMode)
for _, acc := range candidates {
// 会话数量限制检查(等待计划也需要占用会话配额)
if !s.checkAndRegisterSession(ctx, acc, sessionHash) {
@@ -1318,6 +1319,56 @@ func sortAccountsByPriorityAndLastUsed(accounts []*Account, preferOAuth bool) {
})
}
+// sortCandidatesForFallback 根据配置选择排序策略
+// mode: "last_used"(按最后使用时间) 或 "random"(随机)
+func (s *GatewayService) sortCandidatesForFallback(accounts []*Account, preferOAuth bool, mode string) {
+ if mode == "random" {
+ // 先按优先级排序,然后在同优先级内随机打乱
+ sortAccountsByPriorityOnly(accounts, preferOAuth)
+ shuffleWithinPriority(accounts)
+ } else {
+ // 默认按最后使用时间排序
+ sortAccountsByPriorityAndLastUsed(accounts, preferOAuth)
+ }
+}
+
+// sortAccountsByPriorityOnly 仅按优先级排序
+func sortAccountsByPriorityOnly(accounts []*Account, preferOAuth bool) {
+ sort.SliceStable(accounts, func(i, j int) bool {
+ a, b := accounts[i], accounts[j]
+ if a.Priority != b.Priority {
+ return a.Priority < b.Priority
+ }
+ if preferOAuth && a.Type != b.Type {
+ return a.Type == AccountTypeOAuth
+ }
+ return false
+ })
+}
+
+// shuffleWithinPriority 在同优先级内随机打乱顺序
+func shuffleWithinPriority(accounts []*Account) {
+ if len(accounts) <= 1 {
+ return
+ }
+ r := mathrand.New(mathrand.NewSource(time.Now().UnixNano()))
+ start := 0
+ for start < len(accounts) {
+ priority := accounts[start].Priority
+ end := start + 1
+ for end < len(accounts) && accounts[end].Priority == priority {
+ end++
+ }
+ // 对 [start, end) 范围内的账户随机打乱
+ if end-start > 1 {
+ r.Shuffle(end-start, func(i, j int) {
+ accounts[start+i], accounts[start+j] = accounts[start+j], accounts[start+i]
+ })
+ }
+ start = end
+ }
+}
+
// selectAccountForModelWithPlatform 选择单平台账户(完全隔离)
func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, platform string) (*Account, error) {
preferOAuth := platform == PlatformGemini
diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go
index f2ea5859..262a05d9 100644
--- a/backend/internal/service/gemini_multiplatform_test.go
+++ b/backend/internal/service/gemini_multiplatform_test.go
@@ -88,6 +88,9 @@ func (m *mockAccountRepoForGemini) BatchUpdateLastUsed(ctx context.Context, upda
func (m *mockAccountRepoForGemini) SetError(ctx context.Context, id int64, errorMsg string) error {
return nil
}
+func (m *mockAccountRepoForGemini) ClearError(ctx context.Context, id int64) error {
+ return nil
+}
func (m *mockAccountRepoForGemini) SetSchedulable(ctx context.Context, id int64, schedulable bool) error {
return nil
}
diff --git a/backend/internal/service/token_refresh_service.go b/backend/internal/service/token_refresh_service.go
index 26cfd97d..02e7d445 100644
--- a/backend/internal/service/token_refresh_service.go
+++ b/backend/internal/service/token_refresh_service.go
@@ -166,11 +166,25 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
for attempt := 1; attempt <= s.cfg.MaxRetries; attempt++ {
newCredentials, err := refresher.Refresh(ctx, account)
- if err == nil {
- // 刷新成功,更新账号credentials
+
+ // 如果有新凭证,先更新(即使有错误也要保存 token)
+ if newCredentials != nil {
account.Credentials = newCredentials
- if err := s.accountRepo.Update(ctx, account); err != nil {
- return fmt.Errorf("failed to save credentials: %w", err)
+ if saveErr := s.accountRepo.Update(ctx, account); saveErr != nil {
+ return fmt.Errorf("failed to save credentials: %w", saveErr)
+ }
+ }
+
+ if err == nil {
+ // Antigravity 账户:如果之前是因为缺少 project_id 而标记为 error,现在成功获取到了,清除错误状态
+ if account.Platform == PlatformAntigravity &&
+ account.Status == StatusError &&
+ strings.Contains(account.ErrorMessage, "missing_project_id:") {
+ if clearErr := s.accountRepo.ClearError(ctx, account.ID); clearErr != nil {
+ log.Printf("[TokenRefresh] Failed to clear error status for account %d: %v", account.ID, clearErr)
+ } else {
+ log.Printf("[TokenRefresh] Account %d: cleared missing_project_id error", account.ID)
+ }
}
// 对所有 OAuth 账号调用缓存失效(InvalidateToken 内部根据平台判断是否需要处理)
if s.cacheInvalidator != nil && account.Type == AccountTypeOAuth {
@@ -230,6 +244,7 @@ func isNonRetryableRefreshError(err error) bool {
"invalid_client", // 客户端配置错误
"unauthorized_client", // 客户端未授权
"access_denied", // 访问被拒绝
+ "missing_project_id", // 缺少 project_id
}
for _, needle := range nonRetryable {
if strings.Contains(msg, needle) {
diff --git a/frontend/src/components/layout/AppHeader.vue b/frontend/src/components/layout/AppHeader.vue
index fd8742c3..9d2b40fb 100644
--- a/frontend/src/components/layout/AppHeader.vue
+++ b/frontend/src/components/layout/AppHeader.vue
@@ -21,8 +21,20 @@
-
+
+
+
+
+ {{ t('nav.docs') }}
+
+
@@ -211,6 +223,7 @@ const user = computed(() => authStore.user)
const dropdownOpen = ref(false)
const dropdownRef = ref
(null)
const contactInfo = computed(() => appStore.contactInfo)
+const docUrl = computed(() => appStore.docUrl)
// 只在标准模式的管理员下显示新手引导按钮
const showOnboardingButton = computed(() => {
diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts
index fe95f6b9..726b7712 100644
--- a/frontend/src/i18n/locales/en.ts
+++ b/frontend/src/i18n/locales/en.ts
@@ -196,7 +196,8 @@ export default {
expand: 'Expand',
logout: 'Logout',
github: 'GitHub',
- mySubscriptions: 'My Subscriptions'
+ mySubscriptions: 'My Subscriptions',
+ docs: 'Docs'
},
// Auth
diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts
index ef5ebaff..02d21482 100644
--- a/frontend/src/i18n/locales/zh.ts
+++ b/frontend/src/i18n/locales/zh.ts
@@ -193,7 +193,8 @@ export default {
expand: '展开',
logout: '退出登录',
github: 'GitHub',
- mySubscriptions: '我的订阅'
+ mySubscriptions: '我的订阅',
+ docs: '文档'
},
// Auth