chore(test): resolve merge conflict for ws usage window pr

This commit is contained in:
神乐
2026-03-06 21:16:21 +08:00
18 changed files with 3288 additions and 62 deletions

View File

@@ -5429,6 +5429,11 @@ func extractUpstreamErrorMessage(body []byte) string {
return m
}
// ChatGPT 内部 API 风格:{"detail":"..."}
if d := gjson.GetBytes(body, "detail").String(); strings.TrimSpace(d) != "" {
return d
}
// 兜底:尝试顶层 message
return gjson.GetBytes(body, "message").String()
}

View File

@@ -77,7 +77,7 @@ type codexTransformResult struct {
PromptCacheKey string
}
func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool) codexTransformResult {
func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact bool) codexTransformResult {
result := codexTransformResult{}
// 工具续链需求会影响存储策略与 input 过滤逻辑。
needsToolContinuation := NeedsToolContinuation(reqBody)
@@ -95,15 +95,26 @@ func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool) codexTran
result.NormalizedModel = normalizedModel
}
// OAuth 走 ChatGPT internal API 时store 必须为 false显式 true 也会强制覆盖。
// 避免上游返回 "Store must be set to false"。
if v, ok := reqBody["store"].(bool); !ok || v {
reqBody["store"] = false
result.Modified = true
}
if v, ok := reqBody["stream"].(bool); !ok || !v {
reqBody["stream"] = true
result.Modified = true
if isCompact {
if _, ok := reqBody["store"]; ok {
delete(reqBody, "store")
result.Modified = true
}
if _, ok := reqBody["stream"]; ok {
delete(reqBody, "stream")
result.Modified = true
}
} else {
// OAuth 走 ChatGPT internal API 时store 必须为 false显式 true 也会强制覆盖。
// 避免上游返回 "Store must be set to false"。
if v, ok := reqBody["store"].(bool); !ok || v {
reqBody["store"] = false
result.Modified = true
}
if v, ok := reqBody["stream"].(bool); !ok || !v {
reqBody["stream"] = true
result.Modified = true
}
}
// Strip parameters unsupported by codex models via the Responses API.

View File

@@ -18,7 +18,7 @@ func TestApplyCodexOAuthTransform_ToolContinuationPreservesInput(t *testing.T) {
"tool_choice": "auto",
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
// 未显式设置 store=true默认为 false。
store, ok := reqBody["store"].(bool)
@@ -53,7 +53,7 @@ func TestApplyCodexOAuthTransform_ExplicitStoreFalsePreserved(t *testing.T) {
"tool_choice": "auto",
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
store, ok := reqBody["store"].(bool)
require.True(t, ok)
@@ -72,13 +72,29 @@ func TestApplyCodexOAuthTransform_ExplicitStoreTrueForcedFalse(t *testing.T) {
"tool_choice": "auto",
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
store, ok := reqBody["store"].(bool)
require.True(t, ok)
require.False(t, store)
}
func TestApplyCodexOAuthTransform_CompactForcesNonStreaming(t *testing.T) {
reqBody := map[string]any{
"model": "gpt-5.1-codex",
"store": true,
"stream": true,
}
result := applyCodexOAuthTransform(reqBody, true, true)
_, hasStore := reqBody["store"]
require.False(t, hasStore)
_, hasStream := reqBody["stream"]
require.False(t, hasStream)
require.True(t, result.Modified)
}
func TestApplyCodexOAuthTransform_NonContinuationDefaultsStoreFalseAndStripsIDs(t *testing.T) {
// 非续链场景:未设置 store 时默认 false并移除 input 中的 id。
@@ -89,7 +105,7 @@ func TestApplyCodexOAuthTransform_NonContinuationDefaultsStoreFalseAndStripsIDs(
},
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
store, ok := reqBody["store"].(bool)
require.True(t, ok)
@@ -138,7 +154,7 @@ func TestApplyCodexOAuthTransform_NormalizeCodexTools_PreservesResponsesFunction
},
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
tools, ok := reqBody["tools"].([]any)
require.True(t, ok)
@@ -158,7 +174,7 @@ func TestApplyCodexOAuthTransform_EmptyInput(t *testing.T) {
"input": []any{},
}
applyCodexOAuthTransform(reqBody, false)
applyCodexOAuthTransform(reqBody, false, false)
input, ok := reqBody["input"].([]any)
require.True(t, ok)
@@ -193,7 +209,7 @@ func TestApplyCodexOAuthTransform_CodexCLI_PreservesExistingInstructions(t *test
"instructions": "existing instructions",
}
result := applyCodexOAuthTransform(reqBody, true) // isCodexCLI=true
result := applyCodexOAuthTransform(reqBody, true, false) // isCodexCLI=true
instructions, ok := reqBody["instructions"].(string)
require.True(t, ok)
@@ -210,7 +226,7 @@ func TestApplyCodexOAuthTransform_CodexCLI_SuppliesDefaultWhenEmpty(t *testing.T
// 没有 instructions 字段
}
result := applyCodexOAuthTransform(reqBody, true) // isCodexCLI=true
result := applyCodexOAuthTransform(reqBody, true, false) // isCodexCLI=true
instructions, ok := reqBody["instructions"].(string)
require.True(t, ok)
@@ -226,7 +242,7 @@ func TestApplyCodexOAuthTransform_NonCodexCLI_OverridesInstructions(t *testing.T
"instructions": "old instructions",
}
result := applyCodexOAuthTransform(reqBody, false) // isCodexCLI=false
result := applyCodexOAuthTransform(reqBody, false, false) // isCodexCLI=false
instructions, ok := reqBody["instructions"].(string)
require.True(t, ok)

View File

@@ -0,0 +1,378 @@
package service
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
// ForwardAsAnthropic accepts an Anthropic Messages request body, converts it
// to OpenAI Responses API format, forwards to the OpenAI upstream, and converts
// the response back to Anthropic Messages format. This enables Claude Code
// clients to access OpenAI models through the standard /v1/messages endpoint.
func (s *OpenAIGatewayService) ForwardAsAnthropic(
ctx context.Context,
c *gin.Context,
account *Account,
body []byte,
promptCacheKey string,
) (*OpenAIForwardResult, error) {
startTime := time.Now()
// 1. Parse Anthropic request
var anthropicReq apicompat.AnthropicRequest
if err := json.Unmarshal(body, &anthropicReq); err != nil {
return nil, fmt.Errorf("parse anthropic request: %w", err)
}
originalModel := anthropicReq.Model
isStream := anthropicReq.Stream
// 2. Convert Anthropic → Responses
responsesReq, err := apicompat.AnthropicToResponses(&anthropicReq)
if err != nil {
return nil, fmt.Errorf("convert anthropic to responses: %w", err)
}
// 3. Model mapping
mappedModel := account.GetMappedModel(originalModel)
responsesReq.Model = mappedModel
logger.L().Info("openai messages: model mapping applied",
zap.Int64("account_id", account.ID),
zap.String("original_model", originalModel),
zap.String("mapped_model", mappedModel),
zap.Bool("stream", isStream),
)
// 4. Marshal Responses request body, then apply OAuth codex transform
responsesBody, err := json.Marshal(responsesReq)
if err != nil {
return nil, fmt.Errorf("marshal responses request: %w", err)
}
if account.Type == AccountTypeOAuth {
var reqBody map[string]any
if err := json.Unmarshal(responsesBody, &reqBody); err != nil {
return nil, fmt.Errorf("unmarshal for codex transform: %w", err)
}
applyCodexOAuthTransform(reqBody, false)
// OAuth codex transform forces stream=true upstream, so always use
// the streaming response handler regardless of what the client asked.
isStream = true
responsesBody, err = json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("remarshal after codex transform: %w", err)
}
}
// 5. Get access token
token, _, err := s.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
// 6. Build upstream request
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, responsesBody, token, isStream, promptCacheKey, false)
if err != nil {
return nil, fmt.Errorf("build upstream request: %w", err)
}
// 7. Send request
proxyURL := ""
if account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
writeAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed")
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// 8. Handle error response with failover
if resp.StatusCode >= 400 {
if s.shouldFailoverUpstreamError(resp.StatusCode) {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
if s.rateLimitService != nil {
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody}
}
// Non-failover error: return Anthropic-formatted error to client
return s.handleAnthropicErrorResponse(resp, c)
}
// 9. Handle normal response
if isStream {
return s.handleAnthropicStreamingResponse(resp, c, originalModel, startTime)
}
return s.handleAnthropicNonStreamingResponse(resp, c, originalModel, startTime)
}
// handleAnthropicErrorResponse reads an upstream error and returns it in
// Anthropic error format.
func (s *OpenAIGatewayService) handleAnthropicErrorResponse(
resp *http.Response,
c *gin.Context,
) (*OpenAIForwardResult, error) {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
if upstreamMsg == "" {
upstreamMsg = fmt.Sprintf("Upstream error: %d", resp.StatusCode)
}
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
// Record upstream error details for ops logging
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
errType := "api_error"
switch {
case resp.StatusCode == 400:
errType = "invalid_request_error"
case resp.StatusCode == 404:
errType = "not_found_error"
case resp.StatusCode == 429:
errType = "rate_limit_error"
case resp.StatusCode >= 500:
errType = "api_error"
}
writeAnthropicError(c, resp.StatusCode, errType, upstreamMsg)
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
}
// handleAnthropicNonStreamingResponse reads a Responses API JSON response,
// converts it to Anthropic Messages format, and writes it to the client.
func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
startTime time.Time,
) (*OpenAIForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read upstream response: %w", err)
}
var responsesResp apicompat.ResponsesResponse
if err := json.Unmarshal(respBody, &responsesResp); err != nil {
return nil, fmt.Errorf("parse responses response: %w", err)
}
anthropicResp := apicompat.ResponsesToAnthropic(&responsesResp, originalModel)
var usage OpenAIUsage
if responsesResp.Usage != nil {
usage = OpenAIUsage{
InputTokens: responsesResp.Usage.InputTokens,
OutputTokens: responsesResp.Usage.OutputTokens,
}
if responsesResp.Usage.InputTokensDetails != nil {
usage.CacheReadInputTokens = responsesResp.Usage.InputTokensDetails.CachedTokens
}
}
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.JSON(http.StatusOK, anthropicResp)
return &OpenAIForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
Stream: false,
Duration: time.Since(startTime),
}, nil
}
// handleAnthropicStreamingResponse reads Responses SSE events from upstream,
// converts each to Anthropic SSE events, and writes them to the client.
func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
startTime time.Time,
) (*OpenAIForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.WriteHeader(http.StatusOK)
state := apicompat.NewResponsesEventToAnthropicState()
state.Model = originalModel
var usage OpenAIUsage
var firstTokenMs *int
firstChunk := true
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" {
continue
}
payload := line[6:]
if firstChunk {
firstChunk = false
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
// Parse the Responses SSE event
var event apicompat.ResponsesStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
logger.L().Warn("openai messages stream: failed to parse event",
zap.Error(err),
zap.String("request_id", requestID),
)
continue
}
// Extract usage from completion events
if (event.Type == "response.completed" || event.Type == "response.incomplete") &&
event.Response != nil && event.Response.Usage != nil {
usage = OpenAIUsage{
InputTokens: event.Response.Usage.InputTokens,
OutputTokens: event.Response.Usage.OutputTokens,
}
if event.Response.Usage.InputTokensDetails != nil {
usage.CacheReadInputTokens = event.Response.Usage.InputTokensDetails.CachedTokens
}
}
// Convert to Anthropic events
events := apicompat.ResponsesEventToAnthropicEvents(&event, state)
for _, evt := range events {
sse, err := apicompat.ResponsesAnthropicEventToSSE(evt)
if err != nil {
logger.L().Warn("openai messages stream: failed to marshal event",
zap.Error(err),
zap.String("request_id", requestID),
)
continue
}
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
// Client disconnected — return collected usage
logger.L().Info("openai messages stream: client disconnected",
zap.String("request_id", requestID),
)
return &OpenAIForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}, nil
}
}
if len(events) > 0 {
c.Writer.Flush()
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("openai messages stream: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
// Ensure the Anthropic stream is properly terminated
if finalEvents := apicompat.FinalizeResponsesAnthropicStream(state); len(finalEvents) > 0 {
for _, evt := range finalEvents {
sse, err := apicompat.ResponsesAnthropicEventToSSE(evt)
if err != nil {
continue
}
fmt.Fprint(c.Writer, sse) //nolint:errcheck
}
c.Writer.Flush()
}
return &OpenAIForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}, nil
}
// writeAnthropicError writes an error response in Anthropic Messages API format.
func writeAnthropicError(c *gin.Context, statusCode int, errType, message string) {
c.JSON(statusCode, gin.H{
"type": "error",
"error": gin.H{
"type": errType,
"message": message,
},
})
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/Wei-Shaw/sub2api/internal/util/urlvalidator"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"go.uber.org/zap"
@@ -49,6 +50,8 @@ const (
openAIWSRetryBackoffInitialDefault = 120 * time.Millisecond
openAIWSRetryBackoffMaxDefault = 2 * time.Second
openAIWSRetryJitterRatioDefault = 0.2
openAICompactSessionSeedKey = "openai_compact_session_seed"
codexCLIVersion = "0.104.0"
)
// OpenAI allowed headers whitelist (for non-passthrough).
@@ -898,6 +901,22 @@ func isOpenAIInstructionsRequiredError(upstreamStatusCode int, upstreamMsg strin
return false
}
// ExtractSessionID extracts the raw session ID from headers or body without hashing.
// Used by ForwardAsAnthropic to pass as prompt_cache_key for upstream cache.
func (s *OpenAIGatewayService) ExtractSessionID(c *gin.Context, body []byte) string {
if c == nil {
return ""
}
sessionID := strings.TrimSpace(c.GetHeader("session_id"))
if sessionID == "" {
sessionID = strings.TrimSpace(c.GetHeader("conversation_id"))
}
if sessionID == "" && len(body) > 0 {
sessionID = strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String())
}
return sessionID
}
// GenerateSessionHash generates a sticky-session hash for OpenAI requests.
//
// Priority:
@@ -1615,7 +1634,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
}
if account.Type == AccountTypeOAuth {
codexResult := applyCodexOAuthTransform(reqBody, isCodexCLI)
codexResult := applyCodexOAuthTransform(reqBody, isCodexCLI, isOpenAIResponsesCompactPath(c))
if codexResult.Modified {
bodyModified = true
disablePatch()
@@ -2047,14 +2066,14 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
return nil, fmt.Errorf("openai passthrough rejected before upstream: %s", rejectReason)
}
normalizedBody, normalized, err := normalizeOpenAIPassthroughOAuthBody(body)
normalizedBody, normalized, err := normalizeOpenAIPassthroughOAuthBody(body, isOpenAIResponsesCompactPath(c))
if err != nil {
return nil, err
}
if normalized {
body = normalizedBody
reqStream = true
}
reqStream = gjson.GetBytes(body, "stream").Bool()
}
logger.LegacyPrintf("service.openai_gateway",
@@ -2219,6 +2238,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
targetURL = buildOpenAIResponsesURL(validatedURL)
}
}
targetURL = appendOpenAIResponsesRequestPathSuffix(targetURL, openAIResponsesRequestPathSuffix(c))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(body))
if err != nil {
@@ -2252,7 +2272,15 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
if chatgptAccountID := account.GetChatGPTAccountID(); chatgptAccountID != "" {
req.Header.Set("chatgpt-account-id", chatgptAccountID)
}
if req.Header.Get("accept") == "" {
if isOpenAIResponsesCompactPath(c) {
req.Header.Set("accept", "application/json")
if req.Header.Get("version") == "" {
req.Header.Set("version", codexCLIVersion)
}
if req.Header.Get("session_id") == "" {
req.Header.Set("session_id", resolveOpenAICompactSessionID(c))
}
} else if req.Header.Get("accept") == "" {
req.Header.Set("accept", "text/event-stream")
}
if req.Header.Get("OpenAI-Beta") == "" {
@@ -2599,6 +2627,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
default:
targetURL = openaiPlatformAPIURL
}
targetURL = appendOpenAIResponsesRequestPathSuffix(targetURL, openAIResponsesRequestPathSuffix(c))
req, err := http.NewRequestWithContext(ctx, "POST", targetURL, bytes.NewReader(body))
if err != nil {
@@ -2635,7 +2664,17 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
} else {
req.Header.Set("originator", "opencode")
}
req.Header.Set("accept", "text/event-stream")
if isOpenAIResponsesCompactPath(c) {
req.Header.Set("accept", "application/json")
if req.Header.Get("version") == "" {
req.Header.Set("version", codexCLIVersion)
}
if req.Header.Get("session_id") == "" {
req.Header.Set("session_id", resolveOpenAICompactSessionID(c))
}
} else {
req.Header.Set("accept", "text/event-stream")
}
if promptCacheKey != "" {
req.Header.Set("conversation_id", promptCacheKey)
req.Header.Set("session_id", promptCacheKey)
@@ -3426,6 +3465,95 @@ func buildOpenAIResponsesURL(base string) string {
return normalized + "/v1/responses"
}
func IsOpenAIResponsesCompactPathForTest(c *gin.Context) bool {
return isOpenAIResponsesCompactPath(c)
}
func OpenAICompactSessionSeedKeyForTest() string {
return openAICompactSessionSeedKey
}
func NormalizeOpenAICompactRequestBodyForTest(body []byte) ([]byte, bool, error) {
return normalizeOpenAICompactRequestBody(body)
}
func isOpenAIResponsesCompactPath(c *gin.Context) bool {
suffix := strings.TrimSpace(openAIResponsesRequestPathSuffix(c))
return suffix == "/compact" || strings.HasPrefix(suffix, "/compact/")
}
func normalizeOpenAICompactRequestBody(body []byte) ([]byte, bool, error) {
if len(body) == 0 {
return body, false, nil
}
normalized := []byte(`{}`)
for _, field := range []string{"model", "input", "instructions", "previous_response_id"} {
value := gjson.GetBytes(body, field)
if !value.Exists() {
continue
}
next, err := sjson.SetRawBytes(normalized, field, []byte(value.Raw))
if err != nil {
return body, false, fmt.Errorf("normalize compact body %s: %w", field, err)
}
normalized = next
}
if bytes.Equal(bytes.TrimSpace(body), bytes.TrimSpace(normalized)) {
return body, false, nil
}
return normalized, true, nil
}
func resolveOpenAICompactSessionID(c *gin.Context) string {
if c != nil {
if sessionID := strings.TrimSpace(c.GetHeader("session_id")); sessionID != "" {
return sessionID
}
if conversationID := strings.TrimSpace(c.GetHeader("conversation_id")); conversationID != "" {
return conversationID
}
if seed, ok := c.Get(openAICompactSessionSeedKey); ok {
if seedStr, ok := seed.(string); ok && strings.TrimSpace(seedStr) != "" {
return strings.TrimSpace(seedStr)
}
}
}
return uuid.NewString()
}
func openAIResponsesRequestPathSuffix(c *gin.Context) string {
if c == nil || c.Request == nil || c.Request.URL == nil {
return ""
}
normalizedPath := strings.TrimRight(strings.TrimSpace(c.Request.URL.Path), "/")
if normalizedPath == "" {
return ""
}
idx := strings.LastIndex(normalizedPath, "/responses")
if idx < 0 {
return ""
}
suffix := normalizedPath[idx+len("/responses"):]
if suffix == "" || suffix == "/" {
return ""
}
if !strings.HasPrefix(suffix, "/") {
return ""
}
return suffix
}
func appendOpenAIResponsesRequestPathSuffix(baseURL, suffix string) string {
trimmedBase := strings.TrimRight(strings.TrimSpace(baseURL), "/")
trimmedSuffix := strings.TrimSpace(suffix)
if trimmedBase == "" || trimmedSuffix == "" {
return trimmedBase
}
return trimmedBase + trimmedSuffix
}
func (s *OpenAIGatewayService) replaceModelInResponseBody(body []byte, fromModel, toModel string) []byte {
// 使用 gjson/sjson 精确替换 model 字段,避免全量 JSON 反序列化
if m := gjson.GetBytes(body, "model"); m.Exists() && m.Str == fromModel {
@@ -3815,8 +3943,8 @@ func extractOpenAIRequestMetaFromBody(body []byte) (model string, stream bool, p
}
// normalizeOpenAIPassthroughOAuthBody 将透传 OAuth 请求体收敛为旧链路关键行为:
// 1) store=false 2) stream=true
func normalizeOpenAIPassthroughOAuthBody(body []byte) ([]byte, bool, error) {
// 1) store=false 2) 非 compact 保持 stream=truecompact 强制 stream=false
func normalizeOpenAIPassthroughOAuthBody(body []byte, compact bool) ([]byte, bool, error) {
if len(body) == 0 {
return body, false, nil
}
@@ -3824,22 +3952,40 @@ func normalizeOpenAIPassthroughOAuthBody(body []byte) ([]byte, bool, error) {
normalized := body
changed := false
if store := gjson.GetBytes(normalized, "store"); !store.Exists() || store.Type != gjson.False {
next, err := sjson.SetBytes(normalized, "store", false)
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body store=false: %w", err)
if compact {
if store := gjson.GetBytes(normalized, "store"); store.Exists() {
next, err := sjson.DeleteBytes(normalized, "store")
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body delete store: %w", err)
}
normalized = next
changed = true
}
normalized = next
changed = true
}
if stream := gjson.GetBytes(normalized, "stream"); !stream.Exists() || stream.Type != gjson.True {
next, err := sjson.SetBytes(normalized, "stream", true)
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body stream=true: %w", err)
if stream := gjson.GetBytes(normalized, "stream"); stream.Exists() {
next, err := sjson.DeleteBytes(normalized, "stream")
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body delete stream: %w", err)
}
normalized = next
changed = true
}
} else {
if store := gjson.GetBytes(normalized, "store"); !store.Exists() || store.Type != gjson.False {
next, err := sjson.SetBytes(normalized, "store", false)
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body store=false: %w", err)
}
normalized = next
changed = true
}
if stream := gjson.GetBytes(normalized, "stream"); !stream.Exists() || stream.Type != gjson.True {
next, err := sjson.SetBytes(normalized, "stream", true)
if err != nil {
return body, false, fmt.Errorf("normalize passthrough body stream=true: %w", err)
}
normalized = next
changed = true
}
normalized = next
changed = true
}
return normalized, changed, nil

View File

@@ -1288,6 +1288,92 @@ func TestOpenAIUpdateCodexUsageSnapshotFromHeaders(t *testing.T) {
}
}
func TestOpenAIResponsesRequestPathSuffix(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
tests := []struct {
name string
path string
want string
}{
{name: "exact v1 responses", path: "/v1/responses", want: ""},
{name: "compact v1 responses", path: "/v1/responses/compact", want: "/compact"},
{name: "compact alias responses", path: "/responses/compact/", want: "/compact"},
{name: "nested suffix", path: "/openai/v1/responses/compact/detail", want: "/compact/detail"},
{name: "unrelated path", path: "/v1/chat/completions", want: ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.Request = httptest.NewRequest(http.MethodPost, tt.path, nil)
require.Equal(t, tt.want, openAIResponsesRequestPathSuffix(c))
})
}
}
func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCompactPath(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader([]byte(`{"model":"gpt-5"}`)))
svc := &OpenAIGatewayService{}
account := &Account{Type: AccountTypeOAuth}
req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, []byte(`{"model":"gpt-5"}`), "token")
require.NoError(t, err)
require.Equal(t, chatgptCodexURL+"/compact", req.URL.String())
require.Equal(t, "application/json", req.Header.Get("Accept"))
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
require.NotEmpty(t, req.Header.Get("Session_Id"))
}
func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader([]byte(`{"model":"gpt-5"}`)))
svc := &OpenAIGatewayService{}
account := &Account{
Type: AccountTypeOAuth,
Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"},
}
req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, []byte(`{"model":"gpt-5"}`), "token", false, "", true)
require.NoError(t, err)
require.Equal(t, chatgptCodexURL+"/compact", req.URL.String())
require.Equal(t, "application/json", req.Header.Get("Accept"))
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
require.NotEmpty(t, req.Header.Get("Session_Id"))
}
func TestOpenAIBuildUpstreamRequestPreservesCompactPathForAPIKeyBaseURL(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/responses/compact", bytes.NewReader([]byte(`{"model":"gpt-5"}`)))
svc := &OpenAIGatewayService{cfg: &config.Config{
Security: config.SecurityConfig{
URLAllowlist: config.URLAllowlistConfig{Enabled: false},
},
}}
account := &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{"base_url": "https://example.com/v1"},
}
req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, []byte(`{"model":"gpt-5"}`), "token", false, "", false)
require.NoError(t, err)
require.Equal(t, "https://example.com/v1/responses/compact", req.URL.String())
}
// ==================== P1-08 修复model 替换性能优化测试 ====================
// ==================== P1-08 修复model 替换性能优化测试 =============
func TestReplaceModelInSSELine(t *testing.T) {
svc := &OpenAIGatewayService{}

View File

@@ -236,6 +236,60 @@ func TestOpenAIGatewayService_OAuthPassthrough_StreamKeepsToolNameAndBodyNormali
require.NotContains(t, body, "\"name\":\"edit\"")
}
func TestOpenAIGatewayService_OAuthPassthrough_CompactUsesJSONAndKeepsNonStreaming(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil))
c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0")
c.Request.Header.Set("Content-Type", "application/json")
originalBody := []byte(`{"model":"gpt-5.1-codex","stream":true,"store":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`)
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact"}},
Body: io.NopCloser(strings.NewReader(`{"id":"cmp_123","usage":{"input_tokens":11,"output_tokens":22}}`)),
}
upstream := &httpUpstreamRecorder{resp: resp}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}},
httpUpstream: upstream,
}
account := &Account{
ID: 123,
Name: "acc",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"},
Extra: map[string]any{"openai_passthrough": true},
Status: StatusActive,
Schedulable: true,
RateMultiplier: f64p(1),
}
result, err := svc.Forward(context.Background(), c, account, originalBody)
require.NoError(t, err)
require.NotNil(t, result)
require.False(t, result.Stream)
require.False(t, gjson.GetBytes(upstream.lastBody, "store").Exists())
require.False(t, gjson.GetBytes(upstream.lastBody, "stream").Exists())
require.Equal(t, "gpt-5.1-codex", gjson.GetBytes(upstream.lastBody, "model").String())
require.Equal(t, "compact me", gjson.GetBytes(upstream.lastBody, "input.0.text").String())
require.Equal(t, "local-test-instructions", strings.TrimSpace(gjson.GetBytes(upstream.lastBody, "instructions").String()))
require.Equal(t, "application/json", upstream.lastReq.Header.Get("Accept"))
require.Equal(t, codexCLIVersion, upstream.lastReq.Header.Get("Version"))
require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id"))
require.Equal(t, "chatgpt.com", upstream.lastReq.Host)
require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id"))
require.Contains(t, rec.Body.String(), `"id":"cmp_123"`)
}
func TestOpenAIGatewayService_OAuthPassthrough_CodexMissingInstructionsRejectedBeforeUpstream(t *testing.T) {
gin.SetMode(gin.TestMode)
logSink, restore := captureStructuredLog(t)