Merge pull request #1668 from tyqy12/main

修复 OpenAI 账号限流回流误判:7d 窗口可用时不因 5h 窗口为 0 回写 429
This commit is contained in:
Wesley Liddick
2026-04-15 16:48:48 +08:00
committed by GitHub
7 changed files with 80 additions and 190 deletions

View File

@@ -515,22 +515,10 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
_ = s.accountRepo.UpdateExtra(ctx, account.ID, updates) _ = s.accountRepo.UpdateExtra(ctx, account.ID, updates)
mergeAccountExtra(account, updates) mergeAccountExtra(account, updates)
} }
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
if resetAt := codexRateLimitResetAtFromSnapshot(snapshot, time.Now()); resetAt != nil {
_ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt)
account.RateLimitResetAt = resetAt
}
}
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
if isOAuth && s.accountRepo != nil {
if resetAt := (&RateLimitService{}).calculateOpenAI429ResetTime(resp.Header); resetAt != nil {
_ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt)
account.RateLimitResetAt = resetAt
}
}
// 401 Unauthorized: 标记账号为永久错误 // 401 Unauthorized: 标记账号为永久错误
if resp.StatusCode == http.StatusUnauthorized && s.accountRepo != nil { if resp.StatusCode == http.StatusUnauthorized && s.accountRepo != nil {
errMsg := fmt.Sprintf("Authentication failed (401): %s", string(body)) errMsg := fmt.Sprintf("Authentication failed (401): %s", string(body))

View File

@@ -111,7 +111,7 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing.
require.Contains(t, recorder.Body.String(), "test_complete") require.Contains(t, recorder.Body.String(), "test_complete")
} }
func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T) { func TestAccountTestService_OpenAI429PersistsSnapshotWithoutRateLimit(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
ctx, _ := newTestContext() ctx, _ := newTestContext()
@@ -138,10 +138,7 @@ func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T)
require.Error(t, err) require.Error(t, err)
require.NotEmpty(t, repo.updatedExtra) require.NotEmpty(t, repo.updatedExtra)
require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"]) require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"])
require.Equal(t, int64(88), repo.rateLimitedID) require.Zero(t, repo.rateLimitedID)
require.NotNil(t, repo.rateLimitedAt) require.Nil(t, repo.rateLimitedAt)
require.NotNil(t, account.RateLimitResetAt) require.Nil(t, account.RateLimitResetAt)
if account.RateLimitResetAt != nil && repo.rateLimitedAt != nil {
require.WithinDuration(t, *repo.rateLimitedAt, *account.RateLimitResetAt, time.Second)
}
} }

View File

@@ -499,7 +499,6 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
if account == nil { if account == nil {
return usage, nil return usage, nil
} }
syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, account, now)
if progress := buildCodexUsageProgressFromExtra(account.Extra, "5h", now); progress != nil { if progress := buildCodexUsageProgressFromExtra(account.Extra, "5h", now); progress != nil {
usage.FiveHour = progress usage.FiveHour = progress
@@ -509,11 +508,8 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
} }
if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) { if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
if updates, resetAt, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && (len(updates) > 0 || resetAt != nil) { if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 {
mergeAccountExtra(account, updates) mergeAccountExtra(account, updates)
if resetAt != nil {
account.RateLimitResetAt = resetAt
}
if usage.UpdatedAt == nil { if usage.UpdatedAt == nil {
usage.UpdatedAt = &now usage.UpdatedAt = &now
} }
@@ -594,26 +590,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no
return true return true
} }
func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, *time.Time, error) { func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, error) {
if account == nil || !account.IsOAuth() { if account == nil || !account.IsOAuth() {
return nil, nil, nil return nil, nil
} }
accessToken := account.GetOpenAIAccessToken() accessToken := account.GetOpenAIAccessToken()
if accessToken == "" { if accessToken == "" {
return nil, nil, fmt.Errorf("no access token available") return nil, fmt.Errorf("no access token available")
} }
modelID := openaipkg.DefaultTestModel modelID := openaipkg.DefaultTestModel
payload := createOpenAITestPayload(modelID, true) payload := createOpenAITestPayload(modelID, true)
payloadBytes, err := json.Marshal(payload) payloadBytes, err := json.Marshal(payload)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("marshal openai probe payload: %w", err) return nil, fmt.Errorf("marshal openai probe payload: %w", err)
} }
reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second) reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel() defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes)) req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes))
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("create openai probe request: %w", err) return nil, fmt.Errorf("create openai probe request: %w", err)
} }
req.Host = "chatgpt.com" req.Host = "chatgpt.com"
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
@@ -642,67 +638,51 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco
ResponseHeaderTimeout: 10 * time.Second, ResponseHeaderTimeout: 10 * time.Second,
}) })
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("build openai probe client: %w", err) return nil, fmt.Errorf("build openai probe client: %w", err)
} }
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("openai codex probe request failed: %w", err) return nil, fmt.Errorf("openai codex probe request failed: %w", err)
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
updates, resetAt, err := extractOpenAICodexProbeSnapshot(resp) updates, err := extractOpenAICodexProbeUpdates(resp)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
if len(updates) > 0 || resetAt != nil { if len(updates) > 0 {
s.persistOpenAICodexProbeSnapshot(account.ID, updates, resetAt) s.persistOpenAICodexProbeSnapshot(account.ID, updates)
return updates, resetAt, nil return updates, nil
} }
return nil, nil, nil return nil, nil
} }
func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any, resetAt *time.Time) { func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any) {
if s == nil || s.accountRepo == nil || accountID <= 0 { if s == nil || s.accountRepo == nil || accountID <= 0 {
return return
} }
if len(updates) == 0 && resetAt == nil { if len(updates) == 0 {
return return
} }
go func() { go func() {
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel() defer updateCancel()
if len(updates) > 0 { _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
}
if resetAt != nil {
_ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt)
}
}() }()
} }
func extractOpenAICodexProbeSnapshot(resp *http.Response) (map[string]any, *time.Time, error) { func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) {
if resp == nil { if resp == nil {
return nil, nil, nil return nil, nil
} }
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
baseTime := time.Now() return buildCodexUsageExtraUpdates(snapshot, time.Now()), nil
updates := buildCodexUsageExtraUpdates(snapshot, baseTime)
resetAt := codexRateLimitResetAtFromSnapshot(snapshot, baseTime)
if len(updates) > 0 {
return updates, resetAt, nil
}
return nil, resetAt, nil
} }
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
} }
return nil, nil, nil return nil, nil
}
func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) {
updates, _, err := extractOpenAICodexProbeSnapshot(resp)
return updates, err
} }
func mergeAccountExtra(account *Account, updates map[string]any) { func mergeAccountExtra(account *Account, updates map[string]any) {

View File

@@ -92,30 +92,7 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T)
} }
} }
func TestExtractOpenAICodexProbeSnapshotAccepts429WithResetAt(t *testing.T) { func TestAccountUsageService_PersistOpenAICodexProbeSnapshotOnlyUpdatesExtra(t *testing.T) {
t.Parallel()
headers := make(http.Header)
headers.Set("x-codex-primary-used-percent", "100")
headers.Set("x-codex-primary-reset-after-seconds", "604800")
headers.Set("x-codex-primary-window-minutes", "10080")
headers.Set("x-codex-secondary-used-percent", "100")
headers.Set("x-codex-secondary-reset-after-seconds", "18000")
headers.Set("x-codex-secondary-window-minutes", "300")
updates, resetAt, err := extractOpenAICodexProbeSnapshot(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers})
if err != nil {
t.Fatalf("extractOpenAICodexProbeSnapshot() error = %v", err)
}
if len(updates) == 0 {
t.Fatal("expected codex probe updates from 429 headers")
}
if resetAt == nil {
t.Fatal("expected resetAt from exhausted codex headers")
}
}
func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *testing.T) {
t.Parallel() t.Parallel()
repo := &accountUsageCodexProbeRepo{ repo := &accountUsageCodexProbeRepo{
@@ -123,12 +100,10 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
rateLimitCh: make(chan time.Time, 1), rateLimitCh: make(chan time.Time, 1),
} }
svc := &AccountUsageService{accountRepo: repo} svc := &AccountUsageService{accountRepo: repo}
resetAt := time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second)
svc.persistOpenAICodexProbeSnapshot(321, map[string]any{ svc.persistOpenAICodexProbeSnapshot(321, map[string]any{
"codex_7d_used_percent": 100.0, "codex_7d_used_percent": 100.0,
"codex_7d_reset_at": resetAt.Format(time.RFC3339), "codex_7d_reset_at": time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second).Format(time.RFC3339),
}, &resetAt) })
select { select {
case updates := <-repo.updateExtraCh: case updates := <-repo.updateExtraCh:
@@ -136,16 +111,49 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes
t.Fatalf("codex_7d_used_percent = %v, want 100", got) t.Fatalf("codex_7d_used_percent = %v, want 100", got)
} }
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("waiting for codex probe extra persistence timed out") t.Fatal("等待 codex 探测快照写入 extra 超时")
} }
select { select {
case got := <-repo.rateLimitCh: case got := <-repo.rateLimitCh:
if got.Before(resetAt.Add(-time.Second)) || got.After(resetAt.Add(time.Second)) { t.Fatalf("不应将探测快照写入运行时限流状态: %v", got)
t.Fatalf("rate limit resetAt = %v, want around %v", got, resetAt) case <-time.After(200 * time.Millisecond):
} }
case <-time.After(2 * time.Second): }
t.Fatal("waiting for codex probe rate limit persistence timed out")
func TestAccountUsageService_GetOpenAIUsage_DoesNotPromoteCodexExtraToRateLimit(t *testing.T) {
t.Parallel()
resetAt := time.Now().Add(6 * 24 * time.Hour).UTC().Truncate(time.Second)
repo := &accountUsageCodexProbeRepo{
rateLimitCh: make(chan time.Time, 1),
}
svc := &AccountUsageService{accountRepo: repo}
account := &Account{
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Extra: map[string]any{
"codex_5h_used_percent": 1.0,
"codex_5h_reset_at": time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second).Format(time.RFC3339),
"codex_7d_used_percent": 100.0,
"codex_7d_reset_at": resetAt.Format(time.RFC3339),
},
}
usage, err := svc.getOpenAIUsage(context.Background(), account)
if err != nil {
t.Fatalf("getOpenAIUsage() error = %v", err)
}
if usage.SevenDay == nil || usage.SevenDay.Utilization != 100.0 {
t.Fatalf("预期 7 天用量仍然可见,实际为 %#v", usage.SevenDay)
}
if account.RateLimitResetAt != nil {
t.Fatalf("不应让已耗尽的 codex extra 改写运行时限流状态: %v", account.RateLimitResetAt)
}
select {
case got := <-repo.rateLimitCh:
t.Fatalf("不应将已耗尽的 codex extra 持久化为运行时限流状态: %v", got)
case <-time.After(200 * time.Millisecond):
} }
} }

View File

@@ -1470,10 +1470,6 @@ func (s *adminServiceImpl) ListAccounts(ctx context.Context, page, pageSize int,
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
now := time.Now()
for i := range accounts {
syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, &accounts[i], now)
}
return accounts, result.Total, nil return accounts, result.Total, nil
} }

View File

@@ -1681,7 +1681,6 @@ func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Co
if err != nil || latest == nil { if err != nil || latest == nil {
return nil return nil
} }
syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, latest, time.Now())
if !latest.IsSchedulable() || !latest.IsOpenAI() { if !latest.IsSchedulable() || !latest.IsOpenAI() {
return nil return nil
} }
@@ -1704,7 +1703,6 @@ func (s *OpenAIGatewayService) getSchedulableAccount(ctx context.Context, accoun
if err != nil || account == nil { if err != nil || account == nil {
return account, err return account, err
} }
syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, account, time.Now())
return account, nil return account, nil
} }
@@ -4768,69 +4766,6 @@ func buildCodexUsageExtraUpdates(snapshot *OpenAICodexUsageSnapshot, fallbackNow
return updates return updates
} }
func codexUsagePercentExhausted(value *float64) bool {
return value != nil && *value >= 100-1e-9
}
func codexRateLimitResetAtFromSnapshot(snapshot *OpenAICodexUsageSnapshot, fallbackNow time.Time) *time.Time {
if snapshot == nil {
return nil
}
normalized := snapshot.Normalize()
if normalized == nil {
return nil
}
baseTime := codexSnapshotBaseTime(snapshot, fallbackNow)
if codexUsagePercentExhausted(normalized.Used7dPercent) && normalized.Reset7dSeconds != nil {
resetAt := baseTime.Add(time.Duration(*normalized.Reset7dSeconds) * time.Second)
return &resetAt
}
if codexUsagePercentExhausted(normalized.Used5hPercent) && normalized.Reset5hSeconds != nil {
resetAt := baseTime.Add(time.Duration(*normalized.Reset5hSeconds) * time.Second)
return &resetAt
}
return nil
}
func codexRateLimitResetAtFromExtra(extra map[string]any, now time.Time) *time.Time {
if len(extra) == 0 {
return nil
}
if progress := buildCodexUsageProgressFromExtra(extra, "7d", now); progress != nil && codexUsagePercentExhausted(&progress.Utilization) && progress.ResetsAt != nil && now.Before(*progress.ResetsAt) {
resetAt := progress.ResetsAt.UTC()
return &resetAt
}
if progress := buildCodexUsageProgressFromExtra(extra, "5h", now); progress != nil && codexUsagePercentExhausted(&progress.Utilization) && progress.ResetsAt != nil && now.Before(*progress.ResetsAt) {
resetAt := progress.ResetsAt.UTC()
return &resetAt
}
return nil
}
func applyOpenAICodexRateLimitFromExtra(account *Account, now time.Time) (*time.Time, bool) {
if account == nil || !account.IsOpenAI() {
return nil, false
}
resetAt := codexRateLimitResetAtFromExtra(account.Extra, now)
if resetAt == nil {
return nil, false
}
if account.RateLimitResetAt != nil && now.Before(*account.RateLimitResetAt) && !account.RateLimitResetAt.Before(*resetAt) {
return account.RateLimitResetAt, false
}
account.RateLimitResetAt = resetAt
return resetAt, true
}
func syncOpenAICodexRateLimitFromExtra(ctx context.Context, repo AccountRepository, account *Account, now time.Time) *time.Time {
resetAt, changed := applyOpenAICodexRateLimitFromExtra(account, now)
if !changed || resetAt == nil || repo == nil || account == nil || account.ID <= 0 {
return resetAt
}
_ = repo.SetRateLimited(ctx, account.ID, *resetAt)
return resetAt
}
// updateCodexUsageSnapshot saves the Codex usage snapshot to account's Extra field // updateCodexUsageSnapshot saves the Codex usage snapshot to account's Extra field
func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, accountID int64, snapshot *OpenAICodexUsageSnapshot) { func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, accountID int64, snapshot *OpenAICodexUsageSnapshot) {
if snapshot == nil { if snapshot == nil {
@@ -4842,24 +4777,17 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc
now := time.Now() now := time.Now()
updates := buildCodexUsageExtraUpdates(snapshot, now) updates := buildCodexUsageExtraUpdates(snapshot, now)
resetAt := codexRateLimitResetAtFromSnapshot(snapshot, now) if len(updates) == 0 {
if len(updates) == 0 && resetAt == nil {
return return
} }
shouldPersistUpdates := len(updates) > 0 && s.getCodexSnapshotThrottle().Allow(accountID, now) if !s.getCodexSnapshotThrottle().Allow(accountID, now) {
if !shouldPersistUpdates && resetAt == nil {
return return
} }
go func() { go func() {
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if shouldPersistUpdates { _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
}
if resetAt != nil {
_ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt)
}
}() }()
} }

View File

@@ -345,7 +345,7 @@ func TestOpenAIGatewayService_ProxyResponsesWebSocketFromClient_ErrorEventUsageL
} }
} }
func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRateLimit(t *testing.T) { func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotDoesNotSetRateLimit(t *testing.T) {
repo := &openAICodexSnapshotAsyncRepo{ repo := &openAICodexSnapshotAsyncRepo{
updateExtraCh: make(chan map[string]any, 1), updateExtraCh: make(chan map[string]any, 1),
rateLimitCh: make(chan time.Time, 1), rateLimitCh: make(chan time.Time, 1),
@@ -359,7 +359,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
SecondaryResetAfterSeconds: ptrIntWS(1200), SecondaryResetAfterSeconds: ptrIntWS(1200),
SecondaryWindowMinutes: ptrIntWS(300), SecondaryWindowMinutes: ptrIntWS(300),
} }
before := time.Now()
svc.updateCodexUsageSnapshot(context.Background(), 601, snapshot) svc.updateCodexUsageSnapshot(context.Background(), 601, snapshot)
select { select {
@@ -371,9 +370,8 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate
select { select {
case resetAt := <-repo.rateLimitCh: case resetAt := <-repo.rateLimitCh:
require.WithinDuration(t, before.Add(time.Hour), resetAt, 2*time.Second) t.Fatalf("不应因仅写入快照而生成运行时限流时间: %v", resetAt)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("等待 codex 100% 自动切换限流超时")
} }
} }
@@ -401,7 +399,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
select { select {
case resetAt := <-repo.rateLimitCh: case resetAt := <-repo.rateLimitCh:
t.Fatalf("unexpected rate limit reset at: %v", resetAt) t.Fatalf("不应写入运行时限流时间: %v", resetAt)
case <-time.After(200 * time.Millisecond): case <-time.After(200 * time.Millisecond):
} }
} }
@@ -409,7 +407,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN
func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *testing.T) { func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *testing.T) {
repo := &openAICodexSnapshotAsyncRepo{ repo := &openAICodexSnapshotAsyncRepo{
updateExtraCh: make(chan map[string]any, 2), updateExtraCh: make(chan map[string]any, 2),
rateLimitCh: make(chan time.Time, 2),
} }
svc := &OpenAIGatewayService{ svc := &OpenAIGatewayService{
accountRepo: repo, accountRepo: repo,
@@ -443,7 +440,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *t
func ptrFloat64WS(v float64) *float64 { return &v } func ptrFloat64WS(v float64) *float64 { return &v }
func ptrIntWS(v int) *int { return &v } func ptrIntWS(v int) *int { return &v }
func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateLimit(t *testing.T) { func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraDoesNotSetRateLimit(t *testing.T) {
resetAt := time.Now().Add(6 * 24 * time.Hour) resetAt := time.Now().Add(6 * 24 * time.Hour)
account := Account{ account := Account{
ID: 701, ID: 701,
@@ -463,17 +460,15 @@ func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateL
fresh, err := svc.getSchedulableAccount(context.Background(), account.ID) fresh, err := svc.getSchedulableAccount(context.Background(), account.ID)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, fresh) require.NotNil(t, fresh)
require.NotNil(t, fresh.RateLimitResetAt) require.Nil(t, fresh.RateLimitResetAt)
require.WithinDuration(t, resetAt.UTC(), *fresh.RateLimitResetAt, time.Second)
select { select {
case persisted := <-repo.rateLimitCh: case persisted := <-repo.rateLimitCh:
require.WithinDuration(t, resetAt.UTC(), persisted, time.Second) t.Fatalf("不应将已耗尽的 codex extra 提升为运行时限流状态: %v", persisted)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("等待旧快照补写限流状态超时")
} }
} }
func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount(t *testing.T) { func TestAdminService_ListAccounts_ExhaustedCodexExtraDoesNotSetRateLimit(t *testing.T) {
resetAt := time.Now().Add(4 * 24 * time.Hour) resetAt := time.Now().Add(4 * 24 * time.Hour)
repo := &openAICodexExtraListRepo{ repo := &openAICodexExtraListRepo{
stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{{ stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{{
@@ -496,13 +491,11 @@ func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount(
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(1), total) require.Equal(t, int64(1), total)
require.Len(t, accounts, 1) require.Len(t, accounts, 1)
require.NotNil(t, accounts[0].RateLimitResetAt) require.Nil(t, accounts[0].RateLimitResetAt)
require.WithinDuration(t, resetAt.UTC(), *accounts[0].RateLimitResetAt, time.Second)
select { select {
case persisted := <-repo.rateLimitCh: case persisted := <-repo.rateLimitCh:
require.WithinDuration(t, resetAt.UTC(), persisted, time.Second) t.Fatalf("不应在账号列表查询时将 codex extra 持久化为运行时限流状态: %v", persisted)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("等待列表补写限流状态超时")
} }
} }