From a07174c19181cd3239a19f765cf8d128182a8df8 Mon Sep 17 00:00:00 2001 From: shaw Date: Sun, 18 Jan 2026 16:41:15 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E9=99=90=E5=88=B6=E5=8A=9F=E8=83=BD=E5=B9=B6=E5=9C=A8=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E8=B4=A6=E5=8F=B7=E6=97=B6=E6=94=AF=E6=8C=81=E9=85=8D?= =?UTF-8?q?=E9=A2=9D=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/service/gateway_service.go | 189 +++++++++++------- .../components/account/CreateAccountModal.vue | 179 ++++++++++++++++- 2 files changed, 292 insertions(+), 76 deletions(-) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 5068767c..36011947 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -410,11 +410,9 @@ func (s *GatewayService) SelectAccountForModelWithExclusions(ctx context.Context } // SelectAccountWithLoadAwareness selects account with load-awareness and wait plan. -// metadataUserID: 原始 metadata.user_id 字段(用于提取会话 UUID 进行会话数量限制) +// metadataUserID: 已废弃参数,会话限制现在统一使用 sessionHash func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, metadataUserID string) (*AccountSelectionResult, error) { cfg := s.schedulingConfig() - // 提取会话 UUID(用于会话数量限制) - sessionUUID := extractSessionUUID(metadataUserID) var stickyAccountID int64 if sessionHash != "" && s.cache != nil { @@ -440,41 +438,63 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } if s.concurrencyService == nil || !cfg.LoadBatchEnabled { - account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs) - if err != nil { - return nil, err + // 复制排除列表,用于会话限制拒绝时的重试 + localExcluded := make(map[int64]struct{}) + for k, v := range excludedIDs { + localExcluded[k] = v } - result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency) - if err == nil && result.Acquired { - return &AccountSelectionResult{ - Account: account, - Acquired: true, - ReleaseFunc: result.ReleaseFunc, - }, nil - } - if stickyAccountID > 0 && stickyAccountID == account.ID && s.concurrencyService != nil { - waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, account.ID) - if waitingCount < cfg.StickySessionMaxWaiting { + + for { + account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, localExcluded) + if err != nil { + return nil, err + } + + result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency) + if err == nil && result.Acquired { + // 获取槽位后检查会话限制(使用 sessionHash 作为会话标识符) + if !s.checkAndRegisterSession(ctx, account, sessionHash) { + result.ReleaseFunc() // 释放槽位 + localExcluded[account.ID] = struct{}{} // 排除此账号 + continue // 重新选择 + } return &AccountSelectionResult{ - Account: account, - WaitPlan: &AccountWaitPlan{ - AccountID: account.ID, - MaxConcurrency: account.Concurrency, - Timeout: cfg.StickySessionWaitTimeout, - MaxWaiting: cfg.StickySessionMaxWaiting, - }, + Account: account, + Acquired: true, + ReleaseFunc: result.ReleaseFunc, }, nil } + + // 对于等待计划的情况,也需要先检查会话限制 + if !s.checkAndRegisterSession(ctx, account, sessionHash) { + localExcluded[account.ID] = struct{}{} + continue + } + + if stickyAccountID > 0 && stickyAccountID == account.ID && s.concurrencyService != nil { + waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, account.ID) + if waitingCount < cfg.StickySessionMaxWaiting { + return &AccountSelectionResult{ + Account: account, + WaitPlan: &AccountWaitPlan{ + AccountID: account.ID, + MaxConcurrency: account.Concurrency, + Timeout: cfg.StickySessionWaitTimeout, + MaxWaiting: cfg.StickySessionMaxWaiting, + }, + }, nil + } + } + return &AccountSelectionResult{ + Account: account, + WaitPlan: &AccountWaitPlan{ + AccountID: account.ID, + MaxConcurrency: account.Concurrency, + Timeout: cfg.FallbackWaitTimeout, + MaxWaiting: cfg.FallbackMaxWaiting, + }, + }, nil } - return &AccountSelectionResult{ - Account: account, - WaitPlan: &AccountWaitPlan{ - AccountID: account.ID, - MaxConcurrency: account.Concurrency, - Timeout: cfg.FallbackWaitTimeout, - MaxWaiting: cfg.FallbackMaxWaiting, - }, - }, nil } platform, hasForcePlatform, err := s.resolvePlatform(ctx, groupID, group) @@ -590,7 +610,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro result, err := s.tryAcquireAccountSlot(ctx, stickyAccountID, stickyAccount.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, stickyAccount, sessionUUID) { + if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) { result.ReleaseFunc() // 释放槽位 // 继续到负载感知选择 } else { @@ -608,15 +628,20 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, stickyAccountID) if waitingCount < cfg.StickySessionMaxWaiting { - return &AccountSelectionResult{ - Account: stickyAccount, - WaitPlan: &AccountWaitPlan{ - AccountID: stickyAccountID, - MaxConcurrency: stickyAccount.Concurrency, - Timeout: cfg.StickySessionWaitTimeout, - MaxWaiting: cfg.StickySessionMaxWaiting, - }, - }, nil + // 会话数量限制检查(等待计划也需要占用会话配额) + if !s.checkAndRegisterSession(ctx, stickyAccount, sessionHash) { + // 会话限制已满,继续到负载感知选择 + } else { + return &AccountSelectionResult{ + Account: stickyAccount, + WaitPlan: &AccountWaitPlan{ + AccountID: stickyAccountID, + MaxConcurrency: stickyAccount.Concurrency, + Timeout: cfg.StickySessionWaitTimeout, + MaxWaiting: cfg.StickySessionMaxWaiting, + }, + }, nil + } } // 粘性账号槽位满且等待队列已满,继续使用负载感知选择 } @@ -677,7 +702,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro result, err := s.tryAcquireAccountSlot(ctx, item.account.ID, item.account.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, item.account, sessionUUID) { + if !s.checkAndRegisterSession(ctx, item.account, sessionHash) { result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 continue } @@ -695,20 +720,26 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } } - // 5. 所有路由账号槽位满,返回等待计划(选择负载最低的) - acc := routingAvailable[0].account - if s.debugModelRoutingEnabled() { - log.Printf("[ModelRoutingDebug] routed wait: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), acc.ID) + // 5. 所有路由账号槽位满,尝试返回等待计划(选择负载最低的) + // 遍历找到第一个满足会话限制的账号 + for _, item := range routingAvailable { + if !s.checkAndRegisterSession(ctx, item.account, sessionHash) { + continue // 会话限制已满,尝试下一个 + } + if s.debugModelRoutingEnabled() { + log.Printf("[ModelRoutingDebug] routed wait: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), item.account.ID) + } + return &AccountSelectionResult{ + Account: item.account, + WaitPlan: &AccountWaitPlan{ + AccountID: item.account.ID, + MaxConcurrency: item.account.Concurrency, + Timeout: cfg.StickySessionWaitTimeout, + MaxWaiting: cfg.StickySessionMaxWaiting, + }, + }, nil } - return &AccountSelectionResult{ - Account: acc, - WaitPlan: &AccountWaitPlan{ - AccountID: acc.ID, - MaxConcurrency: acc.Concurrency, - Timeout: cfg.StickySessionWaitTimeout, - MaxWaiting: cfg.StickySessionMaxWaiting, - }, - }, nil + // 所有路由账号会话限制都已满,继续到 Layer 2 回退 } // 路由列表中的账号都不可用(负载率 >= 100),继续到 Layer 2 回退 log.Printf("[ModelRouting] All routed accounts unavailable for model=%s, falling back to normal selection", requestedModel) @@ -728,7 +759,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, account, sessionUUID) { + if !s.checkAndRegisterSession(ctx, account, sessionHash) { result.ReleaseFunc() // 释放槽位,继续到 Layer 2 } else { _ = s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL) @@ -742,15 +773,20 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, accountID) if waitingCount < cfg.StickySessionMaxWaiting { - return &AccountSelectionResult{ - Account: account, - WaitPlan: &AccountWaitPlan{ - AccountID: accountID, - MaxConcurrency: account.Concurrency, - Timeout: cfg.StickySessionWaitTimeout, - MaxWaiting: cfg.StickySessionMaxWaiting, - }, - }, nil + // 会话数量限制检查(等待计划也需要占用会话配额) + if !s.checkAndRegisterSession(ctx, account, sessionHash) { + // 会话限制已满,继续到 Layer 2 + } else { + return &AccountSelectionResult{ + Account: account, + WaitPlan: &AccountWaitPlan{ + AccountID: accountID, + MaxConcurrency: account.Concurrency, + Timeout: cfg.StickySessionWaitTimeout, + MaxWaiting: cfg.StickySessionMaxWaiting, + }, + }, nil + } } } } @@ -799,7 +835,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro loadMap, err := s.concurrencyService.GetAccountsLoadBatch(ctx, accountLoads) if err != nil { - if result, ok := s.tryAcquireByLegacyOrder(ctx, candidates, groupID, sessionHash, preferOAuth, sessionUUID); ok { + if result, ok := s.tryAcquireByLegacyOrder(ctx, candidates, groupID, sessionHash, preferOAuth); ok { return result, nil } } else { @@ -849,7 +885,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro result, err := s.tryAcquireAccountSlot(ctx, item.account.ID, item.account.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, item.account, sessionUUID) { + if !s.checkAndRegisterSession(ctx, item.account, sessionHash) { result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 continue } @@ -869,6 +905,10 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro // ============ Layer 3: 兜底排队 ============ sortAccountsByPriorityAndLastUsed(candidates, preferOAuth) for _, acc := range candidates { + // 会话数量限制检查(等待计划也需要占用会话配额) + if !s.checkAndRegisterSession(ctx, acc, sessionHash) { + continue // 会话限制已满,尝试下一个账号 + } return &AccountSelectionResult{ Account: acc, WaitPlan: &AccountWaitPlan{ @@ -882,7 +922,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro return nil, errors.New("no available accounts") } -func (s *GatewayService) tryAcquireByLegacyOrder(ctx context.Context, candidates []*Account, groupID *int64, sessionHash string, preferOAuth bool, sessionUUID string) (*AccountSelectionResult, bool) { +func (s *GatewayService) tryAcquireByLegacyOrder(ctx context.Context, candidates []*Account, groupID *int64, sessionHash string, preferOAuth bool) (*AccountSelectionResult, bool) { ordered := append([]*Account(nil), candidates...) sortAccountsByPriorityAndLastUsed(ordered, preferOAuth) @@ -890,7 +930,7 @@ func (s *GatewayService) tryAcquireByLegacyOrder(ctx context.Context, candidates result, err := s.tryAcquireAccountSlot(ctx, acc.ID, acc.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, acc, sessionUUID) { + if !s.checkAndRegisterSession(ctx, acc, sessionHash) { result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 continue } @@ -1188,15 +1228,16 @@ checkSchedulability: // checkAndRegisterSession 检查并注册会话,用于会话数量限制 // 仅适用于 Anthropic OAuth/SetupToken 账号 +// sessionID: 会话标识符(使用粘性会话的 hash) // 返回 true 表示允许(在限制内或会话已存在),false 表示拒绝(超出限制且是新会话) -func (s *GatewayService) checkAndRegisterSession(ctx context.Context, account *Account, sessionUUID string) bool { +func (s *GatewayService) checkAndRegisterSession(ctx context.Context, account *Account, sessionID string) bool { // 只检查 Anthropic OAuth/SetupToken 账号 if !account.IsAnthropicOAuthOrSetupToken() { return true } maxSessions := account.GetMaxSessions() - if maxSessions <= 0 || sessionUUID == "" { + if maxSessions <= 0 || sessionID == "" { return true // 未启用会话限制或无会话ID } @@ -1206,7 +1247,7 @@ func (s *GatewayService) checkAndRegisterSession(ctx context.Context, account *A idleTimeout := time.Duration(account.GetSessionIdleTimeoutMinutes()) * time.Minute - allowed, err := s.sessionLimitCache.RegisterSession(ctx, account.ID, sessionUUID, maxSessions, idleTimeout) + allowed, err := s.sessionLimitCache.RegisterSession(ctx, account.ID, sessionID, maxSessions, idleTimeout) if err != nil { // 失败开放:缓存错误时允许通过 return true diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index c81de00e..176af368 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1191,6 +1191,136 @@ + +
+
+

{{ t('admin.accounts.quotaControl.title') }}

+

+ {{ t('admin.accounts.quotaControl.hint') }} +

+
+ + +
+
+
+ +

+ {{ t('admin.accounts.quotaControl.windowCost.hint') }} +

+
+ +
+ +
+
+ +
+ $ + +
+

{{ t('admin.accounts.quotaControl.windowCost.limitHint') }}

+
+
+ +
+ $ + +
+

{{ t('admin.accounts.quotaControl.windowCost.stickyReserveHint') }}

+
+
+
+ + +
+
+
+ +

+ {{ t('admin.accounts.quotaControl.sessionLimit.hint') }} +

+
+ +
+ +
+
+ + +

{{ t('admin.accounts.quotaControl.sessionLimit.maxSessionsHint') }}

+
+
+ +
+ + {{ t('common.minutes') }} +
+

{{ t('admin.accounts.quotaControl.sessionLimit.idleTimeoutHint') }}

+
+
+
+
+
@@ -1763,6 +1893,14 @@ const geminiAIStudioOAuthEnabled = ref(false) const showAdvancedOAuth = ref(false) const showGeminiHelpDialog = ref(false) +// Quota control state (Anthropic OAuth/SetupToken only) +const windowCostEnabled = ref(false) +const windowCostLimit = ref(null) +const windowCostStickyReserve = ref(null) +const sessionLimitEnabled = ref(false) +const maxSessions = ref(null) +const sessionIdleTimeout = ref(null) + // Gemini tier selection (used as fallback when auto-detection is unavailable/fails) const geminiTierGoogleOne = ref<'google_one_free' | 'google_ai_pro' | 'google_ai_ultra'>('google_one_free') const geminiTierGcp = ref<'gcp_standard' | 'gcp_enterprise'>('gcp_standard') @@ -2140,6 +2278,13 @@ const resetForm = () => { customErrorCodeInput.value = null interceptWarmupRequests.value = false autoPauseOnExpired.value = true + // Reset quota control state + windowCostEnabled.value = false + windowCostLimit.value = null + windowCostStickyReserve.value = null + sessionLimitEnabled.value = false + maxSessions.value = null + sessionIdleTimeout.value = null tempUnschedEnabled.value = false tempUnschedRules.value = [] geminiOAuthType.value = 'code_assist' @@ -2407,7 +2552,22 @@ const handleAnthropicExchange = async (authCode: string) => { ...proxyConfig }) - const extra = oauth.buildExtraInfo(tokenInfo) + // Build extra with quota control settings + const baseExtra = oauth.buildExtraInfo(tokenInfo) || {} + const extra: Record = { ...baseExtra } + + // Add window cost limit settings + if (windowCostEnabled.value && windowCostLimit.value != null && windowCostLimit.value > 0) { + extra.window_cost_limit = windowCostLimit.value + extra.window_cost_sticky_reserve = windowCostStickyReserve.value ?? 10 + } + + // Add session limit settings + if (sessionLimitEnabled.value && maxSessions.value != null && maxSessions.value > 0) { + extra.max_sessions = maxSessions.value + extra.session_idle_timeout_minutes = sessionIdleTimeout.value ?? 5 + } + const credentials = { ...tokenInfo, ...(interceptWarmupRequests.value ? { intercept_warmup_requests: true } : {}) @@ -2475,7 +2635,22 @@ const handleCookieAuth = async (sessionKey: string) => { ...proxyConfig }) - const extra = oauth.buildExtraInfo(tokenInfo) + // Build extra with quota control settings + const baseExtra = oauth.buildExtraInfo(tokenInfo) || {} + const extra: Record = { ...baseExtra } + + // Add window cost limit settings + if (windowCostEnabled.value && windowCostLimit.value != null && windowCostLimit.value > 0) { + extra.window_cost_limit = windowCostLimit.value + extra.window_cost_sticky_reserve = windowCostStickyReserve.value ?? 10 + } + + // Add session limit settings + if (sessionLimitEnabled.value && maxSessions.value != null && maxSessions.value > 0) { + extra.max_sessions = maxSessions.value + extra.session_idle_timeout_minutes = sessionIdleTimeout.value ?? 5 + } + const accountName = keys.length > 1 ? `${form.name} #${i + 1}` : form.name // Merge interceptWarmupRequests into credentials