From b0d41823bd24bf99849dd1677e9a72b8a2d649d4 Mon Sep 17 00:00:00 2001 From: ianshaw Date: Sat, 3 Jan 2026 17:07:54 -0800 Subject: [PATCH] =?UTF-8?q?fix(thinking):=20=E4=BC=98=E5=8C=96=20thinking?= =?UTF-8?q?=20block=20=E7=AD=BE=E5=90=8D=E9=94=99=E8=AF=AF=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FilterThinkingBlocksForRetry: 将 thinking block 转换为 text block 而非直接删除 - stripThinkingFromClaudeRequest: Antigravity 网关同步采用转换策略 - 统一处理 thinking/redacted_thinking/无 type 字段的 thinking block - 保留 thinking 内容,避免上下文丢失 --- .../service/antigravity_gateway_service.go | 231 +++++++++++++++++- backend/internal/service/gateway_request.go | 132 ++++++++-- 2 files changed, 337 insertions(+), 26 deletions(-) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index e4843f1b..9b0dc65a 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -64,6 +64,7 @@ type AntigravityGatewayService struct { tokenProvider *AntigravityTokenProvider rateLimitService *RateLimitService httpUpstream HTTPUpstream + settingService *SettingService } func NewAntigravityGatewayService( @@ -72,12 +73,14 @@ func NewAntigravityGatewayService( tokenProvider *AntigravityTokenProvider, rateLimitService *RateLimitService, httpUpstream HTTPUpstream, + settingService *SettingService, ) *AntigravityGatewayService { return &AntigravityGatewayService{ accountRepo: accountRepo, tokenProvider: tokenProvider, rateLimitService: rateLimitService, httpUpstream: httpUpstream, + settingService: settingService, } } @@ -307,6 +310,22 @@ func (s *AntigravityGatewayService) unwrapV1InternalResponse(body []byte) ([]byt return body, nil } +// isModelNotFoundError 检测是否为模型不存在的 404 错误 +func isModelNotFoundError(statusCode int, body []byte) bool { + if statusCode != 404 { + return false + } + + bodyStr := strings.ToLower(string(body)) + keywords := []string{"model not found", "unknown model", "not found"} + for _, keyword := range keywords { + if strings.Contains(bodyStr, keyword) { + return true + } + } + return true // 404 without specific message also treated as model not found +} + // Forward 转发 Claude 协议请求(Claude → Gemini 转换) func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte) (*ForwardResult, error) { startTime := time.Now() @@ -397,16 +416,56 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, } defer func() { _ = resp.Body.Close() }() - // 处理错误响应 if resp.StatusCode >= 400 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) - if s.shouldFailoverUpstreamError(resp.StatusCode) { - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} + // 优先检测 thinking block 的 signature 相关错误(400)并重试一次: + // Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验, + // 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。 + if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) { + retryClaudeReq := claudeReq + retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...) + + stripped, stripErr := stripThinkingFromClaudeRequest(&retryClaudeReq) + if stripErr == nil && stripped { + log.Printf("Antigravity account %d: detected signature-related 400, retrying once without thinking blocks", account.ID) + + retryGeminiBody, txErr := antigravity.TransformClaudeToGemini(&retryClaudeReq, projectID, mappedModel) + if txErr == nil { + retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody) + if buildErr == nil { + retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency) + if retryErr == nil { + // Retry success: continue normal success flow with the new response. + if retryResp.StatusCode < 400 { + _ = resp.Body.Close() + resp = retryResp + respBody = nil + } + + // Retry still errored: replace error context with retry response. + retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) + _ = retryResp.Body.Close() + respBody = retryBody + resp = retryResp + } else { + log.Printf("Antigravity account %d: signature retry request failed: %v", account.ID, retryErr) + } + } + } + } } - return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody) + // 处理错误响应(重试后仍失败或不触发重试) + if resp.StatusCode >= 400 { + s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + + if s.shouldFailoverUpstreamError(resp.StatusCode) { + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} + } + + return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody) + } } requestID := resp.Header.Get("x-request-id") @@ -440,6 +499,122 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, }, nil } +func isSignatureRelatedError(respBody []byte) bool { + msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) + if msg == "" { + // Fallback: best-effort scan of the raw payload. + msg = strings.ToLower(string(respBody)) + } + + // Keep this intentionally broad: different upstreams may use "signature" or "thought_signature". + return strings.Contains(msg, "thought_signature") || strings.Contains(msg, "signature") +} + +func extractAntigravityErrorMessage(body []byte) string { + var payload map[string]any + if err := json.Unmarshal(body, &payload); err != nil { + return "" + } + + // Google-style: {"error": {"message": "..."}} + if errObj, ok := payload["error"].(map[string]any); ok { + if msg, ok := errObj["message"].(string); ok && strings.TrimSpace(msg) != "" { + return msg + } + } + + // Fallback: top-level message + if msg, ok := payload["message"].(string); ok && strings.TrimSpace(msg) != "" { + return msg + } + + return "" +} + +// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request. +// This preserves the thinking content while avoiding signature validation errors. +// Note: redacted_thinking blocks are removed because they cannot be converted to text. +// It also disables top-level `thinking` to prevent dummy-thought injection during retry. +func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error) { + if req == nil { + return false, nil + } + + changed := false + if req.Thinking != nil { + req.Thinking = nil + changed = true + } + + for i := range req.Messages { + raw := req.Messages[i].Content + if len(raw) == 0 { + continue + } + + // If content is a string, nothing to strip. + var str string + if json.Unmarshal(raw, &str) == nil { + continue + } + + // Otherwise treat as an array of blocks and convert thinking blocks to text. + var blocks []map[string]any + if err := json.Unmarshal(raw, &blocks); err != nil { + continue + } + + filtered := make([]map[string]any, 0, len(blocks)) + modifiedAny := false + for _, block := range blocks { + t, _ := block["type"].(string) + switch t { + case "thinking": + // Convert thinking to text, skip if empty + thinkingText, _ := block["thinking"].(string) + if thinkingText != "" { + filtered = append(filtered, map[string]any{ + "type": "text", + "text": thinkingText, + }) + } + modifiedAny = true + case "redacted_thinking": + // Remove redacted_thinking (cannot convert encrypted content) + modifiedAny = true + case "": + // Handle untyped block with "thinking" field + if thinkingText, hasThinking := block["thinking"].(string); hasThinking { + if thinkingText != "" { + filtered = append(filtered, map[string]any{ + "type": "text", + "text": thinkingText, + }) + } + modifiedAny = true + } else { + filtered = append(filtered, block) + } + default: + filtered = append(filtered, block) + } + } + + if !modifiedAny { + continue + } + + newRaw, err := json.Marshal(filtered) + if err != nil { + return changed, err + } + req.Messages[i].Content = newRaw + changed = true + } + + return changed, nil +} + // ForwardGemini 转发 Gemini 协议请求 func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte) (*ForwardResult, error) { startTime := time.Now() @@ -550,14 +725,40 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co } defer func() { _ = resp.Body.Close() }() - requestID := resp.Header.Get("x-request-id") - if requestID != "" { - c.Header("x-request-id", requestID) - } - // 处理错误响应 if resp.StatusCode >= 400 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + + // 模型兜底:模型不存在且开启 fallback 时,自动用 fallback 模型重试一次 + if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) && + isModelNotFoundError(resp.StatusCode, respBody) { + fallbackModel := s.settingService.GetFallbackModel(ctx, PlatformAntigravity) + if fallbackModel != "" && fallbackModel != mappedModel { + log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name) + + // 关闭原始响应,释放连接(respBody 已读取到内存) + _ = resp.Body.Close() + + fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body) + if err == nil { + fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped) + if err == nil { + fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency) + if err == nil && fallbackResp.StatusCode < 400 { + resp = fallbackResp + } else if fallbackResp != nil { + _ = fallbackResp.Body.Close() + } + } + } + } + } + + // fallback 成功:继续按正常响应处理 + if resp.StatusCode < 400 { + goto handleSuccess + } + s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) if s.shouldFailoverUpstreamError(resp.StatusCode) { @@ -565,6 +766,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co } // 解包并返回错误 + requestID := resp.Header.Get("x-request-id") + if requestID != "" { + c.Header("x-request-id", requestID) + } unwrapped, _ := s.unwrapV1InternalResponse(respBody) contentType := resp.Header.Get("Content-Type") if contentType == "" { @@ -574,6 +779,12 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode) } +handleSuccess: + requestID := resp.Header.Get("x-request-id") + if requestID != "" { + c.Header("x-request-id", requestID) + } + var usage *ClaudeUsage var firstTokenMs *int diff --git a/backend/internal/service/gateway_request.go b/backend/internal/service/gateway_request.go index 4cf40199..6b54e352 100644 --- a/backend/internal/service/gateway_request.go +++ b/backend/internal/service/gateway_request.go @@ -81,6 +81,115 @@ func ParseGatewayRequest(body []byte) (*ParsedRequest, error) { // - When thinking.type == "enabled": Only remove thinking blocks without valid signatures // (blocks with missing/empty/dummy signatures that would cause 400 errors) func FilterThinkingBlocks(body []byte) []byte { + return filterThinkingBlocksInternal(body, false) +} + +// FilterThinkingBlocksForRetry converts thinking blocks to text blocks for retry scenarios +// This preserves the thinking content while avoiding signature validation errors. +// Note: redacted_thinking blocks are removed because they cannot be converted to text. +func FilterThinkingBlocksForRetry(body []byte) []byte { + // Fast path: check for presence of thinking-related keys + if !bytes.Contains(body, []byte(`"type":"thinking"`)) && + !bytes.Contains(body, []byte(`"type": "thinking"`)) && + !bytes.Contains(body, []byte(`"type":"redacted_thinking"`)) && + !bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) && + !bytes.Contains(body, []byte(`"thinking":`)) && + !bytes.Contains(body, []byte(`"thinking" :`)) { + return body + } + + var req map[string]any + if err := json.Unmarshal(body, &req); err != nil { + return body + } + + messages, ok := req["messages"].([]any) + if !ok { + return body + } + + modified := false + for _, msg := range messages { + msgMap, ok := msg.(map[string]any) + if !ok { + continue + } + + content, ok := msgMap["content"].([]any) + if !ok { + continue + } + + newContent := make([]any, 0, len(content)) + modifiedThisMsg := false + + for _, block := range content { + blockMap, ok := block.(map[string]any) + if !ok { + newContent = append(newContent, block) + continue + } + + blockType, _ := blockMap["type"].(string) + + // Case 1: Standard thinking block - convert to text + if blockType == "thinking" { + thinkingText, _ := blockMap["thinking"].(string) + if thinkingText != "" { + newContent = append(newContent, map[string]any{ + "type": "text", + "text": thinkingText, + }) + } + modifiedThisMsg = true + continue + } + + // Case 2: Redacted thinking block - remove (cannot convert encrypted content) + if blockType == "redacted_thinking" { + modifiedThisMsg = true + continue + } + + // Case 3: Untyped block with "thinking" field - convert to text + if blockType == "" { + if thinkingText, hasThinking := blockMap["thinking"].(string); hasThinking { + if thinkingText != "" { + newContent = append(newContent, map[string]any{ + "type": "text", + "text": thinkingText, + }) + } + modifiedThisMsg = true + continue + } + } + + newContent = append(newContent, block) + } + + if modifiedThisMsg { + msgMap["content"] = newContent + modified = true + } + } + + if !modified { + return body + } + + newBody, err := json.Marshal(req) + if err != nil { + return body + } + return newBody +} + +// filterThinkingBlocksInternal removes invalid thinking blocks from request +// Strategy: +// - When thinking.type != "enabled": Remove all thinking blocks +// - When thinking.type == "enabled": Only remove thinking blocks without valid signatures +func filterThinkingBlocksInternal(body []byte, _ bool) []byte { // Fast path: if body doesn't contain "thinking", skip parsing if !bytes.Contains(body, []byte(`"type":"thinking"`)) && !bytes.Contains(body, []byte(`"type": "thinking"`)) && @@ -93,7 +202,7 @@ func FilterThinkingBlocks(body []byte) []byte { var req map[string]any if err := json.Unmarshal(body, &req); err != nil { - return body // Return original on parse error + return body } // Check if thinking is enabled @@ -106,7 +215,7 @@ func FilterThinkingBlocks(body []byte) []byte { messages, ok := req["messages"].([]any) if !ok { - return body // No messages array + return body } filtered := false @@ -122,7 +231,6 @@ func FilterThinkingBlocks(body []byte) []byte { continue } - // Filter thinking blocks from content array newContent := make([]any, 0, len(content)) filteredThisMessage := false @@ -135,30 +243,24 @@ func FilterThinkingBlocks(body []byte) []byte { blockType, _ := blockMap["type"].(string) - // Handle thinking/redacted_thinking blocks if blockType == "thinking" || blockType == "redacted_thinking" { // When thinking is enabled and this is an assistant message, - // only keep thinking blocks with valid (non-empty, non-dummy) signatures + // only keep thinking blocks with valid signatures if thinkingEnabled && role == "assistant" { signature, _ := blockMap["signature"].(string) - // Keep blocks with valid signatures, remove those without if signature != "" && signature != "skip_thought_signature_validator" { newContent = append(newContent, block) continue } } - filtered = true filteredThisMessage = true continue } - // Some clients send the "thinking" object without a "type" discriminator. - // We intentionally do not drop other typed blocks (e.g. tool_use) that might - // legitimately contain a "thinking" key inside their payload. + // Handle blocks without type discriminator but with "thinking" key if blockType == "" { - if thinkingContent, hasThinking := blockMap["thinking"]; hasThinking { - _ = thinkingContent + if _, hasThinking := blockMap["thinking"]; hasThinking { filtered = true filteredThisMessage = true continue @@ -174,14 +276,12 @@ func FilterThinkingBlocks(body []byte) []byte { } if !filtered { - return body // No changes needed + return body } - // Re-serialize newBody, err := json.Marshal(req) if err != nil { - return body // Return original on marshal error + return body } - return newBody }