fix(antigravity): 修复频繁出现429错误的问题

This commit is contained in:
shaw
2026-01-08 20:05:55 +08:00
parent 3fb43b91bf
commit b015a3bd8a
2 changed files with 375 additions and 46 deletions

View File

@@ -149,6 +149,11 @@ func defaultIdentityPatch(_ string) string {
return antigravityIdentity return antigravityIdentity
} }
// GetDefaultIdentityPatch 返回默认的 Antigravity 身份提示词
func GetDefaultIdentityPatch() string {
return antigravityIdentity
}
// buildSystemInstruction 构建 systemInstruction // buildSystemInstruction 构建 systemInstruction
func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions) *GeminiContent { func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions) *GeminiContent {
var parts []GeminiPart var parts []GeminiPart

View File

@@ -187,10 +187,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
return nil, err return nil, err
} }
// DEBUG: 打印请求 header 和 body // 调试日志Test 请求信息
log.Printf("[DEBUG] Antigravity TestConnection - URL: %s", req.URL.String()) log.Printf("[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String())
log.Printf("[DEBUG] Antigravity TestConnection - Headers: %v", req.Header)
log.Printf("[DEBUG] Antigravity TestConnection - Body: %s", string(requestBody))
// 代理 URL // 代理 URL
proxyURL := "" proxyURL := ""
@@ -235,6 +233,12 @@ func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model stri
}, },
}, },
}, },
// Antigravity 上游要求必须包含身份提示词
"systemInstruction": map[string]any{
"parts": []map[string]any{
{"text": antigravity.GetDefaultIdentityPatch()},
},
},
} }
payloadBytes, _ := json.Marshal(payload) payloadBytes, _ := json.Marshal(payload)
return s.wrapV1InternalRequest(projectID, model, payloadBytes) return s.wrapV1InternalRequest(projectID, model, payloadBytes)
@@ -333,6 +337,53 @@ func extractTextFromSSEResponse(respBody []byte) string {
return strings.Join(texts, "") return strings.Join(texts, "")
} }
// injectIdentityPatchToGeminiRequest 为 Gemini 格式请求注入身份提示词
// 如果请求中已包含 "You are Antigravity" 则不重复注入
func injectIdentityPatchToGeminiRequest(body []byte) ([]byte, error) {
var request map[string]any
if err := json.Unmarshal(body, &request); err != nil {
return nil, fmt.Errorf("解析 Gemini 请求失败: %w", err)
}
// 检查现有 systemInstruction 是否已包含身份提示词
if sysInst, ok := request["systemInstruction"].(map[string]any); ok {
if parts, ok := sysInst["parts"].([]any); ok {
for _, part := range parts {
if partMap, ok := part.(map[string]any); ok {
if text, ok := partMap["text"].(string); ok {
if strings.Contains(text, "You are Antigravity") {
// 已包含身份提示词,直接返回原始请求
return body, nil
}
}
}
}
}
}
// 获取默认身份提示词
identityPatch := antigravity.GetDefaultIdentityPatch()
// 构建新的 systemInstruction
newPart := map[string]any{"text": identityPatch}
if existing, ok := request["systemInstruction"].(map[string]any); ok {
// 已有 systemInstruction在开头插入身份提示词
if parts, ok := existing["parts"].([]any); ok {
existing["parts"] = append([]any{newPart}, parts...)
} else {
existing["parts"] = []any{newPart}
}
} else {
// 没有 systemInstruction创建新的
request["systemInstruction"] = map[string]any{
"parts": []any{newPart},
}
}
return json.Marshal(request)
}
// wrapV1InternalRequest 包装请求为 v1internal 格式 // wrapV1InternalRequest 包装请求为 v1internal 格式
func (s *AntigravityGatewayService) wrapV1InternalRequest(projectID, model string, originalBody []byte) ([]byte, error) { func (s *AntigravityGatewayService) wrapV1InternalRequest(projectID, model string, originalBody []byte) ([]byte, error) {
var request any var request any
@@ -418,17 +469,20 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
proxyURL = account.Proxy.URL() proxyURL = account.Proxy.URL()
} }
// 获取转换选项
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
transformOpts := s.getClaudeTransformOptions(ctx)
transformOpts.EnableIdentityPatch = true // 强制启用Antigravity 上游必需
// 转换 Claude 请求为 Gemini 格式 // 转换 Claude 请求为 Gemini 格式
geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, s.getClaudeTransformOptions(ctx)) geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, transformOpts)
if err != nil { if err != nil {
return nil, fmt.Errorf("transform request: %w", err) return nil, fmt.Errorf("transform request: %w", err)
} }
// 构建上游 actionNewAPIRequest 会自动处理 ?alt=sse 和 Accept Header // Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
action := "generateContent" // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
if claudeReq.Stream { action := "streamGenerateContent"
action = "streamGenerateContent"
}
// 重试循环 // 重试循环
var resp *http.Response var resp *http.Response
@@ -465,7 +519,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
_ = resp.Body.Close() _ = resp.Body.Close()
if attempt < antigravityMaxRetries { if attempt < antigravityMaxRetries {
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries) log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
if !sleepAntigravityBackoffWithContext(ctx, attempt) { if !sleepAntigravityBackoffWithContext(ctx, attempt) {
log.Printf("%s status=context_canceled_during_backoff", prefix) log.Printf("%s status=context_canceled_during_backoff", prefix)
return nil, ctx.Err() return nil, ctx.Err()
@@ -584,6 +638,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
if claudeReq.Stream { if claudeReq.Stream {
// 客户端要求流式,直接透传转换
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel) streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel)
if err != nil { if err != nil {
log.Printf("%s status=stream_error error=%v", prefix, err) log.Printf("%s status=stream_error error=%v", prefix, err)
@@ -592,10 +647,14 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
usage = streamRes.usage usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs firstTokenMs = streamRes.firstTokenMs
} else { } else {
usage, err = s.handleClaudeNonStreamingResponse(c, resp, originalModel) // 客户端要求非流式,收集流式响应后转换返回
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel)
if err != nil { if err != nil {
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
return nil, err return nil, err
} }
usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
} }
return &ForwardResult{ return &ForwardResult{
@@ -928,18 +987,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
proxyURL = account.Proxy.URL() proxyURL = account.Proxy.URL()
} }
// 包装请求 // Antigravity 上游要求必须包含身份提示词,注入到请求
wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, body) injectedBody, err := injectIdentityPatchToGeminiRequest(body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 构建上游 actionNewAPIRequest 会自动处理 ?alt=sse 和 Accept Header // 包装请求
upstreamAction := action wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, injectedBody)
if action == "generateContent" && stream { if err != nil {
upstreamAction = "streamGenerateContent" return nil, err
} }
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
upstreamAction := "streamGenerateContent"
// 重试循环 // 重试循环
var resp *http.Response var resp *http.Response
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
@@ -1016,7 +1079,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
if fallbackModel != "" && fallbackModel != mappedModel { if fallbackModel != "" && fallbackModel != mappedModel {
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name) log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name)
fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body) fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, injectedBody)
if err == nil { if err == nil {
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped) fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped)
if err == nil { if err == nil {
@@ -1066,7 +1129,8 @@ handleSuccess:
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
if stream || upstreamAction == "streamGenerateContent" { if stream {
// 客户端要求流式,直接透传
streamRes, err := s.handleGeminiStreamingResponse(c, resp, startTime) streamRes, err := s.handleGeminiStreamingResponse(c, resp, startTime)
if err != nil { if err != nil {
log.Printf("%s status=stream_error error=%v", prefix, err) log.Printf("%s status=stream_error error=%v", prefix, err)
@@ -1075,11 +1139,14 @@ handleSuccess:
usage = streamRes.usage usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs firstTokenMs = streamRes.firstTokenMs
} else { } else {
usageResp, err := s.handleGeminiNonStreamingResponse(c, resp) // 客户端要求非流式,收集流式响应后返回
streamRes, err := s.handleGeminiStreamToNonStreaming(c, resp, startTime)
if err != nil { if err != nil {
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
return nil, err return nil, err
} }
usage = usageResp usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
} }
if usage == nil { if usage == nil {
@@ -1126,9 +1193,9 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
// sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待 // sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待
// 返回 true 表示正常完成等待false 表示 context 已取消 // 返回 true 表示正常完成等待false 表示 context 已取消
func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
delay := geminiRetryBaseDelay * time.Duration(1<<uint(attempt-1)) delay := antigravityRetryBaseDelay * time.Duration(1<<uint(attempt-1))
if delay > geminiRetryMaxDelay { if delay > antigravityRetryMaxDelay {
delay = geminiRetryMaxDelay delay = antigravityRetryMaxDelay
} }
// +/- 20% jitter // +/- 20% jitter
@@ -1340,25 +1407,150 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
} }
} }
func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Context, resp *http.Response) (*ClaudeUsage, error) { // handleGeminiStreamToNonStreaming 读取上游流式响应,合并为非流式响应返回给客户端
respBody, err := io.ReadAll(resp.Body) // Gemini 流式响应中每个 chunk 都包含累积的完整文本,只需保留最后一个有效响应
if err != nil { func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) {
return nil, err scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
usage := &ClaudeUsage{}
var firstTokenMs *int
var last map[string]any
var lastWithParts map[string]any
type scanEvent struct {
line string
err error
} }
// 解包 v1internal 响应 // 独立 goroutine 读取上游,避免读取阻塞影响超时处理
unwrapped, _ := s.unwrapV1InternalResponse(respBody) events := make(chan scanEvent, 16)
done := make(chan struct{})
var parsed map[string]any sendEvent := func(ev scanEvent) bool {
if json.Unmarshal(unwrapped, &parsed) == nil { select {
if u := extractGeminiUsage(parsed); u != nil { case events <- ev:
c.Data(resp.StatusCode, "application/json", unwrapped) return true
return u, nil case <-done:
return false
} }
} }
c.Data(resp.StatusCode, "application/json", unwrapped) var lastReadAt int64
return &ClaudeUsage{}, nil atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
go func() {
defer close(events)
for scanner.Scan() {
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
if !sendEvent(scanEvent{line: scanner.Text()}) {
return
}
}
if err := scanner.Err(); err != nil {
_ = sendEvent(scanEvent{err: err})
}
}()
defer close(done)
// 上游数据间隔超时保护(防止上游挂起长期占用连接)
streamInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
}
var intervalTicker *time.Ticker
if streamInterval > 0 {
intervalTicker = time.NewTicker(streamInterval)
defer intervalTicker.Stop()
}
var intervalCh <-chan time.Time
if intervalTicker != nil {
intervalCh = intervalTicker.C
}
for {
select {
case ev, ok := <-events:
if !ok {
// 流结束,返回收集的响应
goto returnResponse
}
if ev.err != nil {
if errors.Is(ev.err, bufio.ErrTooLong) {
log.Printf("SSE line too long (antigravity non-stream): max_size=%d error=%v", maxLineSize, ev.err)
}
return nil, ev.err
}
line := ev.line
trimmed := strings.TrimRight(line, "\r\n")
if !strings.HasPrefix(trimmed, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if payload == "" || payload == "[DONE]" {
continue
}
// 解包 v1internal 响应
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
if parseErr != nil {
continue
}
var parsed map[string]any
if err := json.Unmarshal(inner, &parsed); err != nil {
continue
}
// 记录首 token 时间
if firstTokenMs == nil {
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
last = parsed
// 提取 usage
if u := extractGeminiUsage(parsed); u != nil {
usage = u
}
// 保留最后一个有 parts 的响应
if parts := extractGeminiParts(parsed); len(parts) > 0 {
lastWithParts = parsed
}
case <-intervalCh:
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
if time.Since(lastRead) < streamInterval {
continue
}
log.Printf("Stream data interval timeout (antigravity non-stream)")
return nil, fmt.Errorf("stream data interval timeout")
}
}
returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
}
respBody, err := json.Marshal(finalResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal response: %w", err)
}
c.Data(http.StatusOK, "application/json", respBody)
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
} }
func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) error { func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) error {
@@ -1435,17 +1627,148 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int,
return fmt.Errorf("%s", message) return fmt.Errorf("%s", message)
} }
// handleClaudeNonStreamingResponse 处理 Claude 非流式响应Gemini → Claude 转换) // handleClaudeStreamToNonStreaming 收集上游流式响应,转换为 Claude 非流式格式返回
func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Context, resp *http.Response, originalModel string) (*ClaudeUsage, error) { // 用于处理客户端非流式请求但上游只支持流式的情况
body, err := io.ReadAll(io.LimitReader(resp.Body, 8<<20)) func (s *AntigravityGatewayService) handleClaudeStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string) (*antigravityStreamResult, error) {
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
var firstTokenMs *int
var last map[string]any
var lastWithParts map[string]any
type scanEvent struct {
line string
err error
}
// 独立 goroutine 读取上游,避免读取阻塞影响超时处理
events := make(chan scanEvent, 16)
done := make(chan struct{})
sendEvent := func(ev scanEvent) bool {
select {
case events <- ev:
return true
case <-done:
return false
}
}
var lastReadAt int64
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
go func() {
defer close(events)
for scanner.Scan() {
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
if !sendEvent(scanEvent{line: scanner.Text()}) {
return
}
}
if err := scanner.Err(); err != nil {
_ = sendEvent(scanEvent{err: err})
}
}()
defer close(done)
// 上游数据间隔超时保护(防止上游挂起长期占用连接)
streamInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
}
var intervalTicker *time.Ticker
if streamInterval > 0 {
intervalTicker = time.NewTicker(streamInterval)
defer intervalTicker.Stop()
}
var intervalCh <-chan time.Time
if intervalTicker != nil {
intervalCh = intervalTicker.C
}
for {
select {
case ev, ok := <-events:
if !ok {
// 流结束,转换并返回响应
goto returnResponse
}
if ev.err != nil {
if errors.Is(ev.err, bufio.ErrTooLong) {
log.Printf("SSE line too long (antigravity claude non-stream): max_size=%d error=%v", maxLineSize, ev.err)
}
return nil, ev.err
}
line := ev.line
trimmed := strings.TrimRight(line, "\r\n")
if !strings.HasPrefix(trimmed, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if payload == "" || payload == "[DONE]" {
continue
}
// 解包 v1internal 响应
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
if parseErr != nil {
continue
}
var parsed map[string]any
if err := json.Unmarshal(inner, &parsed); err != nil {
continue
}
// 记录首 token 时间
if firstTokenMs == nil {
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
last = parsed
// 保留最后一个有 parts 的响应
if parts := extractGeminiParts(parsed); len(parts) > 0 {
lastWithParts = parsed
}
case <-intervalCh:
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
if time.Since(lastRead) < streamInterval {
continue
}
log.Printf("Stream data interval timeout (antigravity claude non-stream)")
return nil, fmt.Errorf("stream data interval timeout")
}
}
returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream")
}
// 序列化为 JSONGemini 格式)
geminiBody, err := json.Marshal(finalResponse)
if err != nil { if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to read upstream response") return nil, fmt.Errorf("failed to marshal gemini response: %w", err)
} }
// 转换 Gemini 响应为 Claude 格式 // 转换 Gemini 响应为 Claude 格式
claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(body, originalModel) claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(geminiBody, originalModel)
if err != nil { if err != nil {
log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(body)) log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(geminiBody))
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response") return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
} }
@@ -1458,7 +1781,8 @@ func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Cont
CacheCreationInputTokens: agUsage.CacheCreationInputTokens, CacheCreationInputTokens: agUsage.CacheCreationInputTokens,
CacheReadInputTokens: agUsage.CacheReadInputTokens, CacheReadInputTokens: agUsage.CacheReadInputTokens,
} }
return usage, nil
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
} }
// handleClaudeStreamingResponse 处理 Claude 流式响应Gemini SSE → Claude SSE 转换) // handleClaudeStreamingResponse 处理 Claude 流式响应Gemini SSE → Claude SSE 转换)