From 0bb3e4a98cfadab74f3fe66df7474e29edd53f49 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:14:09 +0800 Subject: [PATCH 01/19] feat: add RPM getter methods and schedulability check to Account model --- backend/internal/service/account.go | 71 +++++++++++++++++++ backend/internal/service/account_rpm_test.go | 73 ++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 backend/internal/service/account_rpm_test.go diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index 1864eb54..fe6f432c 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -1137,6 +1137,77 @@ func (a *Account) GetSessionIdleTimeoutMinutes() int { return 5 } +// GetBaseRPM 获取基础 RPM 限制 +// 返回 0 表示未启用 +func (a *Account) GetBaseRPM() int { + if a.Extra == nil { + return 0 + } + if v, ok := a.Extra["base_rpm"]; ok { + return parseExtraInt(v) + } + return 0 +} + +// GetRPMStrategy 获取 RPM 策略 +// "tiered" = 三区模型(默认), "sticky_exempt" = 粘性豁免 +func (a *Account) GetRPMStrategy() string { + if a.Extra == nil { + return "tiered" + } + if v, ok := a.Extra["rpm_strategy"]; ok { + if s, ok := v.(string); ok && s == "sticky_exempt" { + return "sticky_exempt" + } + } + return "tiered" +} + +// GetRPMStickyBuffer 获取 RPM 粘性缓冲数量 +// tiered 模式下的黄区大小,默认为 base_rpm 的 20%(至少 1) +func (a *Account) GetRPMStickyBuffer() int { + if a.Extra == nil { + return 0 + } + if v, ok := a.Extra["rpm_sticky_buffer"]; ok { + val := parseExtraInt(v) + if val > 0 { + return val + } + } + base := a.GetBaseRPM() + buffer := base / 5 + if buffer < 1 && base > 0 { + buffer = 1 + } + return buffer +} + +// CheckRPMSchedulability 根据当前 RPM 计数检查调度状态 +// 复用 WindowCostSchedulability 三态:Schedulable / StickyOnly / NotSchedulable +func (a *Account) CheckRPMSchedulability(currentRPM int) WindowCostSchedulability { + baseRPM := a.GetBaseRPM() + if baseRPM <= 0 { + return WindowCostSchedulable + } + + if currentRPM < baseRPM { + return WindowCostSchedulable + } + + strategy := a.GetRPMStrategy() + if strategy == "sticky_exempt" { + return WindowCostStickyOnly // 粘性豁免无红区 + } + + // tiered: 黄区 + 红区 + buffer := a.GetRPMStickyBuffer() + if currentRPM < baseRPM+buffer { + return WindowCostStickyOnly + } + return WindowCostNotSchedulable +} + // CheckWindowCostSchedulability 根据当前窗口费用检查调度状态 // - 费用 < 阈值: WindowCostSchedulable(可正常调度) // - 费用 >= 阈值 且 < 阈值+预留: WindowCostStickyOnly(仅粘性会话) diff --git a/backend/internal/service/account_rpm_test.go b/backend/internal/service/account_rpm_test.go new file mode 100644 index 00000000..01797763 --- /dev/null +++ b/backend/internal/service/account_rpm_test.go @@ -0,0 +1,73 @@ +package service + +import "testing" + +func TestGetBaseRPM(t *testing.T) { + tests := []struct { + name string + extra map[string]any + expected int + }{ + {"nil extra", nil, 0}, + {"no key", map[string]any{}, 0}, + {"zero", map[string]any{"base_rpm": 0}, 0}, + {"int value", map[string]any{"base_rpm": 15}, 15}, + {"float value", map[string]any{"base_rpm": 15.0}, 15}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &Account{Extra: tt.extra} + if got := a.GetBaseRPM(); got != tt.expected { + t.Errorf("GetBaseRPM() = %d, want %d", got, tt.expected) + } + }) + } +} + +func TestGetRPMStrategy(t *testing.T) { + tests := []struct { + name string + extra map[string]any + expected string + }{ + {"nil extra", nil, "tiered"}, + {"no key", map[string]any{}, "tiered"}, + {"tiered", map[string]any{"rpm_strategy": "tiered"}, "tiered"}, + {"sticky_exempt", map[string]any{"rpm_strategy": "sticky_exempt"}, "sticky_exempt"}, + {"invalid", map[string]any{"rpm_strategy": "foobar"}, "tiered"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &Account{Extra: tt.extra} + if got := a.GetRPMStrategy(); got != tt.expected { + t.Errorf("GetRPMStrategy() = %q, want %q", got, tt.expected) + } + }) + } +} + +func TestCheckRPMSchedulability(t *testing.T) { + tests := []struct { + name string + extra map[string]any + currentRPM int + expected WindowCostSchedulability + }{ + {"disabled", map[string]any{}, 100, WindowCostSchedulable}, + {"green zone", map[string]any{"base_rpm": 15}, 10, WindowCostSchedulable}, + {"yellow zone tiered", map[string]any{"base_rpm": 15}, 15, WindowCostStickyOnly}, + {"red zone tiered", map[string]any{"base_rpm": 15}, 18, WindowCostNotSchedulable}, + {"sticky_exempt at limit", map[string]any{"base_rpm": 15, "rpm_strategy": "sticky_exempt"}, 15, WindowCostStickyOnly}, + {"sticky_exempt over limit", map[string]any{"base_rpm": 15, "rpm_strategy": "sticky_exempt"}, 100, WindowCostStickyOnly}, + {"custom buffer", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 14, WindowCostStickyOnly}, + {"custom buffer red", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 15, WindowCostNotSchedulable}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &Account{Extra: tt.extra} + if got := a.CheckRPMSchedulability(tt.currentRPM); got != tt.expected { + t.Errorf("CheckRPMSchedulability(%d) = %d, want %d", tt.currentRPM, got, tt.expected) + } + }) + } +} From 777be05348b1ca695adf453e8996f7ea6001d13a Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:14:55 +0800 Subject: [PATCH 02/19] feat: add RPMCache interface and Redis implementation with Lua scripts --- backend/internal/repository/rpm_cache.go | 88 ++++++++++++++++++++++++ backend/internal/service/rpm_cache.go | 17 +++++ 2 files changed, 105 insertions(+) create mode 100644 backend/internal/repository/rpm_cache.go create mode 100644 backend/internal/service/rpm_cache.go diff --git a/backend/internal/repository/rpm_cache.go b/backend/internal/repository/rpm_cache.go new file mode 100644 index 00000000..6ec7f739 --- /dev/null +++ b/backend/internal/repository/rpm_cache.go @@ -0,0 +1,88 @@ +package repository + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" +) + +const rpmKeyPrefix = "rpm:" + +// Lua scripts use Redis TIME for server-side minute key calculation +var rpmIncrScript = redis.NewScript(` +local timeResult = redis.call('TIME') +local minuteKey = math.floor(tonumber(timeResult[1]) / 60) +local key = ARGV[1] .. ':' .. minuteKey +local count = redis.call('INCR', key) +if count == 1 then + redis.call('EXPIRE', key, 120) +end +return count +`) + +var rpmGetScript = redis.NewScript(` +local timeResult = redis.call('TIME') +local minuteKey = math.floor(tonumber(timeResult[1]) / 60) +local key = ARGV[1] .. ':' .. minuteKey +local count = redis.call('GET', key) +if count == false then + return 0 +end +return tonumber(count) +`) + +type RPMCacheImpl struct { + rdb *redis.Client +} + +func NewRPMCache(rdb *redis.Client) *RPMCacheImpl { + return &RPMCacheImpl{rdb: rdb} +} + +func rpmKeyBase(accountID int64) string { + return fmt.Sprintf("%s%d", rpmKeyPrefix, accountID) +} + +func (c *RPMCacheImpl) IncrementRPM(ctx context.Context, accountID int64) (int, error) { + result, err := rpmIncrScript.Run(ctx, c.rdb, nil, rpmKeyBase(accountID)).Int() + if err != nil { + return 0, fmt.Errorf("rpm increment: %w", err) + } + return result, nil +} + +func (c *RPMCacheImpl) GetRPM(ctx context.Context, accountID int64) (int, error) { + result, err := rpmGetScript.Run(ctx, c.rdb, nil, rpmKeyBase(accountID)).Int() + if err != nil { + return 0, fmt.Errorf("rpm get: %w", err) + } + return result, nil +} + +func (c *RPMCacheImpl) GetRPMBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) { + if len(accountIDs) == 0 { + return map[int64]int{}, nil + } + + pipe := c.rdb.Pipeline() + cmds := make(map[int64]*redis.Cmd, len(accountIDs)) + for _, id := range accountIDs { + cmds[id] = rpmGetScript.Run(ctx, pipe, nil, rpmKeyBase(id)) + } + + _, err := pipe.Exec(ctx) + if err != nil && err != redis.Nil { + return nil, fmt.Errorf("rpm batch get: %w", err) + } + + result := make(map[int64]int, len(accountIDs)) + for id, cmd := range cmds { + if val, err := cmd.Int(); err == nil { + result[id] = val + } else { + result[id] = 0 + } + } + return result, nil +} diff --git a/backend/internal/service/rpm_cache.go b/backend/internal/service/rpm_cache.go new file mode 100644 index 00000000..07036219 --- /dev/null +++ b/backend/internal/service/rpm_cache.go @@ -0,0 +1,17 @@ +package service + +import "context" + +// RPMCache RPM 计数器缓存接口 +// 用于 Anthropic OAuth/SetupToken 账号的每分钟请求数限制 +type RPMCache interface { + // IncrementRPM 原子递增并返回当前分钟的计数 + // 使用 Redis 服务器时间确定 minute key,避免多实例时钟偏差 + IncrementRPM(ctx context.Context, accountID int64) (count int, err error) + + // GetRPM 获取当前分钟的 RPM 计数 + GetRPM(ctx context.Context, accountID int64) (count int, err error) + + // GetRPMBatch 批量获取多个账号的 RPM 计数(使用 Pipeline) + GetRPMBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) +} From c1c31ed9b23736bb5fa2404e7edd6736a3acdaa8 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:17:19 +0800 Subject: [PATCH 03/19] feat: wire RPMCache into GatewayService and AccountHandler --- backend/cmd/server/wire_gen.go | 5 +++-- backend/internal/handler/admin/account_handler.go | 10 ++++++++++ backend/internal/repository/rpm_cache.go | 3 ++- backend/internal/repository/wire.go | 1 + backend/internal/service/gateway_service.go | 3 +++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 3c44aa72..cc2e8ccd 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -138,7 +138,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig) crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig) sessionLimitCache := repository.ProvideSessionLimitCache(redisClient, configConfig) - accountHandler := admin.NewAccountHandler(adminService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, rateLimitService, accountUsageService, accountTestService, concurrencyService, crsSyncService, sessionLimitCache, compositeTokenCacheInvalidator) + rpmCache := repository.NewRPMCache(redisClient) + accountHandler := admin.NewAccountHandler(adminService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, rateLimitService, accountUsageService, accountTestService, concurrencyService, crsSyncService, sessionLimitCache, rpmCache, compositeTokenCacheInvalidator) adminAnnouncementHandler := admin.NewAnnouncementHandler(announcementService) dataManagementService := service.NewDataManagementService() dataManagementHandler := admin.NewDataManagementHandler(dataManagementService) @@ -160,7 +161,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) claudeTokenProvider := service.NewClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService) digestSessionStore := service.NewDigestSessionStore() - gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, digestSessionStore) + gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore) openAITokenProvider := service.NewOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService) openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index 9b732f9c..6158fa01 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -53,6 +53,7 @@ type AccountHandler struct { concurrencyService *service.ConcurrencyService crsSyncService *service.CRSSyncService sessionLimitCache service.SessionLimitCache + rpmCache service.RPMCache tokenCacheInvalidator service.TokenCacheInvalidator } @@ -69,6 +70,7 @@ func NewAccountHandler( concurrencyService *service.ConcurrencyService, crsSyncService *service.CRSSyncService, sessionLimitCache service.SessionLimitCache, + rpmCache service.RPMCache, tokenCacheInvalidator service.TokenCacheInvalidator, ) *AccountHandler { return &AccountHandler{ @@ -83,6 +85,7 @@ func NewAccountHandler( concurrencyService: concurrencyService, crsSyncService: crsSyncService, sessionLimitCache: sessionLimitCache, + rpmCache: rpmCache, tokenCacheInvalidator: tokenCacheInvalidator, } } @@ -154,6 +157,7 @@ type AccountWithConcurrency struct { // 以下字段仅对 Anthropic OAuth/SetupToken 账号有效,且仅在启用相应功能时返回 CurrentWindowCost *float64 `json:"current_window_cost,omitempty"` // 当前窗口费用 ActiveSessions *int `json:"active_sessions,omitempty"` // 当前活跃会话数 + CurrentRPM *int `json:"current_rpm,omitempty"` // 当前分钟 RPM 计数 } func (h *AccountHandler) buildAccountResponseWithRuntime(ctx context.Context, account *service.Account) AccountWithConcurrency { @@ -189,6 +193,12 @@ func (h *AccountHandler) buildAccountResponseWithRuntime(ctx context.Context, ac } } } + + if h.rpmCache != nil && account.GetBaseRPM() > 0 { + if rpm, err := h.rpmCache.GetRPM(ctx, account.ID); err == nil { + item.CurrentRPM = &rpm + } + } } return item diff --git a/backend/internal/repository/rpm_cache.go b/backend/internal/repository/rpm_cache.go index 6ec7f739..332c30c9 100644 --- a/backend/internal/repository/rpm_cache.go +++ b/backend/internal/repository/rpm_cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/Wei-Shaw/sub2api/internal/service" "github.com/redis/go-redis/v9" ) @@ -36,7 +37,7 @@ type RPMCacheImpl struct { rdb *redis.Client } -func NewRPMCache(rdb *redis.Client) *RPMCacheImpl { +func NewRPMCache(rdb *redis.Client) service.RPMCache { return &RPMCacheImpl{rdb: rdb} } diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index eb8ce3fb..2344035c 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -79,6 +79,7 @@ var ProviderSet = wire.NewSet( NewTimeoutCounterCache, ProvideConcurrencyCache, ProvideSessionLimitCache, + NewRPMCache, NewDashboardCache, NewEmailCache, NewIdentityCache, diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 0ba9e093..6da71cf4 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -520,6 +520,7 @@ type GatewayService struct { concurrencyService *ConcurrencyService claudeTokenProvider *ClaudeTokenProvider sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken) + rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken) userGroupRateCache *gocache.Cache userGroupRateSF singleflight.Group modelsListCache *gocache.Cache @@ -549,6 +550,7 @@ func NewGatewayService( deferredService *DeferredService, claudeTokenProvider *ClaudeTokenProvider, sessionLimitCache SessionLimitCache, + rpmCache RPMCache, digestStore *DigestSessionStore, ) *GatewayService { userGroupRateTTL := resolveUserGroupRateCacheTTL(cfg) @@ -574,6 +576,7 @@ func NewGatewayService( deferredService: deferredService, claudeTokenProvider: claudeTokenProvider, sessionLimitCache: sessionLimitCache, + rpmCache: rpmCache, userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute), modelsListCache: gocache.New(modelsListTTL, time.Minute), modelsListCacheTTL: modelsListTTL, From 678c3ae132db374d5b39c07a487d77313227f93e Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:23:57 +0800 Subject: [PATCH 04/19] feat: integrate RPM scheduling checks into account selection flow --- backend/internal/service/gateway_service.go | 107 ++++++++++++++++++-- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 6da71cf4..2d043474 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -1157,6 +1157,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro return nil, errors.New("no available accounts") } ctx = s.withWindowCostPrefetch(ctx, accounts) + ctx = s.withRPMPrefetch(ctx, accounts) isExcluded := func(accountID int64) bool { if excludedIDs == nil { @@ -1232,6 +1233,10 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro filteredWindowCost++ continue } + // RPM 检查(非粘性会话路径) + if !s.isAccountSchedulableForRPM(ctx, account, false) { + continue + } routingCandidates = append(routingCandidates, account) } @@ -1255,7 +1260,9 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro s.isAccountAllowedForPlatform(stickyAccount, platform, useMixed) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, stickyAccount, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, stickyAccount, requestedModel) && - s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) { // 粘性会话窗口费用检查 + s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) && + + s.isAccountSchedulableForRPM(ctx, stickyAccount, true) { // 粘性会话窗口费用+RPM 检查 result, err := s.tryAcquireAccountSlot(ctx, stickyAccountID, stickyAccount.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 @@ -1409,7 +1416,9 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro s.isAccountAllowedForPlatform(account, platform, useMixed) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && - s.isAccountSchedulableForWindowCost(ctx, account, true) { // 粘性会话窗口费用检查 + s.isAccountSchedulableForWindowCost(ctx, account, true) && + + s.isAccountSchedulableForRPM(ctx, account, true) { // 粘性会话窗口费用+RPM 检查 result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 @@ -1475,6 +1484,10 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if !s.isAccountSchedulableForWindowCost(ctx, acc, false) { continue } + // RPM 检查(非粘性会话路径) + if !s.isAccountSchedulableForRPM(ctx, acc, false) { + continue + } candidates = append(candidates, acc) } @@ -2158,6 +2171,76 @@ checkSchedulability: return true } +// rpmPrefetchContextKey is the context key for prefetched RPM counts. +type rpmPrefetchContextKeyType struct{} + +var rpmPrefetchContextKey = rpmPrefetchContextKeyType{} + +func rpmFromPrefetchContext(ctx context.Context, accountID int64) (int, bool) { + if v, ok := ctx.Value(rpmPrefetchContextKey).(map[int64]int); ok { + count, found := v[accountID] + return count, found + } + return 0, false +} + +// withRPMPrefetch 批量预取所有候选账号的 RPM 计数 +func (s *GatewayService) withRPMPrefetch(ctx context.Context, accounts []Account) context.Context { + if s.rpmCache == nil { + return ctx + } + + var ids []int64 + for i := range accounts { + if accounts[i].IsAnthropicOAuthOrSetupToken() && accounts[i].GetBaseRPM() > 0 { + ids = append(ids, accounts[i].ID) + } + } + if len(ids) == 0 { + return ctx + } + + counts, err := s.rpmCache.GetRPMBatch(ctx, ids) + if err != nil { + return ctx // 失败开放 + } + return context.WithValue(ctx, rpmPrefetchContextKey, counts) +} + +// isAccountSchedulableForRPM 检查账号是否可根据 RPM 进行调度 +// 仅适用于 Anthropic OAuth/SetupToken 账号 +func (s *GatewayService) isAccountSchedulableForRPM(ctx context.Context, account *Account, isSticky bool) bool { + if !account.IsAnthropicOAuthOrSetupToken() { + return true + } + baseRPM := account.GetBaseRPM() + if baseRPM <= 0 { + return true + } + + // 尝试从预取缓存获取 + var currentRPM int + if count, ok := rpmFromPrefetchContext(ctx, account.ID); ok { + currentRPM = count + } else if s.rpmCache != nil { + if count, err := s.rpmCache.GetRPM(ctx, account.ID); err == nil { + currentRPM = count + } + // 失败开放:GetRPM 错误时允许调度 + } + + schedulability := account.CheckRPMSchedulability(currentRPM) + switch schedulability { + case WindowCostSchedulable: + return true + case WindowCostStickyOnly: + return isSticky + case WindowCostNotSchedulable: + return false + } + return true +} + // checkAndRegisterSession 检查并注册会话,用于会话数量限制 // 仅适用于 Anthropic OAuth/SetupToken 账号 // sessionID: 会话标识符(使用粘性会话的 hash) @@ -2492,7 +2575,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) { + if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { if s.debugModelRoutingEnabled() { logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) } @@ -2542,6 +2625,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForRPM(ctx, acc, false) { + continue + } if selected == nil { selected = acc continue @@ -2592,7 +2678,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) { + if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { return account, nil } } @@ -2631,6 +2717,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForRPM(ctx, acc, false) { + continue + } if selected == nil { selected = acc continue @@ -2700,7 +2789,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) { + if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { if s.debugModelRoutingEnabled() { logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy mixed routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) @@ -2752,6 +2841,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForRPM(ctx, acc, false) { + continue + } if selected == nil { selected = acc continue @@ -2802,7 +2894,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) { + if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { return account, nil } @@ -2843,6 +2935,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForRPM(ctx, acc, false) { + continue + } if selected == nil { selected = acc continue From f648b8e0268450ddf04dfcb1de15cb39825c7f52 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:25:50 +0800 Subject: [PATCH 05/19] feat: increment RPM counter before request forwarding --- backend/internal/handler/gateway_handler.go | 14 ++++++++++++++ backend/internal/service/gateway_service.go | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 9262df7e..b4ac664b 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -366,6 +366,13 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) + // RPM 计数递增(调度成功后、Forward 前) + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { + // 失败开放:不阻塞请求 + } + } + // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() @@ -549,6 +556,13 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) + // RPM 计数递增(调度成功后、Forward 前) + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { + // 失败开放:不阻塞请求 + } + } + // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 2d043474..d5f1eb64 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2241,6 +2241,15 @@ func (s *GatewayService) isAccountSchedulableForRPM(ctx context.Context, account return true } +// IncrementAccountRPM increments the RPM counter for the given account. +func (s *GatewayService) IncrementAccountRPM(ctx context.Context, accountID int64) error { + if s.rpmCache == nil { + return nil + } + _, err := s.rpmCache.IncrementRPM(ctx, accountID) + return err +} + // checkAndRegisterSession 检查并注册会话,用于会话数量限制 // 仅适用于 Anthropic OAuth/SetupToken 账号 // sessionID: 会话标识符(使用粘性会话的 hash) From 37fa980565cd0198b196ddbf39184629154e3590 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:26:45 +0800 Subject: [PATCH 06/19] feat: flatten RPM config fields in Account DTO --- backend/internal/handler/dto/mappers.go | 7 +++++++ backend/internal/handler/dto/types.go | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/backend/internal/handler/dto/mappers.go b/backend/internal/handler/dto/mappers.go index 49c74522..d811c7be 100644 --- a/backend/internal/handler/dto/mappers.go +++ b/backend/internal/handler/dto/mappers.go @@ -209,6 +209,13 @@ func AccountFromServiceShallow(a *service.Account) *Account { if idleTimeout := a.GetSessionIdleTimeoutMinutes(); idleTimeout > 0 { out.SessionIdleTimeoutMin = &idleTimeout } + if rpm := a.GetBaseRPM(); rpm > 0 { + out.BaseRPM = &rpm + strategy := a.GetRPMStrategy() + out.RPMStrategy = &strategy + buffer := a.GetRPMStickyBuffer() + out.RPMStickyBuffer = &buffer + } // TLS指纹伪装开关 if a.IsTLSFingerprintEnabled() { enabled := true diff --git a/backend/internal/handler/dto/types.go b/backend/internal/handler/dto/types.go index 73243397..c575c232 100644 --- a/backend/internal/handler/dto/types.go +++ b/backend/internal/handler/dto/types.go @@ -153,6 +153,12 @@ type Account struct { MaxSessions *int `json:"max_sessions,omitempty"` SessionIdleTimeoutMin *int `json:"session_idle_timeout_minutes,omitempty"` + // RPM 限制(仅 Anthropic OAuth/SetupToken 账号有效) + // 从 extra 字段提取,方便前端显示和编辑 + BaseRPM *int `json:"base_rpm,omitempty"` + RPMStrategy *string `json:"rpm_strategy,omitempty"` + RPMStickyBuffer *int `json:"rpm_sticky_buffer,omitempty"` + // TLS指纹伪装(仅 Anthropic OAuth/SetupToken 账号有效) // 从 extra 字段提取,方便前端显示和编辑 EnableTLSFingerprint *bool `json:"enable_tls_fingerprint,omitempty"` From 953c5036bfcb40922c9769d3c776f3e2d33b0e46 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:29:45 +0800 Subject: [PATCH 07/19] feat: add RPM types and i18n translations --- frontend/src/i18n/locales/en.ts | 18 +++++++++++++++++- frontend/src/i18n/locales/zh.ts | 18 +++++++++++++++++- frontend/src/types/index.ts | 6 ++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index a4087237..f730a04f 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -1613,7 +1613,12 @@ export default { sessions: { full: 'Active sessions full, new sessions must wait (idle timeout: {idle} min)', normal: 'Active sessions normal (idle timeout: {idle} min)' - } + }, + rpm: { + full: 'RPM limit reached', + warning: 'RPM approaching limit', + normal: 'RPM normal', + }, }, tempUnschedulable: { title: 'Temp Unschedulable', @@ -1828,6 +1833,17 @@ export default { idleTimeoutPlaceholder: '5', idleTimeoutHint: 'Sessions will be released after idle timeout' }, + rpmLimit: { + label: 'RPM Limit', + hint: 'Limit requests per minute to protect upstream accounts', + baseRpm: 'Base RPM', + baseRpmPlaceholder: '15', + baseRpmHint: 'Max requests per minute, 0 or empty means no limit', + strategy: 'RPM Strategy', + strategyTiered: 'Tiered Model', + strategyStickyExempt: 'Sticky Exempt', + strategyHint: 'Tiered: gradually restrict when exceeded; Sticky Exempt: existing sessions unrestricted', + }, tlsFingerprint: { label: 'TLS Fingerprint Simulation', hint: 'Simulate Node.js/Claude Code client TLS fingerprint' diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index e338457d..b2e72fd8 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -1664,7 +1664,12 @@ export default { sessions: { full: '活跃会话已满,新会话需等待(空闲超时:{idle}分钟)', normal: '活跃会话正常(空闲超时:{idle}分钟)' - } + }, + rpm: { + full: '已达 RPM 上限', + warning: 'RPM 接近上限', + normal: 'RPM 正常', + }, }, clearRateLimit: '清除速率限制', testConnection: '测试连接', @@ -1971,6 +1976,17 @@ export default { idleTimeoutPlaceholder: '5', idleTimeoutHint: '会话空闲超时后自动释放' }, + rpmLimit: { + label: 'RPM 限制', + hint: '限制每分钟请求数量,保护上游账号', + baseRpm: '基础 RPM', + baseRpmPlaceholder: '15', + baseRpmHint: '每分钟最大请求数,0 或留空表示不限制', + strategy: 'RPM 策略', + strategyTiered: '三区模型', + strategyStickyExempt: '粘性豁免', + strategyHint: '三区模型: 超限后逐步限制; 粘性豁免: 已有会话不受限', + }, tlsFingerprint: { label: 'TLS 指纹模拟', hint: '模拟 Node.js/Claude Code 客户端的 TLS 指纹' diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index a8497985..1db50165 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -661,6 +661,11 @@ export interface Account { max_sessions?: number | null session_idle_timeout_minutes?: number | null + // RPM 限制(仅 Anthropic OAuth/SetupToken 账号有效) + base_rpm?: number | null + rpm_strategy?: string | null + rpm_sticky_buffer?: number | null + // TLS指纹伪装(仅 Anthropic OAuth/SetupToken 账号有效) enable_tls_fingerprint?: boolean | null @@ -675,6 +680,7 @@ export interface Account { // 运行时状态(仅当启用对应限制时返回) current_window_cost?: number | null // 当前窗口费用 active_sessions?: number | null // 当前活跃会话数 + current_rpm?: number | null // 当前分钟 RPM 计数 } // Account Usage types From e1c9016d90e561a4e72fb1fe0fa34c2768586d26 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:31:15 +0800 Subject: [PATCH 08/19] feat: add RPM config to EditAccountModal --- .../components/account/EditAccountModal.vue | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index efc34f39..3b6c0789 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -946,6 +946,56 @@ + +
+
+
+ +

+ {{ t('admin.accounts.quotaControl.rpmLimit.hint') }} +

+
+ +
+ +
+
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.baseRpmHint') }}

+
+
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.strategyHint') }}

+
+
+
+
@@ -1251,6 +1301,9 @@ const windowCostStickyReserve = ref(null) const sessionLimitEnabled = ref(false) const maxSessions = ref(null) const sessionIdleTimeout = ref(null) +const rpmLimitEnabled = ref(false) +const baseRpm = ref(null) +const rpmStrategy = ref('tiered') const tlsFingerprintEnabled = ref(false) const sessionIdMaskingEnabled = ref(false) const cacheTTLOverrideEnabled = ref(false) @@ -1710,6 +1763,9 @@ function loadQuotaControlSettings(account: Account) { sessionLimitEnabled.value = false maxSessions.value = null sessionIdleTimeout.value = null + rpmLimitEnabled.value = false + baseRpm.value = null + rpmStrategy.value = 'tiered' tlsFingerprintEnabled.value = false sessionIdMaskingEnabled.value = false cacheTTLOverrideEnabled.value = false @@ -1733,6 +1789,13 @@ function loadQuotaControlSettings(account: Account) { sessionIdleTimeout.value = account.session_idle_timeout_minutes ?? 5 } + // RPM limit + if (account.base_rpm != null && account.base_rpm > 0) { + rpmLimitEnabled.value = true + baseRpm.value = account.base_rpm + rpmStrategy.value = account.rpm_strategy || 'tiered' + } + // Load TLS fingerprint setting if (account.enable_tls_fingerprint === true) { tlsFingerprintEnabled.value = true @@ -2043,6 +2106,16 @@ const handleSubmit = async () => { delete newExtra.session_idle_timeout_minutes } + // RPM limit settings + if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { + newExtra.base_rpm = baseRpm.value + newExtra.rpm_strategy = rpmStrategy.value + } else { + delete newExtra.base_rpm + delete newExtra.rpm_strategy + delete newExtra.rpm_sticky_buffer + } + // TLS fingerprint setting if (tlsFingerprintEnabled.value) { newExtra.enable_tls_fingerprint = true From 856c95538659b6ff93a4a18063d7afbc2389db4b Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:33:01 +0800 Subject: [PATCH 09/19] feat: add RPM config to CreateAccountModal --- .../components/account/CreateAccountModal.vue | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index a0ad4eb4..92ca189e 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1536,6 +1536,56 @@
+ +
+
+
+ +

+ {{ t('admin.accounts.quotaControl.rpmLimit.hint') }} +

+
+ +
+ +
+
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.baseRpmHint') }}

+
+
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.strategyHint') }}

+
+
+
+
@@ -2393,6 +2443,9 @@ const windowCostStickyReserve = ref(null) const sessionLimitEnabled = ref(false) const maxSessions = ref(null) const sessionIdleTimeout = ref(null) +const rpmLimitEnabled = ref(false) +const baseRpm = ref(null) +const rpmStrategy = ref('tiered') const tlsFingerprintEnabled = ref(false) const sessionIdMaskingEnabled = ref(false) const cacheTTLOverrideEnabled = ref(false) @@ -3017,6 +3070,9 @@ const resetForm = () => { sessionLimitEnabled.value = false maxSessions.value = null sessionIdleTimeout.value = null + rpmLimitEnabled.value = false + baseRpm.value = null + rpmStrategy.value = 'tiered' tlsFingerprintEnabled.value = false sessionIdMaskingEnabled.value = false cacheTTLOverrideEnabled.value = false @@ -3926,6 +3982,12 @@ const handleAnthropicExchange = async (authCode: string) => { extra.session_idle_timeout_minutes = sessionIdleTimeout.value ?? 5 } + // Add RPM limit settings + if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { + extra.base_rpm = baseRpm.value + extra.rpm_strategy = rpmStrategy.value + } + // Add TLS fingerprint settings if (tlsFingerprintEnabled.value) { extra.enable_tls_fingerprint = true @@ -4024,6 +4086,12 @@ const handleCookieAuth = async (sessionKey: string) => { extra.session_idle_timeout_minutes = sessionIdleTimeout.value ?? 5 } + // Add RPM limit settings + if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { + extra.base_rpm = baseRpm.value + extra.rpm_strategy = rpmStrategy.value + } + // Add TLS fingerprint settings if (tlsFingerprintEnabled.value) { extra.enable_tls_fingerprint = true From 28ca7df2978576230d968dac733b003dbb338f0e Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:34:33 +0800 Subject: [PATCH 10/19] feat: add RPM display to AccountCapacityCell --- .../account/AccountCapacityCell.vue | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/frontend/src/components/account/AccountCapacityCell.vue b/frontend/src/components/account/AccountCapacityCell.vue index ae338aca..849bacb3 100644 --- a/frontend/src/components/account/AccountCapacityCell.vue +++ b/frontend/src/components/account/AccountCapacityCell.vue @@ -52,6 +52,24 @@ {{ account.max_sessions }}
+ + +
+ + + + + {{ currentRPM }} + / + {{ account.base_rpm }} + +
@@ -191,6 +209,50 @@ const sessionLimitTooltip = computed(() => { return t('admin.accounts.capacity.sessions.normal', { idle }) }) +// 是否显示 RPM 限制 +const showRpmLimit = computed(() => { + return ( + isAnthropicOAuthOrSetupToken.value && + props.account.base_rpm !== undefined && + props.account.base_rpm !== null && + props.account.base_rpm > 0 + ) +}) + +// 当前 RPM 计数 +const currentRPM = computed(() => props.account.current_rpm ?? 0) + +// RPM 状态样式 +const rpmClass = computed(() => { + if (!showRpmLimit.value) return '' + + const current = currentRPM.value + const base = props.account.base_rpm ?? 0 + if (base <= 0) return 'bg-gray-100 text-gray-600 dark:bg-gray-700 dark:text-gray-400' + + const strategy = props.account.rpm_strategy || 'tiered' + if (strategy === 'tiered') { + const buffer = props.account.rpm_sticky_buffer ?? Math.max(1, Math.floor(base / 5)) + if (current >= base + buffer) return 'bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-400' + if (current >= base) return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + } else { + if (current >= base) return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + } + if (current >= base * 0.8) return 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900/30 dark:text-yellow-400' + return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/30 dark:text-emerald-400' +}) + +// RPM 提示文字 +const rpmTooltip = computed(() => { + if (!showRpmLimit.value) return '' + + const current = currentRPM.value + const base = props.account.base_rpm ?? 0 + if (current >= base) return t('admin.accounts.capacity.rpm.full') + if (current >= base * 0.8) return t('admin.accounts.capacity.rpm.warning') + return t('admin.accounts.capacity.rpm.normal') +}) + // 格式化费用显示 const formatCost = (value: number | null | undefined) => { if (value === null || value === undefined) return '0' From 607237571f9b67d639e3847c9b54979f1689bdd5 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 10:16:34 +0800 Subject: [PATCH 11/19] fix: address code review issues for RPM limiting feature - Use TxPipeline (MULTI/EXEC) instead of Pipeline for atomic INCR+EXPIRE - Filter negative values in GetBaseRPM(), update test expectation - Add RPM batch query (GetRPMBatch) to account List API - Add warn logs for RPM increment failures in gateway handler - Reset enableRpmLimit on BulkEditAccountModal close - Use union type 'tiered' | 'sticky_exempt' for rpmStrategy refs - Add design decision comments for rdb.Time() RTT trade-off --- .../internal/handler/admin/account_handler.go | 24 ++- backend/internal/handler/gateway_handler.go | 8 +- backend/internal/repository/rpm_cache.go | 122 ++++++++++----- backend/internal/service/account.go | 7 +- backend/internal/service/account_rpm_test.go | 42 +++++ backend/internal/service/gateway_service.go | 6 + .../account/AccountCapacityCell.vue | 71 ++++++--- .../account/BulkEditAccountModal.vue | 143 +++++++++++++++++- .../components/account/CreateAccountModal.vue | 64 +++++++- .../components/account/EditAccountModal.vue | 66 +++++++- frontend/src/i18n/locales/en.ts | 12 ++ frontend/src/i18n/locales/zh.ts | 12 ++ frontend/src/views/admin/AccountsView.vue | 12 +- 13 files changed, 509 insertions(+), 80 deletions(-) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index 6158fa01..c41f37c1 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -241,9 +241,10 @@ func (h *AccountHandler) List(c *gin.Context) { concurrencyCounts = make(map[int64]int) } - // 识别需要查询窗口费用和会话数的账号(Anthropic OAuth/SetupToken 且启用了相应功能) + // 识别需要查询窗口费用、会话数和 RPM 的账号(Anthropic OAuth/SetupToken 且启用了相应功能) windowCostAccountIDs := make([]int64, 0) sessionLimitAccountIDs := make([]int64, 0) + rpmAccountIDs := make([]int64, 0) sessionIdleTimeouts := make(map[int64]time.Duration) // 各账号的会话空闲超时配置 for i := range accounts { acc := &accounts[i] @@ -255,12 +256,24 @@ func (h *AccountHandler) List(c *gin.Context) { sessionLimitAccountIDs = append(sessionLimitAccountIDs, acc.ID) sessionIdleTimeouts[acc.ID] = time.Duration(acc.GetSessionIdleTimeoutMinutes()) * time.Minute } + if acc.GetBaseRPM() > 0 { + rpmAccountIDs = append(rpmAccountIDs, acc.ID) + } } } - // 并行获取窗口费用和活跃会话数 + // 并行获取窗口费用、活跃会话数和 RPM 计数 var windowCosts map[int64]float64 var activeSessions map[int64]int + var rpmCounts map[int64]int + + // 获取 RPM 计数(批量查询) + if len(rpmAccountIDs) > 0 && h.rpmCache != nil { + rpmCounts, _ = h.rpmCache.GetRPMBatch(c.Request.Context(), rpmAccountIDs) + if rpmCounts == nil { + rpmCounts = make(map[int64]int) + } + } // 获取活跃会话数(批量查询,传入各账号的 idleTimeout 配置) if len(sessionLimitAccountIDs) > 0 && h.sessionLimitCache != nil { @@ -321,6 +334,13 @@ func (h *AccountHandler) List(c *gin.Context) { } } + // 添加 RPM 计数(仅当启用时) + if rpmCounts != nil { + if rpm, ok := rpmCounts[acc.ID]; ok { + item.CurrentRPM = &rpm + } + } + result[i] = item } diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index b4ac664b..b68a46fa 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -368,8 +368,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // RPM 计数递增(调度成功后、Forward 前) if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { - if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { - // 失败开放:不阻塞请求 + if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { + reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) } } @@ -558,8 +558,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // RPM 计数递增(调度成功后、Forward 前) if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { - if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { - // 失败开放:不阻塞请求 + if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { + reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) } } diff --git a/backend/internal/repository/rpm_cache.go b/backend/internal/repository/rpm_cache.go index 332c30c9..80954f52 100644 --- a/backend/internal/repository/rpm_cache.go +++ b/backend/internal/repository/rpm_cache.go @@ -2,78 +2,130 @@ package repository import ( "context" + "errors" "fmt" + "strconv" + "time" "github.com/Wei-Shaw/sub2api/internal/service" "github.com/redis/go-redis/v9" ) -const rpmKeyPrefix = "rpm:" +// RPM 计数器缓存常量定义 +// +// 设计说明: +// 使用 Redis 简单计数器跟踪每个账号每分钟的请求数: +// - Key: rpm:{accountID}:{minuteTimestamp} +// - Value: 当前分钟内的请求计数 +// - TTL: 120 秒(覆盖当前分钟 + 一定冗余) +// +// 使用 TxPipeline(MULTI/EXEC)执行 INCR + EXPIRE,保证原子性且兼容 Redis Cluster。 +// 通过 rdb.Time() 获取服务端时间,避免多实例时钟不同步。 +// +// 设计决策: +// - TxPipeline vs Pipeline:Pipeline 仅合并发送但不保证原子,TxPipeline 使用 MULTI/EXEC 事务保证原子执行。 +// - rdb.Time() 单独调用:Pipeline/TxPipeline 中无法引用前一命令的结果,因此 TIME 必须单独调用(2 RTT)。 +// Lua 脚本可以做到 1 RTT,但在 Redis Cluster 中动态拼接 key 存在 CROSSSLOT 风险,选择安全性优先。 +const ( + // RPM 计数器键前缀 + // 格式: rpm:{accountID}:{minuteTimestamp} + rpmKeyPrefix = "rpm:" -// Lua scripts use Redis TIME for server-side minute key calculation -var rpmIncrScript = redis.NewScript(` -local timeResult = redis.call('TIME') -local minuteKey = math.floor(tonumber(timeResult[1]) / 60) -local key = ARGV[1] .. ':' .. minuteKey -local count = redis.call('INCR', key) -if count == 1 then - redis.call('EXPIRE', key, 120) -end -return count -`) - -var rpmGetScript = redis.NewScript(` -local timeResult = redis.call('TIME') -local minuteKey = math.floor(tonumber(timeResult[1]) / 60) -local key = ARGV[1] .. ':' .. minuteKey -local count = redis.call('GET', key) -if count == false then - return 0 -end -return tonumber(count) -`) + // RPM 计数器 TTL(120 秒,覆盖当前分钟窗口 + 冗余) + rpmKeyTTL = 120 * time.Second +) +// RPMCacheImpl RPM 计数器缓存 Redis 实现 type RPMCacheImpl struct { rdb *redis.Client } +// NewRPMCache 创建 RPM 计数器缓存 func NewRPMCache(rdb *redis.Client) service.RPMCache { return &RPMCacheImpl{rdb: rdb} } -func rpmKeyBase(accountID int64) string { - return fmt.Sprintf("%s%d", rpmKeyPrefix, accountID) +// currentMinuteKey 获取当前分钟的完整 Redis key +// 使用 rdb.Time() 获取 Redis 服务端时间,避免多实例时钟偏差 +func (c *RPMCacheImpl) currentMinuteKey(ctx context.Context, accountID int64) (string, error) { + serverTime, err := c.rdb.Time(ctx).Result() + if err != nil { + return "", fmt.Errorf("redis TIME: %w", err) + } + minuteTS := serverTime.Unix() / 60 + return fmt.Sprintf("%s%d:%d", rpmKeyPrefix, accountID, minuteTS), nil } +// currentMinuteSuffix 获取当前分钟时间戳后缀(供批量操作使用) +// 使用 rdb.Time() 获取 Redis 服务端时间 +func (c *RPMCacheImpl) currentMinuteSuffix(ctx context.Context) (string, error) { + serverTime, err := c.rdb.Time(ctx).Result() + if err != nil { + return "", fmt.Errorf("redis TIME: %w", err) + } + minuteTS := serverTime.Unix() / 60 + return strconv.FormatInt(minuteTS, 10), nil +} + +// IncrementRPM 原子递增并返回当前分钟的计数 +// 使用 TxPipeline (MULTI/EXEC) 执行 INCR + EXPIRE,保证原子性且兼容 Redis Cluster func (c *RPMCacheImpl) IncrementRPM(ctx context.Context, accountID int64) (int, error) { - result, err := rpmIncrScript.Run(ctx, c.rdb, nil, rpmKeyBase(accountID)).Int() + key, err := c.currentMinuteKey(ctx, accountID) if err != nil { return 0, fmt.Errorf("rpm increment: %w", err) } - return result, nil + + // 使用 TxPipeline (MULTI/EXEC) 保证 INCR + EXPIRE 原子执行 + // EXPIRE 幂等,每次都设置不影响正确性 + pipe := c.rdb.TxPipeline() + incrCmd := pipe.Incr(ctx, key) + pipe.Expire(ctx, key, rpmKeyTTL) + + if _, err := pipe.Exec(ctx); err != nil { + return 0, fmt.Errorf("rpm increment: %w", err) + } + + return int(incrCmd.Val()), nil } +// GetRPM 获取当前分钟的 RPM 计数 func (c *RPMCacheImpl) GetRPM(ctx context.Context, accountID int64) (int, error) { - result, err := rpmGetScript.Run(ctx, c.rdb, nil, rpmKeyBase(accountID)).Int() + key, err := c.currentMinuteKey(ctx, accountID) if err != nil { return 0, fmt.Errorf("rpm get: %w", err) } - return result, nil + + val, err := c.rdb.Get(ctx, key).Int() + if errors.Is(err, redis.Nil) { + return 0, nil // 当前分钟无记录 + } + if err != nil { + return 0, fmt.Errorf("rpm get: %w", err) + } + return val, nil } +// GetRPMBatch 批量获取多个账号的 RPM 计数(使用 Pipeline) func (c *RPMCacheImpl) GetRPMBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) { if len(accountIDs) == 0 { return map[int64]int{}, nil } - pipe := c.rdb.Pipeline() - cmds := make(map[int64]*redis.Cmd, len(accountIDs)) - for _, id := range accountIDs { - cmds[id] = rpmGetScript.Run(ctx, pipe, nil, rpmKeyBase(id)) + // 获取当前分钟后缀 + minuteSuffix, err := c.currentMinuteSuffix(ctx) + if err != nil { + return nil, fmt.Errorf("rpm batch get: %w", err) } - _, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { + // 使用 Pipeline 批量 GET + pipe := c.rdb.Pipeline() + cmds := make(map[int64]*redis.StringCmd, len(accountIDs)) + for _, id := range accountIDs { + key := fmt.Sprintf("%s%d:%s", rpmKeyPrefix, id, minuteSuffix) + cmds[id] = pipe.Get(ctx, key) + } + + if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) { return nil, fmt.Errorf("rpm batch get: %w", err) } diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index fe6f432c..7f33c61d 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -1138,13 +1138,16 @@ func (a *Account) GetSessionIdleTimeoutMinutes() int { } // GetBaseRPM 获取基础 RPM 限制 -// 返回 0 表示未启用 +// 返回 0 表示未启用(负数视为无效配置,按 0 处理) func (a *Account) GetBaseRPM() int { if a.Extra == nil { return 0 } if v, ok := a.Extra["base_rpm"]; ok { - return parseExtraInt(v) + val := parseExtraInt(v) + if val > 0 { + return val + } } return 0 } diff --git a/backend/internal/service/account_rpm_test.go b/backend/internal/service/account_rpm_test.go index 01797763..b08b54a2 100644 --- a/backend/internal/service/account_rpm_test.go +++ b/backend/internal/service/account_rpm_test.go @@ -13,6 +13,9 @@ func TestGetBaseRPM(t *testing.T) { {"zero", map[string]any{"base_rpm": 0}, 0}, {"int value", map[string]any{"base_rpm": 15}, 15}, {"float value", map[string]any{"base_rpm": 15.0}, 15}, + {"string value", map[string]any{"base_rpm": "15"}, 15}, + {"negative value", map[string]any{"base_rpm": -5}, 0}, + {"int64 value", map[string]any{"base_rpm": int64(20)}, 20}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -35,6 +38,8 @@ func TestGetRPMStrategy(t *testing.T) { {"tiered", map[string]any{"rpm_strategy": "tiered"}, "tiered"}, {"sticky_exempt", map[string]any{"rpm_strategy": "sticky_exempt"}, "sticky_exempt"}, {"invalid", map[string]any{"rpm_strategy": "foobar"}, "tiered"}, + {"empty string fallback", map[string]any{"rpm_strategy": ""}, "tiered"}, + {"numeric value fallback", map[string]any{"rpm_strategy": 123}, "tiered"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -61,6 +66,13 @@ func TestCheckRPMSchedulability(t *testing.T) { {"sticky_exempt over limit", map[string]any{"base_rpm": 15, "rpm_strategy": "sticky_exempt"}, 100, WindowCostStickyOnly}, {"custom buffer", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 14, WindowCostStickyOnly}, {"custom buffer red", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 15, WindowCostNotSchedulable}, + {"base_rpm=1 green", map[string]any{"base_rpm": 1}, 0, WindowCostSchedulable}, + {"base_rpm=1 yellow (at limit)", map[string]any{"base_rpm": 1}, 1, WindowCostStickyOnly}, + {"base_rpm=1 red (at limit+buffer)", map[string]any{"base_rpm": 1}, 2, WindowCostNotSchedulable}, + {"negative currentRPM", map[string]any{"base_rpm": 15}, -1, WindowCostSchedulable}, + {"base_rpm negative disabled", map[string]any{"base_rpm": -5}, 10, WindowCostSchedulable}, + {"very high currentRPM", map[string]any{"base_rpm": 10}, 9999, WindowCostNotSchedulable}, + {"sticky_exempt very high currentRPM", map[string]any{"base_rpm": 10, "rpm_strategy": "sticky_exempt"}, 9999, WindowCostStickyOnly}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -71,3 +83,33 @@ func TestCheckRPMSchedulability(t *testing.T) { }) } } + +func TestGetRPMStickyBuffer(t *testing.T) { + tests := []struct { + name string + extra map[string]any + expected int + }{ + {"nil extra", nil, 0}, + {"no keys", map[string]any{}, 0}, + {"base_rpm=0", map[string]any{"base_rpm": 0}, 0}, + {"base_rpm=1 min buffer 1", map[string]any{"base_rpm": 1}, 1}, + {"base_rpm=4 min buffer 1", map[string]any{"base_rpm": 4}, 1}, + {"base_rpm=5 buffer 1", map[string]any{"base_rpm": 5}, 1}, + {"base_rpm=10 buffer 2", map[string]any{"base_rpm": 10}, 2}, + {"base_rpm=15 buffer 3", map[string]any{"base_rpm": 15}, 3}, + {"base_rpm=100 buffer 20", map[string]any{"base_rpm": 100}, 20}, + {"custom buffer=5", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 5}, + {"custom buffer=0 fallback to default", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0}, 2}, + {"custom buffer negative fallback", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1}, 2}, + {"custom buffer with float", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &Account{Extra: tt.extra} + if got := a.GetRPMStickyBuffer(); got != tt.expected { + t.Errorf("GetRPMStickyBuffer() = %d, want %d", got, tt.expected) + } + }) + } +} diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index d5f1eb64..631d77cc 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2708,6 +2708,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, } } + // 批量预取 RPM 计数,避免逐个账号查询(N+1) + ctx = s.withRPMPrefetch(ctx, accounts) + // 3. 按优先级+最久未用选择(考虑模型支持) var selected *Account for i := range accounts { @@ -2922,6 +2925,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } } + // 批量预取 RPM 计数,避免逐个账号查询(N+1) + ctx = s.withRPMPrefetch(ctx, accounts) + // 3. 按优先级+最久未用选择(考虑模型支持和混合调度) var selected *Account for i := range accounts { diff --git a/frontend/src/components/account/AccountCapacityCell.vue b/frontend/src/components/account/AccountCapacityCell.vue index 849bacb3..2a4babf2 100644 --- a/frontend/src/components/account/AccountCapacityCell.vue +++ b/frontend/src/components/account/AccountCapacityCell.vue @@ -68,6 +68,7 @@ {{ currentRPM }} / {{ account.base_rpm }} + {{ rpmStrategyTag }} @@ -143,19 +144,15 @@ const windowCostClass = computed(() => { const limit = props.account.window_cost_limit || 0 const reserve = props.account.window_cost_sticky_reserve || 10 - // >= 阈值+预留: 完全不可调度 (红色) if (current >= limit + reserve) { return 'bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-400' } - // >= 阈值: 仅粘性会话 (橙色) if (current >= limit) { return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' } - // >= 80% 阈值: 警告 (黄色) if (current >= limit * 0.8) { return 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900/30 dark:text-yellow-400' } - // 正常 (绿色) return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/30 dark:text-emerald-400' }) @@ -183,15 +180,12 @@ const sessionLimitClass = computed(() => { const current = activeSessions.value const max = props.account.max_sessions || 0 - // >= 最大: 完全占满 (红色) if (current >= max) { return 'bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-400' } - // >= 80%: 警告 (黄色) if (current >= max * 0.8) { return 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900/30 dark:text-yellow-400' } - // 正常 (绿色) return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/30 dark:text-emerald-400' }) @@ -222,35 +216,74 @@ const showRpmLimit = computed(() => { // 当前 RPM 计数 const currentRPM = computed(() => props.account.current_rpm ?? 0) +// RPM 策略 +const rpmStrategy = computed(() => props.account.rpm_strategy || 'tiered') + +// RPM 策略标签 +const rpmStrategyTag = computed(() => { + return rpmStrategy.value === 'sticky_exempt' ? '[S]' : '[T]' +}) + +// RPM buffer 计算(与后端一致:base <= 0 时 buffer 为 0) +const rpmBuffer = computed(() => { + const base = props.account.base_rpm || 0 + return props.account.rpm_sticky_buffer ?? (base > 0 ? Math.max(1, Math.floor(base / 5)) : 0) +}) + // RPM 状态样式 const rpmClass = computed(() => { if (!showRpmLimit.value) return '' const current = currentRPM.value const base = props.account.base_rpm ?? 0 - if (base <= 0) return 'bg-gray-100 text-gray-600 dark:bg-gray-700 dark:text-gray-400' + const buffer = rpmBuffer.value - const strategy = props.account.rpm_strategy || 'tiered' - if (strategy === 'tiered') { - const buffer = props.account.rpm_sticky_buffer ?? Math.max(1, Math.floor(base / 5)) - if (current >= base + buffer) return 'bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-400' - if (current >= base) return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + if (rpmStrategy.value === 'tiered') { + if (current >= base + buffer) { + return 'bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-400' + } + if (current >= base) { + return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + } } else { - if (current >= base) return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + if (current >= base) { + return 'bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-400' + } + } + if (current >= base * 0.8) { + return 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900/30 dark:text-yellow-400' } - if (current >= base * 0.8) return 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900/30 dark:text-yellow-400' return 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/30 dark:text-emerald-400' }) -// RPM 提示文字 +// RPM 提示文字(增强版:显示策略、区域、缓冲区) const rpmTooltip = computed(() => { if (!showRpmLimit.value) return '' const current = currentRPM.value const base = props.account.base_rpm ?? 0 - if (current >= base) return t('admin.accounts.capacity.rpm.full') - if (current >= base * 0.8) return t('admin.accounts.capacity.rpm.warning') - return t('admin.accounts.capacity.rpm.normal') + const buffer = rpmBuffer.value + + if (rpmStrategy.value === 'tiered') { + if (current >= base + buffer) { + return t('admin.accounts.capacity.rpm.tieredBlocked', { buffer }) + } + if (current >= base) { + return t('admin.accounts.capacity.rpm.tieredStickyOnly', { buffer }) + } + if (current >= base * 0.8) { + return t('admin.accounts.capacity.rpm.tieredWarning') + } + return t('admin.accounts.capacity.rpm.tieredNormal') + } else { + if (current >= base) { + return t('admin.accounts.capacity.rpm.stickyExemptOver') + } + if (current >= base * 0.8) { + return t('admin.accounts.capacity.rpm.stickyExemptWarning') + } + return t('admin.accounts.capacity.rpm.stickyExemptNormal') + } }) // 格式化费用显示 diff --git a/frontend/src/components/account/BulkEditAccountModal.vue b/frontend/src/components/account/BulkEditAccountModal.vue index 4a19e35e..e5181741 100644 --- a/frontend/src/components/account/BulkEditAccountModal.vue +++ b/frontend/src/components/account/BulkEditAccountModal.vue @@ -585,6 +585,111 @@ + +
+
+ + +
+ +
+
+ {{ t('admin.accounts.quotaControl.rpmLimit.hint') }} + +
+ +
+
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.baseRpmHint') }}

+
+ +
+ +
+ + +
+
+ +
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.stickyBufferHint') }}

+
+
+
+
+
@@ -658,7 +763,7 @@ import { ref, watch, computed } from 'vue' import { useI18n } from 'vue-i18n' import { useAppStore } from '@/stores/app' import { adminAPI } from '@/api/admin' -import type { Proxy as ProxyConfig, AdminGroup, AccountPlatform } from '@/types' +import type { Proxy as ProxyConfig, AdminGroup, AccountPlatform, AccountType } from '@/types' import BaseDialog from '@/components/common/BaseDialog.vue' import Select from '@/components/common/Select.vue' import ProxySelector from '@/components/common/ProxySelector.vue' @@ -670,6 +775,7 @@ interface Props { show: boolean accountIds: number[] selectedPlatforms: AccountPlatform[] + selectedTypes: AccountType[] proxies: ProxyConfig[] groups: AdminGroup[] } @@ -686,6 +792,15 @@ const appStore = useAppStore() // Platform awareness const isMixedPlatform = computed(() => props.selectedPlatforms.length > 1) +// 是否全部为 Anthropic OAuth/SetupToken(RPM 配置仅在此条件下显示) +const allAnthropicOAuthOrSetupToken = computed(() => { + return ( + props.selectedPlatforms.length === 1 && + props.selectedPlatforms[0] === 'anthropic' && + props.selectedTypes.every(t => t === 'oauth' || t === 'setup-token') + ) +}) + const platformModelPrefix: Record = { anthropic: ['claude-'], antigravity: ['claude-', 'gemini-', 'gpt-oss-', 'tab_'], @@ -725,6 +840,7 @@ const enablePriority = ref(false) const enableRateMultiplier = ref(false) const enableStatus = ref(false) const enableGroups = ref(false) +const enableRpmLimit = ref(false) // State - field values const submitting = ref(false) @@ -741,6 +857,10 @@ const priority = ref(1) const rateMultiplier = ref(1) const status = ref<'active' | 'inactive'>('active') const groupIds = ref([]) +const rpmLimitEnabled = ref(false) +const bulkBaseRpm = ref(null) +const bulkRpmStrategy = ref<'tiered' | 'sticky_exempt'>('tiered') +const bulkRpmStickyBuffer = ref(null) // All models list (combined Anthropic + OpenAI + Gemini) const allModels = [ @@ -1094,6 +1214,22 @@ const buildUpdatePayload = (): Record | null => { updates.credentials = credentials } + // RPM limit settings (写入 extra 字段) + if (enableRpmLimit.value) { + const extra: Record = {} + if (rpmLimitEnabled.value && bulkBaseRpm.value != null && bulkBaseRpm.value > 0) { + extra.base_rpm = bulkBaseRpm.value + extra.rpm_strategy = bulkRpmStrategy.value + if (bulkRpmStickyBuffer.value != null && bulkRpmStickyBuffer.value > 0) { + extra.rpm_sticky_buffer = bulkRpmStickyBuffer.value + } + } else { + // 关闭 RPM 限制 - 设置 base_rpm 为 0 + extra.base_rpm = 0 + } + updates.extra = extra + } + return Object.keys(updates).length > 0 ? updates : null } @@ -1173,6 +1309,7 @@ watch( enableRateMultiplier.value = false enableStatus.value = false enableGroups.value = false + enableRpmLimit.value = false // Reset all values baseUrl.value = '' @@ -1188,6 +1325,10 @@ watch( rateMultiplier.value = 1 status.value = 'active' groupIds.value = [] + rpmLimitEnabled.value = false + bulkBaseRpm.value = null + bulkRpmStrategy.value = 'tiered' + bulkRpmStickyBuffer.value = null } } ) diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 92ca189e..97a6fbce 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1562,26 +1562,68 @@
-
+

{{ t('admin.accounts.quotaControl.rpmLimit.baseRpmHint') }}

+
- -

{{ t('admin.accounts.quotaControl.rpmLimit.strategyHint') }}

+
+ + +
+
+ +
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.stickyBufferHint') }}

@@ -2445,7 +2487,8 @@ const maxSessions = ref(null) const sessionIdleTimeout = ref(null) const rpmLimitEnabled = ref(false) const baseRpm = ref(null) -const rpmStrategy = ref('tiered') +const rpmStrategy = ref<'tiered' | 'sticky_exempt'>('tiered') +const rpmStickyBuffer = ref(null) const tlsFingerprintEnabled = ref(false) const sessionIdMaskingEnabled = ref(false) const cacheTTLOverrideEnabled = ref(false) @@ -3073,6 +3116,7 @@ const resetForm = () => { rpmLimitEnabled.value = false baseRpm.value = null rpmStrategy.value = 'tiered' + rpmStickyBuffer.value = null tlsFingerprintEnabled.value = false sessionIdMaskingEnabled.value = false cacheTTLOverrideEnabled.value = false @@ -3986,6 +4030,9 @@ const handleAnthropicExchange = async (authCode: string) => { if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { extra.base_rpm = baseRpm.value extra.rpm_strategy = rpmStrategy.value + if (rpmStickyBuffer.value != null && rpmStickyBuffer.value > 0) { + extra.rpm_sticky_buffer = rpmStickyBuffer.value + } } // Add TLS fingerprint settings @@ -4090,6 +4137,9 @@ const handleCookieAuth = async (sessionKey: string) => { if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { extra.base_rpm = baseRpm.value extra.rpm_strategy = rpmStrategy.value + if (rpmStickyBuffer.value != null && rpmStickyBuffer.value > 0) { + extra.rpm_sticky_buffer = rpmStickyBuffer.value + } } // Add TLS fingerprint settings diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 3b6c0789..792bf580 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -972,26 +972,68 @@
-
+

{{ t('admin.accounts.quotaControl.rpmLimit.baseRpmHint') }}

+
- -

{{ t('admin.accounts.quotaControl.rpmLimit.strategyHint') }}

+
+ + +
+
+ +
+ + +

{{ t('admin.accounts.quotaControl.rpmLimit.stickyBufferHint') }}

@@ -1303,7 +1345,8 @@ const maxSessions = ref(null) const sessionIdleTimeout = ref(null) const rpmLimitEnabled = ref(false) const baseRpm = ref(null) -const rpmStrategy = ref('tiered') +const rpmStrategy = ref<'tiered' | 'sticky_exempt'>('tiered') +const rpmStickyBuffer = ref(null) const tlsFingerprintEnabled = ref(false) const sessionIdMaskingEnabled = ref(false) const cacheTTLOverrideEnabled = ref(false) @@ -1766,6 +1809,7 @@ function loadQuotaControlSettings(account: Account) { rpmLimitEnabled.value = false baseRpm.value = null rpmStrategy.value = 'tiered' + rpmStickyBuffer.value = null tlsFingerprintEnabled.value = false sessionIdMaskingEnabled.value = false cacheTTLOverrideEnabled.value = false @@ -1793,7 +1837,8 @@ function loadQuotaControlSettings(account: Account) { if (account.base_rpm != null && account.base_rpm > 0) { rpmLimitEnabled.value = true baseRpm.value = account.base_rpm - rpmStrategy.value = account.rpm_strategy || 'tiered' + rpmStrategy.value = (account.rpm_strategy as 'tiered' | 'sticky_exempt') || 'tiered' + rpmStickyBuffer.value = account.rpm_sticky_buffer ?? null } // Load TLS fingerprint setting @@ -2110,6 +2155,11 @@ const handleSubmit = async () => { if (rpmLimitEnabled.value && baseRpm.value != null && baseRpm.value > 0) { newExtra.base_rpm = baseRpm.value newExtra.rpm_strategy = rpmStrategy.value + if (rpmStickyBuffer.value != null && rpmStickyBuffer.value > 0) { + newExtra.rpm_sticky_buffer = rpmStickyBuffer.value + } else { + delete newExtra.rpm_sticky_buffer + } } else { delete newExtra.base_rpm delete newExtra.rpm_strategy diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index f730a04f..d2e1d138 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -1618,6 +1618,13 @@ export default { full: 'RPM limit reached', warning: 'RPM approaching limit', normal: 'RPM normal', + tieredNormal: 'RPM limit (Tiered) - Normal', + tieredWarning: 'RPM limit (Tiered) - Approaching limit', + tieredStickyOnly: 'RPM limit (Tiered) - Sticky only | Buffer: {buffer}', + tieredBlocked: 'RPM limit (Tiered) - Blocked | Buffer: {buffer}', + stickyExemptNormal: 'RPM limit (Sticky Exempt) - Normal', + stickyExemptWarning: 'RPM limit (Sticky Exempt) - Approaching limit', + stickyExemptOver: 'RPM limit (Sticky Exempt) - Over limit, sticky only' }, }, tempUnschedulable: { @@ -1842,7 +1849,12 @@ export default { strategy: 'RPM Strategy', strategyTiered: 'Tiered Model', strategyStickyExempt: 'Sticky Exempt', + strategyTieredHint: 'Green → Yellow → Sticky only → Blocked, progressive throttling', + strategyStickyExemptHint: 'Only sticky sessions allowed when over limit', strategyHint: 'Tiered: gradually restrict when exceeded; Sticky Exempt: existing sessions unrestricted', + stickyBuffer: 'Sticky Buffer', + stickyBufferPlaceholder: 'Default: 20% of base RPM', + stickyBufferHint: 'Extra requests allowed for sticky sessions after exceeding base RPM. Leave empty to use default (20% of base RPM, min 1)' }, tlsFingerprint: { label: 'TLS Fingerprint Simulation', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index b2e72fd8..b28ba89f 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -1669,6 +1669,13 @@ export default { full: '已达 RPM 上限', warning: 'RPM 接近上限', normal: 'RPM 正常', + tieredNormal: 'RPM 限制 (三区模型) - 正常', + tieredWarning: 'RPM 限制 (三区模型) - 接近阈值', + tieredStickyOnly: 'RPM 限制 (三区模型) - 仅粘性会话 | 缓冲区: {buffer}', + tieredBlocked: 'RPM 限制 (三区模型) - 已阻塞 | 缓冲区: {buffer}', + stickyExemptNormal: 'RPM 限制 (粘性豁免) - 正常', + stickyExemptWarning: 'RPM 限制 (粘性豁免) - 接近阈值', + stickyExemptOver: 'RPM 限制 (粘性豁免) - 超限,仅粘性会话' }, }, clearRateLimit: '清除速率限制', @@ -1985,7 +1992,12 @@ export default { strategy: 'RPM 策略', strategyTiered: '三区模型', strategyStickyExempt: '粘性豁免', + strategyTieredHint: '绿区→黄区→仅粘性→阻塞,逐步限流', + strategyStickyExemptHint: '超限后仅允许粘性会话', strategyHint: '三区模型: 超限后逐步限制; 粘性豁免: 已有会话不受限', + stickyBuffer: '粘性缓冲区', + stickyBufferPlaceholder: '默认: base RPM 的 20%', + stickyBufferHint: '超过 base RPM 后,粘性会话额外允许的请求数。为空则使用默认值(base RPM 的 20%,最小为 1)' }, tlsFingerprint: { label: 'TLS 指纹模拟', diff --git a/frontend/src/views/admin/AccountsView.vue b/frontend/src/views/admin/AccountsView.vue index 09e8649d..defcd434 100644 --- a/frontend/src/views/admin/AccountsView.vue +++ b/frontend/src/views/admin/AccountsView.vue @@ -263,7 +263,7 @@ - + @@ -307,7 +307,7 @@ import PlatformTypeBadge from '@/components/common/PlatformTypeBadge.vue' import Icon from '@/components/icons/Icon.vue' import ErrorPassthroughRulesModal from '@/components/admin/ErrorPassthroughRulesModal.vue' import { formatDateTime, formatRelativeTime } from '@/utils/format' -import type { Account, AccountPlatform, Proxy, AdminGroup, WindowStats } from '@/types' +import type { Account, AccountPlatform, AccountType, Proxy, AdminGroup, WindowStats } from '@/types' const { t } = useI18n() const appStore = useAppStore() @@ -324,6 +324,14 @@ const selPlatforms = computed(() => { ) return [...platforms] }) +const selTypes = computed(() => { + const types = new Set( + accounts.value + .filter(a => selIds.value.includes(a.id)) + .map(a => a.type) + ) + return [...types] +}) const showCreate = ref(false) const showEdit = ref(false) const showSync = ref(false) From ff9683b0fcf99cee2073e80e6a58baeb784a7477 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 10:17:25 +0800 Subject: [PATCH 12/19] fix: move RPM prefetch before routing segment in legacy/mixed paths Ensures isAccountSchedulableForRPM calls within the routing segment hit the prefetch cache instead of querying Redis individually. --- backend/internal/service/gateway_service.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 631d77cc..54c3a4d1 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2607,6 +2607,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, } accountsLoaded = true + // 提前预取 RPM 计数,确保 routing 段内的 isAccountSchedulableForRPM 调用能命中缓存 + ctx = s.withRPMPrefetch(ctx, accounts) + routingSet := make(map[int64]struct{}, len(routingAccountIDs)) for _, id := range routingAccountIDs { if id > 0 { @@ -2822,6 +2825,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } accountsLoaded = true + // 提前预取 RPM 计数,确保 routing 段内的 isAccountSchedulableForRPM 调用能命中缓存 + ctx = s.withRPMPrefetch(ctx, accounts) + routingSet := make(map[int64]struct{}, len(routingAccountIDs)) for _, id := range routingAccountIDs { if id > 0 { From 4b72aa33f3485404eab7423cf90f5d3d4197a9e5 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 10:17:59 +0800 Subject: [PATCH 13/19] fix: add enableRpmLimit to hasAnyFieldEnabled check in BulkEditModal Without this, submitting a bulk edit with only RPM changes would be rejected as "no fields selected". --- frontend/src/components/account/BulkEditAccountModal.vue | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontend/src/components/account/BulkEditAccountModal.vue b/frontend/src/components/account/BulkEditAccountModal.vue index e5181741..e583b981 100644 --- a/frontend/src/components/account/BulkEditAccountModal.vue +++ b/frontend/src/components/account/BulkEditAccountModal.vue @@ -1253,7 +1253,8 @@ const handleSubmit = async () => { enablePriority.value || enableRateMultiplier.value || enableStatus.value || - enableGroups.value + enableGroups.value || + enableRpmLimit.value if (!hasAnyFieldEnabled) { appStore.showError(t('admin.accounts.bulkEdit.noFieldsSelected')) From e63c83955adf1c8e4972de181e0d2d86cb133243 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 10:35:33 +0800 Subject: [PATCH 14/19] fix: address deep code review issues for RPM limiting - Move IncrementRPM after Forward success to prevent phantom RPM consumption during account switch retries - Add base_rpm input sanitization (clamp to 0-10000) in Create/Update - Add WindowCost scheduling checks to legacy path sticky sessions (4 check sites + 4 prefetch sites), fixing pre-existing gap - Clean up rpm_strategy/rpm_sticky_buffer when disabling RPM in BulkEditModal (JSONB merge cannot delete keys, use empty values) - Add json.Number test cases to TestGetBaseRPM/TestGetRPMStickyBuffer - Document TOCTOU race as accepted soft-limit design trade-off --- .../internal/handler/admin/account_handler.go | 45 +++++++++++++++++++ backend/internal/handler/gateway_handler.go | 34 +++++++------- backend/internal/service/account_rpm_test.go | 7 ++- backend/internal/service/gateway_service.go | 27 ++++++----- .../account/BulkEditAccountModal.vue | 6 ++- 5 files changed, 92 insertions(+), 27 deletions(-) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index c41f37c1..382d62c1 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -483,6 +483,8 @@ func (h *AccountHandler) Create(c *gin.Context) { response.BadRequest(c, "rate_multiplier must be >= 0") return } + // base_rpm 输入校验:负值归零,超过 10000 截断 + sanitizeExtraBaseRPM(req.Extra) // 确定是否跳过混合渠道检查 skipCheck := req.ConfirmMixedChannelRisk != nil && *req.ConfirmMixedChannelRisk @@ -552,6 +554,8 @@ func (h *AccountHandler) Update(c *gin.Context) { response.BadRequest(c, "rate_multiplier must be >= 0") return } + // base_rpm 输入校验:负值归零,超过 10000 截断 + sanitizeExtraBaseRPM(req.Extra) // 确定是否跳过混合渠道检查 skipCheck := req.ConfirmMixedChannelRisk != nil && *req.ConfirmMixedChannelRisk @@ -1736,3 +1740,44 @@ func (h *AccountHandler) BatchRefreshTier(c *gin.Context) { func (h *AccountHandler) GetAntigravityDefaultModelMapping(c *gin.Context) { response.Success(c, domain.DefaultAntigravityModelMapping) } + +// sanitizeExtraBaseRPM 对 extra map 中的 base_rpm 值进行范围校验和归一化。 +// 负值归零,超过 10000 截断为 10000。extra 为 nil 或不含 base_rpm 时无操作。 +func sanitizeExtraBaseRPM(extra map[string]any) { + if extra == nil { + return + } + raw, ok := extra["base_rpm"] + if !ok { + return + } + v := parseExtraIntForValidation(raw) + if v < 0 { + v = 0 + } else if v > 10000 { + v = 10000 + } + extra["base_rpm"] = v +} + +// parseExtraIntForValidation 从 extra 字段的 any 值解析为 int,用于输入校验。 +// 支持 int, int64, float64, json.Number, string 类型。 +func parseExtraIntForValidation(value any) int { + switch v := value.(type) { + case int: + return v + case int64: + return int(v) + case float64: + return int(v) + case json.Number: + if i, err := v.Int64(); err == nil { + return int(i) + } + case string: + if i, err := strconv.Atoi(strings.TrimSpace(v)); err == nil { + return i + } + } + return 0 +} diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index b68a46fa..3cc52839 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -366,13 +366,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) - // RPM 计数递增(调度成功后、Forward 前) - if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { - if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { - reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) - } - } - // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() @@ -410,6 +403,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) { return } + // RPM 计数递增(Forward 成功后) + // 注意:TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。 + // 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。 + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { + reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) + } + } + // 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context) userAgent := c.GetHeader("User-Agent") clientIP := ip.GetClientIP(c) @@ -556,13 +558,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) - // RPM 计数递增(调度成功后、Forward 前) - if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { - if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { - reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) - } - } - // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() @@ -609,7 +604,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, status, code, message, streamStarted) return } - // 兜底重试按“直接请求兜底分组”处理:清除强制平台,允许按分组平台调度 + // 兜底重试按"直接请求兜底分组"处理:清除强制平台,允许按分组平台调度 ctx := context.WithValue(c.Request.Context(), ctxkey.ForcePlatform, "") c.Request = c.Request.WithContext(ctx) currentAPIKey = fallbackAPIKey @@ -643,6 +638,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) { return } + // RPM 计数递增(Forward 成功后) + // 注意:TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。 + // 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。 + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil { + reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err)) + } + } + // 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context) userAgent := c.GetHeader("User-Agent") clientIP := ip.GetClientIP(c) diff --git a/backend/internal/service/account_rpm_test.go b/backend/internal/service/account_rpm_test.go index b08b54a2..9d91f3e0 100644 --- a/backend/internal/service/account_rpm_test.go +++ b/backend/internal/service/account_rpm_test.go @@ -1,6 +1,9 @@ package service -import "testing" +import ( + "encoding/json" + "testing" +) func TestGetBaseRPM(t *testing.T) { tests := []struct { @@ -16,6 +19,7 @@ func TestGetBaseRPM(t *testing.T) { {"string value", map[string]any{"base_rpm": "15"}, 15}, {"negative value", map[string]any{"base_rpm": -5}, 0}, {"int64 value", map[string]any{"base_rpm": int64(20)}, 20}, + {"json.Number value", map[string]any{"base_rpm": json.Number("25")}, 25}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -103,6 +107,7 @@ func TestGetRPMStickyBuffer(t *testing.T) { {"custom buffer=0 fallback to default", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0}, 2}, {"custom buffer negative fallback", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1}, 2}, {"custom buffer with float", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7}, + {"json.Number base_rpm", map[string]any{"base_rpm": json.Number("10")}, 2}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 54c3a4d1..04e37f68 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2242,6 +2242,9 @@ func (s *GatewayService) isAccountSchedulableForRPM(ctx context.Context, account } // IncrementAccountRPM increments the RPM counter for the given account. +// 已知 TOCTOU 竞态:调度时读取 RPM 计数与此处递增之间存在时间窗口, +// 高并发下可能短暂超出 RPM 限制。这是与 WindowCost 一致的 soft-limit +// 设计权衡——可接受的少量超额优于加锁带来的延迟和复杂度。 func (s *GatewayService) IncrementAccountRPM(ctx context.Context, accountID int64) error { if s.rpmCache == nil { return nil @@ -2444,7 +2447,7 @@ func sameAccountWithLoadGroup(a, b accountWithLoad) bool { // shuffleWithinPriorityAndLastUsed 对排序后的 []*Account 切片,按 (Priority, LastUsedAt) 分组后组内随机打乱。 // // 注意:当 preferOAuth=true 时,需要保证 OAuth 账号在同组内仍然优先,否则会把排序时的偏好打散掉。 -// 因此这里采用“组内分区 + 分区内 shuffle”的方式: +// 因此这里采用"组内分区 + 分区内 shuffle"的方式: // - 先把同组账号按 (OAuth / 非 OAuth) 拆成两段,保持 OAuth 段在前; // - 再分别在各段内随机打散,避免热点。 func shuffleWithinPriorityAndLastUsed(accounts []*Account, preferOAuth bool) { @@ -2584,7 +2587,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { + if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForWindowCost(ctx, account, true) && s.isAccountSchedulableForRPM(ctx, account, true) { if s.debugModelRoutingEnabled() { logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) } @@ -2607,7 +2610,8 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, } accountsLoaded = true - // 提前预取 RPM 计数,确保 routing 段内的 isAccountSchedulableForRPM 调用能命中缓存 + // 提前预取窗口费用+RPM 计数,确保 routing 段内的调度检查调用能命中缓存 + ctx = s.withWindowCostPrefetch(ctx, accounts) ctx = s.withRPMPrefetch(ctx, accounts) routingSet := make(map[int64]struct{}, len(routingAccountIDs)) @@ -2690,7 +2694,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { + if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForWindowCost(ctx, account, true) && s.isAccountSchedulableForRPM(ctx, account, true) { return account, nil } } @@ -2711,7 +2715,8 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, } } - // 批量预取 RPM 计数,避免逐个账号查询(N+1) + // 批量预取窗口费用+RPM 计数,避免逐个账号查询(N+1) + ctx = s.withWindowCostPrefetch(ctx, accounts) ctx = s.withRPMPrefetch(ctx, accounts) // 3. 按优先级+最久未用选择(考虑模型支持) @@ -2804,7 +2809,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { + if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForWindowCost(ctx, account, true) && s.isAccountSchedulableForRPM(ctx, account, true) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { if s.debugModelRoutingEnabled() { logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] legacy mixed routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) @@ -2825,7 +2830,8 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } accountsLoaded = true - // 提前预取 RPM 计数,确保 routing 段内的 isAccountSchedulableForRPM 调用能命中缓存 + // 提前预取窗口费用+RPM 计数,确保 routing 段内的调度检查调用能命中缓存 + ctx = s.withWindowCostPrefetch(ctx, accounts) ctx = s.withRPMPrefetch(ctx, accounts) routingSet := make(map[int64]struct{}, len(routingAccountIDs)) @@ -2912,7 +2918,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if clearSticky { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForRPM(ctx, account, true) { + if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && s.isAccountSchedulableForWindowCost(ctx, account, true) && s.isAccountSchedulableForRPM(ctx, account, true) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { return account, nil } @@ -2931,7 +2937,8 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } } - // 批量预取 RPM 计数,避免逐个账号查询(N+1) + // 批量预取窗口费用+RPM 计数,避免逐个账号查询(N+1) + ctx = s.withWindowCostPrefetch(ctx, accounts) ctx = s.withRPMPrefetch(ctx, accounts) // 3. 按优先级+最久未用选择(考虑模型支持和混合调度) @@ -5304,7 +5311,7 @@ func (s *GatewayService) isThinkingBlockSignatureError(respBody []byte) bool { } func (s *GatewayService) shouldFailoverOn400(respBody []byte) bool { - // 只对“可能是兼容性差异导致”的 400 允许切换,避免无意义重试。 + // 只对"可能是兼容性差异导致"的 400 允许切换,避免无意义重试。 // 默认保守:无法识别则不切换。 msg := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) if msg == "" { diff --git a/frontend/src/components/account/BulkEditAccountModal.vue b/frontend/src/components/account/BulkEditAccountModal.vue index e583b981..ae16ff1a 100644 --- a/frontend/src/components/account/BulkEditAccountModal.vue +++ b/frontend/src/components/account/BulkEditAccountModal.vue @@ -1224,8 +1224,12 @@ const buildUpdatePayload = (): Record | null => { extra.rpm_sticky_buffer = bulkRpmStickyBuffer.value } } else { - // 关闭 RPM 限制 - 设置 base_rpm 为 0 + // 关闭 RPM 限制 - 设置 base_rpm 为 0,并用空值覆盖关联字段 + // 后端使用 JSONB || merge 语义,不会删除已有 key, + // 所以必须显式发送空值来重置(后端读取时会 fallback 到默认值) extra.base_rpm = 0 + extra.rpm_strategy = '' + extra.rpm_sticky_buffer = 0 } updates.extra = extra } From 2491e9b5ad632f483ad5781d1b705616b0bad046 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 10:46:34 +0800 Subject: [PATCH 15/19] fix: round-3 review fixes for RPM limiting - Add sanitizeExtraBaseRPM to BulkUpdate handler (was missing) - Add WindowCost scheduling checks to legacy non-sticky selection paths (4 sites), matching existing sticky + load-aware coverage - Export ParseExtraInt from service package, remove duplicate parseExtraIntForValidation from admin handler --- .../internal/handler/admin/account_handler.go | 25 +++---------------- backend/internal/service/account.go | 6 +++++ backend/internal/service/gateway_service.go | 12 +++++++++ 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index 382d62c1..dadecbc0 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -1082,6 +1082,8 @@ func (h *AccountHandler) BulkUpdate(c *gin.Context) { response.BadRequest(c, "rate_multiplier must be >= 0") return } + // base_rpm 输入校验:负值归零,超过 10000 截断 + sanitizeExtraBaseRPM(req.Extra) // 确定是否跳过混合渠道检查 skipCheck := req.ConfirmMixedChannelRisk != nil && *req.ConfirmMixedChannelRisk @@ -1751,7 +1753,7 @@ func sanitizeExtraBaseRPM(extra map[string]any) { if !ok { return } - v := parseExtraIntForValidation(raw) + v := service.ParseExtraInt(raw) if v < 0 { v = 0 } else if v > 10000 { @@ -1760,24 +1762,3 @@ func sanitizeExtraBaseRPM(extra map[string]any) { extra["base_rpm"] = v } -// parseExtraIntForValidation 从 extra 字段的 any 值解析为 int,用于输入校验。 -// 支持 int, int64, float64, json.Number, string 类型。 -func parseExtraIntForValidation(value any) int { - switch v := value.(type) { - case int: - return v - case int64: - return int(v) - case float64: - return int(v) - case json.Number: - if i, err := v.Int64(); err == nil { - return int(i) - } - case string: - if i, err := strconv.Atoi(strings.TrimSpace(v)); err == nil { - return i - } - } - return 0 -} diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index 7f33c61d..c76c817e 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -1274,6 +1274,12 @@ func parseExtraFloat64(value any) float64 { } // parseExtraInt 从 extra 字段解析 int 值 +// ParseExtraInt 从 extra 字段的 any 值解析为 int。 +// 支持 int, int64, float64, json.Number, string 类型,无法解析时返回 0。 +func ParseExtraInt(value any) int { + return parseExtraInt(value) +} + func parseExtraInt(value any) int { switch v := value.(type) { case int: diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 04e37f68..3323f868 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2641,6 +2641,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForWindowCost(ctx, acc, false) { + continue + } if !s.isAccountSchedulableForRPM(ctx, acc, false) { continue } @@ -2737,6 +2740,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForWindowCost(ctx, acc, false) { + continue + } if !s.isAccountSchedulableForRPM(ctx, acc, false) { continue } @@ -2865,6 +2871,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForWindowCost(ctx, acc, false) { + continue + } if !s.isAccountSchedulableForRPM(ctx, acc, false) { continue } @@ -2963,6 +2972,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if !s.isAccountSchedulableForModelSelection(ctx, acc, requestedModel) { continue } + if !s.isAccountSchedulableForWindowCost(ctx, acc, false) { + continue + } if !s.isAccountSchedulableForRPM(ctx, acc, false) { continue } From cd09adc3cc94f1dcc16dd0787bb58def38bd349e Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 11:03:21 +0800 Subject: [PATCH 16/19] fix: add sanitizeExtraBaseRPM to BatchCreate handler Ensures base_rpm validation (clamp 0-10000) is consistent across all four account mutation paths: Create, Update, BulkUpdate, BatchCreate. --- backend/internal/handler/admin/account_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index dadecbc0..a735d5c0 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -938,6 +938,9 @@ func (h *AccountHandler) BatchCreate(c *gin.Context) { continue } + // base_rpm 输入校验:负值归零,超过 10000 截断 + sanitizeExtraBaseRPM(item.Extra) + skipCheck := item.ConfirmMixedChannelRisk != nil && *item.ConfirmMixedChannelRisk account, err := h.adminService.CreateAccount(ctx, &service.CreateAccountInput{ From e135435ce2a91a6941c4f91d1d6fb412bee51f89 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 12:29:31 +0800 Subject: [PATCH 17/19] fix: sync test constructor calls with new rpmCache parameter Add missing nil argument for rpmCache to NewAccountHandler (5 sites) and NewGatewayService (2 sites) after RPM feature expanded their signatures. --- backend/internal/handler/admin/account_data_handler_test.go | 1 + .../handler/admin/account_handler_mixed_channel_test.go | 2 +- .../internal/handler/admin/account_handler_passthrough_test.go | 1 + .../internal/handler/admin/batch_update_credentials_test.go | 2 +- .../handler/gateway_handler_warmup_intercept_unit_test.go | 1 + backend/internal/handler/sora_gateway_handler_test.go | 3 ++- backend/internal/server/api_contract_test.go | 2 +- 7 files changed, 8 insertions(+), 4 deletions(-) diff --git a/backend/internal/handler/admin/account_data_handler_test.go b/backend/internal/handler/admin/account_data_handler_test.go index c8b04c2a..285033a1 100644 --- a/backend/internal/handler/admin/account_data_handler_test.go +++ b/backend/internal/handler/admin/account_data_handler_test.go @@ -64,6 +64,7 @@ func setupAccountDataRouter() (*gin.Engine, *stubAdminService) { nil, nil, nil, + nil, ) router.GET("/api/v1/admin/accounts/data", h.ExportData) diff --git a/backend/internal/handler/admin/account_handler_mixed_channel_test.go b/backend/internal/handler/admin/account_handler_mixed_channel_test.go index ad004844..61b99e03 100644 --- a/backend/internal/handler/admin/account_handler_mixed_channel_test.go +++ b/backend/internal/handler/admin/account_handler_mixed_channel_test.go @@ -15,7 +15,7 @@ import ( func setupAccountMixedChannelRouter(adminSvc *stubAdminService) *gin.Engine { gin.SetMode(gin.TestMode) router := gin.New() - accountHandler := NewAccountHandler(adminSvc, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + accountHandler := NewAccountHandler(adminSvc, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) router.POST("/api/v1/admin/accounts/check-mixed-channel", accountHandler.CheckMixedChannel) router.POST("/api/v1/admin/accounts", accountHandler.Create) router.PUT("/api/v1/admin/accounts/:id", accountHandler.Update) diff --git a/backend/internal/handler/admin/account_handler_passthrough_test.go b/backend/internal/handler/admin/account_handler_passthrough_test.go index d09cccd6..d86501c0 100644 --- a/backend/internal/handler/admin/account_handler_passthrough_test.go +++ b/backend/internal/handler/admin/account_handler_passthrough_test.go @@ -28,6 +28,7 @@ func TestAccountHandler_Create_AnthropicAPIKeyPassthroughExtraForwarded(t *testi nil, nil, nil, + nil, ) router := gin.New() diff --git a/backend/internal/handler/admin/batch_update_credentials_test.go b/backend/internal/handler/admin/batch_update_credentials_test.go index c8185735..0b1b6691 100644 --- a/backend/internal/handler/admin/batch_update_credentials_test.go +++ b/backend/internal/handler/admin/batch_update_credentials_test.go @@ -36,7 +36,7 @@ func (f *failingAdminService) UpdateAccount(ctx context.Context, id int64, input func setupAccountHandlerWithService(adminSvc service.AdminService) (*gin.Engine, *AccountHandler) { gin.SetMode(gin.TestMode) router := gin.New() - handler := NewAccountHandler(adminSvc, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + handler := NewAccountHandler(adminSvc, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) router.POST("/api/v1/admin/accounts/batch-update-credentials", handler.BatchUpdateCredentials) return router, handler } diff --git a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go index 76141521..2afa6440 100644 --- a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go +++ b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go @@ -153,6 +153,7 @@ func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*servi nil, // deferredService nil, // claudeTokenProvider nil, // sessionLimitCache + nil, // rpmCache nil, // digestStore ) diff --git a/backend/internal/handler/sora_gateway_handler_test.go b/backend/internal/handler/sora_gateway_handler_test.go index 01c684ca..68a04084 100644 --- a/backend/internal/handler/sora_gateway_handler_test.go +++ b/backend/internal/handler/sora_gateway_handler_test.go @@ -426,7 +426,8 @@ func TestSoraGatewayHandler_ChatCompletions(t *testing.T) { deferredService, nil, testutil.StubSessionLimitCache{}, - nil, + nil, // rpmCache + nil, // digestStore ) soraClient := &stubSoraClient{imageURLs: []string{"https://example.com/a.png"}} diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go index c98086e0..5372690b 100644 --- a/backend/internal/server/api_contract_test.go +++ b/backend/internal/server/api_contract_test.go @@ -624,7 +624,7 @@ func newContractDeps(t *testing.T) *contractDeps { apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService) usageHandler := handler.NewUsageHandler(usageService, apiKeyService) adminSettingHandler := adminhandler.NewSettingHandler(settingService, nil, nil, nil, nil) - adminAccountHandler := adminhandler.NewAccountHandler(adminService, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + adminAccountHandler := adminhandler.NewAccountHandler(adminService, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) jwtAuth := func(c *gin.Context) { c.Set(string(middleware.ContextKeyUser), middleware.AuthSubject{ From 115d06edf081dae055ab77b4438607c0724685f3 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 14:28:16 +0800 Subject: [PATCH 18/19] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20gofmt=20?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/handler/admin/account_handler.go | 1 - backend/internal/repository/rpm_cache.go | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index a735d5c0..f6082e09 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -1764,4 +1764,3 @@ func sanitizeExtraBaseRPM(extra map[string]any) { } extra["base_rpm"] = v } - diff --git a/backend/internal/repository/rpm_cache.go b/backend/internal/repository/rpm_cache.go index 80954f52..4d73ec4b 100644 --- a/backend/internal/repository/rpm_cache.go +++ b/backend/internal/repository/rpm_cache.go @@ -23,9 +23,9 @@ import ( // 通过 rdb.Time() 获取服务端时间,避免多实例时钟不同步。 // // 设计决策: -// - TxPipeline vs Pipeline:Pipeline 仅合并发送但不保证原子,TxPipeline 使用 MULTI/EXEC 事务保证原子执行。 -// - rdb.Time() 单独调用:Pipeline/TxPipeline 中无法引用前一命令的结果,因此 TIME 必须单独调用(2 RTT)。 -// Lua 脚本可以做到 1 RTT,但在 Redis Cluster 中动态拼接 key 存在 CROSSSLOT 风险,选择安全性优先。 +// - TxPipeline vs Pipeline:Pipeline 仅合并发送但不保证原子,TxPipeline 使用 MULTI/EXEC 事务保证原子执行。 +// - rdb.Time() 单独调用:Pipeline/TxPipeline 中无法引用前一命令的结果,因此 TIME 必须单独调用(2 RTT)。 +// Lua 脚本可以做到 1 RTT,但在 Redis Cluster 中动态拼接 key 存在 CROSSSLOT 风险,选择安全性优先。 const ( // RPM 计数器键前缀 // 格式: rpm:{accountID}:{minuteTimestamp} From 212cbbd3a2b4ac06dab333f7e1c5b2af40cb499b Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 21:30:59 +0800 Subject: [PATCH 19/19] fix: add missing rpmCache nil arg in sora_client_handler_test --- backend/internal/handler/sora_client_handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/internal/handler/sora_client_handler_test.go b/backend/internal/handler/sora_client_handler_test.go index a639e504..0b80a9cc 100644 --- a/backend/internal/handler/sora_client_handler_test.go +++ b/backend/internal/handler/sora_client_handler_test.go @@ -2181,7 +2181,7 @@ func (s *stubSoraClientForHandler) GetVideoTask(_ context.Context, _ *service.Ac func newMinimalGatewayService(accountRepo service.AccountRepository) *service.GatewayService { return service.NewGatewayService( accountRepo, nil, nil, nil, nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, ) }