fix(gateway): 优化 thinking block 重试逻辑
- 保留用户的 thinking.type=enabled 设置(不再禁用) - 只移除历史消息中的 thinking/redacted_thinking blocks - 处理过滤后空消息:跳过 assistant 消息,user 消息添加占位符 - 增强错误检测:覆盖 signature、Expected thinking、empty content 错误 - 添加重试成功/失败日志便于排查
This commit is contained in:
@@ -84,17 +84,25 @@ func FilterThinkingBlocks(body []byte) []byte {
|
|||||||
return filterThinkingBlocksInternal(body, false)
|
return filterThinkingBlocksInternal(body, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterThinkingBlocksForRetry converts thinking blocks to text blocks for retry scenarios
|
// FilterThinkingBlocksForRetry removes thinking blocks from HISTORICAL messages for retry scenarios.
|
||||||
// This preserves the thinking content while avoiding signature validation errors.
|
// This is used when upstream returns signature-related 400 errors.
|
||||||
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
|
//
|
||||||
|
// Key insight:
|
||||||
|
// - User's thinking.type = "enabled" should be PRESERVED (user's intent)
|
||||||
|
// - Only HISTORICAL assistant messages have thinking blocks with signatures
|
||||||
|
// - These signatures may be invalid when switching accounts/platforms
|
||||||
|
// - New responses will generate fresh thinking blocks without signature issues
|
||||||
|
//
|
||||||
|
// Strategy:
|
||||||
|
// - Keep thinking.type = "enabled" (preserve user intent)
|
||||||
|
// - Remove thinking/redacted_thinking blocks from historical assistant messages
|
||||||
|
// - Ensure no message has empty content after filtering
|
||||||
func FilterThinkingBlocksForRetry(body []byte) []byte {
|
func FilterThinkingBlocksForRetry(body []byte) []byte {
|
||||||
// Fast path: check for presence of thinking-related keys
|
// Fast path: check for presence of thinking-related keys in messages
|
||||||
if !bytes.Contains(body, []byte(`"type":"thinking"`)) &&
|
if !bytes.Contains(body, []byte(`"type":"thinking"`)) &&
|
||||||
!bytes.Contains(body, []byte(`"type": "thinking"`)) &&
|
!bytes.Contains(body, []byte(`"type": "thinking"`)) &&
|
||||||
!bytes.Contains(body, []byte(`"type":"redacted_thinking"`)) &&
|
!bytes.Contains(body, []byte(`"type":"redacted_thinking"`)) &&
|
||||||
!bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) &&
|
!bytes.Contains(body, []byte(`"type": "redacted_thinking"`)) {
|
||||||
!bytes.Contains(body, []byte(`"thinking":`)) &&
|
|
||||||
!bytes.Contains(body, []byte(`"thinking" :`)) {
|
|
||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,20 +111,29 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
|
|||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DO NOT modify thinking.type - preserve user's intent to use thinking mode
|
||||||
|
// The issue is with historical message signatures, not the thinking mode itself
|
||||||
|
|
||||||
messages, ok := req["messages"].([]any)
|
messages, ok := req["messages"].([]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
modified := false
|
modified := false
|
||||||
|
newMessages := make([]any, 0, len(messages))
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
msgMap, ok := msg.(map[string]any)
|
msgMap, ok := msg.(map[string]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
newMessages = append(newMessages, msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
role, _ := msgMap["role"].(string)
|
||||||
content, ok := msgMap["content"].([]any)
|
content, ok := msgMap["content"].([]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// String content or other format - keep as is
|
||||||
|
newMessages = append(newMessages, msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,50 +149,40 @@ func FilterThinkingBlocksForRetry(body []byte) []byte {
|
|||||||
|
|
||||||
blockType, _ := blockMap["type"].(string)
|
blockType, _ := blockMap["type"].(string)
|
||||||
|
|
||||||
// Case 1: Standard thinking block - convert to text
|
// Remove thinking/redacted_thinking blocks from historical messages
|
||||||
if blockType == "thinking" {
|
// These have signatures that may be invalid across different accounts
|
||||||
thinkingText, _ := blockMap["thinking"].(string)
|
if blockType == "thinking" || blockType == "redacted_thinking" {
|
||||||
if thinkingText != "" {
|
|
||||||
newContent = append(newContent, map[string]any{
|
|
||||||
"type": "text",
|
|
||||||
"text": thinkingText,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
modifiedThisMsg = true
|
modifiedThisMsg = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 2: Redacted thinking block - remove (cannot convert encrypted content)
|
|
||||||
if blockType == "redacted_thinking" {
|
|
||||||
modifiedThisMsg = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Case 3: Untyped block with "thinking" field - convert to text
|
|
||||||
if blockType == "" {
|
|
||||||
if thinkingText, hasThinking := blockMap["thinking"].(string); hasThinking {
|
|
||||||
if thinkingText != "" {
|
|
||||||
newContent = append(newContent, map[string]any{
|
|
||||||
"type": "text",
|
|
||||||
"text": thinkingText,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
modifiedThisMsg = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
newContent = append(newContent, block)
|
newContent = append(newContent, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
if modifiedThisMsg {
|
if modifiedThisMsg {
|
||||||
msgMap["content"] = newContent
|
|
||||||
modified = true
|
modified = true
|
||||||
|
// Handle empty content after filtering
|
||||||
|
if len(newContent) == 0 {
|
||||||
|
// For assistant messages, skip entirely (remove from conversation)
|
||||||
|
// For user messages, add placeholder to avoid empty content error
|
||||||
|
if role == "user" {
|
||||||
|
newContent = append(newContent, map[string]any{
|
||||||
|
"type": "text",
|
||||||
|
"text": "(content removed)",
|
||||||
|
})
|
||||||
|
msgMap["content"] = newContent
|
||||||
|
newMessages = append(newMessages, msgMap)
|
||||||
|
}
|
||||||
|
// Skip assistant messages with empty content (don't append)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
msgMap["content"] = newContent
|
||||||
}
|
}
|
||||||
|
newMessages = append(newMessages, msgMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !modified {
|
if modified {
|
||||||
return body
|
req["messages"] = newMessages
|
||||||
}
|
}
|
||||||
|
|
||||||
newBody, err := json.Marshal(req)
|
newBody, err := json.Marshal(req)
|
||||||
|
|||||||
@@ -1029,9 +1029,17 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|||||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||||
if retryErr == nil {
|
if retryErr == nil {
|
||||||
// 使用重试后的响应,继续后续处理
|
// 使用重试后的响应,继续后续处理
|
||||||
|
if retryResp.StatusCode < 400 {
|
||||||
|
log.Printf("Account %d: signature error retry succeeded", account.ID)
|
||||||
|
} else {
|
||||||
|
log.Printf("Account %d: signature error retry returned status %d", account.ID, retryResp.StatusCode)
|
||||||
|
}
|
||||||
resp = retryResp
|
resp = retryResp
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
log.Printf("Account %d: signature error retry failed: %v", account.ID, retryErr)
|
||||||
|
} else {
|
||||||
|
log.Printf("Account %d: signature error retry build request failed: %v", account.ID, buildErr)
|
||||||
}
|
}
|
||||||
// 重试失败,恢复原始响应体继续处理
|
// 重试失败,恢复原始响应体继续处理
|
||||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||||
@@ -1295,7 +1303,7 @@ func truncateForLog(b []byte, maxBytes int) string {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// isThinkingBlockSignatureError 检测是否是thinking block签名错误
|
// isThinkingBlockSignatureError 检测是否是thinking block相关错误
|
||||||
// 这类错误可以通过过滤thinking blocks并重试来解决
|
// 这类错误可以通过过滤thinking blocks并重试来解决
|
||||||
func (s *GatewayService) isThinkingBlockSignatureError(respBody []byte) bool {
|
func (s *GatewayService) isThinkingBlockSignatureError(respBody []byte) bool {
|
||||||
msg := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody)))
|
msg := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody)))
|
||||||
@@ -1303,9 +1311,27 @@ func (s *GatewayService) isThinkingBlockSignatureError(respBody []byte) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log for debugging
|
||||||
|
log.Printf("[SignatureCheck] Checking error message: %s", msg)
|
||||||
|
|
||||||
// 检测signature相关的错误(更宽松的匹配)
|
// 检测signature相关的错误(更宽松的匹配)
|
||||||
// 例如: "Invalid `signature` in `thinking` block", "***.signature" 等
|
// 例如: "Invalid `signature` in `thinking` block", "***.signature" 等
|
||||||
if strings.Contains(msg, "signature") {
|
if strings.Contains(msg, "signature") {
|
||||||
|
log.Printf("[SignatureCheck] Detected signature error")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检测 thinking block 顺序/类型错误
|
||||||
|
// 例如: "Expected `thinking` or `redacted_thinking`, but found `text`"
|
||||||
|
if strings.Contains(msg, "expected") && (strings.Contains(msg, "thinking") || strings.Contains(msg, "redacted_thinking")) {
|
||||||
|
log.Printf("[SignatureCheck] Detected thinking block type error")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检测空消息内容错误(可能是过滤 thinking blocks 后导致的)
|
||||||
|
// 例如: "all messages must have non-empty content"
|
||||||
|
if strings.Contains(msg, "non-empty content") || strings.Contains(msg, "empty content") {
|
||||||
|
log.Printf("[SignatureCheck] Detected empty content error")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user