From 72e5876c64ee231d5b973546b1a85a262ffd0947 Mon Sep 17 00:00:00 2001 From: QTom Date: Tue, 31 Mar 2026 13:19:40 +0800 Subject: [PATCH] feat(gateway): Cache-Driven RPM Buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - buffer 公式从 baseRPM/5 改为 concurrency + maxSessions 保留 baseRPM/5 作为 floor 向后兼容 - 粘性路径 fallback 新增 [StickyCacheMiss] 结构化日志 reason: rpm_red / gate_check / session_limit / wait_queue_full / account_cleared - session_limit 路径跳过 wait queue 重试(RegisterSession 拒绝无副作用) - 典型配置 buffer 从 3 提升至 13,大幅减少高峰期 Prompt Cache Miss Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/internal/service/account.go | 33 +++++++++-- backend/internal/service/account_rpm_test.go | 55 ++++++++++++------ backend/internal/service/gateway_service.go | 61 ++++++++++++++------ 3 files changed, 110 insertions(+), 39 deletions(-) diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index a1449ffd..5ced20a7 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -1727,22 +1727,47 @@ func (a *Account) GetRPMStrategy() string { } // GetRPMStickyBuffer 获取 RPM 粘性缓冲数量 -// tiered 模式下的黄区大小,默认为 base_rpm 的 20%(至少 1) +// Cache-driven: buffer = concurrency + maxSessions(覆盖幽灵窗口 + 稳态会话需求) +// floor = baseRPM / 5(向后兼容 maxSessions=0 且 concurrency=0 场景) func (a *Account) GetRPMStickyBuffer() int { if a.Extra == nil { return 0 } + + // 手动 override 最高优先级 if v, ok := a.Extra["rpm_sticky_buffer"]; ok { val := parseExtraInt(v) if val > 0 { return val } } + base := a.GetBaseRPM() - buffer := base / 5 - if buffer < 1 && base > 0 { - buffer = 1 + if base <= 0 { + return 0 } + + // Cache-driven buffer = concurrency + maxSessions + conc := a.Concurrency + if conc < 0 { + conc = 0 + } + sess := a.GetMaxSessions() + if sess < 0 { + sess = 0 + } + + buffer := conc + sess + + // floor: 向后兼容 + floor := base / 5 + if floor < 1 { + floor = 1 + } + if buffer < floor { + buffer = floor + } + return buffer } diff --git a/backend/internal/service/account_rpm_test.go b/backend/internal/service/account_rpm_test.go index 9d91f3e0..40298263 100644 --- a/backend/internal/service/account_rpm_test.go +++ b/backend/internal/service/account_rpm_test.go @@ -90,28 +90,47 @@ func TestCheckRPMSchedulability(t *testing.T) { func TestGetRPMStickyBuffer(t *testing.T) { tests := []struct { - name string - extra map[string]any - expected int + name string + concurrency int + extra map[string]any + expected int }{ - {"nil extra", nil, 0}, - {"no keys", map[string]any{}, 0}, - {"base_rpm=0", map[string]any{"base_rpm": 0}, 0}, - {"base_rpm=1 min buffer 1", map[string]any{"base_rpm": 1}, 1}, - {"base_rpm=4 min buffer 1", map[string]any{"base_rpm": 4}, 1}, - {"base_rpm=5 buffer 1", map[string]any{"base_rpm": 5}, 1}, - {"base_rpm=10 buffer 2", map[string]any{"base_rpm": 10}, 2}, - {"base_rpm=15 buffer 3", map[string]any{"base_rpm": 15}, 3}, - {"base_rpm=100 buffer 20", map[string]any{"base_rpm": 100}, 20}, - {"custom buffer=5", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 5}, - {"custom buffer=0 fallback to default", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0}, 2}, - {"custom buffer negative fallback", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1}, 2}, - {"custom buffer with float", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7}, - {"json.Number base_rpm", map[string]any{"base_rpm": json.Number("10")}, 2}, + // 基础退化 + {"nil extra", 0, nil, 0}, + {"no keys", 0, map[string]any{}, 0}, + {"base_rpm=0", 0, map[string]any{"base_rpm": 0}, 0}, + + // 新公式: concurrency + maxSessions, floor = base/5 + {"conc=3 sess=10 → 13", 3, map[string]any{"base_rpm": 15, "max_sessions": 10}, 13}, + {"conc=2 sess=5 → 7", 2, map[string]any{"base_rpm": 10, "max_sessions": 5}, 7}, + {"conc=3 sess=15 → 18", 3, map[string]any{"base_rpm": 30, "max_sessions": 15}, 18}, + + // floor 生效 (conc+sess < base/5) + {"conc=0 sess=0 base=15 → floor 3", 0, map[string]any{"base_rpm": 15}, 3}, + {"conc=0 sess=0 base=10 → floor 2", 0, map[string]any{"base_rpm": 10}, 2}, + {"conc=0 sess=0 base=1 → floor 1", 0, map[string]any{"base_rpm": 1}, 1}, + {"conc=0 sess=0 base=4 → floor 1", 0, map[string]any{"base_rpm": 4}, 1}, + {"conc=1 sess=0 base=15 → floor 3", 1, map[string]any{"base_rpm": 15}, 3}, + + // 手动 override + {"custom buffer=5", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5, "max_sessions": 10}, 5}, + {"custom buffer=0 fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0, "max_sessions": 10}, 13}, + {"custom buffer negative fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1, "max_sessions": 10}, 13}, + {"custom buffer with float", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7}, + + // 负值 clamp + {"negative concurrency clamped", -5, map[string]any{"base_rpm": 15, "max_sessions": 10}, 10}, + {"negative maxSessions clamped", 3, map[string]any{"base_rpm": 15, "max_sessions": -5}, 3}, + + // 高并发低会话 + {"conc=10 sess=5 → 15", 10, map[string]any{"base_rpm": 10, "max_sessions": 5}, 15}, + + // json.Number + {"json.Number base_rpm", 3, map[string]any{"base_rpm": json.Number("10"), "max_sessions": json.Number("5")}, 8}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := &Account{Extra: tt.extra} + a := &Account{Concurrency: tt.concurrency, Extra: tt.extra} if got := a.GetRPMStickyBuffer(); got != tt.expected { t.Errorf("GetRPMStickyBuffer() = %d, want %d", got, tt.expected) } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index b54f463b..ec14ac62 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -1418,19 +1418,24 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if containsInt64(routingAccountIDs, stickyAccountID) && !isExcluded(stickyAccountID) { // 粘性账号在路由列表中,优先使用 if stickyAccount, ok := accountByID[stickyAccountID]; ok { - if s.isAccountSchedulableForSelection(stickyAccount) && + var stickyCacheMissReason string + + gatePass := s.isAccountSchedulableForSelection(stickyAccount) && s.isAccountAllowedForPlatform(stickyAccount, platform, useMixed) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, stickyAccount, requestedModel)) && s.isAccountSchedulableForModelSelection(ctx, stickyAccount, requestedModel) && s.isAccountSchedulableForQuota(stickyAccount) && - s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) && + s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) - s.isAccountSchedulableForRPM(ctx, stickyAccount, true) { // 粘性会话窗口费用+RPM 检查 + rpmPass := gatePass && s.isAccountSchedulableForRPM(ctx, stickyAccount, true) + + if rpmPass { // 粘性会话窗口费用+RPM 检查 result, err := s.tryAcquireAccountSlot(ctx, stickyAccountID, stickyAccount.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) { result.ReleaseFunc() // 释放槽位 + stickyCacheMissReason = "session_limit" // 继续到负载感知选择 } else { if s.debugModelRoutingEnabled() { @@ -1444,27 +1449,49 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } } - waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID) - if waitingCount < cfg.StickySessionMaxWaiting { - // 会话数量限制检查(等待计划也需要占用会话配额) - if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) { - // 会话限制已满,继续到负载感知选择 + if stickyCacheMissReason == "" { + waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID) + if waitingCount < cfg.StickySessionMaxWaiting { + // 会话数量限制检查(等待计划也需要占用会话配额) + if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) { + stickyCacheMissReason = "session_limit" + // 会话限制已满,继续到负载感知选择 + } else { + return &AccountSelectionResult{ + Account: stickyAccount, + WaitPlan: &AccountWaitPlan{ + AccountID: stickyAccountID, + MaxConcurrency: stickyAccount.Concurrency, + Timeout: cfg.StickySessionWaitTimeout, + MaxWaiting: cfg.StickySessionMaxWaiting, + }, + }, nil + } } else { - return &AccountSelectionResult{ - Account: stickyAccount, - WaitPlan: &AccountWaitPlan{ - AccountID: stickyAccountID, - MaxConcurrency: stickyAccount.Concurrency, - Timeout: cfg.StickySessionWaitTimeout, - MaxWaiting: cfg.StickySessionMaxWaiting, - }, - }, nil + stickyCacheMissReason = "wait_queue_full" } } // 粘性账号槽位满且等待队列已满,继续使用负载感知选择 + } else if !gatePass { + stickyCacheMissReason = "gate_check" + } else { + stickyCacheMissReason = "rpm_red" + } + + // 记录粘性缓存未命中的结构化日志 + if stickyCacheMissReason != "" { + baseRPM := stickyAccount.GetBaseRPM() + var currentRPM int + if count, ok := rpmFromPrefetchContext(ctx, stickyAccount.ID); ok { + currentRPM = count + } + logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=%s account_id=%d session=%s current_rpm=%d base_rpm=%d", + stickyCacheMissReason, stickyAccountID, shortSessionHash(sessionHash), currentRPM, baseRPM) } } else { _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) + logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=account_cleared account_id=%d session=%s current_rpm=0 base_rpm=0", + stickyAccountID, shortSessionHash(sessionHash)) } } }