diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 3c44aa72..cc2e8ccd 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -138,7 +138,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig) crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig) sessionLimitCache := repository.ProvideSessionLimitCache(redisClient, configConfig) - accountHandler := admin.NewAccountHandler(adminService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, rateLimitService, accountUsageService, accountTestService, concurrencyService, crsSyncService, sessionLimitCache, compositeTokenCacheInvalidator) + rpmCache := repository.NewRPMCache(redisClient) + accountHandler := admin.NewAccountHandler(adminService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, rateLimitService, accountUsageService, accountTestService, concurrencyService, crsSyncService, sessionLimitCache, rpmCache, compositeTokenCacheInvalidator) adminAnnouncementHandler := admin.NewAnnouncementHandler(announcementService) dataManagementService := service.NewDataManagementService() dataManagementHandler := admin.NewDataManagementHandler(dataManagementService) @@ -160,7 +161,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) claudeTokenProvider := service.NewClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService) digestSessionStore := service.NewDigestSessionStore() - gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, digestSessionStore) + gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore) openAITokenProvider := service.NewOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService) openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index 9b732f9c..6158fa01 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -53,6 +53,7 @@ type AccountHandler struct { concurrencyService *service.ConcurrencyService crsSyncService *service.CRSSyncService sessionLimitCache service.SessionLimitCache + rpmCache service.RPMCache tokenCacheInvalidator service.TokenCacheInvalidator } @@ -69,6 +70,7 @@ func NewAccountHandler( concurrencyService *service.ConcurrencyService, crsSyncService *service.CRSSyncService, sessionLimitCache service.SessionLimitCache, + rpmCache service.RPMCache, tokenCacheInvalidator service.TokenCacheInvalidator, ) *AccountHandler { return &AccountHandler{ @@ -83,6 +85,7 @@ func NewAccountHandler( concurrencyService: concurrencyService, crsSyncService: crsSyncService, sessionLimitCache: sessionLimitCache, + rpmCache: rpmCache, tokenCacheInvalidator: tokenCacheInvalidator, } } @@ -154,6 +157,7 @@ type AccountWithConcurrency struct { // 以下字段仅对 Anthropic OAuth/SetupToken 账号有效,且仅在启用相应功能时返回 CurrentWindowCost *float64 `json:"current_window_cost,omitempty"` // 当前窗口费用 ActiveSessions *int `json:"active_sessions,omitempty"` // 当前活跃会话数 + CurrentRPM *int `json:"current_rpm,omitempty"` // 当前分钟 RPM 计数 } func (h *AccountHandler) buildAccountResponseWithRuntime(ctx context.Context, account *service.Account) AccountWithConcurrency { @@ -189,6 +193,12 @@ func (h *AccountHandler) buildAccountResponseWithRuntime(ctx context.Context, ac } } } + + if h.rpmCache != nil && account.GetBaseRPM() > 0 { + if rpm, err := h.rpmCache.GetRPM(ctx, account.ID); err == nil { + item.CurrentRPM = &rpm + } + } } return item diff --git a/backend/internal/repository/rpm_cache.go b/backend/internal/repository/rpm_cache.go index 6ec7f739..332c30c9 100644 --- a/backend/internal/repository/rpm_cache.go +++ b/backend/internal/repository/rpm_cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/Wei-Shaw/sub2api/internal/service" "github.com/redis/go-redis/v9" ) @@ -36,7 +37,7 @@ type RPMCacheImpl struct { rdb *redis.Client } -func NewRPMCache(rdb *redis.Client) *RPMCacheImpl { +func NewRPMCache(rdb *redis.Client) service.RPMCache { return &RPMCacheImpl{rdb: rdb} } diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index eb8ce3fb..2344035c 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -79,6 +79,7 @@ var ProviderSet = wire.NewSet( NewTimeoutCounterCache, ProvideConcurrencyCache, ProvideSessionLimitCache, + NewRPMCache, NewDashboardCache, NewEmailCache, NewIdentityCache, diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 0ba9e093..6da71cf4 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -520,6 +520,7 @@ type GatewayService struct { concurrencyService *ConcurrencyService claudeTokenProvider *ClaudeTokenProvider sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken) + rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken) userGroupRateCache *gocache.Cache userGroupRateSF singleflight.Group modelsListCache *gocache.Cache @@ -549,6 +550,7 @@ func NewGatewayService( deferredService *DeferredService, claudeTokenProvider *ClaudeTokenProvider, sessionLimitCache SessionLimitCache, + rpmCache RPMCache, digestStore *DigestSessionStore, ) *GatewayService { userGroupRateTTL := resolveUserGroupRateCacheTTL(cfg) @@ -574,6 +576,7 @@ func NewGatewayService( deferredService: deferredService, claudeTokenProvider: claudeTokenProvider, sessionLimitCache: sessionLimitCache, + rpmCache: rpmCache, userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute), modelsListCache: gocache.New(modelsListTTL, time.Minute), modelsListCacheTTL: modelsListTTL,