feat: add independent load_factor field for scheduling load calculation

This commit is contained in:
erio
2026-03-06 05:07:10 +08:00
parent ae5d9c8bfc
commit 0d6c1c7790
31 changed files with 596 additions and 49 deletions

View File

@@ -28,6 +28,7 @@ type Account struct {
// RateMultiplier 账号计费倍率(>=0允许 0 表示该账号计费为 0
// 使用指针用于兼容旧版本调度缓存Redis中缺字段的情况nil 表示按 1.0 处理。
RateMultiplier *float64
LoadFactor *int // 调度负载因子nil 表示使用 Concurrency
Status string
ErrorMessage string
LastUsedAt *time.Time
@@ -88,6 +89,19 @@ func (a *Account) BillingRateMultiplier() float64 {
return *a.RateMultiplier
}
func (a *Account) EffectiveLoadFactor() int {
if a == nil {
return 1
}
if a.LoadFactor != nil && *a.LoadFactor > 0 {
return *a.LoadFactor
}
if a.Concurrency > 0 {
return a.Concurrency
}
return 1
}
func (a *Account) IsSchedulable() bool {
if !a.IsActive() || !a.Schedulable {
return false

View File

@@ -0,0 +1,42 @@
package service
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEffectiveLoadFactor_NilAccount(t *testing.T) {
var a *Account
require.Equal(t, 1, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_NilLoadFactor_PositiveConcurrency(t *testing.T) {
a := &Account{Concurrency: 5}
require.Equal(t, 5, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_NilLoadFactor_ZeroConcurrency(t *testing.T) {
a := &Account{Concurrency: 0}
require.Equal(t, 1, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_PositiveLoadFactor(t *testing.T) {
a := &Account{Concurrency: 5, LoadFactor: intPtr(20)}
require.Equal(t, 20, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_ZeroLoadFactor_FallbackToConcurrency(t *testing.T) {
a := &Account{Concurrency: 5, LoadFactor: intPtr(0)}
require.Equal(t, 5, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_NegativeLoadFactor_FallbackToConcurrency(t *testing.T) {
a := &Account{Concurrency: 3, LoadFactor: intPtr(-1)}
require.Equal(t, 3, a.EffectiveLoadFactor())
}
func TestEffectiveLoadFactor_ZeroLoadFactor_ZeroConcurrency(t *testing.T) {
a := &Account{Concurrency: 0, LoadFactor: intPtr(0)}
require.Equal(t, 1, a.EffectiveLoadFactor())
}

View File

@@ -78,6 +78,7 @@ type AccountBulkUpdate struct {
Concurrency *int
Priority *int
RateMultiplier *float64
LoadFactor *int
Status *string
Schedulable *bool
Credentials map[string]any

View File

@@ -195,6 +195,7 @@ type CreateAccountInput struct {
Concurrency int
Priority int
RateMultiplier *float64 // 账号计费倍率(>=0允许 0
LoadFactor *int
GroupIDs []int64
ExpiresAt *int64
AutoPauseOnExpired *bool
@@ -215,6 +216,7 @@ type UpdateAccountInput struct {
Concurrency *int // 使用指针区分"未提供"和"设置为0"
Priority *int // 使用指针区分"未提供"和"设置为0"
RateMultiplier *float64 // 账号计费倍率(>=0允许 0
LoadFactor *int
Status string
GroupIDs *[]int64
ExpiresAt *int64
@@ -230,6 +232,7 @@ type BulkUpdateAccountsInput struct {
Concurrency *int
Priority *int
RateMultiplier *float64 // 账号计费倍率(>=0允许 0
LoadFactor *int
Status string
Schedulable *bool
GroupIDs *[]int64
@@ -1413,6 +1416,9 @@ func (s *adminServiceImpl) CreateAccount(ctx context.Context, input *CreateAccou
}
account.RateMultiplier = input.RateMultiplier
}
if input.LoadFactor != nil && *input.LoadFactor > 0 {
account.LoadFactor = input.LoadFactor
}
if err := s.accountRepo.Create(ctx, account); err != nil {
return nil, err
}
@@ -1483,6 +1489,13 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U
}
account.RateMultiplier = input.RateMultiplier
}
if input.LoadFactor != nil {
if *input.LoadFactor <= 0 {
account.LoadFactor = nil // 0 或负数表示清除
} else {
account.LoadFactor = input.LoadFactor
}
}
if input.Status != "" {
account.Status = input.Status
}
@@ -1616,6 +1629,9 @@ func (s *adminServiceImpl) BulkUpdateAccounts(ctx context.Context, input *BulkUp
if input.RateMultiplier != nil {
repoUpdates.RateMultiplier = input.RateMultiplier
}
if input.LoadFactor != nil {
repoUpdates.LoadFactor = input.LoadFactor
}
if input.Status != "" {
repoUpdates.Status = &input.Status
}

View File

@@ -1311,7 +1311,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
for _, acc := range routingCandidates {
routingLoads = append(routingLoads, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: acc.Concurrency,
MaxConcurrency: acc.EffectiveLoadFactor(),
})
}
routingLoadMap, _ := s.concurrencyService.GetAccountsLoadBatch(ctx, routingLoads)
@@ -1499,7 +1499,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
for _, acc := range candidates {
accountLoads = append(accountLoads, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: acc.Concurrency,
MaxConcurrency: acc.EffectiveLoadFactor(),
})
}

View File

@@ -590,7 +590,7 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance(
filtered = append(filtered, account)
loadReq = append(loadReq, AccountWithConcurrency{
ID: account.ID,
MaxConcurrency: account.Concurrency,
MaxConcurrency: account.EffectiveLoadFactor(),
})
}
if len(filtered) == 0 {

View File

@@ -1242,7 +1242,7 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex
for _, acc := range candidates {
accountLoads = append(accountLoads, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: acc.Concurrency,
MaxConcurrency: acc.EffectiveLoadFactor(),
})
}

View File

@@ -64,8 +64,9 @@ func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts
if acc.ID <= 0 {
continue
}
if prev, ok := unique[acc.ID]; !ok || acc.Concurrency > prev {
unique[acc.ID] = acc.Concurrency
lf := acc.EffectiveLoadFactor()
if prev, ok := unique[acc.ID]; !ok || lf > prev {
unique[acc.ID] = lf
}
}

View File

@@ -389,13 +389,9 @@ func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Con
if acc.ID <= 0 {
continue
}
maxConc := acc.Concurrency
if maxConc < 0 {
maxConc = 0
}
batch = append(batch, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: maxConc,
MaxConcurrency: acc.EffectiveLoadFactor(),
})
}
if len(batch) == 0 {