feat(antigravity): comprehensive enhancements - model mapping, rate limiting, scheduling & ops

Key changes:
- Upgrade model mapping: Opus 4.5 → Opus 4.6-thinking with precise matching
- Unified rate limiting: scope-level → model-level with Redis snapshot sync
- Load-balanced scheduling by call count with smart retry mechanism
- Force cache billing support
- Model identity injection in prompts with leak prevention
- Thinking mode auto-handling (max_tokens/budget_tokens fix)
- Frontend: whitelist mode toggle, model mapping validation, status indicators
- Gemini session fallback with Redis Trie O(L) matching
- Ops: enhanced concurrency monitoring, account availability, retry logic
- Migration scripts: 049-051 for model mapping unification
This commit is contained in:
erio
2026-02-07 12:31:10 +08:00
parent e617b45ba3
commit 5e98445b22
73 changed files with 8553 additions and 1926 deletions

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/domain"
"github.com/Wei-Shaw/sub2api/internal/handler/dto"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/geminicli"
@@ -1490,3 +1491,9 @@ func (h *AccountHandler) BatchRefreshTier(c *gin.Context) {
response.Success(c, results)
}
// GetAntigravityDefaultModelMapping 获取 Antigravity 平台的默认模型映射
// GET /api/v1/admin/accounts/antigravity/default-model-mapping
func (h *AccountHandler) GetAntigravityDefaultModelMapping(c *gin.Context) {
response.Success(c, domain.DefaultAntigravityModelMapping)
}

View File

@@ -63,6 +63,43 @@ func (h *OpsHandler) GetConcurrencyStats(c *gin.Context) {
response.Success(c, payload)
}
// GetUserConcurrencyStats returns real-time concurrency usage for all active users.
// GET /api/v1/admin/ops/user-concurrency
func (h *OpsHandler) GetUserConcurrencyStats(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
response.Success(c, gin.H{
"enabled": false,
"user": map[int64]*service.UserConcurrencyInfo{},
"timestamp": time.Now().UTC(),
})
return
}
users, collectedAt, err := h.opsService.GetUserConcurrencyStats(c.Request.Context())
if err != nil {
response.ErrorFrom(c, err)
return
}
payload := gin.H{
"enabled": true,
"user": users,
}
if collectedAt != nil {
payload["timestamp"] = collectedAt.UTC()
}
response.Success(c, payload)
}
// GetAccountAvailability returns account availability statistics.
// GET /api/v1/admin/ops/account-availability
//

View File

@@ -212,17 +212,6 @@ func AccountFromServiceShallow(a *service.Account) *Account {
}
}
if scopeLimits := a.GetAntigravityScopeRateLimits(); len(scopeLimits) > 0 {
out.ScopeRateLimits = make(map[string]ScopeRateLimitInfo, len(scopeLimits))
now := time.Now()
for scope, remainingSec := range scopeLimits {
out.ScopeRateLimits[scope] = ScopeRateLimitInfo{
ResetAt: now.Add(time.Duration(remainingSec) * time.Second),
RemainingSec: remainingSec,
}
}
}
return out
}

View File

@@ -121,6 +121,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
return
}
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), ctxkey.ThinkingEnabled, parsedReq.ThinkingEnabled))
reqModel := parsedReq.Model
reqStream := parsedReq.Stream
@@ -135,11 +137,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// Track if we've started streaming (for error handling)
streamStarted := false
// 绑定错误透传服务,允许 service 层在非 failover 错误场景复用规则。
if h.errorPassthroughService != nil {
service.BindErrorPassthroughService(c, h.errorPassthroughService)
}
// 获取订阅信息可能为nil- 提前获取用于后续检查
subscription, _ := middleware2.GetSubscriptionFromContext(c)
@@ -205,11 +202,20 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
sessionKey = "gemini:" + sessionHash
}
// 查询粘性会话绑定的账号 ID
var sessionBoundAccountID int64
if sessionKey != "" {
sessionBoundAccountID, _ = h.gatewayService.GetCachedSessionAccountID(c.Request.Context(), apiKey.GroupID, sessionKey)
}
// 判断是否真的绑定了粘性会话:有 sessionKey 且已经绑定到某个账号
hasBoundSession := sessionKey != "" && sessionBoundAccountID > 0
if platform == service.PlatformGemini {
maxAccountSwitches := h.maxAccountSwitchesGemini
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
var lastFailoverErr *service.UpstreamFailoverError
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, failedAccountIDs, "") // Gemini 不使用会话限制
@@ -302,7 +308,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
requestCtx = context.WithValue(requestCtx, ctxkey.AccountSwitchCount, switchCount)
}
if account.Platform == service.PlatformAntigravity {
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body)
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body, hasBoundSession)
} else {
result, err = h.geminiCompatService.Forward(requestCtx, c, account, body)
}
@@ -314,6 +320,9 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if errors.As(err, &failoverErr) {
failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr
if failoverErr.ForceCacheBilling {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted)
return
@@ -332,22 +341,23 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
clientIP := ip.GetClientIP(c)
// 异步记录使用量subscription已在函数开头获取
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string) {
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string, fcb bool) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
Result: result,
APIKey: apiKey,
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: clientIP,
APIKeyService: h.apiKeyService,
Result: result,
APIKey: apiKey,
User: apiKey.User,
Account: usedAccount,
Subscription: subscription,
UserAgent: ua,
IPAddress: clientIP,
ForceCacheBilling: fcb,
APIKeyService: h.apiKeyService,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account, userAgent, clientIP, forceCacheBilling)
return
}
}
@@ -366,6 +376,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
failedAccountIDs := make(map[int64]struct{})
var lastFailoverErr *service.UpstreamFailoverError
retryWithFallback := false
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
for {
// 选择支持该模型的账号
@@ -457,7 +468,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
requestCtx = context.WithValue(requestCtx, ctxkey.AccountSwitchCount, switchCount)
}
if account.Platform == service.PlatformAntigravity {
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body)
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
} else {
result, err = h.gatewayService.Forward(requestCtx, c, account, parsedReq)
}
@@ -504,6 +515,9 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if errors.As(err, &failoverErr) {
failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr
if failoverErr.ForceCacheBilling {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted)
return
@@ -522,22 +536,23 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
clientIP := ip.GetClientIP(c)
// 异步记录使用量subscription已在函数开头获取
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string) {
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string, fcb bool) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
Result: result,
APIKey: currentAPIKey,
User: currentAPIKey.User,
Account: usedAccount,
Subscription: currentSubscription,
UserAgent: ua,
IPAddress: clientIP,
APIKeyService: h.apiKeyService,
Result: result,
APIKey: currentAPIKey,
User: currentAPIKey.User,
Account: usedAccount,
Subscription: currentSubscription,
UserAgent: ua,
IPAddress: clientIP,
ForceCacheBilling: fcb,
APIKeyService: h.apiKeyService,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account, userAgent, clientIP, forceCacheBilling)
return
}
if !retryWithFallback {
@@ -909,6 +924,8 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
return
}
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), ctxkey.ThinkingEnabled, parsedReq.ThinkingEnabled))
// 验证 model 必填
if parsedReq.Model == "" {

View File

@@ -5,6 +5,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"io"
"log"
@@ -20,6 +21,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/google/uuid"
"github.com/gin-gonic/gin"
)
@@ -207,9 +209,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
// 1) user concurrency slot
streamStarted := false
if h.errorPassthroughService != nil {
service.BindErrorPassthroughService(c, h.errorPassthroughService)
}
userReleaseFunc, err := geminiConcurrency.AcquireUserSlotWithWait(c, authSubject.UserID, authSubject.Concurrency, stream, &streamStarted)
if err != nil {
googleError(c, http.StatusTooManyRequests, err.Error())
@@ -250,6 +249,70 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
if sessionKey != "" {
sessionBoundAccountID, _ = h.gatewayService.GetCachedSessionAccountID(c.Request.Context(), apiKey.GroupID, sessionKey)
}
// === Gemini 内容摘要会话 Fallback 逻辑 ===
// 当原有会话标识无效时sessionBoundAccountID == 0尝试基于内容摘要链匹配
var geminiDigestChain string
var geminiPrefixHash string
var geminiSessionUUID string
useDigestFallback := sessionBoundAccountID == 0
if useDigestFallback {
// 解析 Gemini 请求体
var geminiReq antigravity.GeminiRequest
if err := json.Unmarshal(body, &geminiReq); err == nil && len(geminiReq.Contents) > 0 {
// 生成摘要链
geminiDigestChain = service.BuildGeminiDigestChain(&geminiReq)
if geminiDigestChain != "" {
// 生成前缀 hash
userAgent := c.GetHeader("User-Agent")
clientIP := ip.GetClientIP(c)
platform := ""
if apiKey.Group != nil {
platform = apiKey.Group.Platform
}
geminiPrefixHash = service.GenerateGeminiPrefixHash(
authSubject.UserID,
apiKey.ID,
clientIP,
userAgent,
platform,
modelName,
)
// 查找会话
foundUUID, foundAccountID, found := h.gatewayService.FindGeminiSession(
c.Request.Context(),
derefGroupID(apiKey.GroupID),
geminiPrefixHash,
geminiDigestChain,
)
if found {
sessionBoundAccountID = foundAccountID
geminiSessionUUID = foundUUID
log.Printf("[Gemini] Digest fallback matched: uuid=%s, accountID=%d, chain=%s",
foundUUID[:8], foundAccountID, truncateDigestChain(geminiDigestChain))
// 关键:如果原 sessionKey 为空,使用 prefixHash + uuid 作为 sessionKey
// 这样 SelectAccountWithLoadAwareness 的粘性会话逻辑会优先使用匹配到的账号
if sessionKey == "" {
sessionKey = service.GenerateGeminiDigestSessionKey(geminiPrefixHash, foundUUID)
}
_ = h.gatewayService.BindStickySession(c.Request.Context(), apiKey.GroupID, sessionKey, foundAccountID)
} else {
// 生成新的会话 UUID
geminiSessionUUID = uuid.New().String()
// 为新会话也生成 sessionKey用于后续请求的粘性会话
if sessionKey == "" {
sessionKey = service.GenerateGeminiDigestSessionKey(geminiPrefixHash, geminiSessionUUID)
}
}
}
}
}
// 判断是否真的绑定了粘性会话:有 sessionKey 且已经绑定到某个账号
hasBoundSession := sessionKey != "" && sessionBoundAccountID > 0
isCLI := isGeminiCLIRequest(c, body)
cleanedForUnknownBinding := false
@@ -257,6 +320,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
switchCount := 0
failedAccountIDs := make(map[int64]struct{})
var lastFailoverErr *service.UpstreamFailoverError
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, failedAccountIDs, "") // Gemini 不使用会话限制
@@ -344,7 +408,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
requestCtx = context.WithValue(requestCtx, ctxkey.AccountSwitchCount, switchCount)
}
if account.Platform == service.PlatformAntigravity {
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, modelName, action, stream, body)
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, modelName, action, stream, body, hasBoundSession)
} else {
result, err = h.geminiCompatService.ForwardNative(requestCtx, c, account, modelName, action, stream, body)
}
@@ -355,6 +419,9 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
failedAccountIDs[account.ID] = struct{}{}
if failoverErr.ForceCacheBilling {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
lastFailoverErr = failoverErr
h.handleGeminiFailoverExhausted(c, lastFailoverErr)
@@ -374,8 +441,22 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
userAgent := c.GetHeader("User-Agent")
clientIP := ip.GetClientIP(c)
// 保存 Gemini 内容摘要会话(用于 Fallback 匹配)
if useDigestFallback && geminiDigestChain != "" && geminiPrefixHash != "" {
if err := h.gatewayService.SaveGeminiSession(
c.Request.Context(),
derefGroupID(apiKey.GroupID),
geminiPrefixHash,
geminiDigestChain,
geminiSessionUUID,
account.ID,
); err != nil {
log.Printf("[Gemini] Failed to save digest session: %v", err)
}
}
// 6) record usage async (Gemini 使用长上下文双倍计费)
go func(result *service.ForwardResult, usedAccount *service.Account, ua, ip string) {
go func(result *service.ForwardResult, usedAccount *service.Account, ua, ip string, fcb bool) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@@ -389,11 +470,12 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
IPAddress: ip,
LongContextThreshold: 200000, // Gemini 200K 阈值
LongContextMultiplier: 2.0, // 超出部分双倍计费
ForceCacheBilling: fcb,
APIKeyService: h.apiKeyService,
}); err != nil {
log.Printf("Record usage failed: %v", err)
}
}(result, account, userAgent, clientIP)
}(result, account, userAgent, clientIP, forceCacheBilling)
return
}
}
@@ -556,3 +638,19 @@ func extractGeminiCLISessionHash(c *gin.Context, body []byte) string {
// 如果没有 privileged-user-id直接使用 tmp 目录哈希
return tmpDirHash
}
// truncateDigestChain 截断摘要链用于日志显示
func truncateDigestChain(chain string) string {
if len(chain) <= 50 {
return chain
}
return chain[:50] + "..."
}
// derefGroupID 安全解引用 *int64nil 返回 0
func derefGroupID(groupID *int64) int64 {
if groupID == nil {
return 0
}
return *groupID
}

View File

@@ -149,11 +149,6 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
// Track if we've started streaming (for error handling)
streamStarted := false
// 绑定错误透传服务,允许 service 层在非 failover 错误场景复用规则。
if h.errorPassthroughService != nil {
service.BindErrorPassthroughService(c, h.errorPassthroughService)
}
// Get subscription info (may be nil)
subscription, _ := middleware2.GetSubscriptionFromContext(c)