diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index 55865945..a5559b7d 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -515,22 +515,10 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account _ = s.accountRepo.UpdateExtra(ctx, account.ID, updates) mergeAccountExtra(account, updates) } - if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { - if resetAt := codexRateLimitResetAtFromSnapshot(snapshot, time.Now()); resetAt != nil { - _ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt) - account.RateLimitResetAt = resetAt - } - } } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - if isOAuth && s.accountRepo != nil { - if resetAt := (&RateLimitService{}).calculateOpenAI429ResetTime(resp.Header); resetAt != nil { - _ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt) - account.RateLimitResetAt = resetAt - } - } // 401 Unauthorized: 标记账号为永久错误 if resp.StatusCode == http.StatusUnauthorized && s.accountRepo != nil { errMsg := fmt.Sprintf("Authentication failed (401): %s", string(body)) diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go index 5125db5b..82606979 100644 --- a/backend/internal/service/account_test_service_openai_test.go +++ b/backend/internal/service/account_test_service_openai_test.go @@ -111,7 +111,7 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing. require.Contains(t, recorder.Body.String(), "test_complete") } -func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T) { +func TestAccountTestService_OpenAI429PersistsSnapshotWithoutRateLimit(t *testing.T) { gin.SetMode(gin.TestMode) ctx, _ := newTestContext() @@ -138,10 +138,7 @@ func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T) require.Error(t, err) require.NotEmpty(t, repo.updatedExtra) require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"]) - require.Equal(t, int64(88), repo.rateLimitedID) - require.NotNil(t, repo.rateLimitedAt) - require.NotNil(t, account.RateLimitResetAt) - if account.RateLimitResetAt != nil && repo.rateLimitedAt != nil { - require.WithinDuration(t, *repo.rateLimitedAt, *account.RateLimitResetAt, time.Second) - } + require.Zero(t, repo.rateLimitedID) + require.Nil(t, repo.rateLimitedAt) + require.Nil(t, account.RateLimitResetAt) } diff --git a/backend/internal/service/account_usage_service.go b/backend/internal/service/account_usage_service.go index 0e5741d8..8d5bcec8 100644 --- a/backend/internal/service/account_usage_service.go +++ b/backend/internal/service/account_usage_service.go @@ -499,7 +499,6 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou if account == nil { return usage, nil } - syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, account, now) if progress := buildCodexUsageProgressFromExtra(account.Extra, "5h", now); progress != nil { usage.FiveHour = progress @@ -509,11 +508,8 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou } if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) { - if updates, resetAt, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && (len(updates) > 0 || resetAt != nil) { + if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 { mergeAccountExtra(account, updates) - if resetAt != nil { - account.RateLimitResetAt = resetAt - } if usage.UpdatedAt == nil { usage.UpdatedAt = &now } @@ -594,26 +590,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no return true } -func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, *time.Time, error) { +func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, error) { if account == nil || !account.IsOAuth() { - return nil, nil, nil + return nil, nil } accessToken := account.GetOpenAIAccessToken() if accessToken == "" { - return nil, nil, fmt.Errorf("no access token available") + return nil, fmt.Errorf("no access token available") } modelID := openaipkg.DefaultTestModel payload := createOpenAITestPayload(modelID, true) payloadBytes, err := json.Marshal(payload) if err != nil { - return nil, nil, fmt.Errorf("marshal openai probe payload: %w", err) + return nil, fmt.Errorf("marshal openai probe payload: %w", err) } reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes)) if err != nil { - return nil, nil, fmt.Errorf("create openai probe request: %w", err) + return nil, fmt.Errorf("create openai probe request: %w", err) } req.Host = "chatgpt.com" req.Header.Set("Content-Type", "application/json") @@ -642,67 +638,51 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco ResponseHeaderTimeout: 10 * time.Second, }) if err != nil { - return nil, nil, fmt.Errorf("build openai probe client: %w", err) + return nil, fmt.Errorf("build openai probe client: %w", err) } resp, err := client.Do(req) if err != nil { - return nil, nil, fmt.Errorf("openai codex probe request failed: %w", err) + return nil, fmt.Errorf("openai codex probe request failed: %w", err) } defer func() { _ = resp.Body.Close() }() - updates, resetAt, err := extractOpenAICodexProbeSnapshot(resp) + updates, err := extractOpenAICodexProbeUpdates(resp) if err != nil { - return nil, nil, err + return nil, err } - if len(updates) > 0 || resetAt != nil { - s.persistOpenAICodexProbeSnapshot(account.ID, updates, resetAt) - return updates, resetAt, nil + if len(updates) > 0 { + s.persistOpenAICodexProbeSnapshot(account.ID, updates) + return updates, nil } - return nil, nil, nil + return nil, nil } -func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any, resetAt *time.Time) { +func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any) { if s == nil || s.accountRepo == nil || accountID <= 0 { return } - if len(updates) == 0 && resetAt == nil { + if len(updates) == 0 { return } go func() { updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) defer updateCancel() - if len(updates) > 0 { - _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) - } - if resetAt != nil { - _ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt) - } + _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) }() } -func extractOpenAICodexProbeSnapshot(resp *http.Response) (map[string]any, *time.Time, error) { +func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) { if resp == nil { - return nil, nil, nil + return nil, nil } if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { - baseTime := time.Now() - updates := buildCodexUsageExtraUpdates(snapshot, baseTime) - resetAt := codexRateLimitResetAtFromSnapshot(snapshot, baseTime) - if len(updates) > 0 { - return updates, resetAt, nil - } - return nil, resetAt, nil + return buildCodexUsageExtraUpdates(snapshot, time.Now()), nil } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) + return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) } - return nil, nil, nil -} - -func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) { - updates, _, err := extractOpenAICodexProbeSnapshot(resp) - return updates, err + return nil, nil } func mergeAccountExtra(account *Account, updates map[string]any) { diff --git a/backend/internal/service/account_usage_service_test.go b/backend/internal/service/account_usage_service_test.go index fe255225..28b49838 100644 --- a/backend/internal/service/account_usage_service_test.go +++ b/backend/internal/service/account_usage_service_test.go @@ -92,30 +92,7 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T) } } -func TestExtractOpenAICodexProbeSnapshotAccepts429WithResetAt(t *testing.T) { - t.Parallel() - - headers := make(http.Header) - headers.Set("x-codex-primary-used-percent", "100") - headers.Set("x-codex-primary-reset-after-seconds", "604800") - headers.Set("x-codex-primary-window-minutes", "10080") - headers.Set("x-codex-secondary-used-percent", "100") - headers.Set("x-codex-secondary-reset-after-seconds", "18000") - headers.Set("x-codex-secondary-window-minutes", "300") - - updates, resetAt, err := extractOpenAICodexProbeSnapshot(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers}) - if err != nil { - t.Fatalf("extractOpenAICodexProbeSnapshot() error = %v", err) - } - if len(updates) == 0 { - t.Fatal("expected codex probe updates from 429 headers") - } - if resetAt == nil { - t.Fatal("expected resetAt from exhausted codex headers") - } -} - -func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *testing.T) { +func TestAccountUsageService_PersistOpenAICodexProbeSnapshotOnlyUpdatesExtra(t *testing.T) { t.Parallel() repo := &accountUsageCodexProbeRepo{ @@ -123,12 +100,10 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes rateLimitCh: make(chan time.Time, 1), } svc := &AccountUsageService{accountRepo: repo} - resetAt := time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second) - svc.persistOpenAICodexProbeSnapshot(321, map[string]any{ "codex_7d_used_percent": 100.0, - "codex_7d_reset_at": resetAt.Format(time.RFC3339), - }, &resetAt) + "codex_7d_reset_at": time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second).Format(time.RFC3339), + }) select { case updates := <-repo.updateExtraCh: @@ -136,16 +111,49 @@ func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *tes t.Fatalf("codex_7d_used_percent = %v, want 100", got) } case <-time.After(2 * time.Second): - t.Fatal("waiting for codex probe extra persistence timed out") + t.Fatal("等待 codex 探测快照写入 extra 超时") } select { case got := <-repo.rateLimitCh: - if got.Before(resetAt.Add(-time.Second)) || got.After(resetAt.Add(time.Second)) { - t.Fatalf("rate limit resetAt = %v, want around %v", got, resetAt) - } - case <-time.After(2 * time.Second): - t.Fatal("waiting for codex probe rate limit persistence timed out") + t.Fatalf("不应将探测快照写入运行时限流状态: %v", got) + case <-time.After(200 * time.Millisecond): + } +} + +func TestAccountUsageService_GetOpenAIUsage_DoesNotPromoteCodexExtraToRateLimit(t *testing.T) { + t.Parallel() + + resetAt := time.Now().Add(6 * 24 * time.Hour).UTC().Truncate(time.Second) + repo := &accountUsageCodexProbeRepo{ + rateLimitCh: make(chan time.Time, 1), + } + svc := &AccountUsageService{accountRepo: repo} + account := &Account{ + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Extra: map[string]any{ + "codex_5h_used_percent": 1.0, + "codex_5h_reset_at": time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second).Format(time.RFC3339), + "codex_7d_used_percent": 100.0, + "codex_7d_reset_at": resetAt.Format(time.RFC3339), + }, + } + + usage, err := svc.getOpenAIUsage(context.Background(), account) + if err != nil { + t.Fatalf("getOpenAIUsage() error = %v", err) + } + if usage.SevenDay == nil || usage.SevenDay.Utilization != 100.0 { + t.Fatalf("预期 7 天用量仍然可见,实际为 %#v", usage.SevenDay) + } + if account.RateLimitResetAt != nil { + t.Fatalf("不应让已耗尽的 codex extra 改写运行时限流状态: %v", account.RateLimitResetAt) + } + select { + case got := <-repo.rateLimitCh: + t.Fatalf("不应将已耗尽的 codex extra 持久化为运行时限流状态: %v", got) + case <-time.After(200 * time.Millisecond): } } diff --git a/backend/internal/service/admin_service.go b/backend/internal/service/admin_service.go index 97b42c24..7c26a47c 100644 --- a/backend/internal/service/admin_service.go +++ b/backend/internal/service/admin_service.go @@ -1470,10 +1470,6 @@ func (s *adminServiceImpl) ListAccounts(ctx context.Context, page, pageSize int, if err != nil { return nil, 0, err } - now := time.Now() - for i := range accounts { - syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, &accounts[i], now) - } return accounts, result.Total, nil } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 6087b7b6..ef97daad 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1681,7 +1681,6 @@ func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Co if err != nil || latest == nil { return nil } - syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, latest, time.Now()) if !latest.IsSchedulable() || !latest.IsOpenAI() { return nil } @@ -1704,7 +1703,6 @@ func (s *OpenAIGatewayService) getSchedulableAccount(ctx context.Context, accoun if err != nil || account == nil { return account, err } - syncOpenAICodexRateLimitFromExtra(ctx, s.accountRepo, account, time.Now()) return account, nil } @@ -4768,69 +4766,6 @@ func buildCodexUsageExtraUpdates(snapshot *OpenAICodexUsageSnapshot, fallbackNow return updates } -func codexUsagePercentExhausted(value *float64) bool { - return value != nil && *value >= 100-1e-9 -} - -func codexRateLimitResetAtFromSnapshot(snapshot *OpenAICodexUsageSnapshot, fallbackNow time.Time) *time.Time { - if snapshot == nil { - return nil - } - normalized := snapshot.Normalize() - if normalized == nil { - return nil - } - baseTime := codexSnapshotBaseTime(snapshot, fallbackNow) - if codexUsagePercentExhausted(normalized.Used7dPercent) && normalized.Reset7dSeconds != nil { - resetAt := baseTime.Add(time.Duration(*normalized.Reset7dSeconds) * time.Second) - return &resetAt - } - if codexUsagePercentExhausted(normalized.Used5hPercent) && normalized.Reset5hSeconds != nil { - resetAt := baseTime.Add(time.Duration(*normalized.Reset5hSeconds) * time.Second) - return &resetAt - } - return nil -} - -func codexRateLimitResetAtFromExtra(extra map[string]any, now time.Time) *time.Time { - if len(extra) == 0 { - return nil - } - if progress := buildCodexUsageProgressFromExtra(extra, "7d", now); progress != nil && codexUsagePercentExhausted(&progress.Utilization) && progress.ResetsAt != nil && now.Before(*progress.ResetsAt) { - resetAt := progress.ResetsAt.UTC() - return &resetAt - } - if progress := buildCodexUsageProgressFromExtra(extra, "5h", now); progress != nil && codexUsagePercentExhausted(&progress.Utilization) && progress.ResetsAt != nil && now.Before(*progress.ResetsAt) { - resetAt := progress.ResetsAt.UTC() - return &resetAt - } - return nil -} - -func applyOpenAICodexRateLimitFromExtra(account *Account, now time.Time) (*time.Time, bool) { - if account == nil || !account.IsOpenAI() { - return nil, false - } - resetAt := codexRateLimitResetAtFromExtra(account.Extra, now) - if resetAt == nil { - return nil, false - } - if account.RateLimitResetAt != nil && now.Before(*account.RateLimitResetAt) && !account.RateLimitResetAt.Before(*resetAt) { - return account.RateLimitResetAt, false - } - account.RateLimitResetAt = resetAt - return resetAt, true -} - -func syncOpenAICodexRateLimitFromExtra(ctx context.Context, repo AccountRepository, account *Account, now time.Time) *time.Time { - resetAt, changed := applyOpenAICodexRateLimitFromExtra(account, now) - if !changed || resetAt == nil || repo == nil || account == nil || account.ID <= 0 { - return resetAt - } - _ = repo.SetRateLimited(ctx, account.ID, *resetAt) - return resetAt -} - // updateCodexUsageSnapshot saves the Codex usage snapshot to account's Extra field func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, accountID int64, snapshot *OpenAICodexUsageSnapshot) { if snapshot == nil { @@ -4842,24 +4777,17 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc now := time.Now() updates := buildCodexUsageExtraUpdates(snapshot, now) - resetAt := codexRateLimitResetAtFromSnapshot(snapshot, now) - if len(updates) == 0 && resetAt == nil { + if len(updates) == 0 { return } - shouldPersistUpdates := len(updates) > 0 && s.getCodexSnapshotThrottle().Allow(accountID, now) - if !shouldPersistUpdates && resetAt == nil { + if !s.getCodexSnapshotThrottle().Allow(accountID, now) { return } go func() { updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if shouldPersistUpdates { - _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) - } - if resetAt != nil { - _ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt) - } + _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) }() } diff --git a/backend/internal/service/openai_ws_ratelimit_signal_test.go b/backend/internal/service/openai_ws_ratelimit_signal_test.go index 6313d0c0..4ee85a3a 100644 --- a/backend/internal/service/openai_ws_ratelimit_signal_test.go +++ b/backend/internal/service/openai_ws_ratelimit_signal_test.go @@ -345,7 +345,7 @@ func TestOpenAIGatewayService_ProxyResponsesWebSocketFromClient_ErrorEventUsageL } } -func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRateLimit(t *testing.T) { +func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotDoesNotSetRateLimit(t *testing.T) { repo := &openAICodexSnapshotAsyncRepo{ updateExtraCh: make(chan map[string]any, 1), rateLimitCh: make(chan time.Time, 1), @@ -359,7 +359,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate SecondaryResetAfterSeconds: ptrIntWS(1200), SecondaryWindowMinutes: ptrIntWS(300), } - before := time.Now() svc.updateCodexUsageSnapshot(context.Background(), 601, snapshot) select { @@ -371,9 +370,8 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ExhaustedSnapshotSetsRate select { case resetAt := <-repo.rateLimitCh: - require.WithinDuration(t, before.Add(time.Hour), resetAt, 2*time.Second) + t.Fatalf("不应因仅写入快照而生成运行时限流时间: %v", resetAt) case <-time.After(2 * time.Second): - t.Fatal("等待 codex 100% 自动切换限流超时") } } @@ -401,7 +399,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN select { case resetAt := <-repo.rateLimitCh: - t.Fatalf("unexpected rate limit reset at: %v", resetAt) + t.Fatalf("不应写入运行时限流时间: %v", resetAt) case <-time.After(200 * time.Millisecond): } } @@ -409,7 +407,6 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_NonExhaustedSnapshotDoesN func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *testing.T) { repo := &openAICodexSnapshotAsyncRepo{ updateExtraCh: make(chan map[string]any, 2), - rateLimitCh: make(chan time.Time, 2), } svc := &OpenAIGatewayService{ accountRepo: repo, @@ -443,7 +440,7 @@ func TestOpenAIGatewayService_UpdateCodexUsageSnapshot_ThrottlesExtraWrites(t *t func ptrFloat64WS(v float64) *float64 { return &v } func ptrIntWS(v int) *int { return &v } -func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateLimit(t *testing.T) { +func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraDoesNotSetRateLimit(t *testing.T) { resetAt := time.Now().Add(6 * 24 * time.Hour) account := Account{ ID: 701, @@ -463,17 +460,15 @@ func TestOpenAIGatewayService_GetSchedulableAccount_ExhaustedCodexExtraSetsRateL fresh, err := svc.getSchedulableAccount(context.Background(), account.ID) require.NoError(t, err) require.NotNil(t, fresh) - require.NotNil(t, fresh.RateLimitResetAt) - require.WithinDuration(t, resetAt.UTC(), *fresh.RateLimitResetAt, time.Second) + require.Nil(t, fresh.RateLimitResetAt) select { case persisted := <-repo.rateLimitCh: - require.WithinDuration(t, resetAt.UTC(), persisted, time.Second) + t.Fatalf("不应将已耗尽的 codex extra 提升为运行时限流状态: %v", persisted) case <-time.After(2 * time.Second): - t.Fatal("等待旧快照补写限流状态超时") } } -func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount(t *testing.T) { +func TestAdminService_ListAccounts_ExhaustedCodexExtraDoesNotSetRateLimit(t *testing.T) { resetAt := time.Now().Add(4 * 24 * time.Hour) repo := &openAICodexExtraListRepo{ stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{{ @@ -496,13 +491,11 @@ func TestAdminService_ListAccounts_ExhaustedCodexExtraReturnsRateLimitedAccount( require.NoError(t, err) require.Equal(t, int64(1), total) require.Len(t, accounts, 1) - require.NotNil(t, accounts[0].RateLimitResetAt) - require.WithinDuration(t, resetAt.UTC(), *accounts[0].RateLimitResetAt, time.Second) + require.Nil(t, accounts[0].RateLimitResetAt) select { case persisted := <-repo.rateLimitCh: - require.WithinDuration(t, resetAt.UTC(), persisted, time.Second) + t.Fatalf("不应在账号列表查询时将 codex extra 持久化为运行时限流状态: %v", persisted) case <-time.After(2 * time.Second): - t.Fatal("等待列表补写限流状态超时") } }