feat(antigravity): 增强网关功能和 thinking 块处理
主要改进: - 优化 thinking blocks 过滤策略,支持 Auto 模式降级 - 将无效 thinking block 内容转为普通 text - 保留单个空白 text block,不过滤 - 重构配额刷新机制,统一与 Claude 一致 - 支持 cachedContentTokenCount 映射到 cache_read_input_tokens - Haiku 模型映射到 Sonnet - 添加 /antigravity/v1/models 端点支持 - countTokens 端点直接返回空值
This commit is contained in:
@@ -49,11 +49,11 @@ var antigravityPrefixMapping = []struct {
|
||||
{"gemini-3-pro-image", "gemini-3-pro-image"}, // gemini-3-pro-image-preview 等
|
||||
{"claude-3-5-sonnet", "claude-sonnet-4-5"}, // 旧版 claude-3-5-sonnet-xxx
|
||||
{"claude-sonnet-4-5", "claude-sonnet-4-5"}, // claude-sonnet-4-5-xxx
|
||||
{"claude-haiku-4-5", "gemini-3-flash"}, // claude-haiku-4-5-xxx
|
||||
{"claude-haiku-4-5", "claude-sonnet-4-5"}, // claude-haiku-4-5-xxx → sonnet
|
||||
{"claude-opus-4-5", "claude-opus-4-5-thinking"},
|
||||
{"claude-3-haiku", "gemini-3-flash"}, // 旧版 claude-3-haiku-xxx
|
||||
{"claude-3-haiku", "claude-sonnet-4-5"}, // 旧版 claude-3-haiku-xxx → sonnet
|
||||
{"claude-sonnet-4", "claude-sonnet-4-5"},
|
||||
{"claude-haiku-4", "gemini-3-flash"},
|
||||
{"claude-haiku-4", "claude-sonnet-4-5"}, // → sonnet
|
||||
{"claude-opus-4", "claude-opus-4-5-thinking"},
|
||||
{"gemini-3-pro", "gemini-3-pro-high"}, // gemini-3-pro, gemini-3-pro-preview 等
|
||||
}
|
||||
@@ -64,6 +64,7 @@ type AntigravityGatewayService struct {
|
||||
tokenProvider *AntigravityTokenProvider
|
||||
rateLimitService *RateLimitService
|
||||
httpUpstream HTTPUpstream
|
||||
settingService *SettingService
|
||||
}
|
||||
|
||||
func NewAntigravityGatewayService(
|
||||
@@ -72,12 +73,14 @@ func NewAntigravityGatewayService(
|
||||
tokenProvider *AntigravityTokenProvider,
|
||||
rateLimitService *RateLimitService,
|
||||
httpUpstream HTTPUpstream,
|
||||
settingService *SettingService,
|
||||
) *AntigravityGatewayService {
|
||||
return &AntigravityGatewayService{
|
||||
accountRepo: accountRepo,
|
||||
tokenProvider: tokenProvider,
|
||||
rateLimitService: rateLimitService,
|
||||
httpUpstream: httpUpstream,
|
||||
settingService: settingService,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,6 +311,7 @@ func (s *AntigravityGatewayService) unwrapV1InternalResponse(body []byte) ([]byt
|
||||
}
|
||||
|
||||
// isSignatureRelatedError 检测是否为 signature 相关的 400 错误
|
||||
// 注意:不包含 "thinking" 关键词,避免误判消息格式错误为 signature 错误
|
||||
func isSignatureRelatedError(statusCode int, body []byte) bool {
|
||||
if statusCode != 400 {
|
||||
return false
|
||||
@@ -318,7 +322,6 @@ func isSignatureRelatedError(statusCode int, body []byte) bool {
|
||||
"signature",
|
||||
"thought_signature",
|
||||
"thoughtsignature",
|
||||
"thinking",
|
||||
"invalid signature",
|
||||
"signature validation",
|
||||
}
|
||||
@@ -331,28 +334,60 @@ func isSignatureRelatedError(statusCode int, body []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// stripThinkingFromClaudeRequest 从 Claude 请求中移除所有 thinking 相关内容
|
||||
// isModelNotFoundError 检测是否为模型不存在的 404 错误
|
||||
func isModelNotFoundError(statusCode int, body []byte) bool {
|
||||
if statusCode != 404 {
|
||||
return false
|
||||
}
|
||||
|
||||
bodyStr := strings.ToLower(string(body))
|
||||
keywords := []string{
|
||||
"model not found",
|
||||
"model does not exist",
|
||||
"unknown model",
|
||||
"invalid model",
|
||||
}
|
||||
|
||||
for _, keyword := range keywords {
|
||||
if strings.Contains(bodyStr, keyword) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// stripThinkingFromClaudeRequest 从 Claude 请求中移除有问题的 thinking 块
|
||||
// 策略:只移除历史消息中带 dummy signature 的 thinking 块,保留本次 thinking 配置
|
||||
// 这样可以让本次对话仍然使用 thinking 功能,只是清理历史中可能导致问题的内容
|
||||
func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) *antigravity.ClaudeRequest {
|
||||
// 创建副本
|
||||
stripped := *req
|
||||
|
||||
// 移除 thinking 配置
|
||||
stripped.Thinking = nil
|
||||
// 保留 thinking 配置,让本次对话仍然可以使用 thinking
|
||||
// stripped.Thinking = nil // 不再移除
|
||||
|
||||
// 移除消息中的 thinking 块
|
||||
// 只移除消息中带 dummy signature 的 thinking 块
|
||||
if len(stripped.Messages) > 0 {
|
||||
newMessages := make([]antigravity.ClaudeMessage, 0, len(stripped.Messages))
|
||||
for _, msg := range stripped.Messages {
|
||||
newMsg := msg
|
||||
|
||||
// 如果 content 是数组,过滤 thinking 块
|
||||
// 如果 content 是数组,过滤有问题的 thinking 块
|
||||
var blocks []map[string]any
|
||||
if err := json.Unmarshal(msg.Content, &blocks); err == nil {
|
||||
filtered := make([]map[string]any, 0, len(blocks))
|
||||
for _, block := range blocks {
|
||||
// 跳过有 type="thinking" 的块
|
||||
// 跳过带 dummy signature 的 thinking 块
|
||||
if blockType, ok := block["type"].(string); ok && blockType == "thinking" {
|
||||
continue
|
||||
if sig, ok := block["signature"].(string); ok {
|
||||
// 移除 dummy signature 的 thinking 块
|
||||
if sig == "skip_thought_signature_validator" || sig == "" {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// 没有 signature 字段的 thinking 块也移除
|
||||
continue
|
||||
}
|
||||
}
|
||||
// 跳过没有 type 但有 thinking 字段的块(untyped thinking blocks)
|
||||
if _, hasType := block["type"]; !hasType {
|
||||
@@ -390,9 +425,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
|
||||
originalModel := claudeReq.Model
|
||||
mappedModel := s.getMappedModel(account, claudeReq.Model)
|
||||
if mappedModel != claudeReq.Model {
|
||||
log.Printf("Antigravity model mapping: %s -> %s (account: %s)", claudeReq.Model, mappedModel, account.Name)
|
||||
}
|
||||
|
||||
// 获取 access_token
|
||||
if s.tokenProvider == nil {
|
||||
@@ -418,15 +450,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
return nil, fmt.Errorf("transform request: %w", err)
|
||||
}
|
||||
|
||||
// 调试:记录转换后的请求体(仅记录前 2000 字符)
|
||||
if bodyJSON, err := json.Marshal(geminiBody); err == nil {
|
||||
truncated := string(bodyJSON)
|
||||
if len(truncated) > 2000 {
|
||||
truncated = truncated[:2000] + "..."
|
||||
}
|
||||
log.Printf("[Debug] Transformed Gemini request: %s", truncated)
|
||||
}
|
||||
|
||||
// 构建上游 action
|
||||
action := "generateContent"
|
||||
if claudeReq.Stream {
|
||||
@@ -495,7 +518,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
if err != nil {
|
||||
log.Printf("[Antigravity] Failed to transform stripped request: %v", err)
|
||||
// 降级失败,返回原始错误
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||
}
|
||||
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
|
||||
@@ -505,7 +528,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
retryReq, err := antigravity.NewAPIRequest(ctx, action, accessToken, strippedBody)
|
||||
if err != nil {
|
||||
log.Printf("[Antigravity] Failed to create retry request: %v", err)
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||
}
|
||||
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
|
||||
@@ -514,7 +537,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
retryResp, err := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
log.Printf("[Antigravity] Retry request failed: %v", err)
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||
}
|
||||
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
|
||||
@@ -531,7 +554,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
log.Printf("[Antigravity] Retry also failed with status %d: %s", retryResp.StatusCode, string(retryRespBody))
|
||||
s.handleUpstreamError(ctx, account, retryResp.StatusCode, retryResp.Header, retryRespBody)
|
||||
|
||||
if s.shouldFailoverUpstreamError(retryResp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, retryResp.StatusCode, retryRespBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: retryResp.StatusCode}
|
||||
}
|
||||
return nil, s.writeMappedClaudeError(c, retryResp.StatusCode, retryRespBody)
|
||||
@@ -540,7 +563,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
|
||||
// 不是 signature 错误,或者已经没有 thinking 块,直接返回错误
|
||||
if resp.StatusCode >= 400 {
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||
}
|
||||
|
||||
@@ -594,8 +617,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
}
|
||||
|
||||
switch action {
|
||||
case "generateContent", "streamGenerateContent", "countTokens":
|
||||
case "generateContent", "streamGenerateContent":
|
||||
// ok
|
||||
case "countTokens":
|
||||
return nil, s.writeGoogleError(c, http.StatusNotImplemented, "countTokens is not supported")
|
||||
default:
|
||||
return nil, s.writeGoogleError(c, http.StatusNotFound, "Unsupported action: "+action)
|
||||
}
|
||||
@@ -650,18 +675,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
sleepAntigravityBackoff(attempt)
|
||||
continue
|
||||
}
|
||||
if action == "countTokens" {
|
||||
estimated := estimateGeminiCountTokens(body)
|
||||
c.JSON(http.StatusOK, map[string]any{"totalTokens": estimated})
|
||||
return &ForwardResult{
|
||||
RequestID: "",
|
||||
Usage: ClaudeUsage{},
|
||||
Model: originalModel,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: nil,
|
||||
}, nil
|
||||
}
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
||||
}
|
||||
|
||||
@@ -678,18 +691,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
if resp.StatusCode == 429 {
|
||||
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||||
}
|
||||
if action == "countTokens" {
|
||||
estimated := estimateGeminiCountTokens(body)
|
||||
c.JSON(http.StatusOK, map[string]any{"totalTokens": estimated})
|
||||
return &ForwardResult{
|
||||
RequestID: "",
|
||||
Usage: ClaudeUsage{},
|
||||
Model: originalModel,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: nil,
|
||||
}, nil
|
||||
}
|
||||
resp = &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
@@ -712,20 +713,42 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||||
|
||||
if action == "countTokens" {
|
||||
estimated := estimateGeminiCountTokens(body)
|
||||
c.JSON(http.StatusOK, map[string]any{"totalTokens": estimated})
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: ClaudeUsage{},
|
||||
Model: originalModel,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: nil,
|
||||
}, nil
|
||||
// Check if model fallback is enabled and this is a model not found error
|
||||
if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) &&
|
||||
isModelNotFoundError(resp.StatusCode, respBody) {
|
||||
|
||||
fallbackModel := s.settingService.GetFallbackModel(ctx, PlatformAntigravity)
|
||||
|
||||
// Only retry if fallback model is different from current model
|
||||
if fallbackModel != "" && fallbackModel != mappedModel {
|
||||
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)",
|
||||
mappedModel, fallbackModel, account.Name)
|
||||
|
||||
// Close original response
|
||||
_ = resp.Body.Close()
|
||||
|
||||
// Rebuild request with fallback model
|
||||
fallbackBody, err := s.wrapV1InternalRequest(projectID, fallbackModel, body)
|
||||
if err == nil {
|
||||
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackBody)
|
||||
if err == nil {
|
||||
fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err == nil && fallbackResp.StatusCode < 400 {
|
||||
log.Printf("[Antigravity] Fallback succeeded with %s (account: %s)", fallbackModel, account.Name)
|
||||
resp = fallbackResp
|
||||
originalModel = fallbackModel // Update for billing
|
||||
// Continue to normal response handling
|
||||
goto handleSuccess
|
||||
} else if fallbackResp != nil {
|
||||
_ = fallbackResp.Body.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Printf("[Antigravity] Fallback failed, returning original error")
|
||||
}
|
||||
}
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||
}
|
||||
|
||||
@@ -739,6 +762,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
handleSuccess:
|
||||
var usage *ClaudeUsage
|
||||
var firstTokenMs *int
|
||||
|
||||
@@ -789,6 +813,15 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AntigravityGatewayService) shouldFailoverWithTempUnsched(ctx context.Context, account *Account, statusCode int, body []byte) bool {
|
||||
if s.rateLimitService != nil {
|
||||
if s.rateLimitService.HandleTempUnschedulable(ctx, account, statusCode, body) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return s.shouldFailoverUpstreamError(statusCode)
|
||||
}
|
||||
|
||||
func sleepAntigravityBackoff(attempt int) {
|
||||
sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑
|
||||
}
|
||||
@@ -899,7 +932,10 @@ func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Cont
|
||||
}
|
||||
|
||||
// 解包 v1internal 响应
|
||||
unwrapped, _ := s.unwrapV1InternalResponse(respBody)
|
||||
unwrapped := respBody
|
||||
if inner, unwrapErr := s.unwrapV1InternalResponse(respBody); unwrapErr == nil && inner != nil {
|
||||
unwrapped = inner
|
||||
}
|
||||
|
||||
var parsed map[string]any
|
||||
if json.Unmarshal(unwrapped, &parsed) == nil {
|
||||
@@ -973,6 +1009,8 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int,
|
||||
statusStr = "RESOURCE_EXHAUSTED"
|
||||
case 500:
|
||||
statusStr = "INTERNAL"
|
||||
case 501:
|
||||
statusStr = "UNIMPLEMENTED"
|
||||
case 502, 503:
|
||||
statusStr = "UNAVAILABLE"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user