From f648b8e0268450ddf04dfcb1de15cb39825c7f52 Mon Sep 17 00:00:00 2001 From: QTom Date: Sat, 28 Feb 2026 01:25:50 +0800 Subject: [PATCH] feat: increment RPM counter before request forwarding --- backend/internal/handler/gateway_handler.go | 14 ++++++++++++++ backend/internal/service/gateway_service.go | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 9262df7e..b4ac664b 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -366,6 +366,13 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) + // RPM 计数递增(调度成功后、Forward 前) + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { + // 失败开放:不阻塞请求 + } + } + // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() @@ -549,6 +556,13 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 账号槽位/等待计数需要在超时或断开时安全回收 accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc) + // RPM 计数递增(调度成功后、Forward 前) + if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 { + if h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID) != nil { + // 失败开放:不阻塞请求 + } + } + // 转发请求 - 根据账号平台分流 var result *service.ForwardResult requestCtx := c.Request.Context() diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 2d043474..d5f1eb64 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -2241,6 +2241,15 @@ func (s *GatewayService) isAccountSchedulableForRPM(ctx context.Context, account return true } +// IncrementAccountRPM increments the RPM counter for the given account. +func (s *GatewayService) IncrementAccountRPM(ctx context.Context, accountID int64) error { + if s.rpmCache == nil { + return nil + } + _, err := s.rpmCache.IncrementRPM(ctx, accountID) + return err +} + // checkAndRegisterSession 检查并注册会话,用于会话数量限制 // 仅适用于 Anthropic OAuth/SetupToken 账号 // sessionID: 会话标识符(使用粘性会话的 hash)