Files
sub2api/backend/internal/service/openai_oauth_passthrough_test.go
yangjianbo 9c910c2049 feat(openai): 支持自动透传开关并透传 User-Agent
- OpenAI OAuth/API Key 统一支持自动透传开关,编辑页可开关\n- 透传模式仅替换认证并保留计费/并发/审计,修复 API Key responses 端点拼接\n- Usage 页面显示原始 User-Agent 且不截断,补充回归测试与清单
2026-02-12 10:56:07 +08:00

462 lines
16 KiB
Go

package service
import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
func f64p(v float64) *float64 { return &v }
type httpUpstreamRecorder struct {
lastReq *http.Request
lastBody []byte
resp *http.Response
err error
}
func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
u.lastReq = req
if req != nil && req.Body != nil {
b, _ := io.ReadAll(req.Body)
u.lastBody = b
_ = req.Body.Close()
req.Body = io.NopCloser(bytes.NewReader(b))
}
if u.err != nil {
return nil, u.err
}
return u.resp, nil
}
func (u *httpUpstreamRecorder) DoWithTLS(req *http.Request, proxyURL string, accountID int64, accountConcurrency int, enableTLSFingerprint bool) (*http.Response, error) {
return u.Do(req, proxyURL, accountID, accountConcurrency)
}
func TestOpenAIGatewayService_OAuthPassthrough_StreamKeepsToolNameAndBodyUnchanged(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
c.Request.Header.Set("Authorization", "Bearer inbound-should-not-forward")
c.Request.Header.Set("Cookie", "secret=1")
c.Request.Header.Set("X-Api-Key", "sk-inbound")
c.Request.Header.Set("X-Goog-Api-Key", "goog-inbound")
c.Request.Header.Set("Accept-Encoding", "gzip")
c.Request.Header.Set("Proxy-Authorization", "Basic abc")
c.Request.Header.Set("X-Test", "keep")
originalBody := []byte(`{"model":"gpt-5.2","stream":true,"store":true,"input":[{"type":"text","text":"hi"}]}`)
upstreamSSE := strings.Join([]string{
`data: {"type":"response.output_item.added","item":{"type":"tool_call","tool_calls":[{"function":{"name":"apply_patch"}}]}}`,
"",
"data: [DONE]",
"",
}, "\n")
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader(upstreamSSE)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
openAITokenProvider: &OpenAITokenProvider{ // minimal: will be bypassed by nil cache/service, but GetAccessToken uses provider only if non-nil
accountRepo: nil,
},
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
// Use the gateway method that reads token from credentials when provider is nil.
svc.openAITokenProvider = nil
result, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, result.Stream)
// 1) upstream body is exactly unchanged
require.Equal(t, originalBody, upstream.lastBody)
// 2) only auth is replaced; inbound auth/cookie are not forwarded
require.Equal(t, "Bearer oauth-token", upstream.lastReq.Header.Get("Authorization"))
require.Equal(t, "codex_cli_rs/0.1.0", upstream.lastReq.Header.Get("User-Agent"))
require.Empty(t, upstream.lastReq.Header.Get("Cookie"))
require.Empty(t, upstream.lastReq.Header.Get("X-Api-Key"))
require.Empty(t, upstream.lastReq.Header.Get("X-Goog-Api-Key"))
require.Empty(t, upstream.lastReq.Header.Get("Accept-Encoding"))
require.Empty(t, upstream.lastReq.Header.Get("Proxy-Authorization"))
require.Equal(t, "keep", upstream.lastReq.Header.Get("X-Test"))
// 3) required OAuth headers are present
require.Equal(t, "chatgpt.com", upstream.lastReq.Host)
require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id"))
// 4) downstream SSE keeps tool name (no toolCorrector)
body := rec.Body.String()
require.Contains(t, body, "apply_patch")
require.NotContains(t, body, "\"name\":\"edit\"")
}
func TestOpenAIGatewayService_OAuthPassthrough_DisabledUsesLegacyTransform(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
// store=true + stream=false should be forced to store=false + stream=true by applyCodexOAuthTransform (OAuth legacy path)
inputBody := []byte(`{"model":"gpt-5.2","stream":false,"store":true,"input":[{"type":"text","text":"hi"}]}`)
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader("data: [DONE]\n\n")),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": false},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
_, err := svc.Forward(context.Background(), c, account, inputBody)
require.NoError(t, err)
// legacy path rewrites request body (not byte-equal)
require.NotEqual(t, inputBody, upstream.lastBody)
require.Contains(t, string(upstream.lastBody), `"store":false`)
require.Contains(t, string(upstream.lastBody), `"stream":true`)
}
func TestOpenAIGatewayService_OAuthPassthrough_ResponseHeadersAllowXCodex(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
originalBody := []byte(`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`)
headers := make(http.Header)
headers.Set("Content-Type", "application/json")
headers.Set("x-request-id", "rid")
headers.Set("x-codex-primary-used-percent", "12")
headers.Set("x-codex-secondary-used-percent", "34")
headers.Set("x-codex-primary-window-minutes", "300")
headers.Set("x-codex-secondary-window-minutes", "10080")
headers.Set("x-codex-primary-reset-after-seconds", "1")
resp := &http.Response{
StatusCode: http.StatusOK,
Header: headers,
Body: io.NopCloser(strings.NewReader(`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
require.Equal(t, "12", rec.Header().Get("x-codex-primary-used-percent"))
require.Equal(t, "34", rec.Header().Get("x-codex-secondary-used-percent"))
}
func TestOpenAIGatewayService_OAuthPassthrough_UpstreamErrorIncludesPassthroughFlag(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
originalBody := []byte(`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`)
resp := &http.Response{
StatusCode: http.StatusBadRequest,
Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader(`{"error":{"message":"bad"}}`)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.Error(t, err)
// should append an upstream error event with passthrough=true
v, ok := c.Get(OpsUpstreamErrorsKey)
require.True(t, ok)
arr, ok := v.([]*OpsUpstreamErrorEvent)
require.True(t, ok)
require.NotEmpty(t, arr)
require.True(t, arr[len(arr)-1].Passthrough)
}
func TestOpenAIGatewayService_OAuthPassthrough_NonCodexUAStillPassthroughWhenEnabled(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
// Non-Codex UA
c.Request.Header.Set("User-Agent", "curl/8.0")
inputBody := []byte(`{"model":"gpt-5.2","stream":false,"store":true,"input":[{"type":"text","text":"hi"}]}`)
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader("data: [DONE]\n\n")),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
_, err := svc.Forward(context.Background(), c, account, inputBody)
require.NoError(t, err)
require.Equal(t, inputBody, upstream.lastBody)
require.Equal(t, "curl/8.0", upstream.lastReq.Header.Get("User-Agent"))
}
func TestOpenAIGatewayService_OAuthPassthrough_StreamingSetsFirstTokenMs(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
originalBody := []byte(`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`)
upstreamSSE := strings.Join([]string{
`data: {"type":"response.output_text.delta","delta":"h"}`,
"",
"data: [DONE]",
"",
}, "\n")
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader(upstreamSSE)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
start := time.Now()
result, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
// sanity: duration after start
require.GreaterOrEqual(t, time.Since(start), time.Duration(0))
require.NotNil(t, result.FirstTokenMs)
require.GreaterOrEqual(t, *result.FirstTokenMs, 0)
}
func TestOpenAIGatewayService_OAuthPassthrough_StreamClientDisconnectStillCollectsUsage(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
// 首次写入成功,后续写入失败,模拟客户端中途断开。
c.Writer = &failingGinWriter{ResponseWriter: c.Writer, failAfter: 1}
originalBody := []byte(`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`)
upstreamSSE := strings.Join([]string{
`data: {"type":"response.output_text.delta","delta":"h"}`,
"",
`data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"input_tokens_details":{"cached_tokens":3}}}}`,
"",
"data: [DONE]",
"",
}, "\n")
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader(upstreamSSE)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
result, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, result.Stream)
require.NotNil(t, result.FirstTokenMs)
require.Equal(t, 11, result.Usage.InputTokens)
require.Equal(t, 7, result.Usage.OutputTokens)
require.Equal(t, 3, result.Usage.CacheReadInputTokens)
}
func TestOpenAIGatewayService_APIKeyPassthrough_PreservesBodyAndUsesResponsesEndpoint(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "curl/8.0")
c.Request.Header.Set("X-Test", "keep")
originalBody := []byte(`{"model":"gpt-5.2","stream":false,"max_output_tokens":128,"input":[{"type":"text","text":"hi"}]}`)
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid"}},
Body: io.NopCloser(strings.NewReader(`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 456,
Name: "apikey-acc",
Platform: PlatformOpenAI,
Type: AccountTypeAPIKey,
Concurrency: 1,
Credentials: map[string]any{"api_key": "sk-api-key", "base_url": "https://api.openai.com"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
_, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
require.NotNil(t, upstream.lastReq)
require.Equal(t, originalBody, upstream.lastBody)
require.Equal(t, "https://api.openai.com/v1/responses", upstream.lastReq.URL.String())
require.Equal(t, "Bearer sk-api-key", upstream.lastReq.Header.Get("Authorization"))
require.Equal(t, "curl/8.0", upstream.lastReq.Header.Get("User-Agent"))
require.Equal(t, "keep", upstream.lastReq.Header.Get("X-Test"))
}