From 0584305e5a8cc894e1787babf6154c07e34d415e Mon Sep 17 00:00:00 2001 From: lyen1688 Date: Tue, 5 May 2026 19:32:41 +0800 Subject: [PATCH] feat: improve OpenAI messages compatibility for Claude Code --- .../handler/openai_gateway_handler.go | 30 +- .../handler/openai_gateway_handler_test.go | 18 + .../pkg/apicompat/anthropic_responses_test.go | 205 +++- .../pkg/apicompat/anthropic_to_responses.go | 80 +- .../pkg/apicompat/responses_to_anthropic.go | 52 +- backend/internal/pkg/apicompat/types.go | 39 +- .../service/openai_codex_transform.go | 72 +- .../service/openai_codex_transform_test.go | 33 + .../service/openai_compat_model_test.go | 927 +++++++++++++++++- .../service/openai_compat_prompt_cache_key.go | 106 +- .../openai_compat_prompt_cache_key_test.go | 58 ++ .../service/openai_gateway_messages.go | 171 +++- .../service/openai_gateway_service.go | 51 +- .../service/openai_gateway_service_test.go | 23 + .../service/openai_messages_bridge.go | 57 ++ .../service/openai_messages_continuation.go | 277 ++++++ .../service/openai_messages_digest_session.go | 135 +++ .../service/openai_messages_replay_guard.go | 90 ++ .../openai_messages_replay_guard_test.go | 58 ++ .../service/openai_messages_todo_guard.go | 121 +++ .../service/openai_oauth_passthrough_test.go | 58 +- 21 files changed, 2525 insertions(+), 136 deletions(-) create mode 100644 backend/internal/service/openai_messages_bridge.go create mode 100644 backend/internal/service/openai_messages_continuation.go create mode 100644 backend/internal/service/openai_messages_digest_session.go create mode 100644 backend/internal/service/openai_messages_replay_guard.go create mode 100644 backend/internal/service/openai_messages_replay_guard_test.go create mode 100644 backend/internal/service/openai_messages_todo_guard.go diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 5966c163..3997a0ee 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -632,21 +632,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { sessionHash := h.gatewayService.GenerateSessionHash(c, body) promptCacheKey := h.gatewayService.ExtractSessionID(c, body) - - // Anthropic 格式的请求在 metadata.user_id 中携带 session 标识, - // 而非 OpenAI 的 session_id/conversation_id headers。 - // 从中派生 sessionHash(sticky session)和 promptCacheKey(upstream cache)。 - if sessionHash == "" || promptCacheKey == "" { - if userID := strings.TrimSpace(gjson.GetBytes(body, "metadata.user_id").String()); userID != "" { - seed := reqModel + "-" + userID - if promptCacheKey == "" { - promptCacheKey = service.GenerateSessionUUID(seed) - } - if sessionHash == "" { - sessionHash = service.DeriveSessionHashFromSeed(seed) - } - } - } + sessionHash, promptCacheKey = resolveOpenAIMessagesMetadataSession(sessionHash, promptCacheKey, reqModel, body) maxAccountSwitches := h.maxAccountSwitches switchCount := 0 @@ -830,6 +816,20 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { } } +func resolveOpenAIMessagesMetadataSession(sessionHash, promptCacheKey, reqModel string, body []byte) (string, string) { + // Anthropic metadata.user_id 只作为账号粘性信号。上游 GPT/Codex 缓存键 + // 交给 ForwardAsAnthropic 从 cache_control 或完整消息 digest 派生,避免 + // 固定 metadata key 压住后续 turn 的缓存滚动。 + if sessionHash != "" { + return sessionHash, promptCacheKey + } + if userID := strings.TrimSpace(gjson.GetBytes(body, "metadata.user_id").String()); userID != "" { + seed := reqModel + "-" + userID + sessionHash = service.DeriveSessionHashFromSeed(seed) + } + return sessionHash, promptCacheKey +} + // anthropicErrorResponse writes an error in Anthropic Messages API format. func (h *OpenAIGatewayHandler) anthropicErrorResponse(c *gin.Context, status int, errType, message string) { c.JSON(status, gin.H{ diff --git a/backend/internal/handler/openai_gateway_handler_test.go b/backend/internal/handler/openai_gateway_handler_test.go index 2744e0cc..c560350e 100644 --- a/backend/internal/handler/openai_gateway_handler_test.go +++ b/backend/internal/handler/openai_gateway_handler_test.go @@ -92,6 +92,24 @@ func TestOpenAIHandleStreamingAwareError_JSONEscaping(t *testing.T) { } } +func TestResolveOpenAIMessagesMetadataSession_DoesNotDerivePromptCacheKey(t *testing.T) { + body := []byte(`{"model":"claude-sonnet-4-5","metadata":{"user_id":"claude-code-session"},"messages":[{"role":"user","content":"hello"}]}`) + + sessionHash, promptCacheKey := resolveOpenAIMessagesMetadataSession("", "", "claude-sonnet-4-5", body) + + require.NotEmpty(t, sessionHash) + require.Empty(t, promptCacheKey) +} + +func TestResolveOpenAIMessagesMetadataSession_PreservesExplicitPromptCacheKey(t *testing.T) { + body := []byte(`{"metadata":{"user_id":"claude-code-session"}}`) + + sessionHash, promptCacheKey := resolveOpenAIMessagesMetadataSession("", "explicit-cache", "claude-sonnet-4-5", body) + + require.NotEmpty(t, sessionHash) + require.Equal(t, "explicit-cache", promptCacheKey) +} + func TestOpenAIHandleStreamingAwareError_NonStreaming(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() diff --git a/backend/internal/pkg/apicompat/anthropic_responses_test.go b/backend/internal/pkg/apicompat/anthropic_responses_test.go index edde85d3..aa36ef0b 100644 --- a/backend/internal/pkg/apicompat/anthropic_responses_test.go +++ b/backend/internal/pkg/apicompat/anthropic_responses_test.go @@ -32,7 +32,13 @@ func TestAnthropicToResponses_BasicText(t *testing.T) { var items []ResponsesInputItem require.NoError(t, json.Unmarshal(resp.Input, &items)) require.Len(t, items, 1) + assert.Equal(t, "message", items[0].Type) assert.Equal(t, "user", items[0].Role) + var parts []ResponsesContentPart + require.NoError(t, json.Unmarshal(items[0].Content, &parts)) + require.Len(t, parts, 1) + assert.Equal(t, "input_text", parts[0].Type) + assert.Equal(t, "Hello", parts[0].Text) } func TestAnthropicToResponses_SystemPrompt(t *testing.T) { @@ -49,7 +55,12 @@ func TestAnthropicToResponses_SystemPrompt(t *testing.T) { var items []ResponsesInputItem require.NoError(t, json.Unmarshal(resp.Input, &items)) require.Len(t, items, 2) - assert.Equal(t, "system", items[0].Role) + assert.Equal(t, "developer", items[0].Role) + var parts []ResponsesContentPart + require.NoError(t, json.Unmarshal(items[0].Content, &parts)) + require.Len(t, parts, 1) + assert.Equal(t, "input_text", parts[0].Type) + assert.Equal(t, "You are helpful.", parts[0].Text) }) t.Run("array", func(t *testing.T) { @@ -65,11 +76,33 @@ func TestAnthropicToResponses_SystemPrompt(t *testing.T) { var items []ResponsesInputItem require.NoError(t, json.Unmarshal(resp.Input, &items)) require.Len(t, items, 2) - assert.Equal(t, "system", items[0].Role) - // System text should be joined with double newline. - var text string - require.NoError(t, json.Unmarshal(items[0].Content, &text)) - assert.Equal(t, "Part 1\n\nPart 2", text) + assert.Equal(t, "developer", items[0].Role) + var parts []ResponsesContentPart + require.NoError(t, json.Unmarshal(items[0].Content, &parts)) + require.Len(t, parts, 2) + assert.Equal(t, "input_text", parts[0].Type) + assert.Equal(t, "Part 1", parts[0].Text) + assert.Equal(t, "input_text", parts[1].Type) + assert.Equal(t, "Part 2", parts[1].Text) + }) + + t.Run("billing header skipped", func(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 100, + System: json.RawMessage(`[{"type":"text","text":"x-anthropic-billing-header: cc_version=1;"},{"type":"text","text":"Project prompt"}]`), + Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hi"`)}}, + } + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + require.Len(t, items, 2) + var parts []ResponsesContentPart + require.NoError(t, json.Unmarshal(items[0].Content, &parts)) + require.Len(t, parts, 1) + assert.Equal(t, "Project prompt", parts[0].Text) }) } @@ -94,6 +127,8 @@ func TestAnthropicToResponses_ToolUse(t *testing.T) { require.Len(t, resp.Tools, 1) assert.Equal(t, "function", resp.Tools[0].Type) assert.Equal(t, "get_weather", resp.Tools[0].Name) + require.NotNil(t, resp.Tools[0].Strict) + assert.False(t, *resp.Tools[0].Strict) // Check input items var items []ResponsesInputItem @@ -104,10 +139,10 @@ func TestAnthropicToResponses_ToolUse(t *testing.T) { assert.Equal(t, "user", items[0].Role) assert.Equal(t, "assistant", items[1].Role) assert.Equal(t, "function_call", items[2].Type) - assert.Equal(t, "fc_call_1", items[2].CallID) + assert.Equal(t, "call_1", items[2].CallID) assert.Empty(t, items[2].ID) assert.Equal(t, "function_call_output", items[3].Type) - assert.Equal(t, "fc_call_1", items[3].CallID) + assert.Equal(t, "call_1", items[3].CallID) assert.Equal(t, "Sunny, 72°F", items[3].Output) } @@ -261,6 +296,34 @@ func TestResponsesToAnthropic_ToolUse(t *testing.T) { assert.JSONEq(t, `{"city":"NYC"}`, string(anth.Content[1].Input)) } +func TestResponsesToAnthropic_ToolUseStopReasonDoesNotDependOnLastBlock(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_tool_then_text", + Model: "gpt-5.5", + Status: "completed", + Output: []ResponsesOutput{ + { + Type: "function_call", + CallID: "call_todo", + Name: "TodoWrite", + Arguments: `{"todos":[{"content":"review changes","status":"in_progress"}]}`, + }, + { + Type: "message", + Content: []ResponsesContentPart{ + {Type: "output_text", Text: "Task list updated."}, + }, + }, + }, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + assert.Equal(t, "tool_use", anth.StopReason) + require.Len(t, anth.Content, 2) + assert.Equal(t, "tool_use", anth.Content[0].Type) + assert.Equal(t, "text", anth.Content[1].Type) +} + func TestResponsesToAnthropic_ReadToolDropsEmptyPages(t *testing.T) { resp := &ResponsesResponse{ ID: "resp_read", @@ -553,6 +616,81 @@ func TestStreamingToolCall(t *testing.T) { assert.Equal(t, "tool_use", events[0].Delta.StopReason) } +func TestStreamingToolCallStopReasonSurvivesLaterText(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_tool_then_text", Model: "gpt-5.5"}, + }, state) + + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "function_call", CallID: "call_todo", Name: "TodoWrite"}, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_start", events[0].Type) + + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.done", + OutputIndex: 0, + Arguments: `{"todos":[{"content":"review changes","status":"in_progress","activeForm":"reviewing changes"}]}`, + }, state) + require.Len(t, events, 2) + assert.Equal(t, "content_block_delta", events[0].Type) + assert.Equal(t, "content_block_stop", events[1].Type) + + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.delta", + OutputIndex: 1, + Delta: "I will continue after the task list updates.", + }, state) + require.Len(t, events, 2) + assert.Equal(t, "content_block_start", events[0].Type) + assert.Equal(t, "content_block_delta", events[1].Type) + + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + Usage: &ResponsesUsage{InputTokens: 20, OutputTokens: 10}, + }, + }, state) + require.Len(t, events, 3) + assert.Equal(t, "content_block_stop", events[0].Type) + assert.Equal(t, "tool_use", events[1].Delta.StopReason) + assert.Equal(t, "message_stop", events[2].Type) +} + +func TestStreamingToolCallDoneWithoutDeltaEmitsArguments(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_bash", Model: "gpt-5.5"}, + }, state) + + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "function_call", CallID: "call_bash", Name: "Bash"}, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_start", events[0].Type) + + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.done", + OutputIndex: 0, + Arguments: `{"command":"git -C \"/mnt/d/nodejs/other/edmt\" status --short --ignored"}`, + }, state) + require.Len(t, events, 2) + assert.Equal(t, "content_block_delta", events[0].Type) + assert.Equal(t, "input_json_delta", events[0].Delta.Type) + assert.JSONEq(t, `{"command":"git -C \"/mnt/d/nodejs/other/edmt\" status --short --ignored"}`, events[0].Delta.PartialJSON) + assert.Equal(t, "content_block_stop", events[1].Type) +} + func TestStreamingReadToolDropsEmptyPages(t *testing.T) { state := NewResponsesEventToAnthropicState() @@ -692,6 +830,27 @@ func TestFinalizeStream_AbnormalTermination(t *testing.T) { assert.Equal(t, "message_stop", events[2].Type) } +func TestFinalizeStream_ToolCallAbnormalTerminationKeepsToolUseStopReason(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_tool_interrupted", Model: "gpt-5.5"}, + }, state) + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "function_call", CallID: "call_todo", Name: "TodoWrite"}, + }, state) + + events := FinalizeResponsesAnthropicStream(state) + require.Len(t, events, 3) + assert.Equal(t, "content_block_stop", events[0].Type) + assert.Equal(t, "message_delta", events[1].Type) + assert.Equal(t, "tool_use", events[1].Delta.StopReason) + assert.Equal(t, "message_stop", events[2].Type) +} + func TestStreamingEmptyResponse(t *testing.T) { state := NewResponsesEventToAnthropicState() @@ -827,8 +986,8 @@ func TestAnthropicToResponses_ThinkingEnabled(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) require.NotNil(t, resp.Reasoning) - // thinking.type is ignored for effort; default high applies. - assert.Equal(t, "high", resp.Reasoning.Effort) + // thinking.type is ignored for effort; Codex bridge default medium applies. + assert.Equal(t, "medium", resp.Reasoning.Effort) assert.Equal(t, "auto", resp.Reasoning.Summary) assert.Contains(t, resp.Include, "reasoning.encrypted_content") assert.NotContains(t, resp.Include, "reasoning.summary") @@ -845,8 +1004,8 @@ func TestAnthropicToResponses_ThinkingAdaptive(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) require.NotNil(t, resp.Reasoning) - // thinking.type is ignored for effort; default high applies. - assert.Equal(t, "high", resp.Reasoning.Effort) + // thinking.type is ignored for effort; Codex bridge default medium applies. + assert.Equal(t, "medium", resp.Reasoning.Effort) assert.Equal(t, "auto", resp.Reasoning.Summary) assert.NotContains(t, resp.Include, "reasoning.summary") } @@ -861,9 +1020,9 @@ func TestAnthropicToResponses_ThinkingDisabled(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) - // Default effort applies (high → high) even when thinking is disabled. + // Default effort applies (medium) even when thinking is disabled. require.NotNil(t, resp.Reasoning) - assert.Equal(t, "high", resp.Reasoning.Effort) + assert.Equal(t, "medium", resp.Reasoning.Effort) } func TestAnthropicToResponses_NoThinking(t *testing.T) { @@ -875,9 +1034,9 @@ func TestAnthropicToResponses_NoThinking(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) - // Default effort applies (high → high) when no thinking/output_config is set. + // Default effort applies (medium) when no thinking/output_config is set. require.NotNil(t, resp.Reasoning) - assert.Equal(t, "high", resp.Reasoning.Effort) + assert.Equal(t, "medium", resp.Reasoning.Effort) } // --------------------------------------------------------------------------- @@ -885,7 +1044,7 @@ func TestAnthropicToResponses_NoThinking(t *testing.T) { // --------------------------------------------------------------------------- func TestAnthropicToResponses_OutputConfigOverridesDefault(t *testing.T) { - // Default is high, but output_config.effort="low" overrides. low→low after mapping. + // Default is medium, but output_config.effort="low" overrides. low→low after mapping. req := &AnthropicRequest{ Model: "gpt-5.2", MaxTokens: 1024, @@ -919,7 +1078,7 @@ func TestAnthropicToResponses_OutputConfigWithoutThinking(t *testing.T) { } func TestAnthropicToResponses_OutputConfigHigh(t *testing.T) { - // output_config.effort="high" → mapped to "high" (1:1, both sides' default). + // output_config.effort="high" → mapped to "high" (1:1). req := &AnthropicRequest{ Model: "gpt-5.2", MaxTokens: 1024, @@ -951,7 +1110,7 @@ func TestAnthropicToResponses_OutputConfigMax(t *testing.T) { } func TestAnthropicToResponses_NoOutputConfig(t *testing.T) { - // No output_config → default high regardless of thinking.type. + // No output_config → default medium regardless of thinking.type. req := &AnthropicRequest{ Model: "gpt-5.2", MaxTokens: 1024, @@ -962,11 +1121,11 @@ func TestAnthropicToResponses_NoOutputConfig(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) require.NotNil(t, resp.Reasoning) - assert.Equal(t, "high", resp.Reasoning.Effort) + assert.Equal(t, "medium", resp.Reasoning.Effort) } func TestAnthropicToResponses_OutputConfigWithoutEffort(t *testing.T) { - // output_config present but effort empty (e.g. only format set) → default high. + // output_config present but effort empty (e.g. only format set) → default medium. req := &AnthropicRequest{ Model: "gpt-5.2", MaxTokens: 1024, @@ -977,7 +1136,7 @@ func TestAnthropicToResponses_OutputConfigWithoutEffort(t *testing.T) { resp, err := AnthropicToResponses(req) require.NoError(t, err) require.NotNil(t, resp.Reasoning) - assert.Equal(t, "high", resp.Reasoning.Effort) + assert.Equal(t, "medium", resp.Reasoning.Effort) } // --------------------------------------------------------------------------- @@ -1149,7 +1308,7 @@ func TestAnthropicToResponses_ToolResultWithImage(t *testing.T) { // function_call_output should have text-only output (no image). assert.Equal(t, "function_call_output", items[2].Type) - assert.Equal(t, "fc_toolu_1", items[2].CallID) + assert.Equal(t, "toolu_1", items[2].CallID) assert.Equal(t, "(empty)", items[2].Output) // Image should be in a separate user message. diff --git a/backend/internal/pkg/apicompat/anthropic_to_responses.go b/backend/internal/pkg/apicompat/anthropic_to_responses.go index 268f9f22..5f04004d 100644 --- a/backend/internal/pkg/apicompat/anthropic_to_responses.go +++ b/backend/internal/pkg/apicompat/anthropic_to_responses.go @@ -32,6 +32,9 @@ func AnthropicToResponses(req *AnthropicRequest) (*ResponsesRequest, error) { storeFalse := false out.Store = &storeFalse + parallelToolCalls := true + out.ParallelToolCalls = ¶llelToolCalls + out.Text = &ResponsesText{Verbosity: "medium"} if req.MaxTokens > 0 { v := req.MaxTokens @@ -46,10 +49,10 @@ func AnthropicToResponses(req *AnthropicRequest) (*ResponsesRequest, error) { } // Determine reasoning effort: only output_config.effort controls the - // level; thinking.type is ignored. Default is high when unset (both - // Anthropic and OpenAI default to high). + // level; thinking.type is ignored. Default follows Codex CLI / airgate's + // Anthropic bridge shape, which uses medium when unset. // Anthropic levels map 1:1 to OpenAI: low→low, medium→medium, high→high, max→xhigh. - effort := "high" // default → both sides' default + effort := "medium" if req.OutputConfig != nil && req.OutputConfig.Effort != "" { effort = req.OutputConfig.Effort } @@ -108,16 +111,19 @@ func convertAnthropicToolChoiceToResponses(raw json.RawMessage) (json.RawMessage func convertAnthropicToResponsesInput(system json.RawMessage, msgs []AnthropicMessage) ([]ResponsesInputItem, error) { var out []ResponsesInputItem - // System prompt → system role input item. + // System prompt → developer role input item. ChatGPT Codex SSE behaves like + // Codex CLI here: keeping Anthropic system text in input preserves the + // conversation/cache shape better than moving it into instructions. if len(system) > 0 { - sysText, err := parseAnthropicSystemPrompt(system) + sysParts, err := parseAnthropicSystemContentParts(system) if err != nil { return nil, err } - if sysText != "" { - content, _ := json.Marshal(sysText) + if len(sysParts) > 0 { + content, _ := json.Marshal(sysParts) out = append(out, ResponsesInputItem{ - Role: "system", + Type: "message", + Role: "developer", Content: content, }) } @@ -133,24 +139,32 @@ func convertAnthropicToResponsesInput(system json.RawMessage, msgs []AnthropicMe return out, nil } -// parseAnthropicSystemPrompt handles the Anthropic system field which can be -// a plain string or an array of text blocks. -func parseAnthropicSystemPrompt(raw json.RawMessage) (string, error) { +// parseAnthropicSystemContentParts handles the Anthropic system field which can +// be a plain string or an array of text blocks. Claude Code may include an +// x-anthropic-billing-header block; airgate drops it before sending to Codex. +func parseAnthropicSystemContentParts(raw json.RawMessage) ([]ResponsesContentPart, error) { var s string if err := json.Unmarshal(raw, &s); err == nil { - return s, nil + if isAnthropicBillingHeaderText(s) || s == "" { + return nil, nil + } + return []ResponsesContentPart{{Type: "input_text", Text: s}}, nil } var blocks []AnthropicContentBlock if err := json.Unmarshal(raw, &blocks); err != nil { - return "", err + return nil, err } - var parts []string + var parts []ResponsesContentPart for _, b := range blocks { - if b.Type == "text" && b.Text != "" { - parts = append(parts, b.Text) + if b.Type == "text" && b.Text != "" && !isAnthropicBillingHeaderText(b.Text) { + parts = append(parts, ResponsesContentPart{Type: "input_text", Text: b.Text}) } } - return strings.Join(parts, "\n\n"), nil + return parts, nil +} + +func isAnthropicBillingHeaderText(text string) bool { + return strings.HasPrefix(text, "x-anthropic-billing-header: ") } // anthropicMsgToResponsesItems converts a single Anthropic message into one @@ -173,8 +187,12 @@ func anthropicUserToResponses(raw json.RawMessage) ([]ResponsesInputItem, error) // Try plain string. var s string if err := json.Unmarshal(raw, &s); err == nil { - content, _ := json.Marshal(s) - return []ResponsesInputItem{{Role: "user", Content: content}}, nil + parts := []ResponsesContentPart{{Type: "input_text", Text: s}} + partsJSON, err := json.Marshal(parts) + if err != nil { + return nil, err + } + return []ResponsesInputItem{{Type: "message", Role: "user", Content: partsJSON}}, nil } var blocks []AnthropicContentBlock @@ -223,7 +241,7 @@ func anthropicUserToResponses(raw json.RawMessage) ([]ResponsesInputItem, error) if err != nil { return nil, err } - out = append(out, ResponsesInputItem{Role: "user", Content: content}) + out = append(out, ResponsesInputItem{Type: "message", Role: "user", Content: content}) } return out, nil @@ -242,7 +260,7 @@ func anthropicAssistantToResponses(raw json.RawMessage) ([]ResponsesInputItem, e if err != nil { return nil, err } - return []ResponsesInputItem{{Role: "assistant", Content: partsJSON}}, nil + return []ResponsesInputItem{{Type: "message", Role: "assistant", Content: partsJSON}}, nil } var blocks []AnthropicContentBlock @@ -260,7 +278,7 @@ func anthropicAssistantToResponses(raw json.RawMessage) ([]ResponsesInputItem, e if err != nil { return nil, err } - items = append(items, ResponsesInputItem{Role: "assistant", Content: partsJSON}) + items = append(items, ResponsesInputItem{Type: "message", Role: "assistant", Content: partsJSON}) } // tool_use → function_call items. @@ -284,17 +302,14 @@ func anthropicAssistantToResponses(raw json.RawMessage) ([]ResponsesInputItem, e return items, nil } -// toResponsesCallID converts an Anthropic tool ID (toolu_xxx / call_xxx) to a -// Responses API function_call ID that starts with "fc_". +// toResponsesCallID preserves Anthropic tool IDs as Responses call_id values. +// Claude Code sends tool_result.tool_use_id back verbatim, and ChatGPT Codex +// continuation expects that call_id to match the original tool_use id. func toResponsesCallID(id string) string { - if strings.HasPrefix(id, "fc_") { - return id - } - return "fc_" + id + return id } -// fromResponsesCallID reverses toResponsesCallID, stripping the "fc_" prefix -// that was added during request conversion. +// fromResponsesCallID reverses old prefixed IDs while preserving current IDs. func fromResponsesCallID(id string) string { if after, ok := strings.CutPrefix(id, "fc_"); ok { // Only strip if the remainder doesn't look like it was already "fc_" prefixed. @@ -412,11 +427,16 @@ func convertAnthropicToolsToResponses(tools []AnthropicTool) []ResponsesTool { Name: t.Name, Description: t.Description, Parameters: normalizeToolParameters(t.InputSchema), + Strict: boolPtr(false), }) } return out } +func boolPtr(v bool) *bool { + return &v +} + // normalizeToolParameters ensures the tool parameter schema is valid for // OpenAI's Responses API, which requires "properties" on object schemas. // diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic.go b/backend/internal/pkg/apicompat/responses_to_anthropic.go index b76f384d..d7ef0145 100644 --- a/backend/internal/pkg/apicompat/responses_to_anthropic.go +++ b/backend/internal/pkg/apicompat/responses_to_anthropic.go @@ -120,7 +120,7 @@ func responsesStatusToAnthropicStopReason(status string, details *ResponsesIncom } return "end_turn" case "completed": - if len(blocks) > 0 && blocks[len(blocks)-1].Type == "tool_use" { + if containsAnthropicToolUseBlock(blocks) { return "tool_use" } return "end_turn" @@ -129,6 +129,15 @@ func responsesStatusToAnthropicStopReason(status string, details *ResponsesIncom } } +func containsAnthropicToolUseBlock(blocks []AnthropicContentBlock) bool { + for _, block := range blocks { + if block.Type == "tool_use" { + return true + } + } + return false +} + func sanitizeAnthropicToolUseInput(name string, raw string) json.RawMessage { if name != "Read" || raw == "" { return json.RawMessage(raw) @@ -161,11 +170,13 @@ type ResponsesEventToAnthropicState struct { MessageStartSent bool MessageStopSent bool - ContentBlockIndex int - ContentBlockOpen bool - CurrentBlockType string // "text" | "thinking" | "tool_use" - CurrentToolName string - CurrentToolArgs string + ContentBlockIndex int + ContentBlockOpen bool + CurrentBlockType string // "text" | "thinking" | "tool_use" + CurrentToolName string + CurrentToolArgs string + CurrentToolHadDelta bool + HasToolCall bool // OutputIndexToBlockIdx maps Responses output_index → Anthropic content block index. OutputIndexToBlockIdx map[int]int @@ -231,11 +242,16 @@ func FinalizeResponsesAnthropicStream(state *ResponsesEventToAnthropicState) []A var events []AnthropicStreamEvent events = append(events, closeCurrentBlock(state)...) + stopReason := "end_turn" + if state.HasToolCall { + stopReason = "tool_use" + } + events = append(events, AnthropicStreamEvent{ Type: "message_delta", Delta: &AnthropicDelta{ - StopReason: "end_turn", + StopReason: stopReason, }, Usage: &AnthropicUsage{ InputTokens: state.InputTokens, @@ -306,6 +322,8 @@ func resToAnthHandleOutputItemAdded(evt *ResponsesStreamEvent, state *ResponsesE state.CurrentBlockType = "tool_use" state.CurrentToolName = evt.Item.Name state.CurrentToolArgs = "" + state.CurrentToolHadDelta = false + state.HasToolCall = true events = append(events, AnthropicStreamEvent{ Type: "content_block_start", @@ -390,6 +408,9 @@ func resToAnthHandleFuncArgsDelta(evt *ResponsesStreamEvent, state *ResponsesEve state.CurrentToolArgs += evt.Delta return nil } + if state.CurrentBlockType == "tool_use" { + state.CurrentToolHadDelta = true + } blockIdx, ok := state.OutputIndexToBlockIdx[evt.OutputIndex] if !ok { @@ -407,7 +428,7 @@ func resToAnthHandleFuncArgsDelta(evt *ResponsesStreamEvent, state *ResponsesEve } func resToAnthHandleFuncArgsDone(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { - if state.CurrentBlockType != "tool_use" || state.CurrentToolName != "Read" { + if state.CurrentBlockType != "tool_use" { return resToAnthHandleBlockDone(state) } @@ -415,10 +436,16 @@ func resToAnthHandleFuncArgsDone(evt *ResponsesStreamEvent, state *ResponsesEven if raw == "" { raw = state.CurrentToolArgs } - sanitized := sanitizeAnthropicToolUseInput(state.CurrentToolName, raw) - if len(sanitized) == 0 { + if raw == "" || state.CurrentToolHadDelta { return closeCurrentBlock(state) } + if state.CurrentToolName == "Read" { + sanitized := sanitizeAnthropicToolUseInput(state.CurrentToolName, raw) + if len(sanitized) == 0 { + return closeCurrentBlock(state) + } + raw = string(sanitized) + } idx := state.ContentBlockIndex events := []AnthropicStreamEvent{{ @@ -426,7 +453,7 @@ func resToAnthHandleFuncArgsDone(evt *ResponsesStreamEvent, state *ResponsesEven Index: &idx, Delta: &AnthropicDelta{ Type: "input_json_delta", - PartialJSON: string(sanitized), + PartialJSON: raw, }, }} events = append(events, closeCurrentBlock(state)...) @@ -553,7 +580,7 @@ func resToAnthHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo stopReason = "max_tokens" } case "completed": - if state.ContentBlockIndex > 0 && state.CurrentBlockType == "tool_use" { + if state.HasToolCall { stopReason = "tool_use" } } @@ -586,6 +613,7 @@ func closeCurrentBlock(state *ResponsesEventToAnthropicState) []AnthropicStreamE state.ContentBlockIndex++ state.CurrentToolName = "" state.CurrentToolArgs = "" + state.CurrentToolHadDelta = false return []AnthropicStreamEvent{{ Type: "content_block_stop", Index: &idx, diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index 0ff2cf49..f9cd5a1c 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -53,6 +53,8 @@ type AnthropicMessage struct { type AnthropicContentBlock struct { Type string `json:"type"` + CacheControl *AnthropicCacheControl `json:"cache_control,omitempty"` + // type=text Text string `json:"text,omitempty"` @@ -165,19 +167,23 @@ type AnthropicDelta struct { // ResponsesRequest is the request body for POST /v1/responses. type ResponsesRequest struct { - Model string `json:"model"` - Instructions string `json:"instructions,omitempty"` - Input json.RawMessage `json:"input"` // string or []ResponsesInputItem - MaxOutputTokens *int `json:"max_output_tokens,omitempty"` - Temperature *float64 `json:"temperature,omitempty"` - TopP *float64 `json:"top_p,omitempty"` - Stream bool `json:"stream,omitempty"` - Tools []ResponsesTool `json:"tools,omitempty"` - Include []string `json:"include,omitempty"` - Store *bool `json:"store,omitempty"` - Reasoning *ResponsesReasoning `json:"reasoning,omitempty"` - ToolChoice json.RawMessage `json:"tool_choice,omitempty"` - ServiceTier string `json:"service_tier,omitempty"` + Model string `json:"model"` + Instructions string `json:"instructions,omitempty"` + Input json.RawMessage `json:"input"` // string or []ResponsesInputItem + MaxOutputTokens *int `json:"max_output_tokens,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + Stream bool `json:"stream,omitempty"` + Tools []ResponsesTool `json:"tools,omitempty"` + Include []string `json:"include,omitempty"` + Store *bool `json:"store,omitempty"` + ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"` + Reasoning *ResponsesReasoning `json:"reasoning,omitempty"` + Text *ResponsesText `json:"text,omitempty"` + ToolChoice json.RawMessage `json:"tool_choice,omitempty"` + ServiceTier string `json:"service_tier,omitempty"` + PromptCacheKey string `json:"prompt_cache_key,omitempty"` + PreviousResponseID string `json:"previous_response_id,omitempty"` } // ResponsesReasoning configures reasoning effort in the Responses API. @@ -186,13 +192,18 @@ type ResponsesReasoning struct { Summary string `json:"summary,omitempty"` // "auto" | "concise" | "detailed" } +// ResponsesText configures text output options in the Responses API. +type ResponsesText struct { + Verbosity string `json:"verbosity,omitempty"` // "low" | "medium" | "high" +} + // ResponsesInputItem is one item in the Responses API input array. // The Type field determines which other fields are populated. type ResponsesInputItem struct { // Common Type string `json:"type,omitempty"` // "" for role-based messages - // Role-based messages (system/user/assistant) + // Role-based messages (developer/system/user/assistant) Role string `json:"role,omitempty"` Content json.RawMessage `json:"content,omitempty"` // string or []ResponsesContentPart diff --git a/backend/internal/service/openai_codex_transform.go b/backend/internal/service/openai_codex_transform.go index f96bf81f..a3b69dee 100644 --- a/backend/internal/service/openai_codex_transform.go +++ b/backend/internal/service/openai_codex_transform.go @@ -69,6 +69,13 @@ type codexTransformResult struct { PromptCacheKey string } +type codexOAuthTransformOptions struct { + IsCodexCLI bool + IsCompact bool + SkipDefaultInstructions bool + PreserveToolCallIDs bool +} + const ( codexImageGenerationBridgeMarker = "" codexImageGenerationBridgeText = codexImageGenerationBridgeMarker + "\nWhen the user asks for raster image generation or editing, use the OpenAI Responses native `image_generation` tool attached to this request. The local Codex client may not expose an `image_gen` namespace, but that does not mean image generation is unavailable. Do not ask the user to switch to CLI fallback solely because `image_gen` is absent.\n" @@ -94,6 +101,13 @@ var openAICodexOAuthUnsupportedFields = append([]string{ }, openAIChatGPTInternalUnsupportedFields...) func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact bool) codexTransformResult { + return applyCodexOAuthTransformWithOptions(reqBody, codexOAuthTransformOptions{ + IsCodexCLI: isCodexCLI, + IsCompact: isCompact, + }) +} + +func applyCodexOAuthTransformWithOptions(reqBody map[string]any, opts codexOAuthTransformOptions) codexTransformResult { result := codexTransformResult{} // 工具续链需求会影响存储策略与 input 过滤逻辑。 needsToolContinuation := NeedsToolContinuation(reqBody) @@ -111,7 +125,7 @@ func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact result.NormalizedModel = normalizedModel } - if isCompact { + if opts.IsCompact { if _, ok := reqBody["store"]; ok { delete(reqBody, "store") result.Modified = true @@ -183,6 +197,10 @@ func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact if v, ok := reqBody["prompt_cache_key"].(string); ok { result.PromptCacheKey = strings.TrimSpace(v) + if isOpenAICompatMessagesBridgeRequestBody(reqBody) { + delete(reqBody, "prompt_cache_key") + result.Modified = true + } } // 提取 input 中 role:"system" 消息至 instructions(OAuth 上游不支持 system role)。 @@ -191,7 +209,7 @@ func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact } // instructions 处理逻辑:根据是否是 Codex CLI 分别调用不同方法 - if applyInstructions(reqBody, isCodexCLI) { + if !opts.SkipDefaultInstructions && applyInstructions(reqBody, opts.IsCodexCLI) { result.Modified = true } if isCodexSparkModel(normalizedModel) && applyCodexSparkImageUnsupportedInstructions(reqBody) { @@ -208,7 +226,10 @@ func applyCodexOAuthTransform(reqBody map[string]any, isCodexCLI bool, isCompact input = normalizedInput result.Modified = true } - input = filterCodexInput(input, needsToolContinuation) + input = filterCodexInputWithOptions(input, codexInputFilterOptions{ + PreserveReferences: needsToolContinuation, + PreserveCallIDs: opts.PreserveToolCallIDs, + }) reqBody["input"] = input result.Modified = true } else if inputStr, ok := reqBody["input"].(string); ok { @@ -853,7 +874,7 @@ func getNormalizedCodexModel(modelID string) string { } // extractTextFromContent extracts plain text from a content value that is either -// a Go string or a []any of content-part maps with type:"text". +// a Go string or a []any of text-like content-part maps. func extractTextFromContent(content any) string { switch v := content.(type) { case string: @@ -865,7 +886,8 @@ func extractTextFromContent(content any) string { if !ok { continue } - if t, _ := m["type"].(string); t == "text" { + switch t, _ := m["type"].(string); t { + case "text", "input_text", "output_text": if text, ok := m["text"].(string); ok { parts = append(parts, text) } @@ -919,6 +941,28 @@ func extractSystemMessagesFromInput(reqBody map[string]any) bool { return true } +func extractPromptLikeInstructionsFromInput(reqBody map[string]any) string { + input, ok := reqBody["input"].([]any) + if !ok || len(input) == 0 { + return "" + } + var texts []string + for _, item := range input { + m, ok := item.(map[string]any) + if !ok { + continue + } + role, _ := m["role"].(string) + switch role { + case "developer", "system": + if text := strings.TrimSpace(extractTextFromContent(m["content"])); text != "" { + texts = append(texts, text) + } + } + } + return strings.Join(texts, "\n\n") +} + // applyInstructions 处理 instructions 字段:仅在 instructions 为空时填充默认值。 func applyInstructions(reqBody map[string]any, isCodexCLI bool) bool { if !isInstructionsEmpty(reqBody) { @@ -945,9 +989,20 @@ func isInstructionsEmpty(reqBody map[string]any) bool { return strings.TrimSpace(str) == "" } +type codexInputFilterOptions struct { + PreserveReferences bool + PreserveCallIDs bool +} + // filterCodexInput 按需过滤 item_reference 与 id。 // preserveReferences 为 true 时保持引用与 id,以满足续链请求对上下文的依赖。 func filterCodexInput(input []any, preserveReferences bool) []any { + return filterCodexInputWithOptions(input, codexInputFilterOptions{ + PreserveReferences: preserveReferences, + }) +} + +func filterCodexInputWithOptions(input []any, opts codexInputFilterOptions) []any { filtered := make([]any, 0, len(input)) for _, item := range input { m, ok := item.(map[string]any) @@ -968,6 +1023,9 @@ func filterCodexInput(input []any, preserveReferences bool) []any { // 仅修正真正的 tool/function call 标识,避免误改普通 message/reasoning id; // 若 item_reference 指向 legacy call_* 标识,则仅修正该引用本身。 fixCallIDPrefix := func(id string) string { + if opts.PreserveCallIDs { + return id + } if id == "" || strings.HasPrefix(id, "fc") { return id } @@ -978,7 +1036,7 @@ func filterCodexInput(input []any, preserveReferences bool) []any { } if typ == "item_reference" { - if !preserveReferences { + if !opts.PreserveReferences { continue } newItem := make(map[string]any, len(m)) @@ -1046,7 +1104,7 @@ func filterCodexInput(input []any, preserveReferences bool) []any { } } - if !preserveReferences { + if !opts.PreserveReferences { ensureCopy() delete(newItem, "id") } diff --git a/backend/internal/service/openai_codex_transform_test.go b/backend/internal/service/openai_codex_transform_test.go index 3add4779..9c72760a 100644 --- a/backend/internal/service/openai_codex_transform_test.go +++ b/backend/internal/service/openai_codex_transform_test.go @@ -44,6 +44,39 @@ func TestApplyCodexOAuthTransform_ToolContinuationPreservesInput(t *testing.T) { require.Equal(t, "fc1", second["call_id"]) } +func TestApplyCodexOAuthTransform_MessagesBridgePromptCacheKeyIsHeaderOnly(t *testing.T) { + reqBody := map[string]any{ + "model": "gpt-5.5", + "prompt_cache_key": "anthropic-metadata-session-1", + "input": []any{ + map[string]any{ + "type": "message", + "role": "developer", + "content": []any{ + map[string]any{ + "type": "input_text", + "text": openAICompatClaudeCodeTodoGuardMarker, + }, + }, + }, + map[string]any{ + "type": "message", + "role": "user", + "content": "hello", + }, + }, + } + + result := applyCodexOAuthTransformWithOptions(reqBody, codexOAuthTransformOptions{ + SkipDefaultInstructions: true, + PreserveToolCallIDs: true, + }) + + require.Equal(t, "anthropic-metadata-session-1", result.PromptCacheKey) + require.True(t, result.Modified) + require.NotContains(t, reqBody, "prompt_cache_key") +} + func TestApplyCodexOAuthTransform_ToolContinuationPreservesNativeMessageAndReasoningIDs(t *testing.T) { reqBody := map[string]any{ "model": "gpt-5.2", diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index 840784bf..a897e219 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" @@ -145,7 +146,10 @@ func TestForwardAsAnthropic_NormalizesRoutingAndEffortForGpt54XHigh(t *testing.T Body: io.NopCloser(strings.NewReader(upstreamBody)), }} - svc := &OpenAIGatewayService{httpUpstream: upstream} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } account := &Account{ ID: 1, Name: "openai-oauth", @@ -179,6 +183,927 @@ func TestForwardAsAnthropic_NormalizesRoutingAndEffortForGpt54XHigh(t *testing.T t.Logf("response body: %s", rec.Body.String()) } +func TestForwardAsAnthropic_InjectsPromptCacheKeyForAPIKeyMessagesDispatch(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"metadata":{"user_id":"claude-session-1"},"messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.3-codex","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":5,"output_tokens":2,"total_tokens":7,"input_tokens_details":{"cached_tokens":3}}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_cache_key"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "stable-cache-key", gjson.GetBytes(upstream.lastBody, "prompt_cache_key").String()) + require.Equal(t, "gpt-5.3-codex", gjson.GetBytes(upstream.lastBody, "model").String()) + require.Equal(t, 3, result.Usage.CacheReadInputTokens) +} + +func TestForwardAsAnthropic_AutoDerivesPromptCacheKeyWhenMessagesDispatchHasNoSessionID(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"system":"You are helpful.","messages":[{"role":"user","content":"open repo"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.3-codex","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":5,"output_tokens":2,"total_tokens":7,"input_tokens_details":{"cached_tokens":3}}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_auto_cache_key"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, result) + cacheKey := gjson.GetBytes(upstream.lastBody, "prompt_cache_key").String() + require.NotEmpty(t, cacheKey) + require.True(t, strings.HasPrefix(cacheKey, "anthropic-digest-")) + require.Equal(t, generateSessionUUID(isolateOpenAISessionID(0, cacheKey)), upstream.lastReq.Header.Get("session_id")) +} + +func TestForwardAsAnthropic_DoesNotAutoDerivePromptCacheKeyForNonCodexModel(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-4o","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":5,"output_tokens":2,"total_tokens":7}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_no_cache_key"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-4o") + require.NoError(t, err) + require.NotNil(t, result) + require.False(t, gjson.GetBytes(upstream.lastBody, "prompt_cache_key").Exists()) + require.Empty(t, upstream.lastReq.Header.Get("session_id")) +} + +func TestForwardAsAnthropic_TrimsFullReplayOnlyForCodexCompatModels(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + messages := make([]string, 0, openAICompatAnthropicReplayMaxTailMessages+3) + for i := 0; i < openAICompatAnthropicReplayMaxTailMessages+3; i++ { + messages = append(messages, `{"role":"user","content":"message-`+fmt.Sprintf("%02d", i)+`"}`) + } + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[` + strings.Join(messages, ",") + `],"stream":false}`) + + run := func(t *testing.T, mappedModel string) []byte { + t.Helper() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"` + mappedModel + `","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":5,"output_tokens":2,"total_tokens":7}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_trim"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", mappedModel) + require.NoError(t, err) + require.NotNil(t, result) + return upstream.lastBody + } + + codexBody := run(t, "gpt-5.3-codex") + require.Equal(t, int64(openAICompatAnthropicReplayMaxTailMessages+1), gjson.GetBytes(codexBody, "input.#").Int()) + require.Equal(t, "developer", gjson.GetBytes(codexBody, "input.0.role").String()) + require.Contains(t, gjson.GetBytes(codexBody, "input.0.content.0.text").String(), "") + require.Equal(t, "message-03", gjson.GetBytes(codexBody, "input.1.content.0.text").String()) + require.Equal(t, "message-14", gjson.GetBytes(codexBody, "input.12.content.0.text").String()) + + nonCompatBody := run(t, "gpt-4o") + require.Equal(t, int64(openAICompatAnthropicReplayMaxTailMessages+3), gjson.GetBytes(nonCompatBody, "input.#").Int()) + require.Equal(t, "message-00", gjson.GetBytes(nonCompatBody, "input.0.content.0.text").String()) +} + +func TestForwardAsAnthropic_OAuthCompatKeepsFullReplayForCacheGrowth(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + messages := make([]string, 0, openAICompatAnthropicReplayMaxTailMessages+3) + for i := 0; i < openAICompatAnthropicReplayMaxTailMessages+3; i++ { + messages = append(messages, `{"role":"user","content":"message-`+fmt.Sprintf("%02d", i)+`"}`) + } + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[` + strings.Join(messages, ",") + `],"stream":false}`) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{resp: openAICompatSSECompletedResponse("resp_oauth_trim", "gpt-5.4")} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, int64(openAICompatAnthropicReplayMaxTailMessages+4), gjson.GetBytes(upstream.lastBody, "input.#").Int()) + require.Equal(t, "developer", gjson.GetBytes(upstream.lastBody, "input.0.role").String()) + require.Contains(t, gjson.GetBytes(upstream.lastBody, "input.0.content.0.text").String(), "") + require.Equal(t, "message-00", gjson.GetBytes(upstream.lastBody, "input.1.content.0.text").String()) + require.Equal(t, "message-14", gjson.GetBytes(upstream.lastBody, "input.15.content.0.text").String()) + require.False(t, gjson.GetBytes(upstream.lastBody, "prompt_cache_key").Exists()) +} + +func TestForwardAsAnthropic_AttachesPreviousResponseIDForCompatContinuation(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"}],"stream":false}`) + upstream.resp = openAICompatSSECompletedResponse("resp_first", "gpt-5.3-codex") + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, firstResult) + require.Equal(t, "resp_first", firstResult.ResponseID) + require.False(t, gjson.GetBytes(upstream.lastBody, "previous_response_id").Exists()) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + upstream.resp = openAICompatSSECompletedResponse("resp_second", "gpt-5.3-codex") + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, "resp_second", secondResult.ResponseID) + require.Equal(t, "resp_first", gjson.GetBytes(upstream.lastBody, "previous_response_id").String()) + require.Equal(t, int64(2), gjson.GetBytes(upstream.lastBody, "input.#").Int()) + require.Equal(t, "developer", gjson.GetBytes(upstream.lastBody, "input.0.role").String()) + require.Contains(t, gjson.GetBytes(upstream.lastBody, "input.0.content.0.text").String(), "") + require.Equal(t, "second", gjson.GetBytes(upstream.lastBody, "input.1.content.0.text").String()) +} + +func TestForwardAsAnthropic_ReplaysWithoutContinuationWhenPreviousResponseMissing(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + svc.bindOpenAICompatSessionResponseID(context.Background(), nil, account, "stable-cache-key", "resp_missing") + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + upstream.responses = []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid_prev_missing"}}, + Body: io.NopCloser(strings.NewReader(`{"error":{"code":"previous_response_not_found","message":"previous response not found"}}`)), + }, + openAICompatSSECompletedResponse("resp_replayed", "gpt-5.3-codex"), + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, secondBody, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "resp_replayed", result.ResponseID) + require.Len(t, upstream.requests, 2) + require.Equal(t, "resp_missing", gjson.GetBytes(upstream.bodies[0], "previous_response_id").String()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) + require.Equal(t, int64(4), gjson.GetBytes(upstream.bodies[1], "input.#").Int()) + require.Equal(t, "developer", gjson.GetBytes(upstream.bodies[1], "input.0.role").String()) + require.Contains(t, gjson.GetBytes(upstream.bodies[1], "input.0.content.0.text").String(), "") + require.Equal(t, "first", gjson.GetBytes(upstream.bodies[1], "input.1.content.0.text").String()) + require.Equal(t, "second", gjson.GetBytes(upstream.bodies[1], "input.3.content.0.text").String()) +} + +func TestForwardAsAnthropic_DisablesAPIKeyContinuationWhenUpstreamRequiresWebSocketV2(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + svc.bindOpenAICompatSessionResponseID(context.Background(), nil, account, "stable-cache-key", "resp_http_unsupported") + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + upstream.responses = []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid_prev_http_unsupported"}}, + Body: io.NopCloser(strings.NewReader(`{"error":{"message":"previous_response_id is only supported on Responses WebSocket v2","type":"invalid_request_error"}}`)), + }, + openAICompatSSECompletedResponse("resp_replayed", "gpt-5.5"), + openAICompatSSECompletedResponse("resp_later", "gpt-5.5"), + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "stable-cache-key", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "resp_replayed", result.ResponseID) + require.Len(t, upstream.requests, 2) + require.Equal(t, "resp_http_unsupported", gjson.GetBytes(upstream.bodies[0], "previous_response_id").String()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) + + laterRec := httptest.NewRecorder() + laterCtx, _ := gin.CreateTestContext(laterRec) + laterCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + laterCtx.Request.Header.Set("Content-Type", "application/json") + + laterResult, err := svc.ForwardAsAnthropic(context.Background(), laterCtx, account, body, "stable-cache-key", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, laterResult) + require.Equal(t, "resp_later", laterResult.ResponseID) + require.Len(t, upstream.requests, 3) + require.False(t, gjson.GetBytes(upstream.bodies[2], "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_APIKeyMetadataSessionSurvivesChangingCacheControlAnchorAfterContinuationDisabled(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + metadata := `{"user_id":"{\"device_id\":\"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\",\"account_uuid\":\"\",\"session_id\":\"aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa\"}"}` + firstBody := []byte(`{"model":"claude-haiku-4-5-20251001","max_tokens":16,"metadata":` + metadata + `,"system":[{"type":"text","text":"project docs","cache_control":{"type":"ephemeral"}}],"messages":[{"role":"user","content":"first"}],"stream":false}`) + messages := make([]string, 0, openAICompatAnthropicReplayMaxTailMessages+4) + messages = append(messages, `{"role":"user","content":[{"type":"text","text":"rewritten context","cache_control":{"type":"ephemeral"}}]}`) + for i := 1; i < openAICompatAnthropicReplayMaxTailMessages+4; i++ { + messages = append(messages, `{"role":"user","content":"message-`+fmt.Sprintf("%02d", i)+`"}`) + } + secondBody := []byte(`{"model":"claude-haiku-4-5-20251001","max_tokens":16,"metadata":` + metadata + `,"messages":[` + strings.Join(messages, ",") + `],"stream":false}`) + + upstream := &httpUpstreamRecorder{responses: []*http.Response{ + openAICompatSSECompletedResponse("resp_first", "gpt-5.4-mini"), + openAICompatSSECompletedResponse("resp_second", "gpt-5.4-mini"), + }} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "", "gpt-5.4-mini") + require.NoError(t, err) + require.NotNil(t, firstResult) + firstKey := gjson.GetBytes(upstream.bodies[0], "prompt_cache_key").String() + require.NotEmpty(t, firstKey) + require.True(t, strings.HasPrefix(firstKey, "anthropic-metadata-")) + + svc.disableOpenAICompatSessionContinuation(context.Background(), nil, account, firstKey) + + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "", "gpt-5.4-mini") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Len(t, upstream.requests, 2) + require.Equal(t, firstKey, gjson.GetBytes(upstream.bodies[1], "prompt_cache_key").String()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) + require.Equal(t, int64(openAICompatAnthropicReplayMaxTailMessages+5), gjson.GetBytes(upstream.bodies[1], "input.#").Int()) + require.Equal(t, "developer", gjson.GetBytes(upstream.bodies[1], "input.0.role").String()) + require.Contains(t, gjson.GetBytes(upstream.bodies[1], "input.0.content.0.text").String(), "") + require.Equal(t, "rewritten context", gjson.GetBytes(upstream.bodies[1], "input.1.content.0.text").String()) + require.Equal(t, "message-15", gjson.GetBytes(upstream.bodies[1], "input.16.content.0.text").String()) +} + +func TestForwardAsAnthropic_DoesNotAttachPreviousResponseIDForOAuthCompat(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{resp: openAICompatSSECompletedResponse("resp_oauth_next", "gpt-5.4")} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + svc.bindOpenAICompatSessionResponseID(context.Background(), nil, account, "stable-cache-key", "resp_oauth_prev") + + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "stable-cache-key", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, result) + require.False(t, gjson.GetBytes(upstream.lastBody, "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_ReusesOAuthCodexTurnState(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + firstResp := openAICompatSSECompletedResponse("resp_oauth_first", "gpt-5.4") + firstResp.Header.Set("x-codex-turn-state", "turn_state_first") + upstream := &httpUpstreamRecorder{responses: []*http.Response{ + firstResp, + openAICompatSSECompletedResponse("resp_oauth_second", "gpt-5.4"), + }} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"}],"stream":false}`) + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "stable-cache-key", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, firstResult) + require.Empty(t, upstream.requests[0].Header.Get("x-codex-turn-state")) + require.Empty(t, upstream.requests[0].Header.Get("OpenAI-Beta")) + require.Empty(t, upstream.requests[0].Header.Get("originator")) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "stable-cache-key", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, "turn_state_first", upstream.requests[1].Header.Get("x-codex-turn-state")) + require.Equal(t, generateSessionUUID(isolateOpenAISessionID(0, "stable-cache-key")), upstream.requests[1].Header.Get("session_id")) + require.Empty(t, upstream.requests[1].Header.Get("conversation_id")) + require.Empty(t, upstream.requests[1].Header.Get("OpenAI-Beta")) + require.Empty(t, upstream.requests[1].Header.Get("originator")) + require.False(t, gjson.GetBytes(upstream.bodies[1], "prompt_cache_key").Exists()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_OAuthDigestFallbackReusesTurnStateWithoutExplicitKey(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + firstResp := openAICompatSSECompletedResponse("resp_oauth_digest_first", "gpt-5.4") + firstResp.Header.Set("x-codex-turn-state", "turn_state_digest_first") + upstream := &httpUpstreamRecorder{responses: []*http.Response{ + firstResp, + openAICompatSSECompletedResponse("resp_oauth_digest_second", "gpt-5.4"), + }} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"}],"stream":false}`) + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, firstResult) + firstSessionID := upstream.requests[0].Header.Get("session_id") + require.NotEmpty(t, firstSessionID) + require.Empty(t, upstream.requests[0].Header.Get("x-codex-turn-state")) + require.False(t, gjson.GetBytes(upstream.bodies[0], "prompt_cache_key").Exists()) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, firstSessionID, upstream.requests[1].Header.Get("session_id")) + require.Equal(t, "turn_state_digest_first", upstream.requests[1].Header.Get("x-codex-turn-state")) + require.Empty(t, upstream.requests[1].Header.Get("conversation_id")) + require.False(t, gjson.GetBytes(upstream.bodies[1], "prompt_cache_key").Exists()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_OAuthMetadataSessionSurvivesDigestPrefixRewrite(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + firstResp := openAICompatSSECompletedResponse("resp_oauth_metadata_first", "gpt-5.5") + firstResp.Header.Set("x-codex-turn-state", "turn_state_metadata_first") + upstream := &httpUpstreamRecorder{responses: []*http.Response{ + firstResp, + openAICompatSSECompletedResponse("resp_oauth_metadata_second", "gpt-5.5"), + }} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + metadata := `{"user_id":"{\"device_id\":\"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\",\"account_uuid\":\"\",\"session_id\":\"aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa\"}"}` + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"metadata":` + metadata + `,"messages":[{"role":"user","content":"first plan"}],"stream":false}`) + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, firstResult) + firstSessionID := upstream.requests[0].Header.Get("session_id") + require.NotEmpty(t, firstSessionID) + require.Empty(t, upstream.requests[0].Header.Get("x-codex-turn-state")) + require.False(t, gjson.GetBytes(upstream.bodies[0], "prompt_cache_key").Exists()) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"metadata":` + metadata + `,"messages":[{"role":"user","content":"rewritten plan"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, firstSessionID, upstream.requests[1].Header.Get("session_id")) + require.Equal(t, "turn_state_metadata_first", upstream.requests[1].Header.Get("x-codex-turn-state")) + require.Empty(t, upstream.requests[1].Header.Get("conversation_id")) + require.False(t, gjson.GetBytes(upstream.bodies[1], "prompt_cache_key").Exists()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_OAuthMetadataSessionSurvivesChangingCacheControlAnchor(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + firstResp := openAICompatSSECompletedResponse("resp_oauth_cache_anchor_first", "gpt-5.5") + firstResp.Header.Set("x-codex-turn-state", "turn_state_cache_anchor_first") + upstream := &httpUpstreamRecorder{responses: []*http.Response{ + firstResp, + openAICompatSSECompletedResponse("resp_oauth_cache_anchor_second", "gpt-5.5"), + }} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + metadata := `{"user_id":"{\"device_id\":\"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\",\"account_uuid\":\"\",\"session_id\":\"bbbbbbbb-bbbb-4bbb-8bbb-bbbbbbbbbbbb\"}"}` + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"metadata":` + metadata + `,"system":[{"type":"text","text":"anchor one","cache_control":{"type":"ephemeral"}}],"messages":[{"role":"user","content":"first"}],"stream":false}`) + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, firstResult) + firstSessionID := upstream.requests[0].Header.Get("session_id") + require.NotEmpty(t, firstSessionID) + require.Empty(t, upstream.requests[0].Header.Get("x-codex-turn-state")) + require.False(t, gjson.GetBytes(upstream.bodies[0], "prompt_cache_key").Exists()) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"metadata":` + metadata + `,"system":[{"type":"text","text":"anchor two","cache_control":{"type":"ephemeral"}}],"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, firstSessionID, upstream.requests[1].Header.Get("session_id")) + require.Equal(t, "turn_state_cache_anchor_first", upstream.requests[1].Header.Get("x-codex-turn-state")) + require.Empty(t, upstream.requests[1].Header.Get("conversation_id")) + require.False(t, gjson.GetBytes(upstream.bodies[1], "prompt_cache_key").Exists()) + require.False(t, gjson.GetBytes(upstream.bodies[1], "previous_response_id").Exists()) +} + +func TestForwardAsAnthropic_OAuthKeepsSystemAsDeveloperInput(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{resp: openAICompatSSECompletedResponse("resp_oauth_system", "gpt-5.4")} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"system":[{"type":"text","text":"project instructions","cache_control":{"type":"ephemeral"}}],"messages":[{"role":"user","content":"first"}],"stream":false}`) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "developer", gjson.GetBytes(upstream.lastBody, "input.0.role").String()) + require.Equal(t, "input_text", gjson.GetBytes(upstream.lastBody, "input.0.content.0.type").String()) + require.Equal(t, "project instructions", gjson.GetBytes(upstream.lastBody, "input.0.content.0.text").String()) + instructions := gjson.GetBytes(upstream.lastBody, "instructions") + require.True(t, instructions.Exists()) + require.Empty(t, instructions.String()) + require.Empty(t, upstream.requests[0].Header.Get("OpenAI-Beta")) + require.Empty(t, upstream.requests[0].Header.Get("originator")) +} + +func TestForwardAsAnthropic_OAuthAddsClaudeCodeTodoGuardForCompatModel(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{resp: openAICompatSSECompletedResponse("resp_oauth_todo_guard", "gpt-5.5")} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"system":"project instructions","messages":[{"role":"user","content":"review files"}],"stream":false}`) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "developer", gjson.GetBytes(upstream.lastBody, "input.0.role").String()) + require.Equal(t, "project instructions", gjson.GetBytes(upstream.lastBody, "input.0.content.0.text").String()) + require.Equal(t, "developer", gjson.GetBytes(upstream.lastBody, "input.1.role").String()) + require.Contains(t, gjson.GetBytes(upstream.lastBody, "input.1.content.0.text").String(), "") + require.Equal(t, "user", gjson.GetBytes(upstream.lastBody, "input.2.role").String()) +} + +func TestForwardAsAnthropic_OAuthPreservesClaudeCodeToolCallID(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{resp: openAICompatSSECompletedResponse("resp_oauth_tool", "gpt-5.4")} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + body := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"list files"},{"role":"assistant","content":[{"type":"tool_use","id":"toolu_123","name":"Bash","input":{"command":"ls"}}]},{"role":"user","content":[{"type":"tool_result","tool_use_id":"toolu_123","content":"ok"}]}],"tools":[{"name":"Bash","description":"run shell","input_schema":{"type":"object","properties":{"command":{"type":"string"}}}}],"stream":false}`) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "stable-cache-key", "gpt-5.4") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "toolu_123", gjson.GetBytes(upstream.lastBody, `input.#(type=="function_call").call_id`).String()) + require.Equal(t, "toolu_123", gjson.GetBytes(upstream.lastBody, `input.#(type=="function_call_output").call_id`).String()) + require.True(t, gjson.GetBytes(upstream.lastBody, "parallel_tool_calls").Bool()) + require.Equal(t, "medium", gjson.GetBytes(upstream.lastBody, "text.verbosity").String()) + require.False(t, gjson.GetBytes(upstream.lastBody, "tools.0.strict").Bool()) +} + +func TestForwardAsAnthropic_StoresStreamingResponseIDWithoutUsage(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + upstream := &httpUpstreamRecorder{} + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + }, + } + + firstBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"}],"stream":true}`) + upstream.resp = openAICompatSSEResponseWithoutUsage("resp_stream_first", "gpt-5.3-codex") + firstRec := httptest.NewRecorder() + firstCtx, _ := gin.CreateTestContext(firstRec) + firstCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(firstBody)) + firstCtx.Request.Header.Set("Content-Type", "application/json") + + firstResult, err := svc.ForwardAsAnthropic(context.Background(), firstCtx, account, firstBody, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, firstResult) + require.Equal(t, "resp_stream_first", firstResult.ResponseID) + + secondBody := []byte(`{"model":"claude-sonnet-4-5","max_tokens":16,"messages":[{"role":"user","content":"first"},{"role":"assistant","content":"ok"},{"role":"user","content":"second"}],"stream":false}`) + upstream.resp = openAICompatSSECompletedResponse("resp_stream_second", "gpt-5.3-codex") + secondRec := httptest.NewRecorder() + secondCtx, _ := gin.CreateTestContext(secondRec) + secondCtx.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(secondBody)) + secondCtx.Request.Header.Set("Content-Type", "application/json") + + secondResult, err := svc.ForwardAsAnthropic(context.Background(), secondCtx, account, secondBody, "stable-cache-key", "gpt-5.3-codex") + require.NoError(t, err) + require.NotNil(t, secondResult) + require.Equal(t, "resp_stream_first", gjson.GetBytes(upstream.lastBody, "previous_response_id").String()) +} + +func openAICompatSSECompletedResponse(responseID, model string) *http.Response { + body := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"` + responseID + `","object":"response","model":"` + model + `","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":5,"output_tokens":2,"total_tokens":7}}}`, + "", + "data: [DONE]", + "", + }, "\n") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_continuation"}}, + Body: io.NopCloser(strings.NewReader(body)), + } +} + +func openAICompatSSEResponseWithoutUsage(responseID, model string) *http.Response { + body := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"` + responseID + `","object":"response","model":"` + model + `","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}]}}`, + "", + "data: [DONE]", + "", + }, "\n") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_" + responseID}}, + Body: io.NopCloser(strings.NewReader(body)), + } +} + func TestForwardAsAnthropic_ForcedCodexInstructionsTemplatePrependsRenderedInstructions(t *testing.T) { t.Parallel() gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_compat_prompt_cache_key.go b/backend/internal/service/openai_compat_prompt_cache_key.go index fcd27f19..de227ff1 100644 --- a/backend/internal/service/openai_compat_prompt_cache_key.go +++ b/backend/internal/service/openai_compat_prompt_cache_key.go @@ -1,7 +1,9 @@ package service import ( + "crypto/sha256" "encoding/json" + "fmt" "strings" "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" @@ -16,12 +18,8 @@ func shouldAutoInjectPromptCacheKeyForCompat(model string) bool { if !strings.Contains(trimmed, "gpt-5") && !strings.Contains(trimmed, "codex") { return false } - switch normalizeCodexModel(trimmed) { - case "gpt-5.4", "gpt-5.3-codex", "gpt-5.3-codex-spark": - return true - default: - return false - } + normalized := strings.TrimSpace(strings.ToLower(normalizeCodexModel(trimmed))) + return strings.HasPrefix(normalized, "gpt-5") || strings.Contains(normalized, "codex") } func deriveCompatPromptCacheKey(req *apicompat.ChatCompletionsRequest, mappedModel string) string { @@ -71,6 +69,102 @@ func deriveCompatPromptCacheKey(req *apicompat.ChatCompletionsRequest, mappedMod return compatPromptCacheKeyPrefix + hashSensitiveValueForLog(strings.Join(seedParts, "|")) } +func deriveAnthropicCompatPromptCacheKey(req *apicompat.AnthropicRequest, mappedModel string) string { + if req == nil { + return "" + } + if anchorKey := deriveAnthropicCacheControlPromptCacheKey(req); anchorKey != "" { + return anchorKey + } + + normalizedModel := normalizeCodexModel(strings.TrimSpace(mappedModel)) + if normalizedModel == "" { + normalizedModel = normalizeCodexModel(strings.TrimSpace(req.Model)) + } + if normalizedModel == "" { + normalizedModel = strings.TrimSpace(req.Model) + } + + seedParts := []string{"model=" + normalizedModel} + if req.OutputConfig != nil && strings.TrimSpace(req.OutputConfig.Effort) != "" { + seedParts = append(seedParts, "effort="+strings.TrimSpace(req.OutputConfig.Effort)) + } + if len(req.ToolChoice) > 0 { + seedParts = append(seedParts, "tool_choice="+normalizeCompatSeedJSON(req.ToolChoice)) + } + if len(req.Tools) > 0 { + if raw, err := json.Marshal(req.Tools); err == nil { + seedParts = append(seedParts, "tools="+normalizeCompatSeedJSON(raw)) + } + } + if len(req.System) > 0 { + seedParts = append(seedParts, "system="+normalizeCompatSeedJSON(req.System)) + } + + firstUserCaptured := false + for _, msg := range req.Messages { + if strings.TrimSpace(msg.Role) != "user" || firstUserCaptured { + continue + } + seedParts = append(seedParts, "first_user="+normalizeCompatSeedJSON(msg.Content)) + firstUserCaptured = true + } + + return compatPromptCacheKeyPrefix + hashSensitiveValueForLog(strings.Join(seedParts, "|")) +} + +func deriveAnthropicCacheControlPromptCacheKey(req *apicompat.AnthropicRequest) string { + if req == nil { + return "" + } + + var parts []string + var systemBlocks []apicompat.AnthropicContentBlock + if len(req.System) > 0 && json.Unmarshal(req.System, &systemBlocks) == nil { + for _, block := range systemBlocks { + if block.Type == "text" && + block.CacheControl != nil && + strings.TrimSpace(block.CacheControl.Type) == "ephemeral" && + strings.TrimSpace(block.Text) != "" { + parts = append(parts, "system:"+strings.TrimSpace(block.Text)) + } + } + } + + firstUserAnchor := "" + for _, msg := range req.Messages { + var blocks []apicompat.AnthropicContentBlock + if len(msg.Content) == 0 || json.Unmarshal(msg.Content, &blocks) != nil { + continue + } + role := strings.TrimSpace(msg.Role) + for _, block := range blocks { + if block.Type != "text" || + block.CacheControl == nil || + strings.TrimSpace(block.CacheControl.Type) != "ephemeral" || + strings.TrimSpace(block.Text) == "" { + continue + } + switch role { + case "user": + if firstUserAnchor == "" { + firstUserAnchor = strings.TrimSpace(block.Text) + } + case "assistant": + parts = append(parts, "assistant:"+strings.TrimSpace(block.Text)) + } + } + } + if firstUserAnchor != "" { + parts = append(parts, "user_anchor:"+firstUserAnchor) + } + if len(parts) == 0 { + return "" + } + sum := sha256.Sum256([]byte("anthropic-cache:" + strings.Join(parts, "\n"))) + return fmt.Sprintf("anthropic-cache-%x", sum[:16]) +} + func normalizeCompatSeedJSON(v json.RawMessage) string { if len(v) == 0 { return "" diff --git a/backend/internal/service/openai_compat_prompt_cache_key_test.go b/backend/internal/service/openai_compat_prompt_cache_key_test.go index 6ca3e85c..3fe7db6e 100644 --- a/backend/internal/service/openai_compat_prompt_cache_key_test.go +++ b/backend/internal/service/openai_compat_prompt_cache_key_test.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "strings" "testing" "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" @@ -14,7 +15,10 @@ func mustRawJSON(t *testing.T, s string) json.RawMessage { } func TestShouldAutoInjectPromptCacheKeyForCompat(t *testing.T) { + require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.5")) require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.4")) + require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.4-mini")) + require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.2")) require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.3")) require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.3-codex")) require.True(t, shouldAutoInjectPromptCacheKeyForCompat("gpt-5.3-codex-spark")) @@ -77,3 +81,57 @@ func TestDeriveCompatPromptCacheKey_UsesResolvedSparkFamily(t *testing.T) { require.NotEmpty(t, k1) require.Equal(t, k1, k2, "resolved spark family should derive a stable compat cache key") } + +func TestDeriveAnthropicCompatPromptCacheKey_StableAcrossLaterTurns(t *testing.T) { + base := &apicompat.AnthropicRequest{ + Model: "claude-sonnet-4-5", + System: mustRawJSON(t, `"You are helpful."`), + Messages: []apicompat.AnthropicMessage{ + {Role: "user", Content: mustRawJSON(t, `"Open repo"`)}, + }, + } + extended := &apicompat.AnthropicRequest{ + Model: "claude-sonnet-4-5", + System: mustRawJSON(t, `"You are helpful."`), + Messages: []apicompat.AnthropicMessage{ + {Role: "user", Content: mustRawJSON(t, `"Open repo"`)}, + {Role: "assistant", Content: mustRawJSON(t, `"Opened."`)}, + {Role: "user", Content: mustRawJSON(t, `"Run tests"`)}, + }, + } + + k1 := deriveAnthropicCompatPromptCacheKey(base, "gpt-5.3-codex") + k2 := deriveAnthropicCompatPromptCacheKey(extended, "gpt-5.3-codex") + require.NotEmpty(t, k1) + require.Equal(t, k1, k2, "cache key should stay stable as later Claude Code turns append history") +} + +func TestDeriveAnthropicCompatPromptCacheKey_UsesCacheControlAnchors(t *testing.T) { + base := &apicompat.AnthropicRequest{ + Model: "claude-sonnet-4-5", + System: mustRawJSON(t, `[ + {"type":"text","text":"project instructions","cache_control":{"type":"ephemeral"}} + ]`), + Messages: []apicompat.AnthropicMessage{ + {Role: "user", Content: mustRawJSON(t, `[ + {"type":"text","text":"repo anchor","cache_control":{"type":"ephemeral"}} + ]`)}, + }, + } + extended := &apicompat.AnthropicRequest{ + Model: base.Model, + System: base.System, + Messages: []apicompat.AnthropicMessage{ + base.Messages[0], + {Role: "assistant", Content: mustRawJSON(t, `[{"type":"text","text":"Opened."}]`)}, + {Role: "user", Content: mustRawJSON(t, `[{"type":"text","text":"Run tests"}]`)}, + }, + } + + k1 := deriveAnthropicCompatPromptCacheKey(base, "gpt-5.4") + k2 := deriveAnthropicCompatPromptCacheKey(extended, "gpt-5.4") + require.NotEmpty(t, k1) + require.Equal(t, k1, k2) + require.True(t, strings.HasPrefix(k1, "anthropic-cache-")) + require.False(t, strings.HasPrefix(k1, compatPromptCacheKeyPrefix)) +} diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index 5f3bf5c1..aefa8fd2 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -40,12 +40,54 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( if err := json.Unmarshal(body, &anthropicReq); err != nil { return nil, fmt.Errorf("parse anthropic request: %w", err) } + anthropicDigestReq := cloneAnthropicRequestForDigest(&anthropicReq) originalModel := anthropicReq.Model applyOpenAICompatModelNormalization(&anthropicReq) normalizedModel := anthropicReq.Model clientStream := anthropicReq.Stream // client's original stream preference - // 2. Convert Anthropic → Responses + // 2. Model mapping + billingModel := resolveOpenAIForwardModel(account, normalizedModel, defaultMappedModel) + upstreamModel := normalizeOpenAIModelForUpstream(account, billingModel) + promptCacheKey = strings.TrimSpace(promptCacheKey) + apiKeyID := getAPIKeyIDFromContext(c) + anthropicDigestChain := "" + anthropicMatchedDigestChain := "" + compatPromptCacheInjected := false + if promptCacheKey == "" && shouldAutoInjectPromptCacheKeyForCompat(upstreamModel) { + promptCacheKey = promptCacheKeyFromAnthropicMetadataSession(&anthropicReq) + if promptCacheKey == "" { + promptCacheKey = deriveAnthropicCacheControlPromptCacheKey(&anthropicReq) + } + if promptCacheKey == "" { + anthropicDigestChain = buildOpenAICompatAnthropicDigestChain(anthropicDigestReq) + if reusedKey, matchedChain := s.findOpenAICompatAnthropicDigestPromptCacheKey(account, apiKeyID, anthropicDigestChain); reusedKey != "" { + promptCacheKey = reusedKey + anthropicMatchedDigestChain = matchedChain + } else { + promptCacheKey = promptCacheKeyFromAnthropicDigest(anthropicDigestChain) + } + } + compatPromptCacheInjected = promptCacheKey != "" + } + compatReplayTrimmed := false + compatReplayGuardEnabled := shouldAutoInjectPromptCacheKeyForCompat(upstreamModel) + compatContinuationEnabled := openAICompatContinuationEnabled(account, upstreamModel) + previousResponseID := "" + if compatContinuationEnabled { + previousResponseID = s.getOpenAICompatSessionResponseID(ctx, c, account, promptCacheKey) + } + compatContinuationDisabled := compatContinuationEnabled && + s.isOpenAICompatSessionContinuationDisabled(ctx, c, account, promptCacheKey) + compatTurnState := "" + // OAuth/Plus relies on session_id + x-codex-turn-state; trimming to a + // sliding 12-message window makes the cached prefix stall at system/tools. + // Keep full replay there so upstream prompt caching can grow turn by turn. + if compatReplayGuardEnabled && account.Type != AccountTypeOAuth && previousResponseID == "" && !compatContinuationDisabled { + compatReplayTrimmed = applyAnthropicCompatFullReplayGuard(&anthropicReq) + } + + // 3. Convert Anthropic → Responses after compatibility-only replay guard. responsesReq, err := apicompat.AnthropicToResponses(&anthropicReq) if err != nil { return nil, fmt.Errorf("convert anthropic to responses: %w", err) @@ -56,24 +98,50 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( responsesReq.Stream = true isStream := true - // 2b. Handle BetaFastMode → service_tier: "priority" + // 3b. Handle BetaFastMode → service_tier: "priority" if containsBetaToken(c.GetHeader("anthropic-beta"), claude.BetaFastMode) { responsesReq.ServiceTier = "priority" } - // 3. Model mapping - billingModel := resolveOpenAIForwardModel(account, normalizedModel, defaultMappedModel) - upstreamModel := normalizeOpenAIModelForUpstream(account, billingModel) responsesReq.Model = upstreamModel + if previousResponseID != "" { + responsesReq.PreviousResponseID = previousResponseID + trimAnthropicCompatResponsesInputToLatestTurn(responsesReq) + } + if compatReplayGuardEnabled && account.Type != AccountTypeOAuth { + appendOpenAICompatClaudeCodeTodoGuard(responsesReq) + } - logger.L().Debug("openai messages: model mapping applied", + logFields := []zap.Field{ zap.Int64("account_id", account.ID), zap.String("original_model", originalModel), zap.String("normalized_model", normalizedModel), zap.String("billing_model", billingModel), zap.String("upstream_model", upstreamModel), zap.Bool("stream", isStream), - ) + } + if compatPromptCacheInjected { + logFields = append(logFields, + zap.Bool("compat_prompt_cache_key_injected", true), + zap.String("compat_prompt_cache_key_sha256", hashSensitiveValueForLog(promptCacheKey)), + ) + } + if compatReplayTrimmed { + logFields = append(logFields, + zap.Bool("compat_full_replay_trimmed", true), + zap.Int("compat_messages_after_trim", len(anthropicReq.Messages)), + ) + } + if previousResponseID != "" { + logFields = append(logFields, + zap.Bool("compat_previous_response_id_attached", true), + zap.String("compat_previous_response_id", truncateOpenAIWSLogValue(previousResponseID, openAIWSIDValueMaxLen)), + ) + } + if compatTurnState != "" { + logFields = append(logFields, zap.Bool("compat_turn_state_attached", true)) + } + logger.L().Debug("openai messages: model mapping applied", logFields...) // 4. Marshal Responses request body, then apply OAuth codex transform responsesBody, err := json.Marshal(responsesReq) @@ -86,7 +154,10 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( if err := json.Unmarshal(responsesBody, &reqBody); err != nil { return nil, fmt.Errorf("unmarshal for codex transform: %w", err) } - codexResult := applyCodexOAuthTransform(reqBody, false, false) + codexResult := applyCodexOAuthTransformWithOptions(reqBody, codexOAuthTransformOptions{ + SkipDefaultInstructions: true, + PreserveToolCallIDs: true, + }) forcedTemplateText := "" if s.cfg != nil { forcedTemplateText = s.cfg.Gateway.ForcedCodexInstructionsTemplate @@ -96,6 +167,9 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( templateUpstreamModel = codexResult.NormalizedModel } existingInstructions, _ := reqBody["instructions"].(string) + if strings.TrimSpace(existingInstructions) == "" { + existingInstructions = extractPromptLikeInstructionsFromInput(reqBody) + } if _, err := applyForcedCodexInstructionsTemplate(reqBody, forcedTemplateText, forcedCodexInstructionsTemplateData{ ExistingInstructions: strings.TrimSpace(existingInstructions), OriginalModel: originalModel, @@ -105,13 +179,19 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( }); err != nil { return nil, err } + ensureCodexOAuthInstructionsField(reqBody) + if shouldAutoInjectPromptCacheKeyForCompat(upstreamModel) { + appendOpenAICompatClaudeCodeTodoGuardToRequestBody(reqBody) + } if codexResult.NormalizedModel != "" { upstreamModel = codexResult.NormalizedModel } if codexResult.PromptCacheKey != "" { promptCacheKey = codexResult.PromptCacheKey - } else if promptCacheKey != "" { - reqBody["prompt_cache_key"] = promptCacheKey + } + delete(reqBody, "prompt_cache_key") + if shouldAutoInjectPromptCacheKeyForCompat(upstreamModel) { + compatTurnState = s.getOpenAICompatSessionTurnState(ctx, c, account, promptCacheKey) } // OAuth codex transform forces stream=true upstream, so always use // the streaming response handler regardless of what the client asked. @@ -174,8 +254,25 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( // Override session_id with a deterministic UUID derived from the isolated // session key, ensuring different API keys produce different upstream sessions. if promptCacheKey != "" { - apiKeyID := getAPIKeyIDFromContext(c) - upstreamReq.Header.Set("session_id", generateSessionUUID(isolateOpenAISessionID(apiKeyID, promptCacheKey))) + isolatedSessionID := generateSessionUUID(isolateOpenAISessionID(apiKeyID, promptCacheKey)) + upstreamReq.Header.Set("session_id", isolatedSessionID) + if upstreamReq.Header.Get("conversation_id") != "" { + upstreamReq.Header.Set("conversation_id", isolatedSessionID) + } + } + if account.Type == AccountTypeOAuth { + // Anthropic Messages compatibility uses the ChatGPT Codex SSE endpoint. + // Match airgate-openai's request shape: the SSE endpoint does not need + // the Responses experimental beta header, and forcing originator can make + // ChatGPT select a different internal continuation path. + upstreamReq.Header.Del("OpenAI-Beta") + upstreamReq.Header.Del("originator") + } + if account.Type == AccountTypeOAuth && promptCacheKey != "" && strings.TrimSpace(c.GetHeader("conversation_id")) == "" { + upstreamReq.Header.Del("conversation_id") + } + if compatTurnState != "" && upstreamReq.Header.Get("x-codex-turn-state") == "" { + upstreamReq.Header.Set("x-codex-turn-state", compatTurnState) } // 7. Send request @@ -208,6 +305,19 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + if previousResponseID != "" && (isOpenAICompatPreviousResponseNotFound(resp.StatusCode, upstreamMsg, respBody) || isOpenAICompatPreviousResponseUnsupported(resp.StatusCode, upstreamMsg, respBody)) { + if isOpenAICompatPreviousResponseUnsupported(resp.StatusCode, upstreamMsg, respBody) { + s.disableOpenAICompatSessionContinuation(ctx, c, account, promptCacheKey) + } else { + s.deleteOpenAICompatSessionResponseID(ctx, c, account, promptCacheKey) + } + logger.L().Info("openai messages: previous_response_id unavailable, retrying without continuation", + zap.Int64("account_id", account.ID), + zap.String("previous_response_id", truncateOpenAIWSLogValue(previousResponseID, openAIWSIDValueMaxLen)), + zap.String("upstream_model", upstreamModel), + ) + return s.ForwardAsAnthropic(ctx, c, account, body, promptCacheKey, defaultMappedModel) + } if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) { upstreamDetail := "" if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { @@ -240,6 +350,12 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( return s.handleAnthropicErrorResponse(resp, c, account) } + if account.Type == AccountTypeOAuth && promptCacheKey != "" { + if turnState := strings.TrimSpace(resp.Header.Get("x-codex-turn-state")); turnState != "" { + s.bindOpenAICompatSessionTurnState(ctx, c, account, promptCacheKey, turnState) + } + } + // 9. Handle normal response // Upstream is always streaming; choose response format based on client preference. var result *OpenAIForwardResult @@ -253,6 +369,12 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( // Propagate ServiceTier and ReasoningEffort to result for billing if handleErr == nil && result != nil { + if compatContinuationEnabled && promptCacheKey != "" && result.ResponseID != "" { + s.bindOpenAICompatSessionResponseID(ctx, c, account, promptCacheKey, result.ResponseID) + } + if promptCacheKey != "" && anthropicDigestChain != "" { + s.bindOpenAICompatAnthropicDigestPromptCacheKey(account, apiKeyID, anthropicDigestChain, promptCacheKey, anthropicMatchedDigestChain) + } if responsesReq.ServiceTier != "" { st := responsesReq.ServiceTier result.ServiceTier = &st @@ -273,6 +395,19 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( return result, handleErr } +func ensureCodexOAuthInstructionsField(reqBody map[string]any) { + if reqBody == nil { + return + } + if value, ok := reqBody["instructions"]; !ok || value == nil { + reqBody["instructions"] = "" + return + } + if _, ok := reqBody["instructions"].(string); !ok { + reqBody["instructions"] = "" + } +} + // handleAnthropicErrorResponse reads an upstream error and returns it in // Anthropic error format. func (s *OpenAIGatewayService) handleAnthropicErrorResponse( @@ -322,6 +457,7 @@ func (s *OpenAIGatewayService) handleAnthropicBufferedStreamingResponse( return &OpenAIForwardResult{ RequestID: requestID, + ResponseID: finalResponse.ID, Usage: usage, Model: originalModel, BillingModel: billingModel, @@ -505,6 +641,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( state := apicompat.NewResponsesEventToAnthropicState() state.Model = originalModel var usage OpenAIUsage + responseID := "" var firstTokenMs *int firstChunk := true clientDisconnected := false @@ -534,6 +671,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( resultWithUsage := func() *OpenAIForwardResult { return &OpenAIForwardResult{ RequestID: requestID, + ResponseID: responseID, Usage: usage, Model: originalModel, BillingModel: billingModel, @@ -563,8 +701,13 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( // 仅按兼容转换器支持的终止事件提取 usage,避免无意扩大事件语义。 isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type) - if isTerminalEvent && event.Response != nil && event.Response.Usage != nil { - usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + if isTerminalEvent && event.Response != nil { + if id := strings.TrimSpace(event.Response.ID); id != "" { + responseID = id + } + if event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } } // Convert to Anthropic events diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index edd821ce..a5fe707d 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -211,9 +211,10 @@ type OpenAIUsage struct { // OpenAIForwardResult represents the result of forwarding type OpenAIForwardResult struct { - RequestID string - Usage OpenAIUsage - Model string // 原始模型(用于响应和日志显示) + RequestID string + ResponseID string + Usage OpenAIUsage + Model string // 原始模型(用于响应和日志显示) // BillingModel is the model used for cost calculation. // When non-empty, CalculateCost uses this instead of Model. // This is set by the Anthropic Messages conversion path where @@ -346,10 +347,12 @@ type OpenAIGatewayService struct { openaiWSPassthroughDialer openAIWSClientDialer openaiAccountStats *openAIAccountRuntimeStats - openaiWSFallbackUntil sync.Map // key: int64(accountID), value: time.Time - openaiWSRetryMetrics openAIWSRetryMetrics - responseHeaderFilter *responseheaders.CompiledHeaderFilter - codexSnapshotThrottle *accountWriteThrottle + openaiWSFallbackUntil sync.Map // key: int64(accountID), value: time.Time + openaiWSRetryMetrics openAIWSRetryMetrics + responseHeaderFilter *responseheaders.CompiledHeaderFilter + codexSnapshotThrottle *accountWriteThrottle + openaiCompatSessionResponses sync.Map + openaiCompatAnthropicDigestSessions sync.Map } // NewOpenAIGatewayService creates a new OpenAIGatewayService @@ -1992,6 +1995,8 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco originalBody := body reqModel, reqStream, promptCacheKey := extractOpenAIRequestMetaFromBody(body) originalModel := reqModel + compatMessagesBridge := isOpenAICompatMessagesBridgeBody(body) + setOpenAICompatMessagesBridgeContext(c, compatMessagesBridge) isCodexCLI := openai.IsCodexOfficialClientByHeaders(c.GetHeader("User-Agent"), c.GetHeader("originator")) || (s.cfg != nil && s.cfg.Gateway.ForceCodexCLI) wsDecision := s.getOpenAIWSProtocolResolver().Resolve(account) @@ -2117,7 +2122,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } // 非透传模式下,instructions 为空时注入默认指令。 - if isInstructionsEmpty(reqBody) { + if isInstructionsEmpty(reqBody) && !compatMessagesBridge { reqBody["instructions"] = "You are a helpful coding assistant." bodyModified = true markPatchSet("instructions", "You are a helpful coding assistant.") @@ -2246,7 +2251,20 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } if account.Type == AccountTypeOAuth { - codexResult := applyCodexOAuthTransform(reqBody, isCodexCLI, isCompactRequest) + codexResult := codexTransformResult{} + if compatMessagesBridge { + codexResult = applyCodexOAuthTransformWithOptions(reqBody, codexOAuthTransformOptions{ + IsCodexCLI: isCodexCLI, + IsCompact: isCompactRequest, + SkipDefaultInstructions: true, + PreserveToolCallIDs: true, + }) + ensureCodexOAuthInstructionsField(reqBody) + bodyModified = true + disablePatch() + } else { + codexResult = applyCodexOAuthTransform(reqBody, isCodexCLI, isCompactRequest) + } if codexResult.Modified { bodyModified = true disablePatch() @@ -3831,12 +3849,19 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. } } if account.Type == AccountTypeOAuth { + compatMessagesBridge := isOpenAICompatMessagesBridgeContext(c) || isOpenAICompatMessagesBridgeBody(body) // 清除客户端透传的 session 头,后续用隔离后的值重新设置,防止跨用户会话碰撞。 + clientConversationID := strings.TrimSpace(req.Header.Get("conversation_id")) req.Header.Del("conversation_id") req.Header.Del("session_id") - req.Header.Set("OpenAI-Beta", "responses=experimental") - req.Header.Set("originator", resolveOpenAIUpstreamOriginator(c, isCodexCLI)) + if compatMessagesBridge { + req.Header.Del("OpenAI-Beta") + req.Header.Del("originator") + } else { + req.Header.Set("OpenAI-Beta", "responses=experimental") + req.Header.Set("originator", resolveOpenAIUpstreamOriginator(c, isCodexCLI)) + } apiKeyID := getAPIKeyIDFromContext(c) if isOpenAIResponsesCompactPath(c) { req.Header.Set("accept", "application/json") @@ -3850,8 +3875,10 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. } if promptCacheKey != "" { isolated := isolateOpenAISessionID(apiKeyID, promptCacheKey) - req.Header.Set("conversation_id", isolated) req.Header.Set("session_id", isolated) + if !compatMessagesBridge || clientConversationID != "" { + req.Header.Set("conversation_id", isolated) + } } } diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index b55f0d2c..84a2fe71 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1822,6 +1822,29 @@ func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) require.NotEmpty(t, req.Header.Get("Session_Id")) } +func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","prompt_cache_key":"anthropic-metadata-session-1","input":[{"type":"message","role":"developer","content":[{"type":"input_text","text":""}]},{"type":"message","role":"user","content":"hello"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + c.Request.Header.Set("OpenAI-Beta", "responses=experimental") + c.Request.Header.Set("originator", "codex_cli_rs") + + svc := &OpenAIGatewayService{} + account := &Account{ + Type: AccountTypeOAuth, + Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"}, + } + + req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", true, "anthropic-metadata-session-1", false) + require.NoError(t, err) + require.NotEmpty(t, req.Header.Get("Session_Id")) + require.Empty(t, req.Header.Get("Conversation_Id")) + require.Empty(t, req.Header.Get("OpenAI-Beta")) + require.Empty(t, req.Header.Get("originator")) +} + func TestOpenAIBuildUpstreamRequestPreservesCompactPathForAPIKeyBaseURL(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/service/openai_messages_bridge.go b/backend/internal/service/openai_messages_bridge.go new file mode 100644 index 00000000..d67b4b1e --- /dev/null +++ b/backend/internal/service/openai_messages_bridge.go @@ -0,0 +1,57 @@ +package service + +import ( + "bytes" + "strings" + + "github.com/gin-gonic/gin" + "github.com/tidwall/gjson" +) + +const openAICompatMessagesBridgeContextKey = "openai_compat_messages_bridge" + +func isOpenAICompatMessagesBridgeBody(body []byte) bool { + if len(body) == 0 { + return false + } + if bytes.Contains(body, []byte(openAICompatClaudeCodeTodoGuardMarker)) { + return true + } + return isOpenAICompatMessagesBridgePromptCacheKey(gjson.GetBytes(body, "prompt_cache_key").String()) +} + +func isOpenAICompatMessagesBridgeRequestBody(reqBody map[string]any) bool { + if reqBody == nil { + return false + } + if input, ok := reqBody["input"].([]any); ok && inputContainsText(input, openAICompatClaudeCodeTodoGuardMarker) { + return true + } + return isOpenAICompatMessagesBridgePromptCacheKey(firstNonEmptyString(reqBody["prompt_cache_key"])) +} + +func isOpenAICompatMessagesBridgePromptCacheKey(key string) bool { + key = strings.TrimSpace(key) + return strings.HasPrefix(key, "anthropic-metadata-") || + strings.HasPrefix(key, "anthropic-cache-") || + strings.HasPrefix(key, "anthropic-digest-") +} + +func setOpenAICompatMessagesBridgeContext(c *gin.Context, enabled bool) { + if c == nil || !enabled { + return + } + c.Set(openAICompatMessagesBridgeContextKey, true) +} + +func isOpenAICompatMessagesBridgeContext(c *gin.Context) bool { + if c == nil { + return false + } + value, ok := c.Get(openAICompatMessagesBridgeContextKey) + if !ok { + return false + } + enabled, ok := value.(bool) + return ok && enabled +} diff --git a/backend/internal/service/openai_messages_continuation.go b/backend/internal/service/openai_messages_continuation.go new file mode 100644 index 00000000..57d04784 --- /dev/null +++ b/backend/internal/service/openai_messages_continuation.go @@ -0,0 +1,277 @@ +package service + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" + "github.com/gin-gonic/gin" + "github.com/tidwall/gjson" +) + +type openAICompatSessionResponseBinding struct { + ResponseID string + TurnState string + ContinuationDisabled bool + ExpiresAt time.Time +} + +func openAICompatContinuationEnabled(account *Account, model string) bool { + if account == nil || account.Type != AccountTypeAPIKey { + return false + } + return shouldAutoInjectPromptCacheKeyForCompat(model) +} + +func trimAnthropicCompatResponsesInputToLatestTurn(req *apicompat.ResponsesRequest) { + if req == nil || len(req.Input) == 0 { + return + } + + var items []apicompat.ResponsesInputItem + if err := json.Unmarshal(req.Input, &items); err != nil || len(items) == 0 { + return + } + + start := len(items) - 1 + for start > 0 && items[start].Type == "function_call_output" { + start-- + } + trimmed := append([]apicompat.ResponsesInputItem(nil), items[start:]...) + if len(trimmed) == len(items) { + return + } + if input, err := json.Marshal(trimmed); err == nil { + req.Input = input + } +} + +func isOpenAICompatPreviousResponseNotFound(statusCode int, upstreamMsg string, upstreamBody []byte) bool { + if statusCode != http.StatusBadRequest && statusCode != http.StatusNotFound { + return false + } + check := func(s string) bool { + lower := strings.ToLower(strings.TrimSpace(s)) + return strings.Contains(lower, "previous_response_not_found") || + (strings.Contains(lower, "previous response") && strings.Contains(lower, "not found")) || + (strings.Contains(lower, "unsupported parameter") && strings.Contains(lower, "previous_response_id")) + } + if check(upstreamMsg) || check(string(upstreamBody)) { + return true + } + return check(gjson.GetBytes(upstreamBody, "error.code").String()) || + check(gjson.GetBytes(upstreamBody, "error.message").String()) +} + +func isOpenAICompatPreviousResponseUnsupported(statusCode int, upstreamMsg string, upstreamBody []byte) bool { + if statusCode != http.StatusBadRequest { + return false + } + check := func(s string) bool { + lower := strings.ToLower(strings.TrimSpace(s)) + if !strings.Contains(lower, "previous_response_id") { + return false + } + return strings.Contains(lower, "unsupported parameter") || + strings.Contains(lower, "only supported on responses websocket") || + strings.Contains(lower, "not supported") + } + if check(upstreamMsg) || check(string(upstreamBody)) { + return true + } + return check(gjson.GetBytes(upstreamBody, "error.code").String()) || + check(gjson.GetBytes(upstreamBody, "error.message").String()) +} + +func openAICompatSessionResponseKey(c *gin.Context, account *Account, promptCacheKey string) string { + key := strings.TrimSpace(promptCacheKey) + if account == nil || key == "" { + return "" + } + apiKeyID := int64(0) + if c != nil { + apiKeyID = getAPIKeyIDFromContext(c) + } + return strings.Join([]string{ + strconv.FormatInt(account.ID, 10), + strconv.FormatInt(apiKeyID, 10), + key, + }, "\x00") +} + +func (s *OpenAIGatewayService) getOpenAICompatSessionResponseID(_ context.Context, c *gin.Context, account *Account, promptCacheKey string) string { + if s == nil { + return "" + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + if key == "" { + return "" + } + raw, ok := s.openaiCompatSessionResponses.Load(key) + if !ok { + return "" + } + binding, ok := raw.(openAICompatSessionResponseBinding) + if !ok { + s.openaiCompatSessionResponses.Delete(key) + return "" + } + if !binding.ExpiresAt.IsZero() && time.Now().After(binding.ExpiresAt) { + s.openaiCompatSessionResponses.Delete(key) + return "" + } + if binding.ContinuationDisabled { + return "" + } + if strings.TrimSpace(binding.ResponseID) == "" { + s.openaiCompatSessionResponses.Delete(key) + return "" + } + return strings.TrimSpace(binding.ResponseID) +} + +func (s *OpenAIGatewayService) bindOpenAICompatSessionResponseID(_ context.Context, c *gin.Context, account *Account, promptCacheKey, responseID string) { + if s == nil { + return + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + id := strings.TrimSpace(responseID) + if key == "" || id == "" { + return + } + binding := openAICompatSessionResponseBinding{ + ResponseID: id, + ExpiresAt: time.Now().Add(s.openAIWSResponseStickyTTL()), + } + if raw, ok := s.openaiCompatSessionResponses.Load(key); ok { + if existing, ok := raw.(openAICompatSessionResponseBinding); ok { + if existing.ContinuationDisabled { + existing.ResponseID = "" + existing.ExpiresAt = time.Now().Add(s.openAIWSResponseStickyTTL()) + s.openaiCompatSessionResponses.Store(key, existing) + return + } + binding.TurnState = existing.TurnState + } + } + s.openaiCompatSessionResponses.Store(key, binding) +} + +func (s *OpenAIGatewayService) deleteOpenAICompatSessionResponseID(_ context.Context, c *gin.Context, account *Account, promptCacheKey string) { + if s == nil { + return + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + if key == "" { + return + } + raw, ok := s.openaiCompatSessionResponses.Load(key) + if !ok { + return + } + binding, ok := raw.(openAICompatSessionResponseBinding) + if !ok { + s.openaiCompatSessionResponses.Delete(key) + return + } + binding.ResponseID = "" + if strings.TrimSpace(binding.TurnState) == "" && !binding.ContinuationDisabled { + s.openaiCompatSessionResponses.Delete(key) + return + } + binding.ExpiresAt = time.Now().Add(s.openAIWSResponseStickyTTL()) + s.openaiCompatSessionResponses.Store(key, binding) +} + +func (s *OpenAIGatewayService) disableOpenAICompatSessionContinuation(_ context.Context, c *gin.Context, account *Account, promptCacheKey string) { + if s == nil { + return + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + if key == "" { + return + } + binding := openAICompatSessionResponseBinding{ + ContinuationDisabled: true, + ExpiresAt: time.Now().Add(s.openAIWSResponseStickyTTL()), + } + if raw, ok := s.openaiCompatSessionResponses.Load(key); ok { + if existing, ok := raw.(openAICompatSessionResponseBinding); ok { + binding.TurnState = existing.TurnState + } + } + s.openaiCompatSessionResponses.Store(key, binding) +} + +func (s *OpenAIGatewayService) isOpenAICompatSessionContinuationDisabled(_ context.Context, c *gin.Context, account *Account, promptCacheKey string) bool { + if s == nil { + return false + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + if key == "" { + return false + } + raw, ok := s.openaiCompatSessionResponses.Load(key) + if !ok { + return false + } + binding, ok := raw.(openAICompatSessionResponseBinding) + if !ok { + s.openaiCompatSessionResponses.Delete(key) + return false + } + if !binding.ExpiresAt.IsZero() && time.Now().After(binding.ExpiresAt) { + s.openaiCompatSessionResponses.Delete(key) + return false + } + return binding.ContinuationDisabled +} + +func (s *OpenAIGatewayService) getOpenAICompatSessionTurnState(_ context.Context, c *gin.Context, account *Account, promptCacheKey string) string { + if s == nil { + return "" + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + if key == "" { + return "" + } + raw, ok := s.openaiCompatSessionResponses.Load(key) + if !ok { + return "" + } + binding, ok := raw.(openAICompatSessionResponseBinding) + if !ok || strings.TrimSpace(binding.TurnState) == "" { + return "" + } + if !binding.ExpiresAt.IsZero() && time.Now().After(binding.ExpiresAt) { + s.openaiCompatSessionResponses.Delete(key) + return "" + } + return strings.TrimSpace(binding.TurnState) +} + +func (s *OpenAIGatewayService) bindOpenAICompatSessionTurnState(_ context.Context, c *gin.Context, account *Account, promptCacheKey, turnState string) { + if s == nil { + return + } + key := openAICompatSessionResponseKey(c, account, promptCacheKey) + state := strings.TrimSpace(turnState) + if key == "" || state == "" { + return + } + binding := openAICompatSessionResponseBinding{ + TurnState: state, + ExpiresAt: time.Now().Add(s.openAIWSResponseStickyTTL()), + } + if raw, ok := s.openaiCompatSessionResponses.Load(key); ok { + if existing, ok := raw.(openAICompatSessionResponseBinding); ok { + binding.ResponseID = existing.ResponseID + binding.ContinuationDisabled = existing.ContinuationDisabled + } + } + s.openaiCompatSessionResponses.Store(key, binding) +} diff --git a/backend/internal/service/openai_messages_digest_session.go b/backend/internal/service/openai_messages_digest_session.go new file mode 100644 index 00000000..44a49d1e --- /dev/null +++ b/backend/internal/service/openai_messages_digest_session.go @@ -0,0 +1,135 @@ +package service + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" +) + +type openAICompatAnthropicDigestBinding struct { + PromptCacheKey string + ExpiresAt time.Time +} + +func buildOpenAICompatAnthropicDigestChain(req *apicompat.AnthropicRequest) string { + if req == nil { + return "" + } + + parts := make([]string, 0, len(req.Messages)+1) + if len(req.System) > 0 && strings.TrimSpace(string(req.System)) != "" && strings.TrimSpace(string(req.System)) != "null" { + parts = append(parts, "s:"+shortHash(req.System)) + } + for _, msg := range req.Messages { + content := msg.Content + if len(content) == 0 || strings.TrimSpace(string(content)) == "" { + continue + } + prefix := "u" + if strings.TrimSpace(msg.Role) == "assistant" { + prefix = "a" + } + parts = append(parts, prefix+":"+shortHash(content)) + } + return strings.Join(parts, "-") +} + +func openAICompatAnthropicDigestNamespace(account *Account, cAPIKeyID int64) string { + if account == nil || account.ID <= 0 { + return "" + } + return fmt.Sprintf("%d|%d|", account.ID, cAPIKeyID) +} + +func (s *OpenAIGatewayService) findOpenAICompatAnthropicDigestPromptCacheKey(account *Account, cAPIKeyID int64, digestChain string) (promptCacheKey string, matchedChain string) { + if s == nil || digestChain == "" { + return "", "" + } + ns := openAICompatAnthropicDigestNamespace(account, cAPIKeyID) + if ns == "" { + return "", "" + } + chain := digestChain + for { + if raw, ok := s.openaiCompatAnthropicDigestSessions.Load(ns + chain); ok { + if binding, ok := raw.(openAICompatAnthropicDigestBinding); ok { + if binding.ExpiresAt.IsZero() || time.Now().Before(binding.ExpiresAt) { + if key := strings.TrimSpace(binding.PromptCacheKey); key != "" { + return key, chain + } + } + } + s.openaiCompatAnthropicDigestSessions.Delete(ns + chain) + } + i := strings.LastIndex(chain, "-") + if i < 0 { + return "", "" + } + chain = chain[:i] + } +} + +func (s *OpenAIGatewayService) bindOpenAICompatAnthropicDigestPromptCacheKey(account *Account, cAPIKeyID int64, digestChain, promptCacheKey, oldDigestChain string) { + if s == nil || digestChain == "" || strings.TrimSpace(promptCacheKey) == "" { + return + } + ns := openAICompatAnthropicDigestNamespace(account, cAPIKeyID) + if ns == "" { + return + } + binding := openAICompatAnthropicDigestBinding{ + PromptCacheKey: strings.TrimSpace(promptCacheKey), + ExpiresAt: time.Now().Add(s.openAIWSResponseStickyTTL()), + } + s.openaiCompatAnthropicDigestSessions.Store(ns+digestChain, binding) + if oldDigestChain != "" && oldDigestChain != digestChain { + s.openaiCompatAnthropicDigestSessions.Delete(ns + oldDigestChain) + } +} + +func promptCacheKeyFromAnthropicDigest(digestChain string) string { + if strings.TrimSpace(digestChain) == "" { + return "" + } + return "anthropic-digest-" + hashSensitiveValueForLog(digestChain) +} + +func promptCacheKeyFromAnthropicMetadataSession(req *apicompat.AnthropicRequest) string { + if req == nil || len(req.Metadata) == 0 { + return "" + } + var metadata struct { + UserID string `json:"user_id"` + } + if err := json.Unmarshal(req.Metadata, &metadata); err != nil { + return "" + } + parsed := ParseMetadataUserID(metadata.UserID) + if parsed == nil || strings.TrimSpace(parsed.SessionID) == "" { + return "" + } + seed := strings.Join([]string{ + "anthropic-metadata", + strings.TrimSpace(parsed.DeviceID), + strings.TrimSpace(parsed.AccountUUID), + strings.TrimSpace(parsed.SessionID), + }, "|") + return "anthropic-metadata-" + hashSensitiveValueForLog(seed) +} + +func cloneAnthropicRequestForDigest(req *apicompat.AnthropicRequest) *apicompat.AnthropicRequest { + if req == nil { + return nil + } + cp := *req + if len(req.System) > 0 { + cp.System = append(json.RawMessage(nil), req.System...) + } + if len(req.Messages) > 0 { + cp.Messages = append([]apicompat.AnthropicMessage(nil), req.Messages...) + } + return &cp +} diff --git a/backend/internal/service/openai_messages_replay_guard.go b/backend/internal/service/openai_messages_replay_guard.go new file mode 100644 index 00000000..2ad9b6bc --- /dev/null +++ b/backend/internal/service/openai_messages_replay_guard.go @@ -0,0 +1,90 @@ +package service + +import ( + "encoding/json" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" +) + +const openAICompatAnthropicReplayMaxTailMessages = 12 + +func applyAnthropicCompatFullReplayGuard(req *apicompat.AnthropicRequest) bool { + if req == nil || len(req.Messages) <= openAICompatAnthropicReplayMaxTailMessages { + return false + } + + start := len(req.Messages) - openAICompatAnthropicReplayMaxTailMessages + start = expandAnthropicCompatTrimBoundary(req.Messages, start) + if start <= 0 { + return false + } + + req.Messages = append([]apicompat.AnthropicMessage(nil), req.Messages[start:]...) + return true +} + +func expandAnthropicCompatTrimBoundary(messages []apicompat.AnthropicMessage, start int) int { + if start <= 0 || start >= len(messages) { + return start + } + + toolUseIndex := make(map[string]int) + toolResultIndex := make(map[string]int) + for i, msg := range messages { + uses, results := anthropicCompatMessageToolIDs(msg) + for _, id := range uses { + if _, exists := toolUseIndex[id]; !exists { + toolUseIndex[id] = i + } + } + for _, id := range results { + if _, exists := toolResultIndex[id]; !exists { + toolResultIndex[id] = i + } + } + } + + for { + next := start + for i := start; i < len(messages); i++ { + uses, results := anthropicCompatMessageToolIDs(messages[i]) + for _, id := range results { + if useIdx, ok := toolUseIndex[id]; ok && useIdx < next { + next = useIdx + } + } + for _, id := range uses { + if resultIdx, ok := toolResultIndex[id]; ok && resultIdx < next { + next = resultIdx + } + } + } + if next == start { + return start + } + start = next + } +} + +func anthropicCompatMessageToolIDs(msg apicompat.AnthropicMessage) ([]string, []string) { + var blocks []apicompat.AnthropicContentBlock + if err := json.Unmarshal(msg.Content, &blocks); err != nil { + return nil, nil + } + + uses := make([]string, 0, 1) + results := make([]string, 0, 1) + for _, block := range blocks { + switch block.Type { + case "tool_use": + if block.ID != "" { + uses = append(uses, block.ID) + } + case "tool_result": + if block.ToolUseID != "" { + results = append(results, block.ToolUseID) + } + } + } + return uses, results +} diff --git a/backend/internal/service/openai_messages_replay_guard_test.go b/backend/internal/service/openai_messages_replay_guard_test.go new file mode 100644 index 00000000..6176beec --- /dev/null +++ b/backend/internal/service/openai_messages_replay_guard_test.go @@ -0,0 +1,58 @@ +package service + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" + "github.com/stretchr/testify/require" +) + +func TestApplyAnthropicCompatFullReplayGuard_TrimsOldMessages(t *testing.T) { + t.Parallel() + + req := &apicompat.AnthropicRequest{Messages: make([]apicompat.AnthropicMessage, 0, openAICompatAnthropicReplayMaxTailMessages+3)} + for i := 0; i < openAICompatAnthropicReplayMaxTailMessages+3; i++ { + req.Messages = append(req.Messages, apicompat.AnthropicMessage{ + Role: "user", + Content: json.RawMessage(fmt.Sprintf(`"message-%02d"`, i)), + }) + } + + trimmed := applyAnthropicCompatFullReplayGuard(req) + + require.True(t, trimmed) + require.Len(t, req.Messages, openAICompatAnthropicReplayMaxTailMessages) + require.JSONEq(t, `"message-03"`, string(req.Messages[0].Content)) + require.JSONEq(t, `"message-14"`, string(req.Messages[len(req.Messages)-1].Content)) +} + +func TestApplyAnthropicCompatFullReplayGuard_KeepsToolBoundaryIntact(t *testing.T) { + t.Parallel() + + req := &apicompat.AnthropicRequest{Messages: make([]apicompat.AnthropicMessage, 0, openAICompatAnthropicReplayMaxTailMessages+3)} + for i := 0; i < openAICompatAnthropicReplayMaxTailMessages+3; i++ { + role := "user" + content := json.RawMessage(fmt.Sprintf(`"message-%02d"`, i)) + if i == 1 { + role = "assistant" + content = json.RawMessage(`[{"type":"tool_use","id":"toolu_keep","name":"Read","input":{"file_path":"main.go"}}]`) + } + if i == 3 { + content = json.RawMessage(`[{"type":"tool_result","tool_use_id":"toolu_keep","content":"ok"}]`) + } + req.Messages = append(req.Messages, apicompat.AnthropicMessage{ + Role: role, + Content: content, + }) + } + + trimmed := applyAnthropicCompatFullReplayGuard(req) + + require.True(t, trimmed) + require.Len(t, req.Messages, openAICompatAnthropicReplayMaxTailMessages+2) + require.Equal(t, "assistant", req.Messages[0].Role) + require.Contains(t, string(req.Messages[0].Content), `"toolu_keep"`) + require.Contains(t, string(req.Messages[2].Content), `"tool_result"`) +} diff --git a/backend/internal/service/openai_messages_todo_guard.go b/backend/internal/service/openai_messages_todo_guard.go new file mode 100644 index 00000000..96fc90cb --- /dev/null +++ b/backend/internal/service/openai_messages_todo_guard.go @@ -0,0 +1,121 @@ +package service + +import ( + "encoding/json" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" +) + +const ( + openAICompatClaudeCodeTodoGuardMarker = "" + openAICompatClaudeCodeTodoGuardText = openAICompatClaudeCodeTodoGuardMarker + "\nWhen using Claude Code todo or task tracking tools, keep the visible task list consistent. Do not send final or summary text while any item remains in_progress. Before finishing, asking the user to choose, or reporting a blocker, update the todo list so completed work is completed and deferred work is pending/open; leave an item in_progress only when active work will continue in the same turn.\n" +) + +func appendOpenAICompatClaudeCodeTodoGuard(req *apicompat.ResponsesRequest) bool { + if req == nil || len(req.Input) == 0 { + return false + } + + var items []apicompat.ResponsesInputItem + if err := json.Unmarshal(req.Input, &items); err != nil { + return false + } + if len(items) == 0 || responsesInputItemsContainText(items, openAICompatClaudeCodeTodoGuardMarker) { + return false + } + + content, err := json.Marshal([]apicompat.ResponsesContentPart{{ + Type: "input_text", + Text: openAICompatClaudeCodeTodoGuardText, + }}) + if err != nil { + return false + } + + guard := apicompat.ResponsesInputItem{ + Type: "message", + Role: "developer", + Content: content, + } + + insertAt := 0 + for insertAt < len(items) && items[insertAt].Type == "message" && items[insertAt].Role == "developer" { + insertAt++ + } + + items = append(items, apicompat.ResponsesInputItem{}) + copy(items[insertAt+1:], items[insertAt:]) + items[insertAt] = guard + + input, err := json.Marshal(items) + if err != nil { + return false + } + req.Input = input + return true +} + +func appendOpenAICompatClaudeCodeTodoGuardToRequestBody(reqBody map[string]any) bool { + if reqBody == nil { + return false + } + + input, ok := reqBody["input"].([]any) + if !ok || len(input) == 0 || inputContainsText(input, openAICompatClaudeCodeTodoGuardMarker) { + return false + } + + guard := map[string]any{ + "type": "message", + "role": "developer", + "content": []any{ + map[string]any{ + "type": "input_text", + "text": openAICompatClaudeCodeTodoGuardText, + }, + }, + } + + insertAt := 0 + for insertAt < len(input) { + item, ok := input[insertAt].(map[string]any) + if !ok || strings.TrimSpace(firstNonEmptyString(item["type"])) != "message" || strings.TrimSpace(firstNonEmptyString(item["role"])) != "developer" { + break + } + insertAt++ + } + + input = append(input, nil) + copy(input[insertAt+1:], input[insertAt:]) + input[insertAt] = guard + reqBody["input"] = input + return true +} + +func responsesInputItemsContainText(items []apicompat.ResponsesInputItem, needle string) bool { + needle = strings.TrimSpace(needle) + if needle == "" { + return false + } + for _, item := range items { + if strings.Contains(string(item.Content), needle) { + return true + } + } + return false +} + +func inputContainsText(input []any, needle string) bool { + needle = strings.TrimSpace(needle) + if needle == "" { + return false + } + for _, item := range input { + b, err := json.Marshal(item) + if err == nil && strings.Contains(string(b), needle) { + return true + } + } + return false +} diff --git a/backend/internal/service/openai_oauth_passthrough_test.go b/backend/internal/service/openai_oauth_passthrough_test.go index cc9fc572..398cbb85 100644 --- a/backend/internal/service/openai_oauth_passthrough_test.go +++ b/backend/internal/service/openai_oauth_passthrough_test.go @@ -25,9 +25,12 @@ func f64p(v float64) *float64 { return &v } type httpUpstreamRecorder struct { lastReq *http.Request lastBody []byte + requests []*http.Request + bodies [][]byte - resp *http.Response - err error + resp *http.Response + responses []*http.Response + err error } func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { @@ -35,12 +38,19 @@ func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID if req != nil && req.Body != nil { b, _ := io.ReadAll(req.Body) u.lastBody = b + u.bodies = append(u.bodies, append([]byte(nil), b...)) _ = req.Body.Close() req.Body = io.NopCloser(bytes.NewReader(b)) } + u.requests = append(u.requests, req) if u.err != nil { return nil, u.err } + if len(u.responses) > 0 { + resp := u.responses[0] + u.responses = u.responses[1:] + return resp, nil + } return u.resp, nil } @@ -91,6 +101,50 @@ func TestOpenAIGatewayService_ResponsesUnknownModelDoesNotFallbackToGPT54(t *tes require.True(t, rec.Code >= http.StatusBadRequest) } +func TestOpenAIGatewayService_OAuthMessagesBridgeDoesNotInjectDefaultInstructions(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + originalBody := []byte(`{"model":"gpt-5.5","stream":true,"prompt_cache_key":"anthropic-metadata-session-1","input":[{"type":"message","role":"developer","content":[{"type":"input_text","text":""}]},{"type":"message","role":"user","content":"hello"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(originalBody)) + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid_bridge"}}, + Body: io.NopCloser(strings.NewReader(`{"error":{"type":"invalid_request_error","message":"bridge stop"}}`)), + }} + svc := &OpenAIGatewayService{ + cfg: &config.Config{}, + httpUpstream: upstream, + } + account := &Account{ + ID: 123, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + Status: StatusActive, + Schedulable: true, + } + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.Error(t, err) + require.Nil(t, result) + require.NotNil(t, upstream.lastReq) + require.Equal(t, "", gjson.GetBytes(upstream.lastBody, "instructions").String()) + require.False(t, gjson.GetBytes(upstream.lastBody, "prompt_cache_key").Exists()) + require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id")) + require.Empty(t, upstream.lastReq.Header.Get("Conversation_Id")) + require.Empty(t, upstream.lastReq.Header.Get("OpenAI-Beta")) + require.Empty(t, upstream.lastReq.Header.Get("originator")) +} + type openAIPassthroughFailoverRepo struct { stubOpenAIAccountRepo rateLimitCalls []time.Time