diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 2da979b5..dfe80c43 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -424,6 +424,262 @@ func (h *OpenAIGatewayHandler) logOpenAIRemoteCompactOutcome(c *gin.Context, sta log.Warn("codex.remote_compact.failed") } +// Messages handles Anthropic Messages API requests routed to OpenAI platform. +// POST /v1/messages (when group platform is OpenAI) +func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { + streamStarted := false + defer h.recoverResponsesPanic(c, &streamStarted) + + requestStart := time.Now() + + apiKey, ok := middleware2.GetAPIKeyFromContext(c) + if !ok { + h.anthropicErrorResponse(c, http.StatusUnauthorized, "authentication_error", "Invalid API key") + return + } + + subject, ok := middleware2.GetAuthSubjectFromContext(c) + if !ok { + h.anthropicErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found") + return + } + reqLog := requestLogger( + c, + "handler.openai_gateway.messages", + zap.Int64("user_id", subject.UserID), + zap.Int64("api_key_id", apiKey.ID), + zap.Any("group_id", apiKey.GroupID), + ) + if !h.ensureResponsesDependencies(c, reqLog) { + return + } + + body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request) + if err != nil { + if maxErr, ok := extractMaxBytesError(err); ok { + h.anthropicErrorResponse(c, http.StatusRequestEntityTooLarge, "invalid_request_error", buildBodyTooLargeMessage(maxErr.Limit)) + return + } + h.anthropicErrorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to read request body") + return + } + if len(body) == 0 { + h.anthropicErrorResponse(c, http.StatusBadRequest, "invalid_request_error", "Request body is empty") + return + } + + if !gjson.ValidBytes(body) { + h.anthropicErrorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body") + return + } + + modelResult := gjson.GetBytes(body, "model") + if !modelResult.Exists() || modelResult.Type != gjson.String || modelResult.String() == "" { + h.anthropicErrorResponse(c, http.StatusBadRequest, "invalid_request_error", "model is required") + return + } + reqModel := modelResult.String() + reqStream := gjson.GetBytes(body, "stream").Bool() + + reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream)) + + setOpsRequestContext(c, reqModel, reqStream, body) + + subscription, _ := middleware2.GetSubscriptionFromContext(c) + + service.SetOpsLatencyMs(c, service.OpsAuthLatencyMsKey, time.Since(requestStart).Milliseconds()) + routingStart := time.Now() + + userReleaseFunc, acquired := h.acquireResponsesUserSlot(c, subject.UserID, subject.Concurrency, reqStream, &streamStarted, reqLog) + if !acquired { + return + } + if userReleaseFunc != nil { + defer userReleaseFunc() + } + + if err := h.billingCacheService.CheckBillingEligibility(c.Request.Context(), apiKey.User, apiKey, apiKey.Group, subscription); err != nil { + reqLog.Info("openai_messages.billing_eligibility_check_failed", zap.Error(err)) + status, code, message := billingErrorDetails(err) + h.anthropicStreamingAwareError(c, status, code, message, streamStarted) + return + } + + sessionHash := h.gatewayService.GenerateSessionHash(c, body) + promptCacheKey := h.gatewayService.ExtractSessionID(c, body) + + maxAccountSwitches := h.maxAccountSwitches + switchCount := 0 + failedAccountIDs := make(map[int64]struct{}) + var lastFailoverErr *service.UpstreamFailoverError + + for { + reqLog.Debug("openai_messages.account_selecting", zap.Int("excluded_account_count", len(failedAccountIDs))) + selection, scheduleDecision, err := h.gatewayService.SelectAccountWithScheduler( + c.Request.Context(), + apiKey.GroupID, + "", // no previous_response_id + sessionHash, + reqModel, + failedAccountIDs, + service.OpenAIUpstreamTransportAny, + ) + if err != nil { + reqLog.Warn("openai_messages.account_select_failed", + zap.Error(err), + zap.Int("excluded_account_count", len(failedAccountIDs)), + ) + if len(failedAccountIDs) == 0 { + h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted) + return + } + if lastFailoverErr != nil { + h.handleAnthropicFailoverExhausted(c, lastFailoverErr, streamStarted) + } else { + h.anthropicStreamingAwareError(c, http.StatusBadGateway, "api_error", "Upstream request failed", streamStarted) + } + return + } + if selection == nil || selection.Account == nil { + h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted) + return + } + account := selection.Account + reqLog.Debug("openai_messages.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name)) + _ = scheduleDecision + setOpsSelectedAccount(c, account.ID, account.Platform) + + accountReleaseFunc, acquired := h.acquireResponsesAccountSlot(c, apiKey.GroupID, sessionHash, selection, reqStream, &streamStarted, reqLog) + if !acquired { + return + } + + service.SetOpsLatencyMs(c, service.OpsRoutingLatencyMsKey, time.Since(routingStart).Milliseconds()) + forwardStart := time.Now() + + result, err := h.gatewayService.ForwardAsAnthropic(c.Request.Context(), c, account, body, promptCacheKey) + + forwardDurationMs := time.Since(forwardStart).Milliseconds() + if accountReleaseFunc != nil { + accountReleaseFunc() + } + upstreamLatencyMs, _ := getContextInt64(c, service.OpsUpstreamLatencyMsKey) + responseLatencyMs := forwardDurationMs + if upstreamLatencyMs > 0 && forwardDurationMs > upstreamLatencyMs { + responseLatencyMs = forwardDurationMs - upstreamLatencyMs + } + service.SetOpsLatencyMs(c, service.OpsResponseLatencyMsKey, responseLatencyMs) + if err == nil && result != nil && result.FirstTokenMs != nil { + service.SetOpsLatencyMs(c, service.OpsTimeToFirstTokenMsKey, int64(*result.FirstTokenMs)) + } + if err != nil { + var failoverErr *service.UpstreamFailoverError + if errors.As(err, &failoverErr) { + h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil) + h.gatewayService.RecordOpenAIAccountSwitch() + failedAccountIDs[account.ID] = struct{}{} + lastFailoverErr = failoverErr + if switchCount >= maxAccountSwitches { + h.handleAnthropicFailoverExhausted(c, failoverErr, streamStarted) + return + } + switchCount++ + reqLog.Warn("openai_messages.upstream_failover_switching", + zap.Int64("account_id", account.ID), + zap.Int("upstream_status", failoverErr.StatusCode), + zap.Int("switch_count", switchCount), + zap.Int("max_switches", maxAccountSwitches), + ) + continue + } + h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil) + wroteFallback := h.ensureAnthropicErrorResponse(c, streamStarted) + reqLog.Warn("openai_messages.forward_failed", + zap.Int64("account_id", account.ID), + zap.Bool("fallback_error_response_written", wroteFallback), + zap.Error(err), + ) + return + } + if result != nil { + h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, result.FirstTokenMs) + } else { + h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, nil) + } + + userAgent := c.GetHeader("User-Agent") + clientIP := ip.GetClientIP(c) + + h.submitUsageRecordTask(func(ctx context.Context) { + if err := h.gatewayService.RecordUsage(ctx, &service.OpenAIRecordUsageInput{ + Result: result, + APIKey: apiKey, + User: apiKey.User, + Account: account, + Subscription: subscription, + UserAgent: userAgent, + IPAddress: clientIP, + APIKeyService: h.apiKeyService, + }); err != nil { + logger.L().With( + zap.String("component", "handler.openai_gateway.messages"), + zap.Int64("user_id", subject.UserID), + zap.Int64("api_key_id", apiKey.ID), + zap.Any("group_id", apiKey.GroupID), + zap.String("model", reqModel), + zap.Int64("account_id", account.ID), + ).Error("openai_messages.record_usage_failed", zap.Error(err)) + } + }) + reqLog.Debug("openai_messages.request_completed", + zap.Int64("account_id", account.ID), + zap.Int("switch_count", switchCount), + ) + return + } +} + +// anthropicErrorResponse writes an error in Anthropic Messages API format. +func (h *OpenAIGatewayHandler) anthropicErrorResponse(c *gin.Context, status int, errType, message string) { + c.JSON(status, gin.H{ + "type": "error", + "error": gin.H{ + "type": errType, + "message": message, + }, + }) +} + +// anthropicStreamingAwareError handles errors that may occur during streaming, +// using Anthropic SSE error format. +func (h *OpenAIGatewayHandler) anthropicStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { + if streamStarted { + flusher, ok := c.Writer.(http.Flusher) + if ok { + errorEvent := "event: error\ndata: " + `{"type":"error","error":{"type":` + strconv.Quote(errType) + `,"message":` + strconv.Quote(message) + `}}` + "\n\n" + fmt.Fprint(c.Writer, errorEvent) //nolint:errcheck + flusher.Flush() + } + return + } + h.anthropicErrorResponse(c, status, errType, message) +} + +// handleAnthropicFailoverExhausted maps upstream failover errors to Anthropic format. +func (h *OpenAIGatewayHandler) handleAnthropicFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, streamStarted bool) { + status, errType, errMsg := h.mapUpstreamError(failoverErr.StatusCode) + h.anthropicStreamingAwareError(c, status, errType, errMsg, streamStarted) +} + +// ensureAnthropicErrorResponse writes a fallback Anthropic error if no response was written. +func (h *OpenAIGatewayHandler) ensureAnthropicErrorResponse(c *gin.Context, streamStarted bool) bool { + if c == nil || c.Writer == nil || c.Writer.Written() { + return false + } + h.anthropicStreamingAwareError(c, http.StatusBadGateway, "api_error", "Upstream request failed", streamStarted) + return true +} + func (h *OpenAIGatewayHandler) validateFunctionCallOutputRequest(c *gin.Context, body []byte, reqLog *zap.Logger) bool { if !gjson.GetBytes(body, `input.#(type=="function_call_output")`).Exists() { return true diff --git a/backend/internal/pkg/apicompat/anthropic_responses_test.go b/backend/internal/pkg/apicompat/anthropic_responses_test.go new file mode 100644 index 00000000..20eeb969 --- /dev/null +++ b/backend/internal/pkg/apicompat/anthropic_responses_test.go @@ -0,0 +1,534 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// AnthropicToResponses tests +// --------------------------------------------------------------------------- + +func TestAnthropicToResponses_BasicText(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 1024, + Stream: true, + Messages: []AnthropicMessage{ + {Role: "user", Content: json.RawMessage(`"Hello"`)}, + }, + } + + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + assert.Equal(t, "gpt-5.2", resp.Model) + assert.True(t, resp.Stream) + assert.Equal(t, 1024, *resp.MaxOutputTokens) + assert.False(t, *resp.Store) + + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + require.Len(t, items, 1) + assert.Equal(t, "user", items[0].Role) +} + +func TestAnthropicToResponses_SystemPrompt(t *testing.T) { + t.Run("string", func(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 100, + System: json.RawMessage(`"You are helpful."`), + Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hi"`)}}, + } + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + require.Len(t, items, 2) + assert.Equal(t, "system", items[0].Role) + }) + + t.Run("array", func(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 100, + System: json.RawMessage(`[{"type":"text","text":"Part 1"},{"type":"text","text":"Part 2"}]`), + Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hi"`)}}, + } + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + require.Len(t, items, 2) + assert.Equal(t, "system", items[0].Role) + // System text should be joined with double newline. + var text string + require.NoError(t, json.Unmarshal(items[0].Content, &text)) + assert.Equal(t, "Part 1\n\nPart 2", text) + }) +} + +func TestAnthropicToResponses_ToolUse(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 1024, + Messages: []AnthropicMessage{ + {Role: "user", Content: json.RawMessage(`"What is the weather?"`)}, + {Role: "assistant", Content: json.RawMessage(`[{"type":"text","text":"Let me check."},{"type":"tool_use","id":"call_1","name":"get_weather","input":{"city":"NYC"}}]`)}, + {Role: "user", Content: json.RawMessage(`[{"type":"tool_result","tool_use_id":"call_1","content":"Sunny, 72°F"}]`)}, + }, + Tools: []AnthropicTool{ + {Name: "get_weather", Description: "Get weather", InputSchema: json.RawMessage(`{"type":"object","properties":{"city":{"type":"string"}}}`)}, + }, + } + + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + + // Check tools + require.Len(t, resp.Tools, 1) + assert.Equal(t, "function", resp.Tools[0].Type) + assert.Equal(t, "get_weather", resp.Tools[0].Name) + + // Check input items + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + // user + assistant + function_call + function_call_output = 4 + require.Len(t, items, 4) + + assert.Equal(t, "user", items[0].Role) + assert.Equal(t, "assistant", items[1].Role) + assert.Equal(t, "function_call", items[2].Type) + assert.Equal(t, "fc_call_1", items[2].CallID) + assert.Equal(t, "function_call_output", items[3].Type) + assert.Equal(t, "fc_call_1", items[3].CallID) + assert.Equal(t, "Sunny, 72°F", items[3].Output) +} + +func TestAnthropicToResponses_ThinkingIgnored(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 1024, + Messages: []AnthropicMessage{ + {Role: "user", Content: json.RawMessage(`"Hello"`)}, + {Role: "assistant", Content: json.RawMessage(`[{"type":"thinking","thinking":"deep thought"},{"type":"text","text":"Hi!"}]`)}, + {Role: "user", Content: json.RawMessage(`"More"`)}, + }, + } + + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + + var items []ResponsesInputItem + require.NoError(t, json.Unmarshal(resp.Input, &items)) + // user + assistant(text only, thinking ignored) + user = 3 + require.Len(t, items, 3) + assert.Equal(t, "assistant", items[1].Role) + // Assistant content should only have text, not thinking. + var parts []ResponsesContentPart + require.NoError(t, json.Unmarshal(items[1].Content, &parts)) + require.Len(t, parts, 1) + assert.Equal(t, "output_text", parts[0].Type) + assert.Equal(t, "Hi!", parts[0].Text) +} + +func TestAnthropicToResponses_MaxTokensFloor(t *testing.T) { + req := &AnthropicRequest{ + Model: "gpt-5.2", + MaxTokens: 10, // below minMaxOutputTokens (128) + Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hi"`)}}, + } + + resp, err := AnthropicToResponses(req) + require.NoError(t, err) + assert.Equal(t, 128, *resp.MaxOutputTokens) +} + +// --------------------------------------------------------------------------- +// ResponsesToAnthropic (non-streaming) tests +// --------------------------------------------------------------------------- + +func TestResponsesToAnthropic_TextOnly(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_123", + Model: "gpt-5.2", + Status: "completed", + Output: []ResponsesOutput{ + { + Type: "message", + Content: []ResponsesContentPart{ + {Type: "output_text", Text: "Hello there!"}, + }, + }, + }, + Usage: &ResponsesUsage{InputTokens: 10, OutputTokens: 5, TotalTokens: 15}, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + assert.Equal(t, "resp_123", anth.ID) + assert.Equal(t, "claude-opus-4-6", anth.Model) + assert.Equal(t, "end_turn", anth.StopReason) + require.Len(t, anth.Content, 1) + assert.Equal(t, "text", anth.Content[0].Type) + assert.Equal(t, "Hello there!", anth.Content[0].Text) + assert.Equal(t, 10, anth.Usage.InputTokens) + assert.Equal(t, 5, anth.Usage.OutputTokens) +} + +func TestResponsesToAnthropic_ToolUse(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_456", + Model: "gpt-5.2", + Status: "completed", + Output: []ResponsesOutput{ + { + Type: "message", + Content: []ResponsesContentPart{ + {Type: "output_text", Text: "Let me check."}, + }, + }, + { + Type: "function_call", + CallID: "call_1", + Name: "get_weather", + Arguments: `{"city":"NYC"}`, + }, + }, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + assert.Equal(t, "tool_use", anth.StopReason) + require.Len(t, anth.Content, 2) + assert.Equal(t, "text", anth.Content[0].Type) + assert.Equal(t, "tool_use", anth.Content[1].Type) + assert.Equal(t, "call_1", anth.Content[1].ID) + assert.Equal(t, "get_weather", anth.Content[1].Name) +} + +func TestResponsesToAnthropic_Reasoning(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_789", + Model: "gpt-5.2", + Status: "completed", + Output: []ResponsesOutput{ + { + Type: "reasoning", + Summary: []ResponsesSummary{ + {Type: "summary_text", Text: "Thinking about the answer..."}, + }, + }, + { + Type: "message", + Content: []ResponsesContentPart{ + {Type: "output_text", Text: "42"}, + }, + }, + }, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + require.Len(t, anth.Content, 2) + assert.Equal(t, "thinking", anth.Content[0].Type) + assert.Equal(t, "Thinking about the answer...", anth.Content[0].Thinking) + assert.Equal(t, "text", anth.Content[1].Type) + assert.Equal(t, "42", anth.Content[1].Text) +} + +func TestResponsesToAnthropic_Incomplete(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_inc", + Model: "gpt-5.2", + Status: "incomplete", + IncompleteDetails: &ResponsesIncompleteDetails{ + Reason: "max_output_tokens", + }, + Output: []ResponsesOutput{ + { + Type: "message", + Content: []ResponsesContentPart{{Type: "output_text", Text: "Partial..."}}, + }, + }, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + assert.Equal(t, "max_tokens", anth.StopReason) +} + +func TestResponsesToAnthropic_EmptyOutput(t *testing.T) { + resp := &ResponsesResponse{ + ID: "resp_empty", + Model: "gpt-5.2", + Status: "completed", + Output: []ResponsesOutput{}, + } + + anth := ResponsesToAnthropic(resp, "claude-opus-4-6") + require.Len(t, anth.Content, 1) + assert.Equal(t, "text", anth.Content[0].Type) + assert.Equal(t, "", anth.Content[0].Text) +} + +// --------------------------------------------------------------------------- +// Streaming: ResponsesEventToAnthropicEvents tests +// --------------------------------------------------------------------------- + +func TestStreamingTextOnly(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + // 1. response.created + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ + ID: "resp_1", + Model: "gpt-5.2", + }, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "message_start", events[0].Type) + + // 2. output_item.added (message) + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "message"}, + }, state) + assert.Len(t, events, 0) // message item doesn't emit events + + // 3. text delta + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.delta", + Delta: "Hello", + }, state) + require.Len(t, events, 2) // content_block_start + content_block_delta + assert.Equal(t, "content_block_start", events[0].Type) + assert.Equal(t, "text", events[0].ContentBlock.Type) + assert.Equal(t, "content_block_delta", events[1].Type) + assert.Equal(t, "text_delta", events[1].Delta.Type) + assert.Equal(t, "Hello", events[1].Delta.Text) + + // 4. more text + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.delta", + Delta: " world", + }, state) + require.Len(t, events, 1) // only delta, no new block start + assert.Equal(t, "content_block_delta", events[0].Type) + + // 5. text done + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.done", + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_stop", events[0].Type) + + // 6. completed + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + Usage: &ResponsesUsage{InputTokens: 10, OutputTokens: 5}, + }, + }, state) + require.Len(t, events, 2) // message_delta + message_stop + assert.Equal(t, "message_delta", events[0].Type) + assert.Equal(t, "end_turn", events[0].Delta.StopReason) + assert.Equal(t, 10, events[0].Usage.InputTokens) + assert.Equal(t, 5, events[0].Usage.OutputTokens) + assert.Equal(t, "message_stop", events[1].Type) +} + +func TestStreamingToolCall(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + // 1. response.created + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_2", Model: "gpt-5.2"}, + }, state) + + // 2. function_call added + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "function_call", CallID: "call_1", Name: "get_weather"}, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_start", events[0].Type) + assert.Equal(t, "tool_use", events[0].ContentBlock.Type) + assert.Equal(t, "call_1", events[0].ContentBlock.ID) + + // 3. arguments delta + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.delta", + OutputIndex: 0, + Delta: `{"city":`, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_delta", events[0].Type) + assert.Equal(t, "input_json_delta", events[0].Delta.Type) + assert.Equal(t, `{"city":`, events[0].Delta.PartialJSON) + + // 4. arguments done + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.done", + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_stop", events[0].Type) + + // 5. completed with tool_calls + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + Usage: &ResponsesUsage{InputTokens: 20, OutputTokens: 10}, + }, + }, state) + require.Len(t, events, 2) + assert.Equal(t, "tool_use", events[0].Delta.StopReason) +} + +func TestStreamingReasoning(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_3", Model: "gpt-5.2"}, + }, state) + + // reasoning item added + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "reasoning"}, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_start", events[0].Type) + assert.Equal(t, "thinking", events[0].ContentBlock.Type) + + // reasoning text delta + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.reasoning_summary_text.delta", + OutputIndex: 0, + Delta: "Let me think...", + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_delta", events[0].Type) + assert.Equal(t, "thinking_delta", events[0].Delta.Type) + assert.Equal(t, "Let me think...", events[0].Delta.Thinking) + + // reasoning done + events = ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.reasoning_summary_text.done", + }, state) + require.Len(t, events, 1) + assert.Equal(t, "content_block_stop", events[0].Type) +} + +func TestStreamingIncomplete(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_4", Model: "gpt-5.2"}, + }, state) + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.delta", + Delta: "Partial output...", + }, state) + + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.incomplete", + Response: &ResponsesResponse{ + Status: "incomplete", + IncompleteDetails: &ResponsesIncompleteDetails{Reason: "max_output_tokens"}, + Usage: &ResponsesUsage{InputTokens: 100, OutputTokens: 4096}, + }, + }, state) + + // Should close the text block + message_delta + message_stop + require.Len(t, events, 3) + assert.Equal(t, "content_block_stop", events[0].Type) + assert.Equal(t, "message_delta", events[1].Type) + assert.Equal(t, "max_tokens", events[1].Delta.StopReason) + assert.Equal(t, "message_stop", events[2].Type) +} + +func TestFinalizeStream_NeverStarted(t *testing.T) { + state := NewResponsesEventToAnthropicState() + events := FinalizeResponsesAnthropicStream(state) + assert.Nil(t, events) +} + +func TestFinalizeStream_AlreadyCompleted(t *testing.T) { + state := NewResponsesEventToAnthropicState() + state.MessageStartSent = true + state.MessageStopSent = true + events := FinalizeResponsesAnthropicStream(state) + assert.Nil(t, events) +} + +func TestFinalizeStream_AbnormalTermination(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + // Simulate a stream that started but never completed + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_5", Model: "gpt-5.2"}, + }, state) + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.output_text.delta", + Delta: "Interrupted...", + }, state) + + // Stream ends without response.completed + events := FinalizeResponsesAnthropicStream(state) + require.Len(t, events, 3) // content_block_stop + message_delta + message_stop + assert.Equal(t, "content_block_stop", events[0].Type) + assert.Equal(t, "message_delta", events[1].Type) + assert.Equal(t, "end_turn", events[1].Delta.StopReason) + assert.Equal(t, "message_stop", events[2].Type) +} + +func TestStreamingEmptyResponse(t *testing.T) { + state := NewResponsesEventToAnthropicState() + + ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.created", + Response: &ResponsesResponse{ID: "resp_6", Model: "gpt-5.2"}, + }, state) + + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + Usage: &ResponsesUsage{InputTokens: 5, OutputTokens: 0}, + }, + }, state) + + require.Len(t, events, 2) // message_delta + message_stop + assert.Equal(t, "message_delta", events[0].Type) + assert.Equal(t, "end_turn", events[0].Delta.StopReason) +} + +func TestResponsesAnthropicEventToSSE(t *testing.T) { + evt := AnthropicStreamEvent{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: "resp_1", + Type: "message", + Role: "assistant", + }, + } + sse, err := ResponsesAnthropicEventToSSE(evt) + require.NoError(t, err) + assert.Contains(t, sse, "event: message_start\n") + assert.Contains(t, sse, "data: ") + assert.Contains(t, sse, `"resp_1"`) +} diff --git a/backend/internal/pkg/apicompat/anthropic_to_responses.go b/backend/internal/pkg/apicompat/anthropic_to_responses.go new file mode 100644 index 00000000..1a13658d --- /dev/null +++ b/backend/internal/pkg/apicompat/anthropic_to_responses.go @@ -0,0 +1,283 @@ +package apicompat + +import ( + "encoding/json" + "strings" +) + +// AnthropicToResponses converts an Anthropic Messages request directly into +// a Responses API request. This preserves fields that would be lost in a +// Chat Completions intermediary round-trip (e.g. thinking, cache_control, +// structured system prompts). +func AnthropicToResponses(req *AnthropicRequest) (*ResponsesRequest, error) { + input, err := convertAnthropicToResponsesInput(req.System, req.Messages) + if err != nil { + return nil, err + } + + inputJSON, err := json.Marshal(input) + if err != nil { + return nil, err + } + + out := &ResponsesRequest{ + Model: req.Model, + Input: inputJSON, + Temperature: req.Temperature, + TopP: req.TopP, + Stream: req.Stream, + Include: []string{"reasoning.encrypted_content"}, + } + + storeFalse := false + out.Store = &storeFalse + + if req.MaxTokens > 0 { + v := req.MaxTokens + if v < minMaxOutputTokens { + v = minMaxOutputTokens + } + out.MaxOutputTokens = &v + } + + if len(req.Tools) > 0 { + out.Tools = convertAnthropicToolsToResponses(req.Tools) + } + + return out, nil +} + +// convertAnthropicToResponsesInput builds the Responses API input items array +// from the Anthropic system field and message list. +func convertAnthropicToResponsesInput(system json.RawMessage, msgs []AnthropicMessage) ([]ResponsesInputItem, error) { + var out []ResponsesInputItem + + // System prompt → system role input item. + if len(system) > 0 { + sysText, err := parseAnthropicSystemPrompt(system) + if err != nil { + return nil, err + } + if sysText != "" { + content, _ := json.Marshal(sysText) + out = append(out, ResponsesInputItem{ + Role: "system", + Content: content, + }) + } + } + + for _, m := range msgs { + items, err := anthropicMsgToResponsesItems(m) + if err != nil { + return nil, err + } + out = append(out, items...) + } + return out, nil +} + +// parseAnthropicSystemPrompt handles the Anthropic system field which can be +// a plain string or an array of text blocks. +func parseAnthropicSystemPrompt(raw json.RawMessage) (string, error) { + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s, nil + } + var blocks []AnthropicContentBlock + if err := json.Unmarshal(raw, &blocks); err != nil { + return "", err + } + var parts []string + for _, b := range blocks { + if b.Type == "text" && b.Text != "" { + parts = append(parts, b.Text) + } + } + return strings.Join(parts, "\n\n"), nil +} + +// anthropicMsgToResponsesItems converts a single Anthropic message into one +// or more Responses API input items. +func anthropicMsgToResponsesItems(m AnthropicMessage) ([]ResponsesInputItem, error) { + switch m.Role { + case "user": + return anthropicUserToResponses(m.Content) + case "assistant": + return anthropicAssistantToResponses(m.Content) + default: + return anthropicUserToResponses(m.Content) + } +} + +// anthropicUserToResponses handles an Anthropic user message. Content can be a +// plain string or an array of blocks. tool_result blocks are extracted into +// function_call_output items. +func anthropicUserToResponses(raw json.RawMessage) ([]ResponsesInputItem, error) { + // Try plain string. + var s string + if err := json.Unmarshal(raw, &s); err == nil { + content, _ := json.Marshal(s) + return []ResponsesInputItem{{Role: "user", Content: content}}, nil + } + + var blocks []AnthropicContentBlock + if err := json.Unmarshal(raw, &blocks); err != nil { + return nil, err + } + + var out []ResponsesInputItem + + // Extract tool_result blocks → function_call_output items. + for _, b := range blocks { + if b.Type != "tool_result" { + continue + } + text := extractAnthropicToolResultText(b) + if text == "" { + // OpenAI Responses API requires "output" field; use placeholder for empty results. + text = "(empty)" + } + out = append(out, ResponsesInputItem{ + Type: "function_call_output", + CallID: toResponsesCallID(b.ToolUseID), + Output: text, + }) + } + + // Remaining text blocks → user message. + text := extractAnthropicTextFromBlocks(blocks) + if text != "" { + content, _ := json.Marshal(text) + out = append(out, ResponsesInputItem{Role: "user", Content: content}) + } + + return out, nil +} + +// anthropicAssistantToResponses handles an Anthropic assistant message. +// Text content → assistant message with output_text parts. +// tool_use blocks → function_call items. +// thinking blocks → ignored (OpenAI doesn't accept them as input). +func anthropicAssistantToResponses(raw json.RawMessage) ([]ResponsesInputItem, error) { + // Try plain string. + var s string + if err := json.Unmarshal(raw, &s); err == nil { + parts := []ResponsesContentPart{{Type: "output_text", Text: s}} + partsJSON, err := json.Marshal(parts) + if err != nil { + return nil, err + } + return []ResponsesInputItem{{Role: "assistant", Content: partsJSON}}, nil + } + + var blocks []AnthropicContentBlock + if err := json.Unmarshal(raw, &blocks); err != nil { + return nil, err + } + + var items []ResponsesInputItem + + // Text content → assistant message with output_text content parts. + text := extractAnthropicTextFromBlocks(blocks) + if text != "" { + parts := []ResponsesContentPart{{Type: "output_text", Text: text}} + partsJSON, err := json.Marshal(parts) + if err != nil { + return nil, err + } + items = append(items, ResponsesInputItem{Role: "assistant", Content: partsJSON}) + } + + // tool_use → function_call items. + for _, b := range blocks { + if b.Type != "tool_use" { + continue + } + args := "{}" + if len(b.Input) > 0 { + args = string(b.Input) + } + fcID := toResponsesCallID(b.ID) + items = append(items, ResponsesInputItem{ + Type: "function_call", + CallID: fcID, + Name: b.Name, + Arguments: args, + ID: fcID, + }) + } + + return items, nil +} + +// toResponsesCallID converts an Anthropic tool ID (toolu_xxx / call_xxx) to a +// Responses API function_call ID that starts with "fc_". +func toResponsesCallID(id string) string { + if strings.HasPrefix(id, "fc_") { + return id + } + return "fc_" + id +} + +// fromResponsesCallID reverses toResponsesCallID, stripping the "fc_" prefix +// that was added during request conversion. +func fromResponsesCallID(id string) string { + if after, ok := strings.CutPrefix(id, "fc_"); ok { + // Only strip if the remainder doesn't look like it was already "fc_" prefixed. + // E.g. "fc_toolu_xxx" → "toolu_xxx", "fc_call_xxx" → "call_xxx" + if strings.HasPrefix(after, "toolu_") || strings.HasPrefix(after, "call_") { + return after + } + } + return id +} + +// extractAnthropicToolResultText gets the text content from a tool_result block. +func extractAnthropicToolResultText(b AnthropicContentBlock) string { + if len(b.Content) == 0 { + return "" + } + var s string + if err := json.Unmarshal(b.Content, &s); err == nil { + return s + } + var inner []AnthropicContentBlock + if err := json.Unmarshal(b.Content, &inner); err == nil { + var parts []string + for _, ib := range inner { + if ib.Type == "text" && ib.Text != "" { + parts = append(parts, ib.Text) + } + } + return strings.Join(parts, "\n\n") + } + return "" +} + +// extractAnthropicTextFromBlocks joins all text blocks, ignoring thinking/ +// tool_use/tool_result blocks. +func extractAnthropicTextFromBlocks(blocks []AnthropicContentBlock) string { + var parts []string + for _, b := range blocks { + if b.Type == "text" && b.Text != "" { + parts = append(parts, b.Text) + } + } + return strings.Join(parts, "\n\n") +} + +// convertAnthropicToolsToResponses maps Anthropic tool definitions to +// Responses API function tools (input_schema → parameters). +func convertAnthropicToolsToResponses(tools []AnthropicTool) []ResponsesTool { + out := make([]ResponsesTool, len(tools)) + for i, t := range tools { + out[i] = ResponsesTool{ + Type: "function", + Name: t.Name, + Description: t.Description, + Parameters: t.InputSchema, + } + } + return out +} diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic.go b/backend/internal/pkg/apicompat/responses_to_anthropic.go new file mode 100644 index 00000000..16770650 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_to_anthropic.go @@ -0,0 +1,436 @@ +package apicompat + +import ( + "encoding/json" + "fmt" + "time" +) + +// --------------------------------------------------------------------------- +// Non-streaming: ResponsesResponse → AnthropicResponse +// --------------------------------------------------------------------------- + +// ResponsesToAnthropic converts a Responses API response directly into an +// Anthropic Messages response. Reasoning output items are mapped to thinking +// blocks; function_call items become tool_use blocks. +func ResponsesToAnthropic(resp *ResponsesResponse, model string) *AnthropicResponse { + out := &AnthropicResponse{ + ID: resp.ID, + Type: "message", + Role: "assistant", + Model: model, + } + + var blocks []AnthropicContentBlock + + for _, item := range resp.Output { + switch item.Type { + case "reasoning": + summaryText := "" + for _, s := range item.Summary { + if s.Type == "summary_text" && s.Text != "" { + summaryText += s.Text + } + } + if summaryText != "" { + blocks = append(blocks, AnthropicContentBlock{ + Type: "thinking", + Thinking: summaryText, + }) + } + case "message": + for _, part := range item.Content { + if part.Type == "output_text" && part.Text != "" { + blocks = append(blocks, AnthropicContentBlock{ + Type: "text", + Text: part.Text, + }) + } + } + case "function_call": + blocks = append(blocks, AnthropicContentBlock{ + Type: "tool_use", + ID: fromResponsesCallID(item.CallID), + Name: item.Name, + Input: json.RawMessage(item.Arguments), + }) + } + } + + if len(blocks) == 0 { + blocks = append(blocks, AnthropicContentBlock{Type: "text", Text: ""}) + } + out.Content = blocks + + out.StopReason = responsesStatusToAnthropicStopReason(resp.Status, resp.IncompleteDetails, blocks) + + if resp.Usage != nil { + out.Usage = AnthropicUsage{ + InputTokens: resp.Usage.InputTokens, + OutputTokens: resp.Usage.OutputTokens, + } + if resp.Usage.InputTokensDetails != nil { + out.Usage.CacheReadInputTokens = resp.Usage.InputTokensDetails.CachedTokens + } + } + + return out +} + +func responsesStatusToAnthropicStopReason(status string, details *ResponsesIncompleteDetails, blocks []AnthropicContentBlock) string { + switch status { + case "incomplete": + if details != nil && details.Reason == "max_output_tokens" { + return "max_tokens" + } + return "end_turn" + case "completed": + if len(blocks) > 0 && blocks[len(blocks)-1].Type == "tool_use" { + return "tool_use" + } + return "end_turn" + default: + return "end_turn" + } +} + +// --------------------------------------------------------------------------- +// Streaming: ResponsesStreamEvent → []AnthropicStreamEvent (stateful converter) +// --------------------------------------------------------------------------- + +// ResponsesEventToAnthropicState tracks state for converting a sequence of +// Responses SSE events directly into Anthropic SSE events. +type ResponsesEventToAnthropicState struct { + MessageStartSent bool + MessageStopSent bool + + ContentBlockIndex int + ContentBlockOpen bool + CurrentBlockType string // "text" | "thinking" | "tool_use" + + // OutputIndexToBlockIdx maps Responses output_index → Anthropic content block index. + OutputIndexToBlockIdx map[int]int + + InputTokens int + OutputTokens int + CacheReadInputTokens int + + ResponseID string + Model string + Created int64 +} + +// NewResponsesEventToAnthropicState returns an initialised stream state. +func NewResponsesEventToAnthropicState() *ResponsesEventToAnthropicState { + return &ResponsesEventToAnthropicState{ + OutputIndexToBlockIdx: make(map[int]int), + Created: time.Now().Unix(), + } +} + +// ResponsesEventToAnthropicEvents converts a single Responses SSE event into +// zero or more Anthropic SSE events, updating state as it goes. +func ResponsesEventToAnthropicEvents( + evt *ResponsesStreamEvent, + state *ResponsesEventToAnthropicState, +) []AnthropicStreamEvent { + switch evt.Type { + case "response.created": + return resToAnthHandleCreated(evt, state) + case "response.output_item.added": + return resToAnthHandleOutputItemAdded(evt, state) + case "response.output_text.delta": + return resToAnthHandleTextDelta(evt, state) + case "response.output_text.done": + return resToAnthHandleBlockDone(state) + case "response.function_call_arguments.delta": + return resToAnthHandleFuncArgsDelta(evt, state) + case "response.function_call_arguments.done": + return resToAnthHandleBlockDone(state) + case "response.output_item.done": + return resToAnthHandleOutputItemDone(evt, state) + case "response.reasoning_summary_text.delta": + return resToAnthHandleReasoningDelta(evt, state) + case "response.reasoning_summary_text.done": + return resToAnthHandleBlockDone(state) + case "response.completed", "response.incomplete": + return resToAnthHandleCompleted(evt, state) + default: + return nil + } +} + +// FinalizeResponsesAnthropicStream emits synthetic termination events if the +// stream ended without a proper completion event. +func FinalizeResponsesAnthropicStream(state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if !state.MessageStartSent || state.MessageStopSent { + return nil + } + + var events []AnthropicStreamEvent + events = append(events, closeCurrentBlock(state)...) + + events = append(events, + AnthropicStreamEvent{ + Type: "message_delta", + Delta: &AnthropicDelta{ + StopReason: "end_turn", + }, + Usage: &AnthropicUsage{ + InputTokens: state.InputTokens, + OutputTokens: state.OutputTokens, + CacheReadInputTokens: state.CacheReadInputTokens, + }, + }, + AnthropicStreamEvent{Type: "message_stop"}, + ) + state.MessageStopSent = true + return events +} + +// ResponsesAnthropicEventToSSE formats an AnthropicStreamEvent as an SSE line pair. +func ResponsesAnthropicEventToSSE(evt AnthropicStreamEvent) (string, error) { + data, err := json.Marshal(evt) + if err != nil { + return "", err + } + return fmt.Sprintf("event: %s\ndata: %s\n\n", evt.Type, data), nil +} + +// --- internal handlers --- + +func resToAnthHandleCreated(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Response != nil { + state.ResponseID = evt.Response.ID + // Only use upstream model if no override was set (e.g. originalModel) + if state.Model == "" { + state.Model = evt.Response.Model + } + } + + if state.MessageStartSent { + return nil + } + state.MessageStartSent = true + + return []AnthropicStreamEvent{{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: state.ResponseID, + Type: "message", + Role: "assistant", + Content: []AnthropicContentBlock{}, + Model: state.Model, + Usage: AnthropicUsage{ + InputTokens: 0, + OutputTokens: 0, + }, + }, + }} +} + +func resToAnthHandleOutputItemAdded(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Item == nil { + return nil + } + + switch evt.Item.Type { + case "function_call": + var events []AnthropicStreamEvent + events = append(events, closeCurrentBlock(state)...) + + idx := state.ContentBlockIndex + state.OutputIndexToBlockIdx[evt.OutputIndex] = idx + state.ContentBlockOpen = true + state.CurrentBlockType = "tool_use" + + events = append(events, AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "tool_use", + ID: fromResponsesCallID(evt.Item.CallID), + Name: evt.Item.Name, + Input: json.RawMessage("{}"), + }, + }) + return events + + case "reasoning": + var events []AnthropicStreamEvent + events = append(events, closeCurrentBlock(state)...) + + idx := state.ContentBlockIndex + state.OutputIndexToBlockIdx[evt.OutputIndex] = idx + state.ContentBlockOpen = true + state.CurrentBlockType = "thinking" + + events = append(events, AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "thinking", + Thinking: "", + }, + }) + return events + + case "message": + return nil + } + + return nil +} + +func resToAnthHandleTextDelta(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Delta == "" { + return nil + } + + var events []AnthropicStreamEvent + + if !state.ContentBlockOpen || state.CurrentBlockType != "text" { + events = append(events, closeCurrentBlock(state)...) + + idx := state.ContentBlockIndex + state.ContentBlockOpen = true + state.CurrentBlockType = "text" + + events = append(events, AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "text", + Text: "", + }, + }) + } + + idx := state.ContentBlockIndex + events = append(events, AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "text_delta", + Text: evt.Delta, + }, + }) + return events +} + +func resToAnthHandleFuncArgsDelta(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Delta == "" { + return nil + } + + blockIdx, ok := state.OutputIndexToBlockIdx[evt.OutputIndex] + if !ok { + return nil + } + + return []AnthropicStreamEvent{{ + Type: "content_block_delta", + Index: &blockIdx, + Delta: &AnthropicDelta{ + Type: "input_json_delta", + PartialJSON: evt.Delta, + }, + }} +} + +func resToAnthHandleReasoningDelta(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Delta == "" { + return nil + } + + blockIdx, ok := state.OutputIndexToBlockIdx[evt.OutputIndex] + if !ok { + return nil + } + + return []AnthropicStreamEvent{{ + Type: "content_block_delta", + Index: &blockIdx, + Delta: &AnthropicDelta{ + Type: "thinking_delta", + Thinking: evt.Delta, + }, + }} +} + +func resToAnthHandleBlockDone(state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if !state.ContentBlockOpen { + return nil + } + return closeCurrentBlock(state) +} + +func resToAnthHandleOutputItemDone(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if evt.Item == nil { + return nil + } + if state.ContentBlockOpen { + return closeCurrentBlock(state) + } + return nil +} + +func resToAnthHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if state.MessageStopSent { + return nil + } + + var events []AnthropicStreamEvent + events = append(events, closeCurrentBlock(state)...) + + stopReason := "end_turn" + if evt.Response != nil { + if evt.Response.Usage != nil { + state.InputTokens = evt.Response.Usage.InputTokens + state.OutputTokens = evt.Response.Usage.OutputTokens + if evt.Response.Usage.InputTokensDetails != nil { + state.CacheReadInputTokens = evt.Response.Usage.InputTokensDetails.CachedTokens + } + } + switch evt.Response.Status { + case "incomplete": + if evt.Response.IncompleteDetails != nil && evt.Response.IncompleteDetails.Reason == "max_output_tokens" { + stopReason = "max_tokens" + } + case "completed": + if state.ContentBlockIndex > 0 && state.CurrentBlockType == "tool_use" { + stopReason = "tool_use" + } + } + } + + events = append(events, + AnthropicStreamEvent{ + Type: "message_delta", + Delta: &AnthropicDelta{ + StopReason: stopReason, + }, + Usage: &AnthropicUsage{ + InputTokens: state.InputTokens, + OutputTokens: state.OutputTokens, + CacheReadInputTokens: state.CacheReadInputTokens, + }, + }, + AnthropicStreamEvent{Type: "message_stop"}, + ) + state.MessageStopSent = true + return events +} + +func closeCurrentBlock(state *ResponsesEventToAnthropicState) []AnthropicStreamEvent { + if !state.ContentBlockOpen { + return nil + } + idx := state.ContentBlockIndex + state.ContentBlockOpen = false + state.ContentBlockIndex++ + return []AnthropicStreamEvent{{ + Type: "content_block_stop", + Index: &idx, + }} +} diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go new file mode 100644 index 00000000..92e85318 --- /dev/null +++ b/backend/internal/pkg/apicompat/types.go @@ -0,0 +1,544 @@ +// Package apicompat provides type definitions and conversion utilities for +// translating between Anthropic Messages, OpenAI Chat Completions, and OpenAI +// Responses API formats. It enables multi-protocol support so that clients +// using different API formats can be served through a unified gateway. +package apicompat + +import "encoding/json" + +// --------------------------------------------------------------------------- +// Anthropic Messages API types +// --------------------------------------------------------------------------- + +// AnthropicRequest is the request body for POST /v1/messages. +type AnthropicRequest struct { + Model string `json:"model"` + MaxTokens int `json:"max_tokens"` + System json.RawMessage `json:"system,omitempty"` // string or []AnthropicContentBlock + Messages []AnthropicMessage `json:"messages"` + Tools []AnthropicTool `json:"tools,omitempty"` + Stream bool `json:"stream,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + StopSeqs []string `json:"stop_sequences,omitempty"` +} + +// AnthropicMessage is a single message in the Anthropic conversation. +type AnthropicMessage struct { + Role string `json:"role"` // "user" | "assistant" + Content json.RawMessage `json:"content"` +} + +// AnthropicContentBlock is one block inside a message's content array. +type AnthropicContentBlock struct { + Type string `json:"type"` + + // type=text + Text string `json:"text,omitempty"` + + // type=thinking + Thinking string `json:"thinking,omitempty"` + + // type=tool_use + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + + // type=tool_result + ToolUseID string `json:"tool_use_id,omitempty"` + Content json.RawMessage `json:"content,omitempty"` // string or []AnthropicContentBlock + IsError bool `json:"is_error,omitempty"` +} + +// AnthropicTool describes a tool available to the model. +type AnthropicTool struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema json.RawMessage `json:"input_schema"` // JSON Schema object +} + +// AnthropicResponse is the non-streaming response from POST /v1/messages. +type AnthropicResponse struct { + ID string `json:"id"` + Type string `json:"type"` // "message" + Role string `json:"role"` // "assistant" + Content []AnthropicContentBlock `json:"content"` + Model string `json:"model"` + StopReason string `json:"stop_reason"` + StopSequence *string `json:"stop_sequence,omitempty"` + Usage AnthropicUsage `json:"usage"` +} + +// AnthropicUsage holds token counts in Anthropic format. +type AnthropicUsage struct { + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens"` + CacheReadInputTokens int `json:"cache_read_input_tokens"` +} + +// --------------------------------------------------------------------------- +// Anthropic SSE event types +// --------------------------------------------------------------------------- + +// AnthropicStreamEvent is a single SSE event in the Anthropic streaming protocol. +type AnthropicStreamEvent struct { + Type string `json:"type"` + + // message_start + Message *AnthropicResponse `json:"message,omitempty"` + + // content_block_start + Index *int `json:"index,omitempty"` + ContentBlock *AnthropicContentBlock `json:"content_block,omitempty"` + + // content_block_delta + Delta *AnthropicDelta `json:"delta,omitempty"` + + // message_delta + Usage *AnthropicUsage `json:"usage,omitempty"` +} + +// AnthropicDelta carries incremental content in streaming events. +type AnthropicDelta struct { + Type string `json:"type,omitempty"` // "text_delta" | "input_json_delta" | "thinking_delta" | "signature_delta" + + // text_delta + Text string `json:"text,omitempty"` + + // input_json_delta + PartialJSON string `json:"partial_json,omitempty"` + + // thinking_delta + Thinking string `json:"thinking,omitempty"` + + // signature_delta + Signature string `json:"signature,omitempty"` + + // message_delta fields + StopReason string `json:"stop_reason,omitempty"` + StopSequence *string `json:"stop_sequence,omitempty"` +} + +// --------------------------------------------------------------------------- +// OpenAI Chat Completions API types +// --------------------------------------------------------------------------- + +// ChatRequest is the request body for POST /v1/chat/completions. +type ChatRequest struct { + Model string `json:"model"` + Messages []ChatMessage `json:"messages"` + MaxTokens *int `json:"max_tokens,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + Stream bool `json:"stream,omitempty"` + Tools []ChatTool `json:"tools,omitempty"` + Stop json.RawMessage `json:"stop,omitempty"` // string or []string +} + +// ChatMessage is a single message in the Chat Completions conversation. +type ChatMessage struct { + Role string `json:"role"` // "system" | "user" | "assistant" | "tool" + Content json.RawMessage `json:"content,omitempty"` // string or []ChatContentPart + + // assistant fields + ToolCalls []ChatToolCall `json:"tool_calls,omitempty"` + + // tool fields + ToolCallID string `json:"tool_call_id,omitempty"` + + // Copilot-specific reasoning passthrough + ReasoningText string `json:"reasoning_text,omitempty"` + ReasoningOpaque string `json:"reasoning_opaque,omitempty"` +} + +// ChatContentPart is a typed content part in a multi-part message. +type ChatContentPart struct { + Type string `json:"type"` // "text" | "image_url" + Text string `json:"text,omitempty"` +} + +// ChatToolCall represents a tool invocation in an assistant message. +// In streaming deltas, Index identifies which tool call is being updated. +type ChatToolCall struct { + Index int `json:"index"` + ID string `json:"id,omitempty"` + Type string `json:"type,omitempty"` // "function" + Function ChatFunctionCall `json:"function"` +} + +// ChatFunctionCall holds the function name and arguments. +type ChatFunctionCall struct { + Name string `json:"name"` + Arguments string `json:"arguments"` +} + +// ChatTool describes a tool available to the model. +type ChatTool struct { + Type string `json:"type"` // "function" + Function ChatFunction `json:"function"` +} + +// ChatFunction is the function definition inside a ChatTool. +type ChatFunction struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Parameters json.RawMessage `json:"parameters,omitempty"` // JSON Schema +} + +// ChatResponse is the non-streaming response from POST /v1/chat/completions. +type ChatResponse struct { + ID string `json:"id"` + Object string `json:"object"` // "chat.completion" + Created int64 `json:"created"` + Model string `json:"model"` + Choices []ChatChoice `json:"choices"` + Usage *ChatUsage `json:"usage,omitempty"` +} + +// ChatChoice is one completion choice. +type ChatChoice struct { + Index int `json:"index"` + Message ChatMessage `json:"message"` + FinishReason string `json:"finish_reason"` +} + +// ChatUsage holds token counts in Chat Completions format. +type ChatUsage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} + +// --------------------------------------------------------------------------- +// Chat Completions SSE types +// --------------------------------------------------------------------------- + +// ChatStreamChunk is a single SSE chunk in the Chat Completions streaming protocol. +type ChatStreamChunk struct { + ID string `json:"id"` + Object string `json:"object"` // "chat.completion.chunk" + Created int64 `json:"created"` + Model string `json:"model"` + Choices []ChatStreamChoice `json:"choices"` + Usage *ChatUsage `json:"usage,omitempty"` +} + +// ChatStreamChoice is one choice inside a streaming chunk. +type ChatStreamChoice struct { + Index int `json:"index"` + Delta ChatStreamDelta `json:"delta"` + FinishReason *string `json:"finish_reason"` +} + +// ChatStreamDelta carries incremental content in a streaming chunk. +type ChatStreamDelta struct { + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + ToolCalls []ChatToolCall `json:"tool_calls,omitempty"` + + // Copilot-specific reasoning passthrough (streaming) + ReasoningText string `json:"reasoning_text,omitempty"` + ReasoningOpaque string `json:"reasoning_opaque,omitempty"` +} + +// --------------------------------------------------------------------------- +// OpenAI Responses API types +// --------------------------------------------------------------------------- + +// ResponsesRequest is the request body for POST /v1/responses. +type ResponsesRequest struct { + Model string `json:"model"` + Input json.RawMessage `json:"input"` // string or []ResponsesInputItem + MaxOutputTokens *int `json:"max_output_tokens,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + Stream bool `json:"stream,omitempty"` + Tools []ResponsesTool `json:"tools,omitempty"` + Include []string `json:"include,omitempty"` + Store *bool `json:"store,omitempty"` +} + +// ResponsesInputItem is one item in the Responses API input array. +// The Type field determines which other fields are populated. +type ResponsesInputItem struct { + // Common + Type string `json:"type,omitempty"` // "" for role-based messages + + // Role-based messages (system/user/assistant) + Role string `json:"role,omitempty"` + Content json.RawMessage `json:"content,omitempty"` // string or []ResponsesContentPart + + // type=function_call + CallID string `json:"call_id,omitempty"` + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` + ID string `json:"id,omitempty"` + + // type=function_call_output + Output string `json:"output,omitempty"` +} + +// ResponsesContentPart is a typed content part in a Responses message. +type ResponsesContentPart struct { + Type string `json:"type"` // "input_text" | "output_text" | "input_image" + Text string `json:"text,omitempty"` +} + +// ResponsesTool describes a tool in the Responses API. +type ResponsesTool struct { + Type string `json:"type"` // "function" | "web_search" | "local_shell" etc. + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Parameters json.RawMessage `json:"parameters,omitempty"` + Strict *bool `json:"strict,omitempty"` +} + +// ResponsesResponse is the non-streaming response from POST /v1/responses. +type ResponsesResponse struct { + ID string `json:"id"` + Object string `json:"object"` // "response" + Model string `json:"model"` + Status string `json:"status"` // "completed" | "incomplete" | "failed" + Output []ResponsesOutput `json:"output"` + Usage *ResponsesUsage `json:"usage,omitempty"` + + // incomplete_details is present when status="incomplete" + IncompleteDetails *ResponsesIncompleteDetails `json:"incomplete_details,omitempty"` +} + +// ResponsesIncompleteDetails explains why a response is incomplete. +type ResponsesIncompleteDetails struct { + Reason string `json:"reason"` // "max_output_tokens" | "content_filter" +} + +// ResponsesOutput is one output item in a Responses API response. +type ResponsesOutput struct { + Type string `json:"type"` // "message" | "reasoning" | "function_call" + + // type=message + ID string `json:"id,omitempty"` + Role string `json:"role,omitempty"` + Content []ResponsesContentPart `json:"content,omitempty"` + Status string `json:"status,omitempty"` + + // type=reasoning + EncryptedContent string `json:"encrypted_content,omitempty"` + Summary []ResponsesSummary `json:"summary,omitempty"` + + // type=function_call + CallID string `json:"call_id,omitempty"` + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` +} + +// ResponsesSummary is a summary text block inside a reasoning output. +type ResponsesSummary struct { + Type string `json:"type"` // "summary_text" + Text string `json:"text"` +} + +// ResponsesUsage holds token counts in Responses API format. +type ResponsesUsage struct { + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` + TotalTokens int `json:"total_tokens"` + + // Optional detailed breakdown + InputTokensDetails *ResponsesInputTokensDetails `json:"input_tokens_details,omitempty"` + OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` +} + +// --------------------------------------------------------------------------- +// Responses SSE event types +// --------------------------------------------------------------------------- + +// ResponsesStreamEvent is a single SSE event in the Responses streaming protocol. +// The Type field corresponds to the "type" in the JSON payload. +type ResponsesStreamEvent struct { + Type string `json:"type"` + + // response.created / response.completed / response.failed / response.incomplete + Response *ResponsesResponse `json:"response,omitempty"` + + // response.output_item.added / response.output_item.done + Item *ResponsesOutput `json:"item,omitempty"` + + // response.output_text.delta / response.output_text.done + OutputIndex int `json:"output_index,omitempty"` + ContentIndex int `json:"content_index,omitempty"` + Delta string `json:"delta,omitempty"` + Text string `json:"text,omitempty"` + ItemID string `json:"item_id,omitempty"` + + // response.function_call_arguments.delta / done + CallID string `json:"call_id,omitempty"` + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` + + // response.reasoning_summary_text.delta / done + // Reuses Text/Delta fields above, SummaryIndex identifies which summary part + SummaryIndex int `json:"summary_index,omitempty"` + + // error event fields + Code string `json:"code,omitempty"` + Param string `json:"param,omitempty"` + + // Sequence number for ordering events + SequenceNumber int `json:"sequence_number,omitempty"` +} + +// ResponsesOutputReasoning is a reasoning output item in the Responses API. +// This type represents the "type":"reasoning" output item that contains +// extended thinking from the model. +type ResponsesOutputReasoning struct { + ID string `json:"id,omitempty"` + Type string `json:"type"` // "reasoning" + Status string `json:"status,omitempty"` // "in_progress" | "completed" | "incomplete" + EncryptedContent string `json:"encrypted_content,omitempty"` + Summary []ResponsesReasoningSummary `json:"summary,omitempty"` +} + +// ResponsesReasoningSummary is a summary text block inside a reasoning output. +type ResponsesReasoningSummary struct { + Type string `json:"type"` // "summary_text" + Text string `json:"text"` +} + +// ResponsesStreamState maintains the state for converting Responses streaming +// events to Chat Completions format. It tracks content blocks, tool calls, +// reasoning blocks, and other streaming artifacts. +type ResponsesStreamState struct { + // Response metadata + ID string + Model string + Created int64 + + // Content tracking + ContentIndex int + CurrentText string + CurrentItemID string + PendingText []string // Text to accumulate before emitting + + // Tool call tracking + ToolCalls []ResponsesToolCallState + CurrentToolCall *ResponsesToolCallState + + // Reasoning tracking + ReasoningBlocks []ResponsesReasoningState + CurrentReasoning *ResponsesReasoningState + + // Usage tracking + InputTokens int + OutputTokens int + + // Status tracking + Status string + FinishReason string +} + +// ResponsesToolCallState tracks a single tool call during streaming. +type ResponsesToolCallState struct { + Index int + ItemID string + CallID string + Name string + Arguments string + Status string + IsComplete bool +} + +// ResponsesReasoningState tracks a reasoning block during streaming. +type ResponsesReasoningState struct { + ItemID string + SummaryIndex int + SummaryText string + Status string + IsComplete bool +} + +// ResponsesUsageDetail provides additional token usage details in Responses format. +type ResponsesUsageDetail struct { + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` + TotalTokens int `json:"total_tokens"` + + // Optional detailed breakdown + InputTokensDetails *ResponsesInputTokensDetails `json:"input_tokens_details,omitempty"` + OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` +} + +// ResponsesInputTokensDetails breaks down input token usage. +type ResponsesInputTokensDetails struct { + CachedTokens int `json:"cached_tokens,omitempty"` +} + +// ResponsesOutputTokensDetails breaks down output token usage. +type ResponsesOutputTokensDetails struct { + ReasoningTokens int `json:"reasoning_tokens,omitempty"` +} + +// --------------------------------------------------------------------------- +// Finish reason mapping helpers +// --------------------------------------------------------------------------- + +// ChatFinishToAnthropic maps a Chat Completions finish_reason to an Anthropic stop_reason. +func ChatFinishToAnthropic(reason string) string { + switch reason { + case "stop": + return "end_turn" + case "tool_calls": + return "tool_use" + case "length": + return "max_tokens" + default: + return "end_turn" + } +} + +// AnthropicStopToChat maps an Anthropic stop_reason to a Chat Completions finish_reason. +func AnthropicStopToChat(reason string) string { + switch reason { + case "end_turn": + return "stop" + case "tool_use": + return "tool_calls" + case "max_tokens": + return "length" + default: + return "stop" + } +} + +// ResponsesStatusToChat maps a Responses API status to a Chat Completions finish_reason. +func ResponsesStatusToChat(status string, details *ResponsesIncompleteDetails) string { + switch status { + case "completed": + return "stop" + case "incomplete": + if details != nil && details.Reason == "max_output_tokens" { + return "length" + } + return "stop" + default: + return "stop" + } +} + +// ChatFinishToResponsesStatus maps a Chat Completions finish_reason to a Responses status. +func ChatFinishToResponsesStatus(reason string) string { + switch reason { + case "length": + return "incomplete" + default: + return "completed" + } +} + +// --------------------------------------------------------------------------- +// Shared constants +// --------------------------------------------------------------------------- + +// minMaxOutputTokens is the floor for max_output_tokens in a Responses request. +// Very small values may cause upstream API errors, so we enforce a minimum. +const minMaxOutputTokens = 128 diff --git a/backend/internal/server/routes/gateway.go b/backend/internal/server/routes/gateway.go index 13f13320..e2c0f3de 100644 --- a/backend/internal/server/routes/gateway.go +++ b/backend/internal/server/routes/gateway.go @@ -43,8 +43,28 @@ func RegisterGatewayRoutes( gateway.Use(gin.HandlerFunc(apiKeyAuth)) gateway.Use(requireGroupAnthropic) { - gateway.POST("/messages", h.Gateway.Messages) - gateway.POST("/messages/count_tokens", h.Gateway.CountTokens) + // /v1/messages: auto-route based on group platform + gateway.POST("/messages", func(c *gin.Context) { + if getGroupPlatform(c) == service.PlatformOpenAI { + h.OpenAIGateway.Messages(c) + return + } + h.Gateway.Messages(c) + }) + // /v1/messages/count_tokens: OpenAI groups get 404 + gateway.POST("/messages/count_tokens", func(c *gin.Context) { + if getGroupPlatform(c) == service.PlatformOpenAI { + c.JSON(http.StatusNotFound, gin.H{ + "type": "error", + "error": gin.H{ + "type": "not_found_error", + "message": "Token counting is not supported for this platform", + }, + }) + return + } + h.Gateway.CountTokens(c) + }) gateway.GET("/models", h.Gateway.Models) gateway.GET("/usage", h.Gateway.Usage) // OpenAI Responses API @@ -132,3 +152,12 @@ func RegisterGatewayRoutes( // Sora 媒体代理(签名 URL,无需 API Key) r.GET("/sora/media-signed/*filepath", h.SoraGateway.MediaProxySigned) } + +// getGroupPlatform extracts the group platform from the API Key stored in context. +func getGroupPlatform(c *gin.Context) string { + apiKey, ok := middleware.GetAPIKeyFromContext(c) + if !ok || apiKey.Group == nil { + return "" + } + return apiKey.Group.Platform +} diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 9f5c8299..cefd37fe 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -5421,6 +5421,11 @@ func extractUpstreamErrorMessage(body []byte) string { return m } + // ChatGPT 内部 API 风格:{"detail":"..."} + if d := gjson.GetBytes(body, "detail").String(); strings.TrimSpace(d) != "" { + return d + } + // 兜底:尝试顶层 message return gjson.GetBytes(body, "message").String() } diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go new file mode 100644 index 00000000..ad276023 --- /dev/null +++ b/backend/internal/service/openai_gateway_messages.go @@ -0,0 +1,376 @@ +package service + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// ForwardAsAnthropic accepts an Anthropic Messages request body, converts it +// to OpenAI Responses API format, forwards to the OpenAI upstream, and converts +// the response back to Anthropic Messages format. This enables Claude Code +// clients to access OpenAI models through the standard /v1/messages endpoint. +func (s *OpenAIGatewayService) ForwardAsAnthropic( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, + promptCacheKey string, +) (*OpenAIForwardResult, error) { + startTime := time.Now() + + // 1. Parse Anthropic request + var anthropicReq apicompat.AnthropicRequest + if err := json.Unmarshal(body, &anthropicReq); err != nil { + return nil, fmt.Errorf("parse anthropic request: %w", err) + } + originalModel := anthropicReq.Model + isStream := anthropicReq.Stream + + // 2. Convert Anthropic → Responses + responsesReq, err := apicompat.AnthropicToResponses(&anthropicReq) + if err != nil { + return nil, fmt.Errorf("convert anthropic to responses: %w", err) + } + + // 3. Model mapping + mappedModel := account.GetMappedModel(originalModel) + responsesReq.Model = mappedModel + + logger.L().Info("openai messages: model mapping applied", + zap.Int64("account_id", account.ID), + zap.String("original_model", originalModel), + zap.String("mapped_model", mappedModel), + zap.Bool("stream", isStream), + ) + + // 4. Marshal Responses request body, then apply OAuth codex transform + responsesBody, err := json.Marshal(responsesReq) + if err != nil { + return nil, fmt.Errorf("marshal responses request: %w", err) + } + + if account.Type == AccountTypeOAuth { + var reqBody map[string]any + if err := json.Unmarshal(responsesBody, &reqBody); err != nil { + return nil, fmt.Errorf("unmarshal for codex transform: %w", err) + } + applyCodexOAuthTransform(reqBody, false) + // OAuth codex transform forces stream=true upstream, so always use + // the streaming response handler regardless of what the client asked. + isStream = true + responsesBody, err = json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("remarshal after codex transform: %w", err) + } + } + + // 5. Get access token + token, _, err := s.GetAccessToken(ctx, account) + if err != nil { + return nil, fmt.Errorf("get access token: %w", err) + } + + // 6. Build upstream request + upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, responsesBody, token, isStream, promptCacheKey, false) + if err != nil { + return nil, fmt.Errorf("build upstream request: %w", err) + } + + // 7. Send request + proxyURL := "" + if account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) + if err != nil { + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + setOpsUpstreamError(c, 0, safeErr, "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) + writeAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed") + return nil, fmt.Errorf("upstream request failed: %s", safeErr) + } + defer func() { _ = resp.Body.Close() }() + + // 8. Handle error response with failover + if resp.StatusCode >= 400 { + if s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} + } + // Non-failover error: return Anthropic-formatted error to client + return s.handleAnthropicErrorResponse(resp, c) + } + + // 9. Handle normal response + if isStream { + return s.handleAnthropicStreamingResponse(resp, c, originalModel, startTime) + } + return s.handleAnthropicNonStreamingResponse(resp, c, originalModel, startTime) +} + +// handleAnthropicErrorResponse reads an upstream error and returns it in +// Anthropic error format. +func (s *OpenAIGatewayService) handleAnthropicErrorResponse( + resp *http.Response, + c *gin.Context, +) (*OpenAIForwardResult, error) { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + if upstreamMsg == "" { + upstreamMsg = fmt.Sprintf("Upstream error: %d", resp.StatusCode) + } + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + + // Record upstream error details for ops logging + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(body), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + + errType := "api_error" + switch { + case resp.StatusCode == 400: + errType = "invalid_request_error" + case resp.StatusCode == 404: + errType = "not_found_error" + case resp.StatusCode == 429: + errType = "rate_limit_error" + case resp.StatusCode >= 500: + errType = "api_error" + } + + writeAnthropicError(c, resp.StatusCode, errType, upstreamMsg) + return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg) +} + +// handleAnthropicNonStreamingResponse reads a Responses API JSON response, +// converts it to Anthropic Messages format, and writes it to the client. +func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse( + resp *http.Response, + c *gin.Context, + originalModel string, + startTime time.Time, +) (*OpenAIForwardResult, error) { + requestID := resp.Header.Get("x-request-id") + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read upstream response: %w", err) + } + + var responsesResp apicompat.ResponsesResponse + if err := json.Unmarshal(respBody, &responsesResp); err != nil { + return nil, fmt.Errorf("parse responses response: %w", err) + } + + anthropicResp := apicompat.ResponsesToAnthropic(&responsesResp, originalModel) + + var usage OpenAIUsage + if responsesResp.Usage != nil { + usage = OpenAIUsage{ + InputTokens: responsesResp.Usage.InputTokens, + OutputTokens: responsesResp.Usage.OutputTokens, + } + if responsesResp.Usage.InputTokensDetails != nil { + usage.CacheReadInputTokens = responsesResp.Usage.InputTokensDetails.CachedTokens + } + } + + if s.responseHeaderFilter != nil { + responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + } + c.JSON(http.StatusOK, anthropicResp) + + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + Stream: false, + Duration: time.Since(startTime), + }, nil +} + +// handleAnthropicStreamingResponse reads Responses SSE events from upstream, +// converts each to Anthropic SSE events, and writes them to the client. +func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( + resp *http.Response, + c *gin.Context, + originalModel string, + startTime time.Time, +) (*OpenAIForwardResult, error) { + requestID := resp.Header.Get("x-request-id") + + if s.responseHeaderFilter != nil { + responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + } + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.WriteHeader(http.StatusOK) + + state := apicompat.NewResponsesEventToAnthropicState() + state.Model = originalModel + var usage OpenAIUsage + var firstTokenMs *int + firstChunk := true + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Text() + + if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" { + continue + } + payload := line[6:] + + if firstChunk { + firstChunk = false + ms := int(time.Since(startTime).Milliseconds()) + firstTokenMs = &ms + } + + // Parse the Responses SSE event + var event apicompat.ResponsesStreamEvent + if err := json.Unmarshal([]byte(payload), &event); err != nil { + logger.L().Warn("openai messages stream: failed to parse event", + zap.Error(err), + zap.String("request_id", requestID), + ) + continue + } + + // Extract usage from completion events + if (event.Type == "response.completed" || event.Type == "response.incomplete") && + event.Response != nil && event.Response.Usage != nil { + usage = OpenAIUsage{ + InputTokens: event.Response.Usage.InputTokens, + OutputTokens: event.Response.Usage.OutputTokens, + } + if event.Response.Usage.InputTokensDetails != nil { + usage.CacheReadInputTokens = event.Response.Usage.InputTokensDetails.CachedTokens + } + } + + // Convert to Anthropic events + events := apicompat.ResponsesEventToAnthropicEvents(&event, state) + for _, evt := range events { + sse, err := apicompat.ResponsesAnthropicEventToSSE(evt) + if err != nil { + logger.L().Warn("openai messages stream: failed to marshal event", + zap.Error(err), + zap.String("request_id", requestID), + ) + continue + } + if _, err := fmt.Fprint(c.Writer, sse); err != nil { + // Client disconnected — return collected usage + logger.L().Info("openai messages stream: client disconnected", + zap.String("request_id", requestID), + ) + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + Stream: true, + Duration: time.Since(startTime), + FirstTokenMs: firstTokenMs, + }, nil + } + } + if len(events) > 0 { + c.Writer.Flush() + } + } + + if err := scanner.Err(); err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.L().Warn("openai messages stream: read error", + zap.Error(err), + zap.String("request_id", requestID), + ) + } + } + + // Ensure the Anthropic stream is properly terminated + if finalEvents := apicompat.FinalizeResponsesAnthropicStream(state); len(finalEvents) > 0 { + for _, evt := range finalEvents { + sse, err := apicompat.ResponsesAnthropicEventToSSE(evt) + if err != nil { + continue + } + fmt.Fprint(c.Writer, sse) //nolint:errcheck + } + c.Writer.Flush() + } + + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + Stream: true, + Duration: time.Since(startTime), + FirstTokenMs: firstTokenMs, + }, nil +} + +// writeAnthropicError writes an error response in Anthropic Messages API format. +func writeAnthropicError(c *gin.Context, statusCode int, errType, message string) { + c.JSON(statusCode, gin.H{ + "type": "error", + "error": gin.H{ + "type": errType, + "message": message, + }, + }) +} diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 73bdba65..db7ab62d 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -888,6 +888,22 @@ func isOpenAIInstructionsRequiredError(upstreamStatusCode int, upstreamMsg strin return false } +// ExtractSessionID extracts the raw session ID from headers or body without hashing. +// Used by ForwardAsAnthropic to pass as prompt_cache_key for upstream cache. +func (s *OpenAIGatewayService) ExtractSessionID(c *gin.Context, body []byte) string { + if c == nil { + return "" + } + sessionID := strings.TrimSpace(c.GetHeader("session_id")) + if sessionID == "" { + sessionID = strings.TrimSpace(c.GetHeader("conversation_id")) + } + if sessionID == "" && len(body) > 0 { + sessionID = strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String()) + } + return sessionID +} + // GenerateSessionHash generates a sticky-session hash for OpenAI requests. // // Priority: diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 92725c07..14064078 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1230,6 +1230,142 @@ + +
+ + +
+

+ {{ t('admin.accounts.openai.modelRestrictionDisabledByPassthrough') }} +

+
+ + +
+
@@ -3603,6 +3739,14 @@ const handleOpenAIExchange = async (authCode: string) => { const shouldCreateOpenAI = form.platform === 'openai' const shouldCreateSora = form.platform === 'sora' + // Add model mapping for OpenAI OAuth accounts(透传模式下不应用) + if (shouldCreateOpenAI && !isOpenAIModelRestrictionDisabled.value) { + const modelMapping = buildModelMappingObject(modelRestrictionMode.value, allowedModels.value, modelMappings.value) + if (modelMapping) { + credentials.model_mapping = modelMapping + } + } + // 应用临时不可调度配置 if (!applyTempUnschedConfig(credentials)) { return @@ -3713,6 +3857,14 @@ const handleOpenAIValidateRT = async (refreshTokenInput: string) => { const oauthExtra = oauthClient.buildExtraInfo(tokenInfo) as Record | undefined const extra = buildOpenAIExtra(oauthExtra) + // Add model mapping for OpenAI OAuth accounts(透传模式下不应用) + if (shouldCreateOpenAI && !isOpenAIModelRestrictionDisabled.value) { + const modelMapping = buildModelMappingObject(modelRestrictionMode.value, allowedModels.value, modelMappings.value) + if (modelMapping) { + credentials.model_mapping = modelMapping + } + } + // Generate account name with index for batch const accountName = refreshTokens.length > 1 ? `${form.name} #${i + 1}` : form.name diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index be7d2d45..406a3d7b 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -351,6 +351,142 @@
+ +
+ + +
+

+ {{ t('admin.accounts.openai.modelRestrictionDisabledByPassthrough') }} +

+
+ + +
+
@@ -1659,9 +1795,33 @@ watch( ? 'https://generativelanguage.googleapis.com' : 'https://api.anthropic.com' editBaseUrl.value = platformDefaultUrl - modelRestrictionMode.value = 'whitelist' - modelMappings.value = [] - allowedModels.value = [] + + // Load model mappings for OpenAI OAuth accounts + if (newAccount.platform === 'openai' && newAccount.credentials) { + const oauthCredentials = newAccount.credentials as Record + const existingMappings = oauthCredentials.model_mapping as Record | undefined + if (existingMappings && typeof existingMappings === 'object') { + const entries = Object.entries(existingMappings) + const isWhitelistMode = entries.length > 0 && entries.every(([from, to]) => from === to) + if (isWhitelistMode) { + modelRestrictionMode.value = 'whitelist' + allowedModels.value = entries.map(([from]) => from) + modelMappings.value = [] + } else { + modelRestrictionMode.value = 'mapping' + modelMappings.value = entries.map(([from, to]) => ({ from, to })) + allowedModels.value = [] + } + } else { + modelRestrictionMode.value = 'whitelist' + modelMappings.value = [] + allowedModels.value = [] + } + } else { + modelRestrictionMode.value = 'whitelist' + modelMappings.value = [] + allowedModels.value = [] + } customErrorCodesEnabled.value = false selectedErrorCodes.value = [] } @@ -2163,6 +2323,28 @@ const handleSubmit = async () => { updatePayload.credentials = newCredentials } + // OpenAI OAuth: persist model mapping to credentials + if (props.account.platform === 'openai' && props.account.type === 'oauth') { + const currentCredentials = (updatePayload.credentials as Record) || + ((props.account.credentials as Record) || {}) + const newCredentials: Record = { ...currentCredentials } + const shouldApplyModelMapping = !openaiPassthroughEnabled.value + + if (shouldApplyModelMapping) { + const modelMapping = buildModelMappingObject(modelRestrictionMode.value, allowedModels.value, modelMappings.value) + if (modelMapping) { + newCredentials.model_mapping = modelMapping + } else { + delete newCredentials.model_mapping + } + } else if (currentCredentials.model_mapping) { + // 透传模式保留现有映射 + newCredentials.model_mapping = currentCredentials.model_mapping + } + + updatePayload.credentials = newCredentials + } + // Antigravity: persist model mapping to credentials (applies to all antigravity types) // Antigravity 只支持映射模式 if (props.account.platform === 'antigravity') {