fix(openai): fail over passthrough 429 and 529

This commit is contained in:
qingyuzhang
2026-03-30 22:29:26 +08:00
parent 1dfd974432
commit 6b646b6127
2 changed files with 206 additions and 42 deletions

View File

@@ -48,6 +48,22 @@ func (u *httpUpstreamRecorder) DoWithTLS(req *http.Request, proxyURL string, acc
return u.Do(req, proxyURL, accountID, accountConcurrency)
}
type openAIPassthroughFailoverRepo struct {
stubOpenAIAccountRepo
rateLimitCalls []time.Time
overloadCalls []time.Time
}
func (r *openAIPassthroughFailoverRepo) SetRateLimited(_ context.Context, _ int64, resetAt time.Time) error {
r.rateLimitCalls = append(r.rateLimitCalls, resetAt)
return nil
}
func (r *openAIPassthroughFailoverRepo) SetOverloaded(_ context.Context, _ int64, until time.Time) error {
r.overloadCalls = append(r.overloadCalls, until)
return nil
}
var structuredLogCaptureMu sync.Mutex
type inMemoryLogSink struct {
@@ -527,6 +543,8 @@ func TestOpenAIGatewayService_OAuthPassthrough_UpstreamErrorIncludesPassthroughF
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.Error(t, err)
require.True(t, c.Writer.Written(), "非 429/529 的 passthrough 错误应继续原样写回客户端")
require.Equal(t, http.StatusBadRequest, rec.Code)
// should append an upstream error event with passthrough=true
v, ok := c.Get(OpsUpstreamErrorsKey)
@@ -535,55 +553,145 @@ func TestOpenAIGatewayService_OAuthPassthrough_UpstreamErrorIncludesPassthroughF
require.True(t, ok)
require.NotEmpty(t, arr)
require.True(t, arr[len(arr)-1].Passthrough)
require.Equal(t, "http_error", arr[len(arr)-1].Kind)
}
func TestOpenAIGatewayService_OAuthPassthrough_429PersistsRateLimit(t *testing.T) {
func TestOpenAIGatewayService_OpenAIPassthrough_429And529TriggerFailover(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
originalBody := []byte(`{"model":"gpt-5.2","stream":false,"instructions":"local-test-instructions","input":[{"type":"text","text":"hi"}]}`)
resetAt := time.Now().Add(7 * 24 * time.Hour).Unix()
resp := &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: http.Header{
"Content-Type": []string{"application/json"},
"x-request-id": []string{"rid-rate-limit"},
newAccount := func(accountType string) *Account {
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: accountType,
Concurrency: 1,
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
switch accountType {
case AccountTypeOAuth:
account.Credentials = map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}
case AccountTypeAPIKey:
account.Credentials = map[string]any{"api_key": "sk-test"}
}
return account
}
testCases := []struct {
name string
accountType string
statusCode int
body string
assertRepo func(t *testing.T, repo *openAIPassthroughFailoverRepo, start time.Time)
}{
{
name: "oauth_429_rate_limit",
accountType: AccountTypeOAuth,
statusCode: http.StatusTooManyRequests,
body: func() string {
resetAt := time.Now().Add(7 * 24 * time.Hour).Unix()
return fmt.Sprintf(`{"error":{"message":"The usage limit has been reached","type":"usage_limit_reached","resets_at":%d}}`, resetAt)
}(),
assertRepo: func(t *testing.T, repo *openAIPassthroughFailoverRepo, _ time.Time) {
require.Len(t, repo.rateLimitCalls, 1)
require.Empty(t, repo.overloadCalls)
require.True(t, time.Until(repo.rateLimitCalls[0]) > 24*time.Hour)
},
},
{
name: "oauth_529_overload",
accountType: AccountTypeOAuth,
statusCode: 529,
body: `{"error":{"message":"server overloaded","type":"server_error"}}`,
assertRepo: func(t *testing.T, repo *openAIPassthroughFailoverRepo, start time.Time) {
require.Empty(t, repo.rateLimitCalls)
require.Len(t, repo.overloadCalls, 1)
require.WithinDuration(t, start.Add(10*time.Minute), repo.overloadCalls[0], 5*time.Second)
},
},
{
name: "apikey_429_rate_limit",
accountType: AccountTypeAPIKey,
statusCode: http.StatusTooManyRequests,
body: func() string {
resetAt := time.Now().Add(7 * 24 * time.Hour).Unix()
return fmt.Sprintf(`{"error":{"message":"The usage limit has been reached","type":"usage_limit_reached","resets_at":%d}}`, resetAt)
}(),
assertRepo: func(t *testing.T, repo *openAIPassthroughFailoverRepo, _ time.Time) {
require.Len(t, repo.rateLimitCalls, 1)
require.Empty(t, repo.overloadCalls)
require.True(t, time.Until(repo.rateLimitCalls[0]) > 24*time.Hour)
},
},
{
name: "apikey_529_overload",
accountType: AccountTypeAPIKey,
statusCode: 529,
body: `{"error":{"message":"server overloaded","type":"server_error"}}`,
assertRepo: func(t *testing.T, repo *openAIPassthroughFailoverRepo, start time.Time) {
require.Empty(t, repo.rateLimitCalls)
require.Len(t, repo.overloadCalls, 1)
require.WithinDuration(t, start.Add(10*time.Minute), repo.overloadCalls[0], 5*time.Second)
},
},
Body: io.NopCloser(strings.NewReader(fmt.Sprintf(`{"error":{"message":"The usage limit has been reached","type":"usage_limit_reached","resets_at":%d}}`, resetAt))),
}
upstream := &httpUpstreamRecorder{resp: resp}
repo := &openAIWSRateLimitSignalRepo{}
rateSvc := &RateLimitService{accountRepo: repo}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
rateLimitService: rateSvc,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.Error(t, err)
require.Equal(t, http.StatusTooManyRequests, rec.Code)
require.Contains(t, rec.Body.String(), "usage_limit_reached")
require.Len(t, repo.rateLimitCalls, 1)
require.WithinDuration(t, time.Unix(resetAt, 0), repo.rateLimitCalls[0], 2*time.Second)
resp := &http.Response{
StatusCode: tc.statusCode,
Header: http.Header{
"Content-Type": []string{"application/json"},
"x-request-id": []string{"rid-failover"},
},
Body: io.NopCloser(strings.NewReader(tc.body)),
}
upstream := &httpUpstreamRecorder{resp: resp}
repo := &openAIPassthroughFailoverRepo{}
rateSvc := &RateLimitService{
accountRepo: repo,
cfg: &config.Config{
RateLimit: config.RateLimitConfig{OverloadCooldownMinutes: 10},
},
}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
rateLimitService: rateSvc,
}
account := newAccount(tc.accountType)
start := time.Now()
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.Error(t, err)
var failoverErr *UpstreamFailoverError
require.ErrorAs(t, err, &failoverErr)
require.Equal(t, tc.statusCode, failoverErr.StatusCode)
require.False(t, c.Writer.Written(), "429/529 passthrough 应返回 failover 错误给上层换号,而不是直接向客户端写响应")
v, ok := c.Get(OpsUpstreamErrorsKey)
require.True(t, ok)
arr, ok := v.([]*OpsUpstreamErrorEvent)
require.True(t, ok)
require.NotEmpty(t, arr)
require.True(t, arr[len(arr)-1].Passthrough)
require.Equal(t, "failover", arr[len(arr)-1].Kind)
require.Equal(t, tc.statusCode, arr[len(arr)-1].UpstreamStatusCode)
tc.assertRepo(t, repo, start)
})
}
}
func TestOpenAIGatewayService_OAuthPassthrough_NonCodexUAFallbackToCodexUA(t *testing.T) {