merge: 合并 main 分支解决冲突

This commit is contained in:
ianshaw
2026-01-03 06:44:23 -08:00
7 changed files with 82 additions and 456 deletions

View File

@@ -14,24 +14,15 @@ func TransformClaudeToGemini(claudeReq *ClaudeRequest, projectID, mappedModel st
// 用于存储 tool_use id -> name 映射 // 用于存储 tool_use id -> name 映射
toolIDToName := make(map[string]string) toolIDToName := make(map[string]string)
// 检测是否启用 thinking
isThinkingEnabled := claudeReq.Thinking != nil && claudeReq.Thinking.Type == "enabled"
// 只有 Gemini 模型支持 dummy thought workaround // 只有 Gemini 模型支持 dummy thought workaround
// Claude 模型通过 Vertex/Google API 需要有效的 thought signatures // Claude 模型通过 Vertex/Google API 需要有效的 thought signatures
allowDummyThought := strings.HasPrefix(mappedModel, "gemini-") allowDummyThought := strings.HasPrefix(mappedModel, "gemini-")
// 检测是否启用 thinking
requestedThinkingEnabled := claudeReq.Thinking != nil && claudeReq.Thinking.Type == "enabled"
// antigravity(v1internal) 下Gemini 与 Claude 的 “thinking” 都可能涉及 thoughtSignature 链路:
// - Gemini支持 dummy signature 跳过校验
// - Claude需要透传上游签名否则容易 400
isThinkingEnabled := requestedThinkingEnabled
thoughtSignatureMode := thoughtSignatureModePreserve
if allowDummyThought {
thoughtSignatureMode = thoughtSignatureModeDummy
}
// 1. 构建 contents // 1. 构建 contents
contents, err := buildContents(claudeReq.Messages, toolIDToName, isThinkingEnabled, thoughtSignatureMode) contents, err := buildContents(claudeReq.Messages, toolIDToName, isThinkingEnabled, allowDummyThought)
if err != nil { if err != nil {
return nil, fmt.Errorf("build contents: %w", err) return nil, fmt.Errorf("build contents: %w", err)
} }
@@ -129,7 +120,7 @@ func buildSystemInstruction(system json.RawMessage, modelName string) *GeminiCon
} }
// buildContents 构建 contents // buildContents 构建 contents
func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isThinkingEnabled bool, thoughtSignatureMode thoughtSignatureMode) ([]GeminiContent, error) { func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isThinkingEnabled, allowDummyThought bool) ([]GeminiContent, error) {
var contents []GeminiContent var contents []GeminiContent
for i, msg := range messages { for i, msg := range messages {
@@ -138,30 +129,23 @@ func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isT
role = "model" role = "model"
} }
parts, err := buildParts(msg.Content, toolIDToName, thoughtSignatureMode) parts, err := buildParts(msg.Content, toolIDToName, allowDummyThought)
if err != nil { if err != nil {
return nil, fmt.Errorf("build parts for message %d: %w", i, err) return nil, fmt.Errorf("build parts for message %d: %w", i, err)
} }
allowDummyThought := thoughtSignatureMode == thoughtSignatureModeDummy
// 只有 Gemini 模型支持 dummy thinking block workaround // 只有 Gemini 模型支持 dummy thinking block workaround
// 只对最后一条 assistant 消息添加Pre-fill 场景) // 只对最后一条 assistant 消息添加Pre-fill 场景)
// 历史 assistant 消息不能添加没有 signature 的 dummy thinking block // 历史 assistant 消息不能添加没有 signature 的 dummy thinking block
if allowDummyThought && role == "model" && isThinkingEnabled && i == len(messages)-1 { if allowDummyThought && role == "model" && isThinkingEnabled && i == len(messages)-1 {
hasThoughtPart := false hasThoughtPart := false
firstPartIsThought := false for _, p := range parts {
for idx, p := range parts {
if p.Thought { if p.Thought {
hasThoughtPart = true hasThoughtPart = true
if idx == 0 {
firstPartIsThought = true
}
break break
} }
} }
// 如果没有thinking part或者有thinking part但不在第一个位置都需要在开头添加dummy thinking block if !hasThoughtPart && len(parts) > 0 {
if len(parts) > 0 && (!hasThoughtPart || !firstPartIsThought) {
// 在开头添加 dummy thinking block // 在开头添加 dummy thinking block
parts = append([]GeminiPart{{ parts = append([]GeminiPart{{
Text: "Thinking...", Text: "Thinking...",
@@ -189,18 +173,8 @@ func buildContents(messages []ClaudeMessage, toolIDToName map[string]string, isT
const dummyThoughtSignature = "skip_thought_signature_validator" const dummyThoughtSignature = "skip_thought_signature_validator"
// buildParts 构建消息的 parts // buildParts 构建消息的 parts
type thoughtSignatureMode int // allowDummyThought: 只有 Gemini 模型支持 dummy thought signature
func buildParts(content json.RawMessage, toolIDToName map[string]string, allowDummyThought bool) ([]GeminiPart, error) {
const (
thoughtSignatureModePreserve thoughtSignatureMode = iota
thoughtSignatureModeDummy
)
// buildParts 构建消息的 parts
// thoughtSignatureMode:
// - dummy: 用 dummy signature 跳过 Gemini thoughtSignature 校验
// - preserve: 透传输入中的 signature主要用于 Claude via Vertex 的签名链路)
func buildParts(content json.RawMessage, toolIDToName map[string]string, thoughtSignatureMode thoughtSignatureMode) ([]GeminiPart, error) {
var parts []GeminiPart var parts []GeminiPart
// 尝试解析为字符串 // 尝试解析为字符串
@@ -226,40 +200,22 @@ func buildParts(content json.RawMessage, toolIDToName map[string]string, thought
} }
case "thinking": case "thinking":
signature := strings.TrimSpace(block.Signature) part := GeminiPart{
Text: block.Thinking,
if thoughtSignatureMode == thoughtSignatureModeDummy { Thought: true,
// Gemini 模型可以使用 dummy signature
parts = append(parts, GeminiPart{
Text: block.Thinking,
Thought: true,
ThoughtSignature: dummyThoughtSignature,
})
continue
} }
// 保留原有 signatureClaude 模型需要有效的 signature
// Claude via Vertex if block.Signature != "" {
// - signature 是上游返回的完整性令牌;本地不需要/无法验证,只能透传 part.ThoughtSignature = block.Signature
// - 缺失/无效 signature例如来自 Gemini 的 dummy signature会导致上游 400 } else if !allowDummyThought {
// - 为避免泄露 thinking 内容,缺失/无效 signature 的 thinking 直接丢弃 // Claude 模型需要有效 signature跳过无 signature 的 thinking block
if signature == "" || signature == dummyThoughtSignature { log.Printf("Warning: skipping thinking block without signature for Claude model")
continue continue
}
// 兼容:用 Claude 的 "thinking" 块承载两类东西
// 1) 真正的 thought 文本thinking != ""-> Gemini thought part
// 2) 仅承载 signature 的空 thinking 块thinking == ""-> Gemini signature-only part
if strings.TrimSpace(block.Thinking) == "" {
parts = append(parts, GeminiPart{
ThoughtSignature: signature,
})
} else { } else {
parts = append(parts, GeminiPart{ // Gemini 模型使用 dummy signature
Text: block.Thinking, part.ThoughtSignature = dummyThoughtSignature
Thought: true,
ThoughtSignature: signature,
})
} }
parts = append(parts, part)
case "image": case "image":
if block.Source != nil && block.Source.Type == "base64" { if block.Source != nil && block.Source.Type == "base64" {
@@ -284,15 +240,10 @@ func buildParts(content json.RawMessage, toolIDToName map[string]string, thought
ID: block.ID, ID: block.ID,
}, },
} }
switch thoughtSignatureMode { // 只有 Gemini 模型使用 dummy signature
case thoughtSignatureModeDummy: // Claude 模型不设置 signature避免验证问题
if allowDummyThought {
part.ThoughtSignature = dummyThoughtSignature part.ThoughtSignature = dummyThoughtSignature
case thoughtSignatureModePreserve:
// Claude via Vertex透传 tool_use 的 signature如果有
// 注意:跨模型混用时可能出现 dummy signature这里直接丢弃以避免 400。
if sig := strings.TrimSpace(block.Signature); sig != "" && sig != dummyThoughtSignature {
part.ThoughtSignature = sig
}
} }
parts = append(parts, part) parts = append(parts, part)
@@ -631,9 +582,11 @@ func cleanSchemaValue(value any) any {
continue continue
} }
// 递归清理所有值
result[k] = cleanSchemaValue(val) result[k] = cleanSchemaValue(val)
} }
return result return result
case []any: case []any:
// 递归处理数组中的每个元素 // 递归处理数组中的每个元素
cleaned := make([]any, 0, len(v)) cleaned := make([]any, 0, len(v))

View File

@@ -237,11 +237,7 @@ func (p *NonStreamingProcessor) buildResponse(geminiResp *GeminiResponse, respon
usage := ClaudeUsage{} usage := ClaudeUsage{}
if geminiResp.UsageMetadata != nil { if geminiResp.UsageMetadata != nil {
cached := geminiResp.UsageMetadata.CachedContentTokenCount cached := geminiResp.UsageMetadata.CachedContentTokenCount
prompt := geminiResp.UsageMetadata.PromptTokenCount usage.InputTokens = geminiResp.UsageMetadata.PromptTokenCount - cached
if cached > prompt {
cached = prompt
}
usage.InputTokens = prompt - cached
usage.OutputTokens = geminiResp.UsageMetadata.CandidatesTokenCount usage.OutputTokens = geminiResp.UsageMetadata.CandidatesTokenCount
usage.CacheReadInputTokens = cached usage.CacheReadInputTokens = cached
} }

View File

@@ -81,11 +81,7 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
// 但 Claude 的 input_tokens 不包含 cache_read_input_tokens需要减去 // 但 Claude 的 input_tokens 不包含 cache_read_input_tokens需要减去
if geminiResp.UsageMetadata != nil { if geminiResp.UsageMetadata != nil {
cached := geminiResp.UsageMetadata.CachedContentTokenCount cached := geminiResp.UsageMetadata.CachedContentTokenCount
prompt := geminiResp.UsageMetadata.PromptTokenCount p.inputTokens = geminiResp.UsageMetadata.PromptTokenCount - cached
if cached > prompt {
cached = prompt
}
p.inputTokens = prompt - cached
p.outputTokens = geminiResp.UsageMetadata.CandidatesTokenCount p.outputTokens = geminiResp.UsageMetadata.CandidatesTokenCount
p.cacheReadTokens = cached p.cacheReadTokens = cached
} }
@@ -134,11 +130,7 @@ func (p *StreamingProcessor) emitMessageStart(v1Resp *V1InternalResponse) []byte
usage := ClaudeUsage{} usage := ClaudeUsage{}
if v1Resp.Response.UsageMetadata != nil { if v1Resp.Response.UsageMetadata != nil {
cached := v1Resp.Response.UsageMetadata.CachedContentTokenCount cached := v1Resp.Response.UsageMetadata.CachedContentTokenCount
prompt := v1Resp.Response.UsageMetadata.PromptTokenCount usage.InputTokens = v1Resp.Response.UsageMetadata.PromptTokenCount - cached
if cached > prompt {
cached = prompt
}
usage.InputTokens = prompt - cached
usage.OutputTokens = v1Resp.Response.UsageMetadata.CandidatesTokenCount usage.OutputTokens = v1Resp.Response.UsageMetadata.CandidatesTokenCount
usage.CacheReadInputTokens = cached usage.CacheReadInputTokens = cached
} }

View File

@@ -4,11 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"time" "time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
) )
@@ -84,18 +82,14 @@ const (
// UsageCache 封装账户使用量相关的缓存 // UsageCache 封装账户使用量相关的缓存
type UsageCache struct { type UsageCache struct {
apiCache *sync.Map // accountID -> *apiUsageCache apiCache sync.Map // accountID -> *apiUsageCache
windowStatsCache *sync.Map // accountID -> *windowStatsCache windowStatsCache sync.Map // accountID -> *windowStatsCache
antigravityCache *sync.Map // accountID -> *antigravityUsageCache antigravityCache sync.Map // accountID -> *antigravityUsageCache
} }
// NewUsageCache 创建 UsageCache 实例 // NewUsageCache 创建 UsageCache 实例
func NewUsageCache() *UsageCache { func NewUsageCache() *UsageCache {
return &UsageCache{ return &UsageCache{}
apiCache: &sync.Map{},
antigravityCache: &sync.Map{},
windowStatsCache: &sync.Map{},
}
} }
// WindowStats 窗口期统计 // WindowStats 窗口期统计
@@ -159,7 +153,7 @@ type AccountUsageService struct {
usageLogRepo UsageLogRepository usageLogRepo UsageLogRepository
usageFetcher ClaudeUsageFetcher usageFetcher ClaudeUsageFetcher
geminiQuotaService *GeminiQuotaService geminiQuotaService *GeminiQuotaService
antigravityQuotaFetcher QuotaFetcher antigravityQuotaFetcher *AntigravityQuotaFetcher
cache *UsageCache cache *UsageCache
} }
@@ -172,33 +166,12 @@ func NewAccountUsageService(
antigravityQuotaFetcher *AntigravityQuotaFetcher, antigravityQuotaFetcher *AntigravityQuotaFetcher,
cache *UsageCache, cache *UsageCache,
) *AccountUsageService { ) *AccountUsageService {
if cache == nil {
cache = &UsageCache{
apiCache: &sync.Map{},
antigravityCache: &sync.Map{},
windowStatsCache: &sync.Map{},
}
}
if cache.apiCache == nil {
cache.apiCache = &sync.Map{}
}
if cache.antigravityCache == nil {
cache.antigravityCache = &sync.Map{}
}
if cache.windowStatsCache == nil {
cache.windowStatsCache = &sync.Map{}
}
var quotaFetcher QuotaFetcher
if antigravityQuotaFetcher != nil {
quotaFetcher = antigravityQuotaFetcher
}
return &AccountUsageService{ return &AccountUsageService{
accountRepo: accountRepo, accountRepo: accountRepo,
usageLogRepo: usageLogRepo, usageLogRepo: usageLogRepo,
usageFetcher: usageFetcher, usageFetcher: usageFetcher,
geminiQuotaService: geminiQuotaService, geminiQuotaService: geminiQuotaService,
antigravityQuotaFetcher: quotaFetcher, antigravityQuotaFetcher: antigravityQuotaFetcher,
cache: cache, cache: cache,
} }
} }
@@ -292,8 +265,8 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
totals := geminiAggregateUsage(stats) totals := geminiAggregateUsage(stats)
resetAt := geminiDailyResetTime(now) resetAt := geminiDailyResetTime(now)
usage.GeminiProDaily = buildGeminiUsageProgress(totals.ProRequests, quota.ProRPD, resetAt, totals.ProTokens, totals.ProCost) usage.GeminiProDaily = buildGeminiUsageProgress(totals.ProRequests, quota.ProRPD, resetAt, totals.ProTokens, totals.ProCost, now)
usage.GeminiFlashDaily = buildGeminiUsageProgress(totals.FlashRequests, quota.FlashRPD, resetAt, totals.FlashTokens, totals.FlashCost) usage.GeminiFlashDaily = buildGeminiUsageProgress(totals.FlashRequests, quota.FlashRPD, resetAt, totals.FlashTokens, totals.FlashCost, now)
return usage, nil return usage, nil
} }
@@ -305,41 +278,20 @@ func (s *AccountUsageService) getAntigravityUsage(ctx context.Context, account *
return &UsageInfo{UpdatedAt: &now}, nil return &UsageInfo{UpdatedAt: &now}, nil
} }
// Ensure project_id is stable for quota queries.
if strings.TrimSpace(account.GetCredential("project_id")) == "" {
projectID := antigravity.GenerateMockProjectID()
if account.Credentials == nil {
account.Credentials = map[string]any{}
}
account.Credentials["project_id"] = projectID
if s.accountRepo != nil {
_, err := s.accountRepo.BulkUpdate(ctx, []int64{account.ID}, AccountBulkUpdate{
Credentials: map[string]any{"project_id": projectID},
})
if err != nil {
log.Printf("Failed to persist antigravity project_id for account %d: %v", account.ID, err)
}
}
}
// 1. 检查缓存10 分钟) // 1. 检查缓存10 分钟)
if cached, ok := s.cache.antigravityCache.Load(account.ID); ok { if cached, ok := s.cache.antigravityCache.Load(account.ID); ok {
if cache, ok := cached.(*antigravityUsageCache); ok && time.Since(cache.timestamp) < apiCacheTTL { if cache, ok := cached.(*antigravityUsageCache); ok && time.Since(cache.timestamp) < apiCacheTTL {
// 重新计算 RemainingSeconds // 重新计算 RemainingSeconds
usage := cloneUsageInfo(cache.usageInfo) usage := cache.usageInfo
if usage.FiveHour != nil && usage.FiveHour.ResetsAt != nil { if usage.FiveHour != nil && usage.FiveHour.ResetsAt != nil {
usage.FiveHour.RemainingSeconds = remainingSecondsUntil(*usage.FiveHour.ResetsAt) usage.FiveHour.RemainingSeconds = int(time.Until(*usage.FiveHour.ResetsAt).Seconds())
} }
return usage, nil return usage, nil
} }
} }
// 2. 获取代理 URL // 2. 获取代理 URL
proxyURL, err := s.antigravityQuotaFetcher.GetProxyURL(ctx, account) proxyURL := s.antigravityQuotaFetcher.GetProxyURL(ctx, account)
if err != nil {
log.Printf("Failed to get proxy URL for account %d: %v", account.ID, err)
proxyURL = ""
}
// 3. 调用 API 获取额度 // 3. 调用 API 获取额度
result, err := s.antigravityQuotaFetcher.FetchQuota(ctx, account, proxyURL) result, err := s.antigravityQuotaFetcher.FetchQuota(ctx, account, proxyURL)
@@ -468,12 +420,12 @@ func (s *AccountUsageService) buildUsageInfo(resp *ClaudeUsageResponse, updatedA
// 5小时窗口 - 始终创建对象(即使 ResetsAt 为空) // 5小时窗口 - 始终创建对象(即使 ResetsAt 为空)
info.FiveHour = &UsageProgress{ info.FiveHour = &UsageProgress{
Utilization: clampFloat64(resp.FiveHour.Utilization, 0, 100), Utilization: resp.FiveHour.Utilization,
} }
if resp.FiveHour.ResetsAt != "" { if resp.FiveHour.ResetsAt != "" {
if fiveHourReset, err := parseTime(resp.FiveHour.ResetsAt); err == nil { if fiveHourReset, err := parseTime(resp.FiveHour.ResetsAt); err == nil {
info.FiveHour.ResetsAt = &fiveHourReset info.FiveHour.ResetsAt = &fiveHourReset
info.FiveHour.RemainingSeconds = remainingSecondsUntil(fiveHourReset) info.FiveHour.RemainingSeconds = int(time.Until(fiveHourReset).Seconds())
} else { } else {
log.Printf("Failed to parse FiveHour.ResetsAt: %s, error: %v", resp.FiveHour.ResetsAt, err) log.Printf("Failed to parse FiveHour.ResetsAt: %s, error: %v", resp.FiveHour.ResetsAt, err)
} }
@@ -483,14 +435,14 @@ func (s *AccountUsageService) buildUsageInfo(resp *ClaudeUsageResponse, updatedA
if resp.SevenDay.ResetsAt != "" { if resp.SevenDay.ResetsAt != "" {
if sevenDayReset, err := parseTime(resp.SevenDay.ResetsAt); err == nil { if sevenDayReset, err := parseTime(resp.SevenDay.ResetsAt); err == nil {
info.SevenDay = &UsageProgress{ info.SevenDay = &UsageProgress{
Utilization: clampFloat64(resp.SevenDay.Utilization, 0, 100), Utilization: resp.SevenDay.Utilization,
ResetsAt: &sevenDayReset, ResetsAt: &sevenDayReset,
RemainingSeconds: remainingSecondsUntil(sevenDayReset), RemainingSeconds: int(time.Until(sevenDayReset).Seconds()),
} }
} else { } else {
log.Printf("Failed to parse SevenDay.ResetsAt: %s, error: %v", resp.SevenDay.ResetsAt, err) log.Printf("Failed to parse SevenDay.ResetsAt: %s, error: %v", resp.SevenDay.ResetsAt, err)
info.SevenDay = &UsageProgress{ info.SevenDay = &UsageProgress{
Utilization: clampFloat64(resp.SevenDay.Utilization, 0, 100), Utilization: resp.SevenDay.Utilization,
} }
} }
} }
@@ -499,14 +451,14 @@ func (s *AccountUsageService) buildUsageInfo(resp *ClaudeUsageResponse, updatedA
if resp.SevenDaySonnet.ResetsAt != "" { if resp.SevenDaySonnet.ResetsAt != "" {
if sonnetReset, err := parseTime(resp.SevenDaySonnet.ResetsAt); err == nil { if sonnetReset, err := parseTime(resp.SevenDaySonnet.ResetsAt); err == nil {
info.SevenDaySonnet = &UsageProgress{ info.SevenDaySonnet = &UsageProgress{
Utilization: clampFloat64(resp.SevenDaySonnet.Utilization, 0, 100), Utilization: resp.SevenDaySonnet.Utilization,
ResetsAt: &sonnetReset, ResetsAt: &sonnetReset,
RemainingSeconds: remainingSecondsUntil(sonnetReset), RemainingSeconds: int(time.Until(sonnetReset).Seconds()),
} }
} else { } else {
log.Printf("Failed to parse SevenDaySonnet.ResetsAt: %s, error: %v", resp.SevenDaySonnet.ResetsAt, err) log.Printf("Failed to parse SevenDaySonnet.ResetsAt: %s, error: %v", resp.SevenDaySonnet.ResetsAt, err)
info.SevenDaySonnet = &UsageProgress{ info.SevenDaySonnet = &UsageProgress{
Utilization: clampFloat64(resp.SevenDaySonnet.Utilization, 0, 100), Utilization: resp.SevenDaySonnet.Utilization,
} }
} }
} }
@@ -520,7 +472,10 @@ func (s *AccountUsageService) estimateSetupTokenUsage(account *Account) *UsageIn
// 如果有session_window信息 // 如果有session_window信息
if account.SessionWindowEnd != nil { if account.SessionWindowEnd != nil {
remaining := remainingSecondsUntil(*account.SessionWindowEnd) remaining := int(time.Until(*account.SessionWindowEnd).Seconds())
if remaining < 0 {
remaining = 0
}
// 根据状态估算使用率 (百分比形式100 = 100%) // 根据状态估算使用率 (百分比形式100 = 100%)
var utilization float64 var utilization float64
@@ -532,7 +487,6 @@ func (s *AccountUsageService) estimateSetupTokenUsage(account *Account) *UsageIn
default: default:
utilization = 0.0 utilization = 0.0
} }
utilization = clampFloat64(utilization, 0, 100)
info.FiveHour = &UsageProgress{ info.FiveHour = &UsageProgress{
Utilization: utilization, Utilization: utilization,
@@ -551,12 +505,15 @@ func (s *AccountUsageService) estimateSetupTokenUsage(account *Account) *UsageIn
return info return info
} }
func buildGeminiUsageProgress(used, limit int64, resetAt time.Time, tokens int64, cost float64) *UsageProgress { func buildGeminiUsageProgress(used, limit int64, resetAt time.Time, tokens int64, cost float64, now time.Time) *UsageProgress {
if limit <= 0 { if limit <= 0 {
return nil return nil
} }
utilization := clampFloat64((float64(used)/float64(limit))*100, 0, 100) utilization := (float64(used) / float64(limit)) * 100
remainingSeconds := remainingSecondsUntil(resetAt) remainingSeconds := int(resetAt.Sub(now).Seconds())
if remainingSeconds < 0 {
remainingSeconds = 0
}
resetCopy := resetAt resetCopy := resetAt
return &UsageProgress{ return &UsageProgress{
Utilization: utilization, Utilization: utilization,
@@ -569,47 +526,3 @@ func buildGeminiUsageProgress(used, limit int64, resetAt time.Time, tokens int64
}, },
} }
} }
func cloneUsageInfo(src *UsageInfo) *UsageInfo {
if src == nil {
return nil
}
dst := *src
if src.UpdatedAt != nil {
t := *src.UpdatedAt
dst.UpdatedAt = &t
}
dst.FiveHour = cloneUsageProgress(src.FiveHour)
dst.SevenDay = cloneUsageProgress(src.SevenDay)
dst.SevenDaySonnet = cloneUsageProgress(src.SevenDaySonnet)
dst.GeminiProDaily = cloneUsageProgress(src.GeminiProDaily)
dst.GeminiFlashDaily = cloneUsageProgress(src.GeminiFlashDaily)
if src.AntigravityQuota != nil {
dst.AntigravityQuota = make(map[string]*AntigravityModelQuota, len(src.AntigravityQuota))
for k, v := range src.AntigravityQuota {
if v == nil {
dst.AntigravityQuota[k] = nil
continue
}
copyVal := *v
dst.AntigravityQuota[k] = &copyVal
}
}
return &dst
}
func cloneUsageProgress(src *UsageProgress) *UsageProgress {
if src == nil {
return nil
}
dst := *src
if src.ResetsAt != nil {
t := *src.ResetsAt
dst.ResetsAt = &t
}
if src.WindowStats != nil {
statsCopy := *src.WindowStats
dst.WindowStats = &statsCopy
}
return &dst
}

View File

@@ -64,7 +64,6 @@ type AntigravityGatewayService struct {
tokenProvider *AntigravityTokenProvider tokenProvider *AntigravityTokenProvider
rateLimitService *RateLimitService rateLimitService *RateLimitService
httpUpstream HTTPUpstream httpUpstream HTTPUpstream
settingService *SettingService
} }
func NewAntigravityGatewayService( func NewAntigravityGatewayService(
@@ -73,14 +72,12 @@ func NewAntigravityGatewayService(
tokenProvider *AntigravityTokenProvider, tokenProvider *AntigravityTokenProvider,
rateLimitService *RateLimitService, rateLimitService *RateLimitService,
httpUpstream HTTPUpstream, httpUpstream HTTPUpstream,
settingService *SettingService,
) *AntigravityGatewayService { ) *AntigravityGatewayService {
return &AntigravityGatewayService{ return &AntigravityGatewayService{
accountRepo: accountRepo, accountRepo: accountRepo,
tokenProvider: tokenProvider, tokenProvider: tokenProvider,
rateLimitService: rateLimitService, rateLimitService: rateLimitService,
httpUpstream: httpUpstream, httpUpstream: httpUpstream,
settingService: settingService,
} }
} }
@@ -310,106 +307,6 @@ func (s *AntigravityGatewayService) unwrapV1InternalResponse(body []byte) ([]byt
return body, nil return body, nil
} }
// isSignatureRelatedError 检测是否为 signature 相关的 400 错误
// 注意:不包含 "thinking" 关键词,避免误判消息格式错误为 signature 错误
func isSignatureRelatedError(statusCode int, body []byte) bool {
if statusCode != 400 {
return false
}
bodyStr := strings.ToLower(string(body))
keywords := []string{
"signature",
"thought_signature",
"thoughtsignature",
"invalid signature",
"signature validation",
}
for _, keyword := range keywords {
if strings.Contains(bodyStr, keyword) {
return true
}
}
return false
}
// isModelNotFoundError 检测是否为模型不存在的 404 错误
func isModelNotFoundError(statusCode int, body []byte) bool {
if statusCode != 404 {
return false
}
bodyStr := strings.ToLower(string(body))
keywords := []string{
"model not found",
"model does not exist",
"unknown model",
"invalid model",
}
for _, keyword := range keywords {
if strings.Contains(bodyStr, keyword) {
return true
}
}
return false
}
// stripThinkingFromClaudeRequest 从 Claude 请求中移除有问题的 thinking 块
// 策略:只移除历史消息中带 dummy signature 的 thinking 块,保留本次 thinking 配置
// 这样可以让本次对话仍然使用 thinking 功能,只是清理历史中可能导致问题的内容
func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) *antigravity.ClaudeRequest {
// 创建副本
stripped := *req
// 保留 thinking 配置,让本次对话仍然可以使用 thinking
// stripped.Thinking = nil // 不再移除
// 只移除消息中带 dummy signature 的 thinking 块
if len(stripped.Messages) > 0 {
newMessages := make([]antigravity.ClaudeMessage, 0, len(stripped.Messages))
for _, msg := range stripped.Messages {
newMsg := msg
// 如果 content 是数组,过滤有问题的 thinking 块
var blocks []map[string]any
if err := json.Unmarshal(msg.Content, &blocks); err == nil {
filtered := make([]map[string]any, 0, len(blocks))
for _, block := range blocks {
// 跳过带 dummy signature 的 thinking 块
if blockType, ok := block["type"].(string); ok && blockType == "thinking" {
if sig, ok := block["signature"].(string); ok {
// 移除 dummy signature 的 thinking 块
if sig == "skip_thought_signature_validator" || sig == "" {
continue
}
} else {
// 没有 signature 字段的 thinking 块也移除
continue
}
}
// 跳过没有 type 但有 thinking 字段的块untyped thinking blocks
if _, hasType := block["type"]; !hasType {
if _, hasThinking := block["thinking"]; hasThinking {
continue
}
}
filtered = append(filtered, block)
}
if newContent, err := json.Marshal(filtered); err == nil {
newMsg.Content = newContent
}
}
newMessages = append(newMessages, newMsg)
}
stripped.Messages = newMessages
}
return &stripped
}
// Forward 转发 Claude 协议请求Claude → Gemini 转换) // Forward 转发 Claude 协议请求Claude → Gemini 转换)
func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte) (*ForwardResult, error) { func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte) (*ForwardResult, error) {
startTime := time.Now() startTime := time.Now()
@@ -505,70 +402,11 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
// Auto 模式:检测 signature 错误并自动降级重试 if s.shouldFailoverUpstreamError(resp.StatusCode) {
if isSignatureRelatedError(resp.StatusCode, respBody) && claudeReq.Thinking != nil { return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
log.Printf("[Antigravity] Detected signature-related error, retrying without thinking blocks (account: %s, model: %s)", account.Name, mappedModel)
// 关闭原始响应释放连接respBody 已读取到内存)
_ = resp.Body.Close()
// 移除 thinking 块并重试一次
strippedReq := stripThinkingFromClaudeRequest(&claudeReq)
strippedBody, err := antigravity.TransformClaudeToGemini(strippedReq, projectID, mappedModel)
if err != nil {
log.Printf("[Antigravity] Failed to transform stripped request: %v", err)
// 降级失败,返回原始错误
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
}
// 发送降级请求
retryReq, err := antigravity.NewAPIRequest(ctx, action, accessToken, strippedBody)
if err != nil {
log.Printf("[Antigravity] Failed to create retry request: %v", err)
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
}
retryResp, err := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
log.Printf("[Antigravity] Retry request failed: %v", err)
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
}
// 如果重试成功,使用重试的响应(不要 return让后面的代码处理响应
if retryResp.StatusCode < 400 {
log.Printf("[Antigravity] Retry succeeded after stripping thinking blocks (account: %s, model: %s)", account.Name, mappedModel)
resp = retryResp
} else {
// 重试也失败,返回重试的错误
retryRespBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
log.Printf("[Antigravity] Retry also failed with status %d: %s", retryResp.StatusCode, string(retryRespBody))
s.handleUpstreamError(ctx, account, retryResp.StatusCode, retryResp.Header, retryRespBody)
if s.shouldFailoverWithTempUnsched(ctx, account, retryResp.StatusCode, retryRespBody) {
return nil, &UpstreamFailoverError{StatusCode: retryResp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, retryResp.StatusCode, retryRespBody)
}
} }
// 不是 signature 错误,或者已经没有 thinking 块,直接返回错误 return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
if resp.StatusCode >= 400 {
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody)
}
} }
requestID := resp.Header.Get("x-request-id") requestID := resp.Header.Get("x-request-id")
@@ -620,7 +458,16 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
case "generateContent", "streamGenerateContent": case "generateContent", "streamGenerateContent":
// ok // ok
case "countTokens": case "countTokens":
return nil, s.writeGoogleError(c, http.StatusNotImplemented, "countTokens is not supported") // 直接返回空值,不透传上游
c.JSON(http.StatusOK, map[string]any{"totalTokens": 0})
return &ForwardResult{
RequestID: "",
Usage: ClaudeUsage{},
Model: originalModel,
Stream: false,
Duration: time.Since(time.Now()),
FirstTokenMs: nil,
}, nil
default: default:
return nil, s.writeGoogleError(c, http.StatusNotFound, "Unsupported action: "+action) return nil, s.writeGoogleError(c, http.StatusNotFound, "Unsupported action: "+action)
} }
@@ -713,42 +560,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
// Check if model fallback is enabled and this is a model not found error if s.shouldFailoverUpstreamError(resp.StatusCode) {
if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) &&
isModelNotFoundError(resp.StatusCode, respBody) {
fallbackModel := s.settingService.GetFallbackModel(ctx, PlatformAntigravity)
// Only retry if fallback model is different from current model
if fallbackModel != "" && fallbackModel != mappedModel {
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)",
mappedModel, fallbackModel, account.Name)
// Close original response
_ = resp.Body.Close()
// Rebuild request with fallback model
fallbackBody, err := s.wrapV1InternalRequest(projectID, fallbackModel, body)
if err == nil {
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackBody)
if err == nil {
fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency)
if err == nil && fallbackResp.StatusCode < 400 {
log.Printf("[Antigravity] Fallback succeeded with %s (account: %s)", fallbackModel, account.Name)
resp = fallbackResp
originalModel = fallbackModel // Update for billing
// Continue to normal response handling
goto handleSuccess
} else if fallbackResp != nil {
_ = fallbackResp.Body.Close()
}
}
}
log.Printf("[Antigravity] Fallback failed, returning original error")
}
}
if s.shouldFailoverWithTempUnsched(ctx, account, resp.StatusCode, respBody) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
} }
@@ -762,7 +574,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode) return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode)
} }
handleSuccess:
var usage *ClaudeUsage var usage *ClaudeUsage
var firstTokenMs *int var firstTokenMs *int
@@ -813,15 +624,6 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
} }
} }
func (s *AntigravityGatewayService) shouldFailoverWithTempUnsched(ctx context.Context, account *Account, statusCode int, body []byte) bool {
if s.rateLimitService != nil {
if s.rateLimitService.HandleTempUnschedulable(ctx, account, statusCode, body) {
return true
}
}
return s.shouldFailoverUpstreamError(statusCode)
}
func sleepAntigravityBackoff(attempt int) { func sleepAntigravityBackoff(attempt int) {
sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑 sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑
} }
@@ -932,10 +734,7 @@ func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Cont
} }
// 解包 v1internal 响应 // 解包 v1internal 响应
unwrapped := respBody unwrapped, _ := s.unwrapV1InternalResponse(respBody)
if inner, unwrapErr := s.unwrapV1InternalResponse(respBody); unwrapErr == nil && inner != nil {
unwrapped = inner
}
var parsed map[string]any var parsed map[string]any
if json.Unmarshal(unwrapped, &parsed) == nil { if json.Unmarshal(unwrapped, &parsed) == nil {
@@ -1009,8 +808,6 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int,
statusStr = "RESOURCE_EXHAUSTED" statusStr = "RESOURCE_EXHAUSTED"
case 500: case 500:
statusStr = "INTERNAL" statusStr = "INTERNAL"
case 501:
statusStr = "UNIMPLEMENTED"
case 502, 503: case 502, 503:
statusStr = "UNAVAILABLE" statusStr = "UNAVAILABLE"
} }

View File

@@ -2,7 +2,6 @@ package service
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
@@ -20,9 +19,6 @@ func NewAntigravityQuotaFetcher(proxyRepo ProxyRepository) *AntigravityQuotaFetc
// CanFetch 检查是否可以获取此账户的额度 // CanFetch 检查是否可以获取此账户的额度
func (f *AntigravityQuotaFetcher) CanFetch(account *Account) bool { func (f *AntigravityQuotaFetcher) CanFetch(account *Account) bool {
if f == nil || account == nil {
return false
}
if account.Platform != PlatformAntigravity { if account.Platform != PlatformAntigravity {
return false return false
} }
@@ -32,12 +28,6 @@ func (f *AntigravityQuotaFetcher) CanFetch(account *Account) bool {
// FetchQuota 获取 Antigravity 账户额度信息 // FetchQuota 获取 Antigravity 账户额度信息
func (f *AntigravityQuotaFetcher) FetchQuota(ctx context.Context, account *Account, proxyURL string) (*QuotaResult, error) { func (f *AntigravityQuotaFetcher) FetchQuota(ctx context.Context, account *Account, proxyURL string) (*QuotaResult, error) {
if f == nil {
return nil, fmt.Errorf("antigravity quota fetcher is nil")
}
if account == nil {
return nil, fmt.Errorf("account is nil")
}
accessToken := account.GetCredential("access_token") accessToken := account.GetCredential("access_token")
projectID := account.GetCredential("project_id") projectID := account.GetCredential("project_id")
@@ -71,10 +61,6 @@ func (f *AntigravityQuotaFetcher) buildUsageInfo(modelsResp *antigravity.FetchAv
AntigravityQuota: make(map[string]*AntigravityModelQuota), AntigravityQuota: make(map[string]*AntigravityModelQuota),
} }
if modelsResp == nil {
return info
}
// 遍历所有模型,填充 AntigravityQuota // 遍历所有模型,填充 AntigravityQuota
for modelName, modelInfo := range modelsResp.Models { for modelName, modelInfo := range modelsResp.Models {
if modelInfo.QuotaInfo == nil { if modelInfo.QuotaInfo == nil {
@@ -82,7 +68,7 @@ func (f *AntigravityQuotaFetcher) buildUsageInfo(modelsResp *antigravity.FetchAv
} }
// remainingFraction 是剩余比例 (0.0-1.0),转换为使用率百分比 // remainingFraction 是剩余比例 (0.0-1.0),转换为使用率百分比
utilization := clampInt(int((1.0-modelInfo.QuotaInfo.RemainingFraction)*100), 0, 100) utilization := int((1.0 - modelInfo.QuotaInfo.RemainingFraction) * 100)
info.AntigravityQuota[modelName] = &AntigravityModelQuota{ info.AntigravityQuota[modelName] = &AntigravityModelQuota{
Utilization: utilization, Utilization: utilization,
@@ -94,14 +80,14 @@ func (f *AntigravityQuotaFetcher) buildUsageInfo(modelsResp *antigravity.FetchAv
priorityModels := []string{"claude-sonnet-4-20250514", "claude-sonnet-4", "gemini-2.5-pro"} priorityModels := []string{"claude-sonnet-4-20250514", "claude-sonnet-4", "gemini-2.5-pro"}
for _, modelName := range priorityModels { for _, modelName := range priorityModels {
if modelInfo, ok := modelsResp.Models[modelName]; ok && modelInfo.QuotaInfo != nil { if modelInfo, ok := modelsResp.Models[modelName]; ok && modelInfo.QuotaInfo != nil {
utilization := clampFloat64((1.0-modelInfo.QuotaInfo.RemainingFraction)*100, 0, 100) utilization := (1.0 - modelInfo.QuotaInfo.RemainingFraction) * 100
progress := &UsageProgress{ progress := &UsageProgress{
Utilization: utilization, Utilization: utilization,
} }
if modelInfo.QuotaInfo.ResetTime != "" { if modelInfo.QuotaInfo.ResetTime != "" {
if resetTime, err := time.Parse(time.RFC3339, modelInfo.QuotaInfo.ResetTime); err == nil { if resetTime, err := time.Parse(time.RFC3339, modelInfo.QuotaInfo.ResetTime); err == nil {
progress.ResetsAt = &resetTime progress.ResetsAt = &resetTime
progress.RemainingSeconds = remainingSecondsUntil(resetTime) progress.RemainingSeconds = int(time.Until(resetTime).Seconds())
} }
} }
info.FiveHour = progress info.FiveHour = progress
@@ -113,22 +99,13 @@ func (f *AntigravityQuotaFetcher) buildUsageInfo(modelsResp *antigravity.FetchAv
} }
// GetProxyURL 获取账户的代理 URL // GetProxyURL 获取账户的代理 URL
func (f *AntigravityQuotaFetcher) GetProxyURL(ctx context.Context, account *Account) (string, error) { func (f *AntigravityQuotaFetcher) GetProxyURL(ctx context.Context, account *Account) string {
if f == nil {
return "", fmt.Errorf("antigravity quota fetcher is nil")
}
if account == nil {
return "", fmt.Errorf("account is nil")
}
if account.ProxyID == nil || f.proxyRepo == nil { if account.ProxyID == nil || f.proxyRepo == nil {
return "", nil return ""
} }
proxy, err := f.proxyRepo.GetByID(ctx, *account.ProxyID) proxy, err := f.proxyRepo.GetByID(ctx, *account.ProxyID)
if err != nil { if err != nil || proxy == nil {
return "", err return ""
} }
if proxy == nil { return proxy.URL()
return "", nil
}
return proxy.URL(), nil
} }

View File

@@ -8,8 +8,6 @@ import (
type QuotaFetcher interface { type QuotaFetcher interface {
// CanFetch 检查是否可以获取此账户的额度 // CanFetch 检查是否可以获取此账户的额度
CanFetch(account *Account) bool CanFetch(account *Account) bool
// GetProxyURL 获取账户的代理 URL如果没有代理则返回空字符串
GetProxyURL(ctx context.Context, account *Account) (string, error)
// FetchQuota 获取账户额度信息 // FetchQuota 获取账户额度信息
FetchQuota(ctx context.Context, account *Account, proxyURL string) (*QuotaResult, error) FetchQuota(ctx context.Context, account *Account, proxyURL string) (*QuotaResult, error)
} }