diff --git a/backend/internal/repository/usage_billing_repo.go b/backend/internal/repository/usage_billing_repo.go index 2b6edad3..62f48b58 100644 --- a/backend/internal/repository/usage_billing_repo.go +++ b/backend/internal/repository/usage_billing_repo.go @@ -290,7 +290,6 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI if err != nil { return nil, err } - defer func() { _ = rows.Close() }() var state service.AccountQuotaState if rows.Next() { @@ -299,18 +298,36 @@ func incrementUsageBillingAccountQuota(ctx context.Context, tx *sql.Tx, accountI &state.DailyUsed, &state.DailyLimit, &state.WeeklyUsed, &state.WeeklyLimit, ); err != nil { + _ = rows.Close() return nil, err } } else { if err := rows.Err(); err != nil { + _ = rows.Close() return nil, err } + _ = rows.Close() return nil, service.ErrAccountNotFound } if err := rows.Err(); err != nil { + _ = rows.Close() return nil, err } - if state.TotalLimit > 0 && state.TotalUsed >= state.TotalLimit && (state.TotalUsed-amount) < state.TotalLimit { + // 必须在执行下一条 SQL 前显式关闭 rows:pq 驱动在同一连接上 + // 不允许前一条查询的结果集未耗尽时启动新查询,否则会返回 + // "unexpected Parse response" 错误。 + if err := rows.Close(); err != nil { + return nil, err + } + // 任意维度额度在本次递增中从"未超"跨越到"已超"时,必须刷新调度快照, + // 否则 Redis 中缓存的 Account 仍显示旧的 used 值,后续请求会继续选中本账号, + // 最终观察到 daily_used / weekly_used 大幅超过配置的 limit。 + // 对于日/周额度,即使本次触发了周期重置(pre=0、post=amount), + // 判定式 (post-amount) < limit 同样成立,逻辑与总额度保持一致。 + crossedTotal := state.TotalLimit > 0 && state.TotalUsed >= state.TotalLimit && (state.TotalUsed-amount) < state.TotalLimit + crossedDaily := state.DailyLimit > 0 && state.DailyUsed >= state.DailyLimit && (state.DailyUsed-amount) < state.DailyLimit + crossedWeekly := state.WeeklyLimit > 0 && state.WeeklyUsed >= state.WeeklyLimit && (state.WeeklyUsed-amount) < state.WeeklyLimit + if crossedTotal || crossedDaily || crossedWeekly { if err := enqueueSchedulerOutbox(ctx, tx, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil); err != nil { logger.LegacyPrintf("repository.usage_billing", "[SchedulerOutbox] enqueue quota exceeded failed: account=%d err=%v", accountID, err) return nil, err diff --git a/backend/internal/repository/usage_billing_repo_integration_test.go b/backend/internal/repository/usage_billing_repo_integration_test.go index eda34cc9..e8d4d327 100644 --- a/backend/internal/repository/usage_billing_repo_integration_test.go +++ b/backend/internal/repository/usage_billing_repo_integration_test.go @@ -199,6 +199,94 @@ func TestUsageBillingRepositoryApply_UpdatesAccountQuota(t *testing.T) { require.InDelta(t, 3.5, quotaUsed, 0.000001) } +func TestUsageBillingRepositoryApply_EnqueuesSchedulerOutboxOnQuotaCrossing(t *testing.T) { + ctx := context.Background() + client := testEntClient(t) + repo := NewUsageBillingRepository(client, integrationDB) + + newFixture := func(t *testing.T, extra map[string]any) (int64, int64) { + t.Helper() + user := mustCreateUser(t, client, &service.User{ + Email: fmt.Sprintf("usage-billing-outbox-user-%d-%s@example.com", time.Now().UnixNano(), uuid.NewString()), + PasswordHash: "hash", + }) + apiKey := mustCreateApiKey(t, client, &service.APIKey{ + UserID: user.ID, + Key: "sk-usage-billing-outbox-" + uuid.NewString(), + Name: "billing-outbox", + }) + account := mustCreateAccount(t, client, &service.Account{ + Name: "usage-billing-outbox-" + uuid.NewString(), + Type: service.AccountTypeAPIKey, + Extra: extra, + }) + return apiKey.ID, account.ID + } + + outboxCountFor := func(t *testing.T, accountID int64) int { + t.Helper() + var count int + require.NoError(t, integrationDB.QueryRowContext(ctx, + "SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1 AND account_id = $2", + service.SchedulerOutboxEventAccountChanged, accountID, + ).Scan(&count)) + return count + } + + t.Run("daily_first_crossing_enqueues", func(t *testing.T) { + apiKeyID, accountID := newFixture(t, map[string]any{ + "quota_daily_limit": 10.0, + }) + // 第一次低于日限额:不应入队 outbox + _, err := repo.Apply(ctx, &service.UsageBillingCommand{ + RequestID: uuid.NewString(), + APIKeyID: apiKeyID, + AccountID: accountID, + AccountType: service.AccountTypeAPIKey, + AccountQuotaCost: 4, + }) + require.NoError(t, err) + require.Equal(t, 0, outboxCountFor(t, accountID), "below limit should not enqueue") + + // 第二次跨越日限额:应入队一次 outbox + _, err = repo.Apply(ctx, &service.UsageBillingCommand{ + RequestID: uuid.NewString(), + APIKeyID: apiKeyID, + AccountID: accountID, + AccountType: service.AccountTypeAPIKey, + AccountQuotaCost: 8, + }) + require.NoError(t, err) + require.Equal(t, 1, outboxCountFor(t, accountID), "crossing daily limit should enqueue once") + + // 再次递增(已超):不应重复入队 + _, err = repo.Apply(ctx, &service.UsageBillingCommand{ + RequestID: uuid.NewString(), + APIKeyID: apiKeyID, + AccountID: accountID, + AccountType: service.AccountTypeAPIKey, + AccountQuotaCost: 2, + }) + require.NoError(t, err) + require.Equal(t, 1, outboxCountFor(t, accountID), "subsequent increments beyond limit should not re-enqueue") + }) + + t.Run("weekly_first_crossing_enqueues", func(t *testing.T) { + apiKeyID, accountID := newFixture(t, map[string]any{ + "quota_weekly_limit": 10.0, + }) + _, err := repo.Apply(ctx, &service.UsageBillingCommand{ + RequestID: uuid.NewString(), + APIKeyID: apiKeyID, + AccountID: accountID, + AccountType: service.AccountTypeAPIKey, + AccountQuotaCost: 15, // 单次即跨越 + }) + require.NoError(t, err) + require.Equal(t, 1, outboxCountFor(t, accountID), "single-shot crossing weekly limit should enqueue once") + }) +} + func TestDashboardAggregationRepositoryCleanupUsageBillingDedup_BatchDeletesOldRows(t *testing.T) { ctx := context.Background() repo := newDashboardAggregationRepositoryWithSQL(integrationDB)