diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index ddb4343a..b9cd698a 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -757,6 +757,8 @@ func (s *AccountTestService) reconcileOpenAI429State(ctx context.Context, accoun return } + persistOpenAI429PlanType(ctx, s.accountRepo, account, body) + var resetAt *time.Time if calculated := calculateOpenAI429ResetTime(headers); calculated != nil { resetAt = calculated diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go index 56204be3..c6e30ed6 100644 --- a/backend/internal/service/account_test_service_openai_test.go +++ b/backend/internal/service/account_test_service_openai_test.go @@ -61,12 +61,14 @@ func newTestContext() (*gin.Context, *httptest.ResponseRecorder) { type openAIAccountTestRepo struct { mockAccountRepoForGemini - updatedExtra map[string]any - rateLimitedID int64 - rateLimitedAt *time.Time - clearedErrorID int64 - setErrorID int64 - setErrorMsg string + updatedExtra map[string]any + bulkUpdatedIDs []int64 + bulkUpdatedPayload AccountBulkUpdate + rateLimitedID int64 + rateLimitedAt *time.Time + clearedErrorID int64 + setErrorID int64 + setErrorMsg string } func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error { @@ -74,6 +76,12 @@ func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates return nil } +func (r *openAIAccountTestRepo) BulkUpdate(_ context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) { + r.bulkUpdatedIDs = append([]int64(nil), ids...) + r.bulkUpdatedPayload = updates + return int64(len(ids)), nil +} + func (r *openAIAccountTestRepo) SetRateLimited(_ context.Context, id int64, resetAt time.Time) error { r.rateLimitedID = id r.rateLimitedAt = &resetAt @@ -216,6 +224,33 @@ func TestAccountTestService_OpenAI429BodyOnlyPersistsRateLimitAndClearsStaleErro require.Empty(t, repo.updatedExtra) } +func TestAccountTestService_OpenAI429SyncsObservedPlanType(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, _ := newTestContext() + + resp := newJSONResponse(http.StatusTooManyRequests, `{"error":{"type":"usage_limit_reached","message":"limit reached","plan_type":"free","resets_at":1777283883}}`) + + repo := &openAIAccountTestRepo{} + upstream := &queuedHTTPUpstream{responses: []*http.Response{resp}} + svc := &AccountTestService{accountRepo: repo, httpUpstream: upstream} + account := &Account{ + ID: 81, + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Status: StatusActive, + Concurrency: 1, + Credentials: map[string]any{"access_token": "test-token", "plan_type": "plus"}, + } + + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") + require.Error(t, err) + require.Equal(t, []int64{account.ID}, repo.bulkUpdatedIDs) + require.Equal(t, "free", repo.bulkUpdatedPayload.Credentials["plan_type"]) + require.Equal(t, "free", account.Credentials["plan_type"]) + require.Equal(t, account.ID, repo.rateLimitedID) + require.NotNil(t, account.RateLimitResetAt) +} + func TestAccountTestService_OpenAI429ActiveAccountDoesNotClearError(t *testing.T) { gin.SetMode(gin.TestMode) ctx, _ := newTestContext() diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index a53cb0e9..19c45a5a 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -824,6 +824,7 @@ func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *A func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header, responseBody []byte) { // 1. OpenAI 平台:优先尝试解析 x-codex-* 响应头(用于 rate_limit_exceeded) if account.Platform == PlatformOpenAI { + persistOpenAI429PlanType(ctx, s.accountRepo, account, responseBody) s.persistOpenAICodexSnapshot(ctx, account, headers) if resetAt := s.calculateOpenAI429ResetTime(headers); resetAt != nil { if err := s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt); err != nil { @@ -1198,6 +1199,55 @@ func parseOpenAIRateLimitResetTime(body []byte) *int64 { return nil } +func parseOpenAIRateLimitPlanType(body []byte) string { + var parsed map[string]any + if err := json.Unmarshal(body, &parsed); err != nil { + return "" + } + + errObj, ok := parsed["error"].(map[string]any) + if !ok { + return "" + } + + errType, _ := errObj["type"].(string) + if errType != "usage_limit_reached" && errType != "rate_limit_exceeded" { + return "" + } + + planType, _ := errObj["plan_type"].(string) + return strings.ToLower(strings.TrimSpace(planType)) +} + +func persistOpenAI429PlanType(ctx context.Context, repo AccountRepository, account *Account, body []byte) { + if repo == nil || account == nil || account.Platform != PlatformOpenAI { + return + } + + planType := parseOpenAIRateLimitPlanType(body) + if planType == "" { + return + } + + current := strings.TrimSpace(account.GetCredential("plan_type")) + if strings.EqualFold(current, planType) { + return + } + + if _, err := repo.BulkUpdate(ctx, []int64{account.ID}, AccountBulkUpdate{ + Credentials: map[string]any{"plan_type": planType}, + }); err != nil { + slog.Warn("openai_429_plan_type_sync_failed", "account_id", account.ID, "plan_type", planType, "error", err) + return + } + + if account.Credentials == nil { + account.Credentials = make(map[string]any, 1) + } + account.Credentials["plan_type"] = planType + slog.Info("openai_429_plan_type_synced", "account_id", account.ID, "previous_plan_type", current, "plan_type", planType) +} + // handle529 处理529过载错误 // 根据配置决定是否暂停账号调度及冷却时长 func (s *RateLimitService) handle529(ctx context.Context, account *Account) { diff --git a/backend/internal/service/ratelimit_service_openai_test.go b/backend/internal/service/ratelimit_service_openai_test.go index 619bb773..aa5a070c 100644 --- a/backend/internal/service/ratelimit_service_openai_test.go +++ b/backend/internal/service/ratelimit_service_openai_test.go @@ -149,8 +149,10 @@ func TestCalculateOpenAI429ResetTime_ReversedWindowOrder(t *testing.T) { type openAI429SnapshotRepo struct { mockAccountRepoForGemini - rateLimitedID int64 - updatedExtra map[string]any + rateLimitedID int64 + updatedExtra map[string]any + bulkUpdatedIDs []int64 + bulkUpdatedPayload AccountBulkUpdate } func (r *openAI429SnapshotRepo) SetRateLimited(_ context.Context, id int64, _ time.Time) error { @@ -163,6 +165,12 @@ func (r *openAI429SnapshotRepo) UpdateExtra(_ context.Context, _ int64, updates return nil } +func (r *openAI429SnapshotRepo) BulkUpdate(_ context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) { + r.bulkUpdatedIDs = append([]int64(nil), ids...) + r.bulkUpdatedPayload = updates + return int64(len(ids)), nil +} + func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) { repo := &openAI429SnapshotRepo{} svc := NewRateLimitService(repo, nil, nil, nil, nil) @@ -192,6 +200,25 @@ func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) { } } +func TestHandle429_OpenAISyncsObservedPlanType(t *testing.T) { + repo := &openAI429SnapshotRepo{} + svc := NewRateLimitService(repo, nil, nil, nil, nil) + account := &Account{ + ID: 124, + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Credentials: map[string]any{"plan_type": "plus"}, + } + body := []byte(`{"error":{"type":"usage_limit_reached","message":"limit reached","plan_type":"free","resets_at":1777283883}}`) + + svc.handle429(context.Background(), account, http.Header{}, body) + + require.Equal(t, []int64{account.ID}, repo.bulkUpdatedIDs) + require.Equal(t, "free", repo.bulkUpdatedPayload.Credentials["plan_type"]) + require.Equal(t, "free", account.Credentials["plan_type"]) + require.Equal(t, account.ID, repo.rateLimitedID) +} + func TestNormalizedCodexLimits(t *testing.T) { // Test the Normalize() method directly pUsed := 100.0