package service import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" "github.com/Wei-Shaw/sub2api/internal/pkg/claude" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" "github.com/gin-gonic/gin" "github.com/tidwall/gjson" "go.uber.org/zap" ) // ForwardAsChatCompletions accepts an OpenAI Chat Completions API request body, // converts it to Anthropic Messages format (chained via Responses format), // forwards to the Anthropic upstream, and converts the response back to Chat // Completions format. This enables Chat Completions clients to access Anthropic // models through Anthropic platform groups. func (s *GatewayService) ForwardAsChatCompletions( ctx context.Context, c *gin.Context, account *Account, body []byte, parsed *ParsedRequest, ) (*ForwardResult, error) { startTime := time.Now() // 1. Parse Chat Completions request var ccReq apicompat.ChatCompletionsRequest if err := json.Unmarshal(body, &ccReq); err != nil { return nil, fmt.Errorf("parse chat completions request: %w", err) } originalModel := ccReq.Model clientStream := ccReq.Stream includeUsage := ccReq.StreamOptions != nil && ccReq.StreamOptions.IncludeUsage // 2. Convert CC → Responses → Anthropic (chained conversion) responsesReq, err := apicompat.ChatCompletionsToResponses(&ccReq) if err != nil { return nil, fmt.Errorf("convert chat completions to responses: %w", err) } anthropicReq, err := apicompat.ResponsesToAnthropicRequest(responsesReq) if err != nil { return nil, fmt.Errorf("convert responses to anthropic: %w", err) } // 3. Force upstream streaming anthropicReq.Stream = true reqStream := true // 4. Model mapping mappedModel := originalModel if account.Type == AccountTypeAPIKey { mappedModel = account.GetMappedModel(originalModel) } if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey { normalized := claude.NormalizeModelID(originalModel) if normalized != originalModel { mappedModel = normalized } } anthropicReq.Model = mappedModel logger.L().Debug("gateway forward_as_chat_completions: model mapping applied", zap.Int64("account_id", account.ID), zap.String("original_model", originalModel), zap.String("mapped_model", mappedModel), zap.Bool("client_stream", clientStream), ) // 5. Marshal Anthropic request body anthropicBody, err := json.Marshal(anthropicReq) if err != nil { return nil, fmt.Errorf("marshal anthropic request: %w", err) } // 6. Apply Claude Code mimicry for OAuth accounts isClaudeCode := false // CC API is never Claude Code shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode if shouldMimicClaudeCode { if !strings.Contains(strings.ToLower(mappedModel), "haiku") && !systemIncludesClaudeCodePrompt(anthropicReq.System) { anthropicBody = injectClaudeCodePrompt(anthropicBody, anthropicReq.System) } } // 7. Enforce cache_control block limit anthropicBody = enforceCacheControlLimit(anthropicBody) // 8. Get access token token, tokenType, err := s.GetAccessToken(ctx, account) if err != nil { return nil, fmt.Errorf("get access token: %w", err) } // 9. Get proxy URL proxyURL := "" if account.ProxyID != nil && account.Proxy != nil { proxyURL = account.Proxy.URL() } // 10. Build upstream request upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream) upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode) releaseUpstreamCtx() if err != nil { return nil, fmt.Errorf("build upstream request: %w", err) } // 11. Send request resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled()) if err != nil { if resp != nil && resp.Body != nil { _ = resp.Body.Close() } safeErr := sanitizeUpstreamErrorMessage(err.Error()) setOpsUpstreamError(c, 0, safeErr, "") appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, AccountID: account.ID, AccountName: account.Name, UpstreamStatusCode: 0, Kind: "request_error", Message: safeErr, }) writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed") return nil, fmt.Errorf("upstream request failed: %s", safeErr) } defer func() { _ = resp.Body.Close() }() // 12. Handle error response with failover if resp.StatusCode >= 400 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() resp.Body = io.NopCloser(bytes.NewReader(respBody)) upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) if s.shouldFailoverUpstreamError(resp.StatusCode) { appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, AccountID: account.ID, AccountName: account.Name, UpstreamStatusCode: resp.StatusCode, UpstreamRequestID: resp.Header.Get("x-request-id"), Kind: "failover", Message: upstreamMsg, }) if s.rateLimitService != nil { s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) } return nil, &UpstreamFailoverError{ StatusCode: resp.StatusCode, ResponseBody: respBody, } } writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg) return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg) } // 13. Extract reasoning effort from CC request body reasoningEffort := extractCCReasoningEffortFromBody(body) // 14. Handle normal response // Read Anthropic SSE → convert to Responses events → convert to CC format var result *ForwardResult var handleErr error if clientStream { result, handleErr = s.handleCCStreamingFromAnthropic(resp, c, originalModel, mappedModel, reasoningEffort, startTime, includeUsage) } else { result, handleErr = s.handleCCBufferedFromAnthropic(resp, c, originalModel, mappedModel, reasoningEffort, startTime) } return result, handleErr } // extractCCReasoningEffortFromBody reads reasoning effort from a Chat Completions // request body. It checks both nested (reasoning.effort) and flat (reasoning_effort) // formats used by OpenAI-compatible clients. func extractCCReasoningEffortFromBody(body []byte) *string { raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String()) if raw == "" { raw = strings.TrimSpace(gjson.GetBytes(body, "reasoning_effort").String()) } if raw == "" { return nil } normalized := normalizeOpenAIReasoningEffort(raw) if normalized == "" { return nil } return &normalized } // handleCCBufferedFromAnthropic reads Anthropic SSE events, assembles the full // response, then converts Anthropic → Responses → Chat Completions. func (s *GatewayService) handleCCBufferedFromAnthropic( resp *http.Response, c *gin.Context, originalModel string, mappedModel string, reasoningEffort *string, startTime time.Time, ) (*ForwardResult, error) { requestID := resp.Header.Get("x-request-id") scanner := bufio.NewScanner(resp.Body) maxLineSize := defaultMaxLineSize if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { maxLineSize = s.cfg.Gateway.MaxLineSize } scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) var finalResp *apicompat.AnthropicResponse var usage ClaudeUsage for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "event: ") { continue } if !scanner.Scan() { break } dataLine := scanner.Text() if !strings.HasPrefix(dataLine, "data: ") { continue } payload := dataLine[6:] var event apicompat.AnthropicStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { continue } // message_start carries the initial response structure and cache usage if event.Type == "message_start" && event.Message != nil { finalResp = event.Message mergeAnthropicUsage(&usage, event.Message.Usage) } // message_delta carries final usage and stop_reason if event.Type == "message_delta" { if event.Usage != nil { mergeAnthropicUsage(&usage, *event.Usage) } if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil { finalResp.StopReason = event.Delta.StopReason } } if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil { finalResp.Content = append(finalResp.Content, *event.ContentBlock) } if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil { idx := *event.Index if idx < len(finalResp.Content) { switch event.Delta.Type { case "text_delta": finalResp.Content[idx].Text += event.Delta.Text case "thinking_delta": finalResp.Content[idx].Thinking += event.Delta.Thinking case "input_json_delta": finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON) } } } } if err := scanner.Err(); err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { logger.L().Warn("forward_as_cc buffered: read error", zap.Error(err), zap.String("request_id", requestID), ) } } if finalResp == nil { writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response") return nil, fmt.Errorf("upstream stream ended without response") } // Update usage from accumulated delta if usage.InputTokens > 0 || usage.OutputTokens > 0 { finalResp.Usage = apicompat.AnthropicUsage{ InputTokens: usage.InputTokens, OutputTokens: usage.OutputTokens, CacheCreationInputTokens: usage.CacheCreationInputTokens, CacheReadInputTokens: usage.CacheReadInputTokens, } } // Chain: Anthropic → Responses → Chat Completions responsesResp := apicompat.AnthropicToResponsesResponse(finalResp) ccResp := apicompat.ResponsesToChatCompletions(responsesResp, originalModel) if s.responseHeaderFilter != nil { responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) } c.JSON(http.StatusOK, ccResp) return &ForwardResult{ RequestID: requestID, Usage: usage, Model: originalModel, UpstreamModel: mappedModel, ReasoningEffort: reasoningEffort, Stream: false, Duration: time.Since(startTime), }, nil } // handleCCStreamingFromAnthropic reads Anthropic SSE events, converts each // to Responses events, then to Chat Completions chunks, and writes them. func (s *GatewayService) handleCCStreamingFromAnthropic( resp *http.Response, c *gin.Context, originalModel string, mappedModel string, reasoningEffort *string, startTime time.Time, includeUsage bool, ) (*ForwardResult, error) { requestID := resp.Header.Get("x-request-id") if s.responseHeaderFilter != nil { responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) } c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("X-Accel-Buffering", "no") c.Writer.WriteHeader(http.StatusOK) // Use Anthropic→Responses state machine, then convert Responses→CC anthState := apicompat.NewAnthropicEventToResponsesState() anthState.Model = originalModel ccState := apicompat.NewResponsesEventToChatState() ccState.Model = originalModel ccState.IncludeUsage = includeUsage var usage ClaudeUsage var firstTokenMs *int firstChunk := true scanner := bufio.NewScanner(resp.Body) maxLineSize := defaultMaxLineSize if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { maxLineSize = s.cfg.Gateway.MaxLineSize } scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) resultWithUsage := func() *ForwardResult { return &ForwardResult{ RequestID: requestID, Usage: usage, Model: originalModel, UpstreamModel: mappedModel, ReasoningEffort: reasoningEffort, Stream: true, Duration: time.Since(startTime), FirstTokenMs: firstTokenMs, } } writeChunk := func(chunk apicompat.ChatCompletionsChunk) bool { sse, err := apicompat.ChatChunkToSSE(chunk) if err != nil { return false } if _, err := fmt.Fprint(c.Writer, sse); err != nil { return true // client disconnected } return false } processAnthropicEvent := func(event *apicompat.AnthropicStreamEvent) bool { if firstChunk { firstChunk = false ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } // Extract usage from message_delta if event.Type == "message_delta" && event.Usage != nil { mergeAnthropicUsage(&usage, *event.Usage) } // Also capture usage from message_start (carries cache fields) if event.Type == "message_start" && event.Message != nil { mergeAnthropicUsage(&usage, event.Message.Usage) } // Chain: Anthropic event → Responses events → CC chunks responsesEvents := apicompat.AnthropicEventToResponsesEvents(event, anthState) for _, resEvt := range responsesEvents { ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState) for _, chunk := range ccChunks { if disconnected := writeChunk(chunk); disconnected { return true } } } c.Writer.Flush() return false } for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "event: ") { continue } if !scanner.Scan() { break } dataLine := scanner.Text() if !strings.HasPrefix(dataLine, "data: ") { continue } payload := dataLine[6:] var event apicompat.AnthropicStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { continue } if processAnthropicEvent(&event) { return resultWithUsage(), nil } } if err := scanner.Err(); err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { logger.L().Warn("forward_as_cc stream: read error", zap.Error(err), zap.String("request_id", requestID), ) } } // Finalize both state machines finalResEvents := apicompat.FinalizeAnthropicResponsesStream(anthState) for _, resEvt := range finalResEvents { ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState) for _, chunk := range ccChunks { writeChunk(chunk) //nolint:errcheck } } finalCCChunks := apicompat.FinalizeResponsesChatStream(ccState) for _, chunk := range finalCCChunks { writeChunk(chunk) //nolint:errcheck } // Write [DONE] marker fmt.Fprint(c.Writer, "data: [DONE]\n\n") //nolint:errcheck c.Writer.Flush() return resultWithUsage(), nil } // writeGatewayCCError writes an error in OpenAI Chat Completions format for // the Anthropic-upstream CC forwarding path. func writeGatewayCCError(c *gin.Context, statusCode int, errType, message string) { c.JSON(statusCode, gin.H{ "error": gin.H{ "type": errType, "message": message, }, }) }