feat(gateway): Cache-Driven RPM Buffer
- buffer 公式从 baseRPM/5 改为 concurrency + maxSessions 保留 baseRPM/5 作为 floor 向后兼容 - 粘性路径 fallback 新增 [StickyCacheMiss] 结构化日志 reason: rpm_red / gate_check / session_limit / wait_queue_full / account_cleared - session_limit 路径跳过 wait queue 重试(RegisterSession 拒绝无副作用) - 典型配置 buffer 从 3 提升至 13,大幅减少高峰期 Prompt Cache Miss Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1727,22 +1727,47 @@ func (a *Account) GetRPMStrategy() string {
|
||||
}
|
||||
|
||||
// GetRPMStickyBuffer 获取 RPM 粘性缓冲数量
|
||||
// tiered 模式下的黄区大小,默认为 base_rpm 的 20%(至少 1)
|
||||
// Cache-driven: buffer = concurrency + maxSessions(覆盖幽灵窗口 + 稳态会话需求)
|
||||
// floor = baseRPM / 5(向后兼容 maxSessions=0 且 concurrency=0 场景)
|
||||
func (a *Account) GetRPMStickyBuffer() int {
|
||||
if a.Extra == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// 手动 override 最高优先级
|
||||
if v, ok := a.Extra["rpm_sticky_buffer"]; ok {
|
||||
val := parseExtraInt(v)
|
||||
if val > 0 {
|
||||
return val
|
||||
}
|
||||
}
|
||||
|
||||
base := a.GetBaseRPM()
|
||||
buffer := base / 5
|
||||
if buffer < 1 && base > 0 {
|
||||
buffer = 1
|
||||
if base <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Cache-driven buffer = concurrency + maxSessions
|
||||
conc := a.Concurrency
|
||||
if conc < 0 {
|
||||
conc = 0
|
||||
}
|
||||
sess := a.GetMaxSessions()
|
||||
if sess < 0 {
|
||||
sess = 0
|
||||
}
|
||||
|
||||
buffer := conc + sess
|
||||
|
||||
// floor: 向后兼容
|
||||
floor := base / 5
|
||||
if floor < 1 {
|
||||
floor = 1
|
||||
}
|
||||
if buffer < floor {
|
||||
buffer = floor
|
||||
}
|
||||
|
||||
return buffer
|
||||
}
|
||||
|
||||
|
||||
@@ -90,28 +90,47 @@ func TestCheckRPMSchedulability(t *testing.T) {
|
||||
|
||||
func TestGetRPMStickyBuffer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
extra map[string]any
|
||||
expected int
|
||||
name string
|
||||
concurrency int
|
||||
extra map[string]any
|
||||
expected int
|
||||
}{
|
||||
{"nil extra", nil, 0},
|
||||
{"no keys", map[string]any{}, 0},
|
||||
{"base_rpm=0", map[string]any{"base_rpm": 0}, 0},
|
||||
{"base_rpm=1 min buffer 1", map[string]any{"base_rpm": 1}, 1},
|
||||
{"base_rpm=4 min buffer 1", map[string]any{"base_rpm": 4}, 1},
|
||||
{"base_rpm=5 buffer 1", map[string]any{"base_rpm": 5}, 1},
|
||||
{"base_rpm=10 buffer 2", map[string]any{"base_rpm": 10}, 2},
|
||||
{"base_rpm=15 buffer 3", map[string]any{"base_rpm": 15}, 3},
|
||||
{"base_rpm=100 buffer 20", map[string]any{"base_rpm": 100}, 20},
|
||||
{"custom buffer=5", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5}, 5},
|
||||
{"custom buffer=0 fallback to default", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0}, 2},
|
||||
{"custom buffer negative fallback", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1}, 2},
|
||||
{"custom buffer with float", map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7},
|
||||
{"json.Number base_rpm", map[string]any{"base_rpm": json.Number("10")}, 2},
|
||||
// 基础退化
|
||||
{"nil extra", 0, nil, 0},
|
||||
{"no keys", 0, map[string]any{}, 0},
|
||||
{"base_rpm=0", 0, map[string]any{"base_rpm": 0}, 0},
|
||||
|
||||
// 新公式: concurrency + maxSessions, floor = base/5
|
||||
{"conc=3 sess=10 → 13", 3, map[string]any{"base_rpm": 15, "max_sessions": 10}, 13},
|
||||
{"conc=2 sess=5 → 7", 2, map[string]any{"base_rpm": 10, "max_sessions": 5}, 7},
|
||||
{"conc=3 sess=15 → 18", 3, map[string]any{"base_rpm": 30, "max_sessions": 15}, 18},
|
||||
|
||||
// floor 生效 (conc+sess < base/5)
|
||||
{"conc=0 sess=0 base=15 → floor 3", 0, map[string]any{"base_rpm": 15}, 3},
|
||||
{"conc=0 sess=0 base=10 → floor 2", 0, map[string]any{"base_rpm": 10}, 2},
|
||||
{"conc=0 sess=0 base=1 → floor 1", 0, map[string]any{"base_rpm": 1}, 1},
|
||||
{"conc=0 sess=0 base=4 → floor 1", 0, map[string]any{"base_rpm": 4}, 1},
|
||||
{"conc=1 sess=0 base=15 → floor 3", 1, map[string]any{"base_rpm": 15}, 3},
|
||||
|
||||
// 手动 override
|
||||
{"custom buffer=5", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 5, "max_sessions": 10}, 5},
|
||||
{"custom buffer=0 fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": 0, "max_sessions": 10}, 13},
|
||||
{"custom buffer negative fallback", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": -1, "max_sessions": 10}, 13},
|
||||
{"custom buffer with float", 3, map[string]any{"base_rpm": 10, "rpm_sticky_buffer": float64(7)}, 7},
|
||||
|
||||
// 负值 clamp
|
||||
{"negative concurrency clamped", -5, map[string]any{"base_rpm": 15, "max_sessions": 10}, 10},
|
||||
{"negative maxSessions clamped", 3, map[string]any{"base_rpm": 15, "max_sessions": -5}, 3},
|
||||
|
||||
// 高并发低会话
|
||||
{"conc=10 sess=5 → 15", 10, map[string]any{"base_rpm": 10, "max_sessions": 5}, 15},
|
||||
|
||||
// json.Number
|
||||
{"json.Number base_rpm", 3, map[string]any{"base_rpm": json.Number("10"), "max_sessions": json.Number("5")}, 8},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
a := &Account{Extra: tt.extra}
|
||||
a := &Account{Concurrency: tt.concurrency, Extra: tt.extra}
|
||||
if got := a.GetRPMStickyBuffer(); got != tt.expected {
|
||||
t.Errorf("GetRPMStickyBuffer() = %d, want %d", got, tt.expected)
|
||||
}
|
||||
|
||||
@@ -1418,19 +1418,24 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
|
||||
if containsInt64(routingAccountIDs, stickyAccountID) && !isExcluded(stickyAccountID) {
|
||||
// 粘性账号在路由列表中,优先使用
|
||||
if stickyAccount, ok := accountByID[stickyAccountID]; ok {
|
||||
if s.isAccountSchedulableForSelection(stickyAccount) &&
|
||||
var stickyCacheMissReason string
|
||||
|
||||
gatePass := s.isAccountSchedulableForSelection(stickyAccount) &&
|
||||
s.isAccountAllowedForPlatform(stickyAccount, platform, useMixed) &&
|
||||
(requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, stickyAccount, requestedModel)) &&
|
||||
s.isAccountSchedulableForModelSelection(ctx, stickyAccount, requestedModel) &&
|
||||
s.isAccountSchedulableForQuota(stickyAccount) &&
|
||||
s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true) &&
|
||||
s.isAccountSchedulableForWindowCost(ctx, stickyAccount, true)
|
||||
|
||||
s.isAccountSchedulableForRPM(ctx, stickyAccount, true) { // 粘性会话窗口费用+RPM 检查
|
||||
rpmPass := gatePass && s.isAccountSchedulableForRPM(ctx, stickyAccount, true)
|
||||
|
||||
if rpmPass { // 粘性会话窗口费用+RPM 检查
|
||||
result, err := s.tryAcquireAccountSlot(ctx, stickyAccountID, stickyAccount.Concurrency)
|
||||
if err == nil && result.Acquired {
|
||||
// 会话数量限制检查
|
||||
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
|
||||
result.ReleaseFunc() // 释放槽位
|
||||
stickyCacheMissReason = "session_limit"
|
||||
// 继续到负载感知选择
|
||||
} else {
|
||||
if s.debugModelRoutingEnabled() {
|
||||
@@ -1444,27 +1449,49 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
|
||||
}
|
||||
}
|
||||
|
||||
waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID)
|
||||
if waitingCount < cfg.StickySessionMaxWaiting {
|
||||
// 会话数量限制检查(等待计划也需要占用会话配额)
|
||||
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
|
||||
// 会话限制已满,继续到负载感知选择
|
||||
if stickyCacheMissReason == "" {
|
||||
waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID)
|
||||
if waitingCount < cfg.StickySessionMaxWaiting {
|
||||
// 会话数量限制检查(等待计划也需要占用会话配额)
|
||||
if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) {
|
||||
stickyCacheMissReason = "session_limit"
|
||||
// 会话限制已满,继续到负载感知选择
|
||||
} else {
|
||||
return &AccountSelectionResult{
|
||||
Account: stickyAccount,
|
||||
WaitPlan: &AccountWaitPlan{
|
||||
AccountID: stickyAccountID,
|
||||
MaxConcurrency: stickyAccount.Concurrency,
|
||||
Timeout: cfg.StickySessionWaitTimeout,
|
||||
MaxWaiting: cfg.StickySessionMaxWaiting,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
} else {
|
||||
return &AccountSelectionResult{
|
||||
Account: stickyAccount,
|
||||
WaitPlan: &AccountWaitPlan{
|
||||
AccountID: stickyAccountID,
|
||||
MaxConcurrency: stickyAccount.Concurrency,
|
||||
Timeout: cfg.StickySessionWaitTimeout,
|
||||
MaxWaiting: cfg.StickySessionMaxWaiting,
|
||||
},
|
||||
}, nil
|
||||
stickyCacheMissReason = "wait_queue_full"
|
||||
}
|
||||
}
|
||||
// 粘性账号槽位满且等待队列已满,继续使用负载感知选择
|
||||
} else if !gatePass {
|
||||
stickyCacheMissReason = "gate_check"
|
||||
} else {
|
||||
stickyCacheMissReason = "rpm_red"
|
||||
}
|
||||
|
||||
// 记录粘性缓存未命中的结构化日志
|
||||
if stickyCacheMissReason != "" {
|
||||
baseRPM := stickyAccount.GetBaseRPM()
|
||||
var currentRPM int
|
||||
if count, ok := rpmFromPrefetchContext(ctx, stickyAccount.ID); ok {
|
||||
currentRPM = count
|
||||
}
|
||||
logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=%s account_id=%d session=%s current_rpm=%d base_rpm=%d",
|
||||
stickyCacheMissReason, stickyAccountID, shortSessionHash(sessionHash), currentRPM, baseRPM)
|
||||
}
|
||||
} else {
|
||||
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
|
||||
logger.LegacyPrintf("service.gateway", "[StickyCacheMiss] reason=account_cleared account_id=%d session=%s current_rpm=0 base_rpm=0",
|
||||
stickyAccountID, shortSessionHash(sessionHash))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user