fix: sync OpenAI plan type from usage limit errors
This commit is contained in:
@@ -757,6 +757,8 @@ func (s *AccountTestService) reconcileOpenAI429State(ctx context.Context, accoun
|
||||
return
|
||||
}
|
||||
|
||||
persistOpenAI429PlanType(ctx, s.accountRepo, account, body)
|
||||
|
||||
var resetAt *time.Time
|
||||
if calculated := calculateOpenAI429ResetTime(headers); calculated != nil {
|
||||
resetAt = calculated
|
||||
|
||||
@@ -61,12 +61,14 @@ func newTestContext() (*gin.Context, *httptest.ResponseRecorder) {
|
||||
|
||||
type openAIAccountTestRepo struct {
|
||||
mockAccountRepoForGemini
|
||||
updatedExtra map[string]any
|
||||
rateLimitedID int64
|
||||
rateLimitedAt *time.Time
|
||||
clearedErrorID int64
|
||||
setErrorID int64
|
||||
setErrorMsg string
|
||||
updatedExtra map[string]any
|
||||
bulkUpdatedIDs []int64
|
||||
bulkUpdatedPayload AccountBulkUpdate
|
||||
rateLimitedID int64
|
||||
rateLimitedAt *time.Time
|
||||
clearedErrorID int64
|
||||
setErrorID int64
|
||||
setErrorMsg string
|
||||
}
|
||||
|
||||
func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error {
|
||||
@@ -74,6 +76,12 @@ func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *openAIAccountTestRepo) BulkUpdate(_ context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) {
|
||||
r.bulkUpdatedIDs = append([]int64(nil), ids...)
|
||||
r.bulkUpdatedPayload = updates
|
||||
return int64(len(ids)), nil
|
||||
}
|
||||
|
||||
func (r *openAIAccountTestRepo) SetRateLimited(_ context.Context, id int64, resetAt time.Time) error {
|
||||
r.rateLimitedID = id
|
||||
r.rateLimitedAt = &resetAt
|
||||
@@ -216,6 +224,33 @@ func TestAccountTestService_OpenAI429BodyOnlyPersistsRateLimitAndClearsStaleErro
|
||||
require.Empty(t, repo.updatedExtra)
|
||||
}
|
||||
|
||||
func TestAccountTestService_OpenAI429SyncsObservedPlanType(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, _ := newTestContext()
|
||||
|
||||
resp := newJSONResponse(http.StatusTooManyRequests, `{"error":{"type":"usage_limit_reached","message":"limit reached","plan_type":"free","resets_at":1777283883}}`)
|
||||
|
||||
repo := &openAIAccountTestRepo{}
|
||||
upstream := &queuedHTTPUpstream{responses: []*http.Response{resp}}
|
||||
svc := &AccountTestService{accountRepo: repo, httpUpstream: upstream}
|
||||
account := &Account{
|
||||
ID: 81,
|
||||
Platform: PlatformOpenAI,
|
||||
Type: AccountTypeOAuth,
|
||||
Status: StatusActive,
|
||||
Concurrency: 1,
|
||||
Credentials: map[string]any{"access_token": "test-token", "plan_type": "plus"},
|
||||
}
|
||||
|
||||
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "")
|
||||
require.Error(t, err)
|
||||
require.Equal(t, []int64{account.ID}, repo.bulkUpdatedIDs)
|
||||
require.Equal(t, "free", repo.bulkUpdatedPayload.Credentials["plan_type"])
|
||||
require.Equal(t, "free", account.Credentials["plan_type"])
|
||||
require.Equal(t, account.ID, repo.rateLimitedID)
|
||||
require.NotNil(t, account.RateLimitResetAt)
|
||||
}
|
||||
|
||||
func TestAccountTestService_OpenAI429ActiveAccountDoesNotClearError(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, _ := newTestContext()
|
||||
|
||||
@@ -824,6 +824,7 @@ func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *A
|
||||
func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header, responseBody []byte) {
|
||||
// 1. OpenAI 平台:优先尝试解析 x-codex-* 响应头(用于 rate_limit_exceeded)
|
||||
if account.Platform == PlatformOpenAI {
|
||||
persistOpenAI429PlanType(ctx, s.accountRepo, account, responseBody)
|
||||
s.persistOpenAICodexSnapshot(ctx, account, headers)
|
||||
if resetAt := s.calculateOpenAI429ResetTime(headers); resetAt != nil {
|
||||
if err := s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt); err != nil {
|
||||
@@ -1198,6 +1199,55 @@ func parseOpenAIRateLimitResetTime(body []byte) *int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseOpenAIRateLimitPlanType(body []byte) string {
|
||||
var parsed map[string]any
|
||||
if err := json.Unmarshal(body, &parsed); err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
errObj, ok := parsed["error"].(map[string]any)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
errType, _ := errObj["type"].(string)
|
||||
if errType != "usage_limit_reached" && errType != "rate_limit_exceeded" {
|
||||
return ""
|
||||
}
|
||||
|
||||
planType, _ := errObj["plan_type"].(string)
|
||||
return strings.ToLower(strings.TrimSpace(planType))
|
||||
}
|
||||
|
||||
func persistOpenAI429PlanType(ctx context.Context, repo AccountRepository, account *Account, body []byte) {
|
||||
if repo == nil || account == nil || account.Platform != PlatformOpenAI {
|
||||
return
|
||||
}
|
||||
|
||||
planType := parseOpenAIRateLimitPlanType(body)
|
||||
if planType == "" {
|
||||
return
|
||||
}
|
||||
|
||||
current := strings.TrimSpace(account.GetCredential("plan_type"))
|
||||
if strings.EqualFold(current, planType) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := repo.BulkUpdate(ctx, []int64{account.ID}, AccountBulkUpdate{
|
||||
Credentials: map[string]any{"plan_type": planType},
|
||||
}); err != nil {
|
||||
slog.Warn("openai_429_plan_type_sync_failed", "account_id", account.ID, "plan_type", planType, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if account.Credentials == nil {
|
||||
account.Credentials = make(map[string]any, 1)
|
||||
}
|
||||
account.Credentials["plan_type"] = planType
|
||||
slog.Info("openai_429_plan_type_synced", "account_id", account.ID, "previous_plan_type", current, "plan_type", planType)
|
||||
}
|
||||
|
||||
// handle529 处理529过载错误
|
||||
// 根据配置决定是否暂停账号调度及冷却时长
|
||||
func (s *RateLimitService) handle529(ctx context.Context, account *Account) {
|
||||
|
||||
@@ -149,8 +149,10 @@ func TestCalculateOpenAI429ResetTime_ReversedWindowOrder(t *testing.T) {
|
||||
|
||||
type openAI429SnapshotRepo struct {
|
||||
mockAccountRepoForGemini
|
||||
rateLimitedID int64
|
||||
updatedExtra map[string]any
|
||||
rateLimitedID int64
|
||||
updatedExtra map[string]any
|
||||
bulkUpdatedIDs []int64
|
||||
bulkUpdatedPayload AccountBulkUpdate
|
||||
}
|
||||
|
||||
func (r *openAI429SnapshotRepo) SetRateLimited(_ context.Context, id int64, _ time.Time) error {
|
||||
@@ -163,6 +165,12 @@ func (r *openAI429SnapshotRepo) UpdateExtra(_ context.Context, _ int64, updates
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *openAI429SnapshotRepo) BulkUpdate(_ context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) {
|
||||
r.bulkUpdatedIDs = append([]int64(nil), ids...)
|
||||
r.bulkUpdatedPayload = updates
|
||||
return int64(len(ids)), nil
|
||||
}
|
||||
|
||||
func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) {
|
||||
repo := &openAI429SnapshotRepo{}
|
||||
svc := NewRateLimitService(repo, nil, nil, nil, nil)
|
||||
@@ -192,6 +200,25 @@ func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandle429_OpenAISyncsObservedPlanType(t *testing.T) {
|
||||
repo := &openAI429SnapshotRepo{}
|
||||
svc := NewRateLimitService(repo, nil, nil, nil, nil)
|
||||
account := &Account{
|
||||
ID: 124,
|
||||
Platform: PlatformOpenAI,
|
||||
Type: AccountTypeOAuth,
|
||||
Credentials: map[string]any{"plan_type": "plus"},
|
||||
}
|
||||
body := []byte(`{"error":{"type":"usage_limit_reached","message":"limit reached","plan_type":"free","resets_at":1777283883}}`)
|
||||
|
||||
svc.handle429(context.Background(), account, http.Header{}, body)
|
||||
|
||||
require.Equal(t, []int64{account.ID}, repo.bulkUpdatedIDs)
|
||||
require.Equal(t, "free", repo.bulkUpdatedPayload.Credentials["plan_type"])
|
||||
require.Equal(t, "free", account.Credentials["plan_type"])
|
||||
require.Equal(t, account.ID, repo.rateLimitedID)
|
||||
}
|
||||
|
||||
func TestNormalizedCodexLimits(t *testing.T) {
|
||||
// Test the Normalize() method directly
|
||||
pUsed := 100.0
|
||||
|
||||
Reference in New Issue
Block a user