package service import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" "github.com/gin-gonic/gin" "go.uber.org/zap" ) // ForwardAsAnthropic accepts an Anthropic Messages request body, converts it // to OpenAI Responses API format, forwards to the OpenAI upstream, and converts // the response back to Anthropic Messages format. This enables Claude Code // clients to access OpenAI models through the standard /v1/messages endpoint. func (s *OpenAIGatewayService) ForwardAsAnthropic( ctx context.Context, c *gin.Context, account *Account, body []byte, promptCacheKey string, ) (*OpenAIForwardResult, error) { startTime := time.Now() // 1. Parse Anthropic request var anthropicReq apicompat.AnthropicRequest if err := json.Unmarshal(body, &anthropicReq); err != nil { return nil, fmt.Errorf("parse anthropic request: %w", err) } originalModel := anthropicReq.Model isStream := anthropicReq.Stream // 2. Convert Anthropic → Responses responsesReq, err := apicompat.AnthropicToResponses(&anthropicReq) if err != nil { return nil, fmt.Errorf("convert anthropic to responses: %w", err) } // 3. Model mapping mappedModel := account.GetMappedModel(originalModel) responsesReq.Model = mappedModel logger.L().Debug("openai messages: model mapping applied", zap.Int64("account_id", account.ID), zap.String("original_model", originalModel), zap.String("mapped_model", mappedModel), zap.Bool("stream", isStream), ) // 4. Marshal Responses request body, then apply OAuth codex transform responsesBody, err := json.Marshal(responsesReq) if err != nil { return nil, fmt.Errorf("marshal responses request: %w", err) } if account.Type == AccountTypeOAuth { var reqBody map[string]any if err := json.Unmarshal(responsesBody, &reqBody); err != nil { return nil, fmt.Errorf("unmarshal for codex transform: %w", err) } applyCodexOAuthTransform(reqBody, false, false) // OAuth codex transform forces stream=true upstream, so always use // the streaming response handler regardless of what the client asked. isStream = true responsesBody, err = json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("remarshal after codex transform: %w", err) } } // 5. Get access token token, _, err := s.GetAccessToken(ctx, account) if err != nil { return nil, fmt.Errorf("get access token: %w", err) } // 6. Build upstream request upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, responsesBody, token, isStream, promptCacheKey, false) if err != nil { return nil, fmt.Errorf("build upstream request: %w", err) } // 7. Send request proxyURL := "" if account.Proxy != nil { proxyURL = account.Proxy.URL() } resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { safeErr := sanitizeUpstreamErrorMessage(err.Error()) setOpsUpstreamError(c, 0, safeErr, "") appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, AccountID: account.ID, AccountName: account.Name, UpstreamStatusCode: 0, Kind: "request_error", Message: safeErr, }) writeAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed") return nil, fmt.Errorf("upstream request failed: %s", safeErr) } defer func() { _ = resp.Body.Close() }() // 8. Handle error response with failover if resp.StatusCode >= 400 { if s.shouldFailoverUpstreamError(resp.StatusCode) { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) upstreamDetail := "" if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes if maxBytes <= 0 { maxBytes = 2048 } upstreamDetail = truncateString(string(respBody), maxBytes) } appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, AccountID: account.ID, AccountName: account.Name, UpstreamStatusCode: resp.StatusCode, UpstreamRequestID: resp.Header.Get("x-request-id"), Kind: "failover", Message: upstreamMsg, Detail: upstreamDetail, }) if s.rateLimitService != nil { s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) } return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} } // Non-failover error: return Anthropic-formatted error to client return s.handleAnthropicErrorResponse(resp, c, account) } // 9. Handle normal response var result *OpenAIForwardResult var handleErr error if isStream { result, handleErr = s.handleAnthropicStreamingResponse(resp, c, originalModel, mappedModel, startTime) } else { result, handleErr = s.handleAnthropicNonStreamingResponse(resp, c, originalModel, mappedModel, startTime) } // Extract and save Codex usage snapshot from response headers (for OAuth accounts) if handleErr == nil && account.Type == AccountTypeOAuth { if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { s.updateCodexUsageSnapshot(ctx, account.ID, snapshot) } } return result, handleErr } // handleAnthropicErrorResponse reads an upstream error and returns it in // Anthropic error format. func (s *OpenAIGatewayService) handleAnthropicErrorResponse( resp *http.Response, c *gin.Context, account *Account, ) (*OpenAIForwardResult, error) { body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) if upstreamMsg == "" { upstreamMsg = fmt.Sprintf("Upstream error: %d", resp.StatusCode) } upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) // Record upstream error details for ops logging upstreamDetail := "" if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes if maxBytes <= 0 { maxBytes = 2048 } upstreamDetail = truncateString(string(body), maxBytes) } setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) // Apply error passthrough rules (matches handleErrorResponse pattern in openai_gateway_service.go) if status, errType, errMsg, matched := applyErrorPassthroughRule( c, account.Platform, resp.StatusCode, body, http.StatusBadGateway, "api_error", "Upstream request failed", ); matched { writeAnthropicError(c, status, errType, errMsg) if upstreamMsg == "" { upstreamMsg = errMsg } if upstreamMsg == "" { return nil, fmt.Errorf("upstream error: %d (passthrough rule matched)", resp.StatusCode) } return nil, fmt.Errorf("upstream error: %d (passthrough rule matched) message=%s", resp.StatusCode, upstreamMsg) } errType := "api_error" switch { case resp.StatusCode == 400: errType = "invalid_request_error" case resp.StatusCode == 404: errType = "not_found_error" case resp.StatusCode == 429: errType = "rate_limit_error" case resp.StatusCode >= 500: errType = "api_error" } writeAnthropicError(c, resp.StatusCode, errType, upstreamMsg) return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg) } // handleAnthropicNonStreamingResponse reads a Responses API JSON response, // converts it to Anthropic Messages format, and writes it to the client. func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse( resp *http.Response, c *gin.Context, originalModel string, mappedModel string, startTime time.Time, ) (*OpenAIForwardResult, error) { requestID := resp.Header.Get("x-request-id") respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("read upstream response: %w", err) } var responsesResp apicompat.ResponsesResponse if err := json.Unmarshal(respBody, &responsesResp); err != nil { return nil, fmt.Errorf("parse responses response: %w", err) } anthropicResp := apicompat.ResponsesToAnthropic(&responsesResp, originalModel) var usage OpenAIUsage if responsesResp.Usage != nil { usage = OpenAIUsage{ InputTokens: responsesResp.Usage.InputTokens, OutputTokens: responsesResp.Usage.OutputTokens, } if responsesResp.Usage.InputTokensDetails != nil { usage.CacheReadInputTokens = responsesResp.Usage.InputTokensDetails.CachedTokens } } if s.responseHeaderFilter != nil { responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) } c.JSON(http.StatusOK, anthropicResp) return &OpenAIForwardResult{ RequestID: requestID, Usage: usage, Model: originalModel, BillingModel: mappedModel, Stream: false, Duration: time.Since(startTime), }, nil } // handleAnthropicStreamingResponse reads Responses SSE events from upstream, // converts each to Anthropic SSE events, and writes them to the client. func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( resp *http.Response, c *gin.Context, originalModel string, mappedModel string, startTime time.Time, ) (*OpenAIForwardResult, error) { requestID := resp.Header.Get("x-request-id") if s.responseHeaderFilter != nil { responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) } c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.WriteHeader(http.StatusOK) state := apicompat.NewResponsesEventToAnthropicState() state.Model = originalModel var usage OpenAIUsage var firstTokenMs *int firstChunk := true scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" { continue } payload := line[6:] if firstChunk { firstChunk = false ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } // Parse the Responses SSE event var event apicompat.ResponsesStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { logger.L().Warn("openai messages stream: failed to parse event", zap.Error(err), zap.String("request_id", requestID), ) continue } // Extract usage from completion events if (event.Type == "response.completed" || event.Type == "response.incomplete" || event.Type == "response.failed") && event.Response != nil && event.Response.Usage != nil { usage = OpenAIUsage{ InputTokens: event.Response.Usage.InputTokens, OutputTokens: event.Response.Usage.OutputTokens, } if event.Response.Usage.InputTokensDetails != nil { usage.CacheReadInputTokens = event.Response.Usage.InputTokensDetails.CachedTokens } } // Convert to Anthropic events events := apicompat.ResponsesEventToAnthropicEvents(&event, state) for _, evt := range events { sse, err := apicompat.ResponsesAnthropicEventToSSE(evt) if err != nil { logger.L().Warn("openai messages stream: failed to marshal event", zap.Error(err), zap.String("request_id", requestID), ) continue } if _, err := fmt.Fprint(c.Writer, sse); err != nil { // Client disconnected — return collected usage logger.L().Info("openai messages stream: client disconnected", zap.String("request_id", requestID), ) return &OpenAIForwardResult{ RequestID: requestID, Usage: usage, Model: originalModel, BillingModel: mappedModel, Stream: true, Duration: time.Since(startTime), FirstTokenMs: firstTokenMs, }, nil } } if len(events) > 0 { c.Writer.Flush() } } if err := scanner.Err(); err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { logger.L().Warn("openai messages stream: read error", zap.Error(err), zap.String("request_id", requestID), ) } } // Ensure the Anthropic stream is properly terminated if finalEvents := apicompat.FinalizeResponsesAnthropicStream(state); len(finalEvents) > 0 { for _, evt := range finalEvents { sse, err := apicompat.ResponsesAnthropicEventToSSE(evt) if err != nil { continue } fmt.Fprint(c.Writer, sse) //nolint:errcheck } c.Writer.Flush() } return &OpenAIForwardResult{ RequestID: requestID, Usage: usage, Model: originalModel, BillingModel: mappedModel, Stream: true, Duration: time.Since(startTime), FirstTokenMs: firstTokenMs, }, nil } // writeAnthropicError writes an error response in Anthropic Messages API format. func writeAnthropicError(c *gin.Context, statusCode int, errType, message string) { c.JSON(statusCode, gin.H{ "type": "error", "error": gin.H{ "type": errType, "message": message, }, }) }