fix(backend): 改进 thinking/tool block 签名处理和重试策略

主要改动:
- request_transformer: thinking block 缺少签名时降级为文本而非丢弃,保留内容并在上层禁用 thinking mode
- antigravity_gateway_service: 新增两阶段降级策略,先处理 thinking blocks,如仍失败且涉及 tool 签名错误则进一步降级 tool blocks
- gateway_request: 新增 FilterSignatureSensitiveBlocksForRetry 函数,支持将 tool_use/tool_result 降级为文本
- gateway_request: 改进 FilterThinkingBlocksForRetry,禁用顶层 thinking 配置以避免结构约束冲突
- gateway_service: 实现保守的两阶段重试逻辑,优先保留内容,仅在必要时降级工具调用
- 新增 antigravity_gateway_service_test.go 测试签名块剥离逻辑
- 更新相关测试用例以验证降级行为

此修复解决了跨平台/账户切换时历史消息签名失效导致的请求失败问题。
This commit is contained in:
IanShaw027
2026-01-04 22:32:36 +08:00
parent 99308ab4fb
commit 87426e5dda
8 changed files with 815 additions and 110 deletions

View File

@@ -22,7 +22,7 @@ func TransformClaudeToGemini(claudeReq *ClaudeRequest, projectID, mappedModel st
allowDummyThought := strings.HasPrefix(mappedModel, "gemini-")
// 1. 构建 contents
contents, err := buildContents(claudeReq.Messages, toolIDToName, isThinkingEnabled, allowDummyThought)
contents, strippedThinking, err := buildContents(claudeReq.Messages, toolIDToName, isThinkingEnabled, allowDummyThought)
if err != nil {
return nil, fmt.Errorf("build contents: %w", err)
}
@@ -31,7 +31,15 @@ func TransformClaudeToGemini(claudeReq *ClaudeRequest, projectID, mappedModel st
systemInstruction := buildSystemInstruction(claudeReq.System, claudeReq.Model)
// 3. 构建 generationConfig
generationConfig := buildGenerationConfig(claudeReq)
reqForConfig := claudeReq
if strippedThinking {
// If we had to downgrade thinking blocks to plain text due to missing/invalid signatures,
// disable upstream thinking mode to avoid signature/structure validation errors.
reqCopy := *claudeReq
reqCopy.Thinking = nil
reqForConfig = &reqCopy
}
generationConfig := buildGenerationConfig(reqForConfig)
// 4. 构建 tools
tools := buildTools(claudeReq.Tools)
@@ -120,8 +128,9 @@ func buildSystemInstruction(system json.RawMessage, modelName string) *GeminiCon
}
// buildContents 构建 contents
func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isThinkingEnabled, allowDummyThought bool) ([]GeminiContent, error) {
func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isThinkingEnabled, allowDummyThought bool) ([]GeminiContent, bool, error) {
var contents []GeminiContent
strippedThinking := false
for i, msg := range messages {
role := msg.Role
@@ -129,9 +138,12 @@ func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isT
role = "model"
}
parts, err := buildParts(msg.Content, toolIDToName, allowDummyThought)
parts, strippedThisMsg, err := buildParts(msg.Content, toolIDToName, allowDummyThought)
if err != nil {
return nil, fmt.Errorf("build parts for message %d: %w", i, err)
return nil, false, fmt.Errorf("build parts for message %d: %w", i, err)
}
if strippedThisMsg {
strippedThinking = true
}
// 只有 Gemini 模型支持 dummy thinking block workaround
@@ -165,7 +177,7 @@ func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isT
})
}
return contents, nil
return contents, strippedThinking, nil
}
// dummyThoughtSignature 用于跳过 Gemini 3 thought_signature 验证
@@ -174,8 +186,9 @@ const dummyThoughtSignature = "skip_thought_signature_validator"
// buildParts 构建消息的 parts
// allowDummyThought: 只有 Gemini 模型支持 dummy thought signature
func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDummyThought bool) ([]GeminiPart, error) {
func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDummyThought bool) ([]GeminiPart, bool, error) {
var parts []GeminiPart
strippedThinking := false
// 尝试解析为字符串
var textContent string
@@ -183,13 +196,13 @@ func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDu
if textContent != "(no content)" && strings.TrimSpace(textContent) != "" {
parts = append(parts, GeminiPart{Text: strings.TrimSpace(textContent)})
}
return parts, nil
return parts, false, nil
}
// 解析为内容块数组
var blocks []ContentBlock
if err := json.Unmarshal(content, &blocks); err != nil {
return nil, fmt.Errorf("parse content blocks: %w", err)
return nil, false, fmt.Errorf("parse content blocks: %w", err)
}
for _, block := range blocks {
@@ -208,8 +221,11 @@ func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDu
if block.Signature != "" {
part.ThoughtSignature = block.Signature
} else if !allowDummyThought {
// Claude 模型需要有效 signature,跳过无 signature 的 thinking block
log.Printf("Warning: skipping thinking block without signature for Claude model")
// Claude 模型需要有效 signature;在缺失时降级为普通文本,并在上层禁用 thinking mode。
if strings.TrimSpace(block.Thinking) != "" {
parts = append(parts, GeminiPart{Text: block.Thinking})
}
strippedThinking = true
continue
} else {
// Gemini 模型使用 dummy signature
@@ -276,7 +292,7 @@ func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDu
}
}
return parts, nil
return parts, strippedThinking, nil
}
// parseToolResultContent 解析 tool_result 的 content

View File

@@ -15,15 +15,15 @@ func TestBuildParts_ThinkingBlockWithoutSignature(t *testing.T) {
description string
}{
{
name: "Claude model - drop thinking without signature",
name: "Claude model - downgrade thinking to text without signature",
content: `[
{"type": "text", "text": "Hello"},
{"type": "thinking", "thinking": "Let me think...", "signature": ""},
{"type": "text", "text": "World"}
]`,
allowDummyThought: false,
expectedParts: 2, // thinking 内容被丢弃
description: "Claude模型应丢弃无signaturethinking block内容",
expectedParts: 3, // thinking 内容降级为普通 text part
description: "Claude模型缺少signature时应将thinking降级为text并在上层禁用thinking mode",
},
{
name: "Claude model - preserve thinking block with signature",
@@ -52,7 +52,7 @@ func TestBuildParts_ThinkingBlockWithoutSignature(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
toolIDToName := make(map[string]string)
parts, err := buildParts(json.RawMessage(tt.content), toolIDToName, tt.allowDummyThought)
parts, _, err := buildParts(json.RawMessage(tt.content), toolIDToName, tt.allowDummyThought)
if err != nil {
t.Fatalf("buildParts() error = %v", err)
@@ -71,6 +71,17 @@ func TestBuildParts_ThinkingBlockWithoutSignature(t *testing.T) {
t.Fatalf("expected thought part with signature sig_real_123, got thought=%v signature=%q",
parts[1].Thought, parts[1].ThoughtSignature)
}
case "Claude model - downgrade thinking to text without signature":
if len(parts) != 3 {
t.Fatalf("expected 3 parts, got %d", len(parts))
}
if parts[1].Thought {
t.Fatalf("expected downgraded text part, got thought=%v signature=%q",
parts[1].Thought, parts[1].ThoughtSignature)
}
if parts[1].Text != "Let me think..." {
t.Fatalf("expected downgraded text %q, got %q", "Let me think...", parts[1].Text)
}
case "Gemini model - use dummy signature":
if len(parts) != 3 {
t.Fatalf("expected 3 parts, got %d", len(parts))
@@ -91,7 +102,7 @@ func TestBuildParts_ToolUseSignatureHandling(t *testing.T) {
t.Run("Gemini uses dummy tool_use signature", func(t *testing.T) {
toolIDToName := make(map[string]string)
parts, err := buildParts(json.RawMessage(content), toolIDToName, true)
parts, _, err := buildParts(json.RawMessage(content), toolIDToName, true)
if err != nil {
t.Fatalf("buildParts() error = %v", err)
}
@@ -105,7 +116,7 @@ func TestBuildParts_ToolUseSignatureHandling(t *testing.T) {
t.Run("Claude model - preserve valid signature for tool_use", func(t *testing.T) {
toolIDToName := make(map[string]string)
parts, err := buildParts(json.RawMessage(content), toolIDToName, false)
parts, _, err := buildParts(json.RawMessage(content), toolIDToName, false)
if err != nil {
t.Fatalf("buildParts() error = %v", err)
}

View File

@@ -443,35 +443,70 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
// 当历史消息携带的 signature 不合法时会直接 400去除 thinking 后可继续完成请求。
if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) {
retryClaudeReq := claudeReq
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
// Conservative two-stage fallback:
// 1) Disable top-level thinking + thinking->text
// 2) Only if still signature-related 400: also downgrade tool_use/tool_result to text.
stripped, stripErr := stripThinkingFromClaudeRequest(&retryClaudeReq)
if stripErr == nil && stripped {
log.Printf("Antigravity account %d: detected signature-related 400, retrying once without thinking blocks", account.ID)
retryStages := []struct {
name string
strip func(*antigravity.ClaudeRequest) (bool, error)
}{
{name: "thinking-only", strip: stripThinkingFromClaudeRequest},
{name: "thinking+tools", strip: stripSignatureSensitiveBlocksFromClaudeRequest},
}
for _, stage := range retryStages {
retryClaudeReq := claudeReq
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
stripped, stripErr := stage.strip(&retryClaudeReq)
if stripErr != nil || !stripped {
continue
}
log.Printf("Antigravity account %d: detected signature-related 400, retrying once (%s)", account.ID, stage.name)
retryGeminiBody, txErr := antigravity.TransformClaudeToGemini(&retryClaudeReq, projectID, mappedModel)
if txErr == nil {
retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody)
if buildErr == nil {
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if retryErr == nil {
// Retry success: continue normal success flow with the new response.
if retryResp.StatusCode < 400 {
_ = resp.Body.Close()
resp = retryResp
respBody = nil
} else {
// Retry still errored: replace error context with retry response.
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
respBody = retryBody
resp = retryResp
}
} else {
log.Printf("Antigravity account %d: signature retry request failed: %v", account.ID, retryErr)
}
if txErr != nil {
continue
}
retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody)
if buildErr != nil {
continue
}
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if retryErr != nil {
log.Printf("Antigravity account %d: signature retry request failed (%s): %v", account.ID, stage.name, retryErr)
continue
}
if retryResp.StatusCode < 400 {
_ = resp.Body.Close()
resp = retryResp
respBody = nil
break
}
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
// If this stage fixed the signature issue, we stop; otherwise we may try the next stage.
if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) {
respBody = retryBody
resp = &http.Response{
StatusCode: retryResp.StatusCode,
Header: retryResp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(retryBody)),
}
break
}
// Still signature-related; capture context and allow next stage.
respBody = retryBody
resp = &http.Response{
StatusCode: retryResp.StatusCode,
Header: retryResp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(retryBody)),
}
}
}
@@ -555,7 +590,7 @@ func extractAntigravityErrorMessage(body []byte) string {
// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request.
// This preserves the thinking content while avoiding signature validation errors.
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
// It also disables top-level `thinking` to prevent dummy-thought injection during retry.
// It also disables top-level `thinking` to avoid upstream structural constraints for thinking mode.
func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error) {
if req == nil {
return false, nil
@@ -585,6 +620,92 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
continue
}
filtered := make([]map[string]any, 0, len(blocks))
modifiedAny := false
for _, block := range blocks {
t, _ := block["type"].(string)
switch t {
case "thinking":
thinkingText, _ := block["thinking"].(string)
if thinkingText != "" {
filtered = append(filtered, map[string]any{
"type": "text",
"text": thinkingText,
})
}
modifiedAny = true
case "redacted_thinking":
modifiedAny = true
case "":
if thinkingText, hasThinking := block["thinking"].(string); hasThinking {
if thinkingText != "" {
filtered = append(filtered, map[string]any{
"type": "text",
"text": thinkingText,
})
}
modifiedAny = true
} else {
filtered = append(filtered, block)
}
default:
filtered = append(filtered, block)
}
}
if !modifiedAny {
continue
}
if len(filtered) == 0 {
filtered = append(filtered, map[string]any{
"type": "text",
"text": "(content removed)",
})
}
newRaw, err := json.Marshal(filtered)
if err != nil {
return changed, err
}
req.Messages[i].Content = newRaw
changed = true
}
return changed, nil
}
// stripSignatureSensitiveBlocksFromClaudeRequest is a stronger retry degradation that additionally converts
// tool blocks to plain text. Use this only after a thinking-only retry still fails with signature errors.
func stripSignatureSensitiveBlocksFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error) {
if req == nil {
return false, nil
}
changed := false
if req.Thinking != nil {
req.Thinking = nil
changed = true
}
for i := range req.Messages {
raw := req.Messages[i].Content
if len(raw) == 0 {
continue
}
// If content is a string, nothing to strip.
var str string
if json.Unmarshal(raw, &str) == nil {
continue
}
// Otherwise treat as an array of blocks and convert signature-sensitive blocks to text.
var blocks []map[string]any
if err := json.Unmarshal(raw, &blocks); err != nil {
continue
}
filtered := make([]map[string]any, 0, len(blocks))
modifiedAny := false
for _, block := range blocks {
@@ -603,6 +724,49 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
case "redacted_thinking":
// Remove redacted_thinking (cannot convert encrypted content)
modifiedAny = true
case "tool_use":
// Convert tool_use to text to avoid upstream signature/thought_signature validation errors.
// This is a retry-only degradation path, so we prioritise request validity over tool semantics.
name, _ := block["name"].(string)
id, _ := block["id"].(string)
input := block["input"]
inputJSON, _ := json.Marshal(input)
text := "(tool_use)"
if name != "" {
text += " name=" + name
}
if id != "" {
text += " id=" + id
}
if len(inputJSON) > 0 && string(inputJSON) != "null" {
text += " input=" + string(inputJSON)
}
filtered = append(filtered, map[string]any{
"type": "text",
"text": text,
})
modifiedAny = true
case "tool_result":
// Convert tool_result to text so it stays consistent when tool_use is downgraded.
toolUseID, _ := block["tool_use_id"].(string)
isError, _ := block["is_error"].(bool)
content := block["content"]
contentJSON, _ := json.Marshal(content)
text := "(tool_result)"
if toolUseID != "" {
text += " tool_use_id=" + toolUseID
}
if isError {
text += " is_error=true"
}
if len(contentJSON) > 0 && string(contentJSON) != "null" {
text += "\n" + string(contentJSON)
}
filtered = append(filtered, map[string]any{
"type": "text",
"text": text,
})
modifiedAny = true
case "":
// Handle untyped block with "thinking" field
if thinkingText, hasThinking := block["thinking"].(string); hasThinking {
@@ -625,6 +789,14 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
continue
}
if len(filtered) == 0 {
// Keep request valid: upstream rejects empty content arrays.
filtered = append(filtered, map[string]any{
"type": "text",
"text": "(content removed)",
})
}
newRaw, err := json.Marshal(filtered)
if err != nil {
return changed, err
@@ -747,11 +919,18 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
break
}
defer func() { _ = resp.Body.Close() }()
defer func() {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
}()
// 处理错误响应
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
// 尽早关闭原始响应体,释放连接;后续逻辑仍可能需要读取 body因此用内存副本重新包装。
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
// 模型兜底:模型不存在且开启 fallback 时,自动用 fallback 模型重试一次
if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) &&
@@ -760,15 +939,13 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
if fallbackModel != "" && fallbackModel != mappedModel {
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name)
// 关闭原始响应释放连接respBody 已读取到内存)
_ = resp.Body.Close()
fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body)
if err == nil {
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped)
if err == nil {
fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency)
if err == nil && fallbackResp.StatusCode < 400 {
_ = resp.Body.Close()
resp = fallbackResp
} else if fallbackResp != nil {
_ = fallbackResp.Body.Close()

View File

@@ -0,0 +1,83 @@
package service
import (
"encoding/json"
"testing"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/stretchr/testify/require"
)
func TestStripSignatureSensitiveBlocksFromClaudeRequest(t *testing.T) {
req := &antigravity.ClaudeRequest{
Model: "claude-sonnet-4-5",
Thinking: &antigravity.ThinkingConfig{
Type: "enabled",
BudgetTokens: 1024,
},
Messages: []antigravity.ClaudeMessage{
{
Role: "assistant",
Content: json.RawMessage(`[
{"type":"thinking","thinking":"secret plan","signature":""},
{"type":"tool_use","id":"t1","name":"Bash","input":{"command":"ls"}}
]`),
},
{
Role: "user",
Content: json.RawMessage(`[
{"type":"tool_result","tool_use_id":"t1","content":"ok","is_error":false},
{"type":"redacted_thinking","data":"..."}
]`),
},
},
}
changed, err := stripSignatureSensitiveBlocksFromClaudeRequest(req)
require.NoError(t, err)
require.True(t, changed)
require.Nil(t, req.Thinking)
require.Len(t, req.Messages, 2)
var blocks0 []map[string]any
require.NoError(t, json.Unmarshal(req.Messages[0].Content, &blocks0))
require.Len(t, blocks0, 2)
require.Equal(t, "text", blocks0[0]["type"])
require.Equal(t, "secret plan", blocks0[0]["text"])
require.Equal(t, "text", blocks0[1]["type"])
var blocks1 []map[string]any
require.NoError(t, json.Unmarshal(req.Messages[1].Content, &blocks1))
require.Len(t, blocks1, 1)
require.Equal(t, "text", blocks1[0]["type"])
require.NotEmpty(t, blocks1[0]["text"])
}
func TestStripThinkingFromClaudeRequest_DoesNotDowngradeTools(t *testing.T) {
req := &antigravity.ClaudeRequest{
Model: "claude-sonnet-4-5",
Thinking: &antigravity.ThinkingConfig{
Type: "enabled",
BudgetTokens: 1024,
},
Messages: []antigravity.ClaudeMessage{
{
Role: "assistant",
Content: json.RawMessage(`[{"type":"thinking","thinking":"secret plan"},{"type":"tool_use","id":"t1","name":"Bash","input":{"command":"ls"}}]`),
},
},
}
changed, err := stripThinkingFromClaudeRequest(req)
require.NoError(t, err)
require.True(t, changed)
require.Nil(t, req.Thinking)
var blocks []map[string]any
require.NoError(t, json.Unmarshal(req.Messages[0].Content, &blocks))
require.Len(t, blocks, 2)
require.Equal(t, "text", blocks[0]["type"])
require.Equal(t, "secret plan", blocks[0]["text"])
require.Equal(t, "tool_use", blocks[1]["type"])
}

View File

@@ -84,25 +84,28 @@ func FilterThinkingBlocks(body []byte) []byte {
return filterThinkingBlocksInternal(body, false)
}
// FilterThinkingBlocksForRetry removes thinking blocks from HISTORICAL messages for retry scenarios.
// This is used when upstream returns signature-related 400 errors.
// FilterThinkingBlocksForRetry strips thinking-related constructs for retry scenarios.
//
// Key insight:
// - User's thinking.type = "enabled" should be PRESERVED (user's intent)
// - Only HISTORICAL assistant messages have thinking blocks with signatures
// - These signatures may be invalid when switching accounts/platforms
// - New responses will generate fresh thinking blocks without signature issues
// Why:
// - Upstreams may reject historical `thinking`/`redacted_thinking` blocks due to invalid/missing signatures.
// - Anthropic extended thinking has a structural constraint: when top-level `thinking` is enabled and the
// final message is an assistant prefill, the assistant content must start with a thinking block.
// - If we remove thinking blocks but keep top-level `thinking` enabled, we can trigger:
// "Expected `thinking` or `redacted_thinking`, but found `text`"
//
// Strategy:
// - Keep thinking.type = "enabled" (preserve user intent)
// - Remove thinking/redacted_thinking blocks from historical assistant messages
// - Ensure no message has empty content after filtering
// Strategy (B: preserve content as text):
// - Disable top-level `thinking` (remove `thinking` field).
// - Convert `thinking` blocks to `text` blocks (preserve the thinking content).
// - Remove `redacted_thinking` blocks (cannot be converted to text).
// - Ensure no message ends up with empty content.
func FilterThinkingBlocksForRetry(body []byte) []byte {
// Fast path: check for presence of thinking-related keys in messages
// Fast path: check for presence of thinking-related keys in messages or top-level thinking config.
if !bytes.Contains(body, []byte(`"type":"thinking"`)) &&
!bytes.Contains(body, []byte(`"type": "thinking"`)) &&
!bytes.Contains(body, []byte(`"type":"redacted_thinking"`)) &&
!bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) {
!bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) &&
!bytes.Contains(body, []byte(`"thinking":`)) &&
!bytes.Contains(body, []byte(`"thinking" :`)) {
return body
}
@@ -111,15 +114,19 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
return body
}
// DO NOT modify thinking.type - preserve user's intent to use thinking mode
// The issue is with historical message signatures, not the thinking mode itself
modified := false
messages, ok := req["messages"].([]any)
if !ok {
return body
}
modified := false
// Disable top-level thinking mode for retry to avoid structural/signature constraints upstream.
if _, exists := req["thinking"]; exists {
delete(req, "thinking")
modified = true
}
newMessages := make([]any, 0, len(messages))
for _, msg := range messages {
@@ -149,13 +156,42 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
blockType, _ := blockMap["type"].(string)
// Remove thinking/redacted_thinking blocks from historical messages
// These have signatures that may be invalid across different accounts
if blockType == "thinking" || blockType == "redacted_thinking" {
// Convert thinking blocks to text (preserve content) and drop redacted_thinking.
switch blockType {
case "thinking":
modifiedThisMsg = true
thinkingText, _ := blockMap["thinking"].(string)
if thinkingText == "" {
continue
}
newContent = append(newContent, map[string]any{
"type": "text",
"text": thinkingText,
})
continue
case "redacted_thinking":
modifiedThisMsg = true
continue
}
// Handle blocks without type discriminator but with a "thinking" field.
if blockType == "" {
if rawThinking, hasThinking := blockMap["thinking"]; hasThinking {
modifiedThisMsg = true
switch v := rawThinking.(type) {
case string:
if v != "" {
newContent = append(newContent, map[string]any{"type": "text", "text": v})
}
default:
if b, err := json.Marshal(v); err == nil && len(b) > 0 {
newContent = append(newContent, map[string]any{"type": "text", "text": string(b)})
}
}
continue
}
}
newContent = append(newContent, block)
}
@@ -163,18 +199,15 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
modified = true
// Handle empty content after filtering
if len(newContent) == 0 {
// For assistant messages, skip entirely (remove from conversation)
// For user messages, add placeholder to avoid empty content error
if role == "user" {
newContent = append(newContent, map[string]any{
"type": "text",
"text": "(content removed)",
})
msgMap["content"] = newContent
newMessages = append(newMessages, msgMap)
// Always add a placeholder to avoid upstream "non-empty content" errors.
placeholder := "(content removed)"
if role == "assistant" {
placeholder = "(assistant content removed)"
}
// Skip assistant messages with empty content (don't append)
continue
newContent = append(newContent, map[string]any{
"type": "text",
"text": placeholder,
})
}
msgMap["content"] = newContent
}
@@ -183,6 +216,9 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
if modified {
req["messages"] = newMessages
} else {
// Avoid rewriting JSON when no changes are needed.
return body
}
newBody, err := json.Marshal(req)
@@ -192,6 +228,172 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
return newBody
}
// FilterSignatureSensitiveBlocksForRetry is a stronger retry filter for cases where upstream errors indicate
// signature/thought_signature validation issues involving tool blocks.
//
// This performs everything in FilterThinkingBlocksForRetry, plus:
// - Convert `tool_use` blocks to text (name/id/input) so we stop sending structured tool calls.
// - Convert `tool_result` blocks to text so we keep tool results visible without tool semantics.
//
// Use this only when needed: converting tool blocks to text changes model behaviour and can increase the
// risk of prompt injection (tool output becomes plain conversation text).
func FilterSignatureSensitiveBlocksForRetry(body []byte) []byte {
// Fast path: only run when we see likely relevant constructs.
if !bytes.Contains(body, []byte(`"type":"thinking"`)) &&
!bytes.Contains(body, []byte(`"type": "thinking"`)) &&
!bytes.Contains(body, []byte(`"type":"redacted_thinking"`)) &&
!bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) &&
!bytes.Contains(body, []byte(`"type":"tool_use"`)) &&
!bytes.Contains(body, []byte(`"type": "tool_use"`)) &&
!bytes.Contains(body, []byte(`"type":"tool_result"`)) &&
!bytes.Contains(body, []byte(`"type": "tool_result"`)) &&
!bytes.Contains(body, []byte(`"thinking":`)) &&
!bytes.Contains(body, []byte(`"thinking" :`)) {
return body
}
var req map[string]any
if err := json.Unmarshal(body, &req); err != nil {
return body
}
modified := false
// Disable top-level thinking for retry to avoid structural/signature constraints upstream.
if _, exists := req["thinking"]; exists {
delete(req, "thinking")
modified = true
}
messages, ok := req["messages"].([]any)
if !ok {
return body
}
newMessages := make([]any, 0, len(messages))
for _, msg := range messages {
msgMap, ok := msg.(map[string]any)
if !ok {
newMessages = append(newMessages, msg)
continue
}
role, _ := msgMap["role"].(string)
content, ok := msgMap["content"].([]any)
if !ok {
newMessages = append(newMessages, msg)
continue
}
newContent := make([]any, 0, len(content))
modifiedThisMsg := false
for _, block := range content {
blockMap, ok := block.(map[string]any)
if !ok {
newContent = append(newContent, block)
continue
}
blockType, _ := blockMap["type"].(string)
switch blockType {
case "thinking":
modifiedThisMsg = true
thinkingText, _ := blockMap["thinking"].(string)
if thinkingText == "" {
continue
}
newContent = append(newContent, map[string]any{"type": "text", "text": thinkingText})
continue
case "redacted_thinking":
modifiedThisMsg = true
continue
case "tool_use":
modifiedThisMsg = true
name, _ := blockMap["name"].(string)
id, _ := blockMap["id"].(string)
input := blockMap["input"]
inputJSON, _ := json.Marshal(input)
text := "(tool_use)"
if name != "" {
text += " name=" + name
}
if id != "" {
text += " id=" + id
}
if len(inputJSON) > 0 && string(inputJSON) != "null" {
text += " input=" + string(inputJSON)
}
newContent = append(newContent, map[string]any{"type": "text", "text": text})
continue
case "tool_result":
modifiedThisMsg = true
toolUseID, _ := blockMap["tool_use_id"].(string)
isError, _ := blockMap["is_error"].(bool)
content := blockMap["content"]
contentJSON, _ := json.Marshal(content)
text := "(tool_result)"
if toolUseID != "" {
text += " tool_use_id=" + toolUseID
}
if isError {
text += " is_error=true"
}
if len(contentJSON) > 0 && string(contentJSON) != "null" {
text += "\n" + string(contentJSON)
}
newContent = append(newContent, map[string]any{"type": "text", "text": text})
continue
}
if blockType == "" {
if rawThinking, hasThinking := blockMap["thinking"]; hasThinking {
modifiedThisMsg = true
switch v := rawThinking.(type) {
case string:
if v != "" {
newContent = append(newContent, map[string]any{"type": "text", "text": v})
}
default:
if b, err := json.Marshal(v); err == nil && len(b) > 0 {
newContent = append(newContent, map[string]any{"type": "text", "text": string(b)})
}
}
continue
}
}
newContent = append(newContent, block)
}
if modifiedThisMsg {
modified = true
if len(newContent) == 0 {
placeholder := "(content removed)"
if role == "assistant" {
placeholder = "(assistant content removed)"
}
newContent = append(newContent, map[string]any{"type": "text", "text": placeholder})
}
msgMap["content"] = newContent
}
newMessages = append(newMessages, msgMap)
}
if !modified {
return body
}
req["messages"] = newMessages
newBody, err := json.Marshal(req)
if err != nil {
return body
}
return newBody
}
// filterThinkingBlocksInternal removes invalid thinking blocks from request
// Strategy:
// - When thinking.type != "enabled": Remove all thinking blocks

View File

@@ -151,3 +151,125 @@ func TestFilterThinkingBlocks(t *testing.T) {
})
}
}
func TestFilterThinkingBlocksForRetry_DisablesThinkingAndPreservesAsText(t *testing.T) {
input := []byte(`{
"model":"claude-3-5-sonnet-20241022",
"thinking":{"type":"enabled","budget_tokens":1024},
"messages":[
{"role":"user","content":[{"type":"text","text":"Hi"}]},
{"role":"assistant","content":[
{"type":"thinking","thinking":"Let me think...","signature":"bad_sig"},
{"type":"text","text":"Answer"}
]}
]
}`)
out := FilterThinkingBlocksForRetry(input)
var req map[string]any
require.NoError(t, json.Unmarshal(out, &req))
_, hasThinking := req["thinking"]
require.False(t, hasThinking)
msgs, ok := req["messages"].([]any)
require.True(t, ok)
require.Len(t, msgs, 2)
assistant := msgs[1].(map[string]any)
content := assistant["content"].([]any)
require.Len(t, content, 2)
first := content[0].(map[string]any)
require.Equal(t, "text", first["type"])
require.Equal(t, "Let me think...", first["text"])
}
func TestFilterThinkingBlocksForRetry_DisablesThinkingEvenWithoutThinkingBlocks(t *testing.T) {
input := []byte(`{
"model":"claude-3-5-sonnet-20241022",
"thinking":{"type":"enabled","budget_tokens":1024},
"messages":[
{"role":"user","content":[{"type":"text","text":"Hi"}]},
{"role":"assistant","content":[{"type":"text","text":"Prefill"}]}
]
}`)
out := FilterThinkingBlocksForRetry(input)
var req map[string]any
require.NoError(t, json.Unmarshal(out, &req))
_, hasThinking := req["thinking"]
require.False(t, hasThinking)
}
func TestFilterThinkingBlocksForRetry_RemovesRedactedThinkingAndKeepsValidContent(t *testing.T) {
input := []byte(`{
"thinking":{"type":"enabled","budget_tokens":1024},
"messages":[
{"role":"assistant","content":[
{"type":"redacted_thinking","data":"..."},
{"type":"text","text":"Visible"}
]}
]
}`)
out := FilterThinkingBlocksForRetry(input)
var req map[string]any
require.NoError(t, json.Unmarshal(out, &req))
_, hasThinking := req["thinking"]
require.False(t, hasThinking)
msgs := req["messages"].([]any)
content := msgs[0].(map[string]any)["content"].([]any)
require.Len(t, content, 1)
require.Equal(t, "text", content[0].(map[string]any)["type"])
require.Equal(t, "Visible", content[0].(map[string]any)["text"])
}
func TestFilterThinkingBlocksForRetry_EmptyContentGetsPlaceholder(t *testing.T) {
input := []byte(`{
"thinking":{"type":"enabled"},
"messages":[
{"role":"assistant","content":[{"type":"redacted_thinking","data":"..."}]}
]
}`)
out := FilterThinkingBlocksForRetry(input)
var req map[string]any
require.NoError(t, json.Unmarshal(out, &req))
msgs := req["messages"].([]any)
content := msgs[0].(map[string]any)["content"].([]any)
require.Len(t, content, 1)
require.Equal(t, "text", content[0].(map[string]any)["type"])
require.NotEmpty(t, content[0].(map[string]any)["text"])
}
func TestFilterSignatureSensitiveBlocksForRetry_DowngradesTools(t *testing.T) {
input := []byte(`{
"thinking":{"type":"enabled","budget_tokens":1024},
"messages":[
{"role":"assistant","content":[
{"type":"tool_use","id":"t1","name":"Bash","input":{"command":"ls"}},
{"type":"tool_result","tool_use_id":"t1","content":"ok","is_error":false}
]}
]
}`)
out := FilterSignatureSensitiveBlocksForRetry(input)
var req map[string]any
require.NoError(t, json.Unmarshal(out, &req))
_, hasThinking := req["thinking"]
require.False(t, hasThinking)
msgs := req["messages"].([]any)
content := msgs[0].(map[string]any)["content"].([]any)
require.Len(t, content, 2)
require.Equal(t, "text", content[0].(map[string]any)["type"])
require.Equal(t, "text", content[1].(map[string]any)["type"])
require.Contains(t, content[0].(map[string]any)["text"], "tool_use")
require.Contains(t, content[1].(map[string]any)["text"], "tool_result")
}

View File

@@ -1131,46 +1131,90 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// 优先检测thinking block签名错误400并重试一次
if resp.StatusCode == 400 {
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
if readErr == nil {
_ = resp.Body.Close()
if readErr == nil {
_ = resp.Body.Close()
if s.isThinkingBlockSignatureError(respBody) {
// 避免在重试预算已耗尽时再发起额外请求
if time.Since(retryStart) >= maxRetryElapsed {
resp.Body = io.NopCloser(bytes.NewReader(respBody))
break
if s.isThinkingBlockSignatureError(respBody) {
looksLikeToolSignatureError := func(msg string) bool {
m := strings.ToLower(msg)
return strings.Contains(m, "tool_use") ||
strings.Contains(m, "tool_result") ||
strings.Contains(m, "functioncall") ||
strings.Contains(m, "function_call") ||
strings.Contains(m, "functionresponse") ||
strings.Contains(m, "function_response")
}
// 避免在重试预算已耗尽时再发起额外请求
if time.Since(retryStart) >= maxRetryElapsed {
resp.Body = io.NopCloser(bytes.NewReader(respBody))
break
}
log.Printf("Account %d: detected thinking block signature error, retrying with filtered thinking blocks", account.ID)
// 过滤thinking blocks并重试使用更激进的过滤
filteredBody := FilterThinkingBlocksForRetry(body)
retryReq, buildErr := s.buildUpstreamRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
if buildErr == nil {
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if retryErr == nil {
// 使用重试后的响应,继续后续处理
if retryResp.StatusCode < 400 {
log.Printf("Account %d: signature error retry succeeded", account.ID)
} else {
log.Printf("Account %d: signature error retry returned status %d", account.ID, retryResp.StatusCode)
// Conservative two-stage fallback:
// 1) Disable thinking + thinking->text (preserve content)
// 2) Only if upstream still errors AND error message points to tool/function signature issues:
// also downgrade tool_use/tool_result blocks to text.
filteredBody := FilterThinkingBlocksForRetry(body)
retryReq, buildErr := s.buildUpstreamRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
if buildErr == nil {
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if retryErr == nil {
if retryResp.StatusCode < 400 {
log.Printf("Account %d: signature error retry succeeded (thinking downgraded)", account.ID)
resp = retryResp
break
}
retryRespBody, retryReadErr := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
if retryReadErr == nil && retryResp.StatusCode == 400 && s.isThinkingBlockSignatureError(retryRespBody) {
msg2 := extractUpstreamErrorMessage(retryRespBody)
if looksLikeToolSignatureError(msg2) && time.Since(retryStart) < maxRetryElapsed {
log.Printf("Account %d: signature retry still failing and looks tool-related, retrying with tool blocks downgraded", account.ID)
filteredBody2 := FilterSignatureSensitiveBlocksForRetry(body)
retryReq2, buildErr2 := s.buildUpstreamRequest(ctx, c, account, filteredBody2, token, tokenType, reqModel)
if buildErr2 == nil {
retryResp2, retryErr2 := s.httpUpstream.Do(retryReq2, proxyURL, account.ID, account.Concurrency)
if retryErr2 == nil {
resp = retryResp2
break
}
if retryResp2 != nil && retryResp2.Body != nil {
_ = retryResp2.Body.Close()
}
log.Printf("Account %d: tool-downgrade signature retry failed: %v", account.ID, retryErr2)
} else {
log.Printf("Account %d: tool-downgrade signature retry build failed: %v", account.ID, buildErr2)
}
}
}
// Fall back to the original retry response context.
resp = &http.Response{
StatusCode: retryResp.StatusCode,
Header: retryResp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(retryRespBody)),
}
break
}
resp = retryResp
break
if retryResp != nil && retryResp.Body != nil {
_ = retryResp.Body.Close()
}
log.Printf("Account %d: signature error retry failed: %v", account.ID, retryErr)
} else {
log.Printf("Account %d: signature error retry build request failed: %v", account.ID, buildErr)
}
if retryResp != nil && retryResp.Body != nil {
_ = retryResp.Body.Close()
}
log.Printf("Account %d: signature error retry failed: %v", account.ID, retryErr)
} else {
log.Printf("Account %d: signature error retry build request failed: %v", account.ID, buildErr)
// Retry failed: restore original response body and continue handling.
resp.Body = io.NopCloser(bytes.NewReader(respBody))
break
}
// 重试失败,恢复原始响应体继续处理
// 不是thinking签名错误,恢复响应体
resp.Body = io.NopCloser(bytes.NewReader(respBody))
break
}
// 不是thinking签名错误恢复响应体
resp.Body = io.NopCloser(bytes.NewReader(respBody))
}
}
// 检查是否需要通用重试排除400因为400已经在上面特殊处理过了
@@ -2037,7 +2081,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
if resp.StatusCode == 400 && s.isThinkingBlockSignatureError(respBody) {
log.Printf("Account %d: detected thinking block signature error on count_tokens, retrying with filtered thinking blocks", account.ID)
filteredBody := FilterThinkingBlocks(body)
filteredBody := FilterThinkingBlocksForRetry(body)
retryReq, buildErr := s.buildCountTokensRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
if buildErr == nil {
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)

View File

@@ -359,6 +359,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", err.Error())
}
originalClaudeBody := body
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
@@ -479,6 +480,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
}
var resp *http.Response
signatureRetryStage := 0
for attempt := 1; attempt <= geminiMaxRetries; attempt++ {
upstreamReq, idHeader, err := buildReq(ctx)
if err != nil {
@@ -503,6 +505,46 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries: "+sanitizeUpstreamErrorMessage(err.Error()))
}
// Special-case: signature/thought_signature validation errors are not transient, but may be fixed by
// downgrading Claude thinking/tool history to plain text (conservative two-stage retry).
if resp.StatusCode == http.StatusBadRequest && signatureRetryStage < 2 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
if isGeminiSignatureRelatedError(respBody) {
var strippedClaudeBody []byte
stageName := ""
switch signatureRetryStage {
case 0:
// Stage 1: disable thinking + thinking->text
strippedClaudeBody = FilterThinkingBlocksForRetry(originalClaudeBody)
stageName = "thinking-only"
signatureRetryStage = 1
default:
// Stage 2: additionally downgrade tool_use/tool_result blocks to text
strippedClaudeBody = FilterSignatureSensitiveBlocksForRetry(originalClaudeBody)
stageName = "thinking+tools"
signatureRetryStage = 2
}
retryGeminiReq, txErr := convertClaudeMessagesToGeminiGenerateContent(strippedClaudeBody)
if txErr == nil {
log.Printf("Gemini account %d: detected signature-related 400, retrying with downgraded Claude blocks (%s)", account.ID, stageName)
geminiReq = retryGeminiReq
// Consume one retry budget attempt and continue with the updated request payload.
sleepGeminiBackoff(1)
continue
}
}
// Restore body for downstream error handling.
resp = &http.Response{
StatusCode: http.StatusBadRequest,
Header: resp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(respBody)),
}
break
}
if resp.StatusCode >= 400 && s.shouldRetryGeminiUpstreamError(account, resp.StatusCode) {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
@@ -600,6 +642,14 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
}, nil
}
func isGeminiSignatureRelatedError(respBody []byte) bool {
msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
if msg == "" {
msg = strings.ToLower(string(respBody))
}
return strings.Contains(msg, "thought_signature") || strings.Contains(msg, "signature")
}
func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte) (*ForwardResult, error) {
startTime := time.Now()