Merge pull request #1836 from wucm667/fix/account-daily-weekly-quota-cache-invalidation
fix: 修复账户配额跨越时调度快照入队逻辑
This commit is contained in:
@@ -290,7 +290,6 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func() { _ = rows.Close() }()
|
|
||||||
|
|
||||||
var state service.AccountQuotaState
|
var state service.AccountQuotaState
|
||||||
if rows.Next() {
|
if rows.Next() {
|
||||||
@@ -299,18 +298,36 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI
|
|||||||
&state.DailyUsed, &state.DailyLimit,
|
&state.DailyUsed, &state.DailyLimit,
|
||||||
&state.WeeklyUsed, &state.WeeklyLimit,
|
&state.WeeklyUsed, &state.WeeklyLimit,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
_ = rows.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
|
_ = rows.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
_ = rows.Close()
|
||||||
return nil, service.ErrAccountNotFound
|
return nil, service.ErrAccountNotFound
|
||||||
}
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
|
_ = rows.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if state.TotalLimit > 0 && state.TotalUsed >= state.TotalLimit && (state.TotalUsed-amount) < state.TotalLimit {
|
// 必须在执行下一条 SQL 前显式关闭 rows:pq 驱动在同一连接上
|
||||||
|
// 不允许前一条查询的结果集未耗尽时启动新查询,否则会返回
|
||||||
|
// "unexpected Parse response" 错误。
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// 任意维度额度在本次递增中从"未超"跨越到"已超"时,必须刷新调度快照,
|
||||||
|
// 否则 Redis 中缓存的 Account 仍显示旧的 used 值,后续请求会继续选中本账号,
|
||||||
|
// 最终观察到 daily_used / weekly_used 大幅超过配置的 limit。
|
||||||
|
// 对于日/周额度,即使本次触发了周期重置(pre=0、post=amount),
|
||||||
|
// 判定式 (post-amount) < limit 同样成立,逻辑与总额度保持一致。
|
||||||
|
crossedTotal := state.TotalLimit > 0 && state.TotalUsed >= state.TotalLimit && (state.TotalUsed-amount) < state.TotalLimit
|
||||||
|
crossedDaily := state.DailyLimit > 0 && state.DailyUsed >= state.DailyLimit && (state.DailyUsed-amount) < state.DailyLimit
|
||||||
|
crossedWeekly := state.WeeklyLimit > 0 && state.WeeklyUsed >= state.WeeklyLimit && (state.WeeklyUsed-amount) < state.WeeklyLimit
|
||||||
|
if crossedTotal || crossedDaily || crossedWeekly {
|
||||||
if err := enqueueSchedulerOutbox(ctx, tx, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil); err != nil {
|
if err := enqueueSchedulerOutbox(ctx, tx, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil); err != nil {
|
||||||
logger.LegacyPrintf("repository.usage_billing", "[SchedulerOutbox] enqueue quota exceeded failed: account=%d err=%v", accountID, err)
|
logger.LegacyPrintf("repository.usage_billing", "[SchedulerOutbox] enqueue quota exceeded failed: account=%d err=%v", accountID, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -199,6 +199,94 @@ func TestUsageBillingRepositoryApply_UpdatesAccountQuota(t *testing.T) {
|
|||||||
require.InDelta(t, 3.5, quotaUsed, 0.000001)
|
require.InDelta(t, 3.5, quotaUsed, 0.000001)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUsageBillingRepositoryApply_EnqueuesSchedulerOutboxOnQuotaCrossing(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
client := testEntClient(t)
|
||||||
|
repo := NewUsageBillingRepository(client, integrationDB)
|
||||||
|
|
||||||
|
newFixture := func(t *testing.T, extra map[string]any) (int64, int64) {
|
||||||
|
t.Helper()
|
||||||
|
user := mustCreateUser(t, client, &service.User{
|
||||||
|
Email: fmt.Sprintf("usage-billing-outbox-user-%d-%s@example.com", time.Now().UnixNano(), uuid.NewString()),
|
||||||
|
PasswordHash: "hash",
|
||||||
|
})
|
||||||
|
apiKey := mustCreateApiKey(t, client, &service.APIKey{
|
||||||
|
UserID: user.ID,
|
||||||
|
Key: "sk-usage-billing-outbox-" + uuid.NewString(),
|
||||||
|
Name: "billing-outbox",
|
||||||
|
})
|
||||||
|
account := mustCreateAccount(t, client, &service.Account{
|
||||||
|
Name: "usage-billing-outbox-" + uuid.NewString(),
|
||||||
|
Type: service.AccountTypeAPIKey,
|
||||||
|
Extra: extra,
|
||||||
|
})
|
||||||
|
return apiKey.ID, account.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
outboxCountFor := func(t *testing.T, accountID int64) int {
|
||||||
|
t.Helper()
|
||||||
|
var count int
|
||||||
|
require.NoError(t, integrationDB.QueryRowContext(ctx,
|
||||||
|
"SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1 AND account_id = $2",
|
||||||
|
service.SchedulerOutboxEventAccountChanged, accountID,
|
||||||
|
).Scan(&count))
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("daily_first_crossing_enqueues", func(t *testing.T) {
|
||||||
|
apiKeyID, accountID := newFixture(t, map[string]any{
|
||||||
|
"quota_daily_limit": 10.0,
|
||||||
|
})
|
||||||
|
// 第一次低于日限额:不应入队 outbox
|
||||||
|
_, err := repo.Apply(ctx, &service.UsageBillingCommand{
|
||||||
|
RequestID: uuid.NewString(),
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
AccountType: service.AccountTypeAPIKey,
|
||||||
|
AccountQuotaCost: 4,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, outboxCountFor(t, accountID), "below limit should not enqueue")
|
||||||
|
|
||||||
|
// 第二次跨越日限额:应入队一次 outbox
|
||||||
|
_, err = repo.Apply(ctx, &service.UsageBillingCommand{
|
||||||
|
RequestID: uuid.NewString(),
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
AccountType: service.AccountTypeAPIKey,
|
||||||
|
AccountQuotaCost: 8,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, outboxCountFor(t, accountID), "crossing daily limit should enqueue once")
|
||||||
|
|
||||||
|
// 再次递增(已超):不应重复入队
|
||||||
|
_, err = repo.Apply(ctx, &service.UsageBillingCommand{
|
||||||
|
RequestID: uuid.NewString(),
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
AccountType: service.AccountTypeAPIKey,
|
||||||
|
AccountQuotaCost: 2,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, outboxCountFor(t, accountID), "subsequent increments beyond limit should not re-enqueue")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("weekly_first_crossing_enqueues", func(t *testing.T) {
|
||||||
|
apiKeyID, accountID := newFixture(t, map[string]any{
|
||||||
|
"quota_weekly_limit": 10.0,
|
||||||
|
})
|
||||||
|
_, err := repo.Apply(ctx, &service.UsageBillingCommand{
|
||||||
|
RequestID: uuid.NewString(),
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
AccountType: service.AccountTypeAPIKey,
|
||||||
|
AccountQuotaCost: 15, // 单次即跨越
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, outboxCountFor(t, accountID), "single-shot crossing weekly limit should enqueue once")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestDashboardAggregationRepositoryCleanupUsageBillingDedup_BatchDeletesOldRows(t *testing.T) {
|
func TestDashboardAggregationRepositoryCleanupUsageBillingDedup_BatchDeletesOldRows(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
repo := newDashboardAggregationRepositoryWithSQL(integrationDB)
|
repo := newDashboardAggregationRepositoryWithSQL(integrationDB)
|
||||||
|
|||||||
Reference in New Issue
Block a user