Files
sub2api/backend/internal/service/gateway_forward_as_responses.go
keh4l 6e12578bc5 feat(gateway): port Parrot tool-name obfuscation + message cache breakpoints
Implements the remaining three parity items with Parrot cc_mimicry:

  D) Tool-name obfuscation
     - Dynamic mapping when tools.length > 5 (matches Parrot threshold).
       Fake names follow {prefix}{name[:3]}{i:02d} (e.g. 'manage_bas00').
       Go port of random.Random(hash(tuple(names))) uses fnv64a seed +
       math/rand; byte-exact reproduction is impossible (Python hash vs
       Go hash), but the two invariants that matter are preserved:
         * same input tool_names yield identical mapping (cache hit)
         * prefix pool is shuffled (names look distributed)
     - Static prefix map (sessions_ -> cc_sess_, session_ -> cc_ses_)
       applied as fallback, matching Parrot TOOL_NAME_REWRITES verbatim.
     - Server tools (web_search_20250305, computer_*, etc.) are NOT
       renamed; only type=='function' and type=='custom' tools are.
     - tool_choice.name is rewritten in sync (only when type=='tool').
     - Response side: bytes-level replace on every SSE chunk / JSON
       body at 6 injection points (standard stream/non-stream,
       passthrough stream/non-stream, chat_completions stream +
       non-stream, responses stream + non-stream). Reverse mapping
       applied longest-fake-name-first to prevent substring conflicts
       (parity with Parrot _restore_tool_names_in_chunk).
     - tool_choice is no longer unconditionally deleted in
       normalizeClaudeOAuthRequestBody — Parrot passes it through.

  E) tools[-1] cache_control breakpoint
     - Injected as {type:ephemeral, ttl:<DefaultCacheControlTTL>} when
       the last tool has no cache_control. Client-provided ttl is
       passed through unchanged (repo-wide policy).

  F) messages cache_control strategy
     - stripMessageCacheControl removes every client-provided
       messages[*].content[*].cache_control (multi-turn stability).
     - addMessageCacheBreakpoints then injects two stable breakpoints:
       (1) last message, and (2) second-to-last user turn when
       messages.length >= 4.
     - Combined with the system block breakpoint and tools[-1]
       breakpoint, this gives exactly the 4 breakpoints Anthropic
       allows per request.

Non-trivial implementation details to be aware of when rebasing:

  * Two new files, no upstream collision:
      gateway_tool_rewrite.go       (D + E algorithms)
      gateway_messages_cache.go     (F strip + breakpoints)
  * Two new feature calls bolted onto the tail of
    applyClaudeCodeOAuthMimicryToBody in gateway_service.go — rebase
    conflicts will be ~10 lines maximum.
  * Response-side injection points all wrap their existing write with
    reverseToolNamesIfPresent(c, ...), preserving original behavior
    when no mapping is stored (static prefix rollback still runs).
  * Non-stream chat/responses switched from c.JSON to
    json.Marshal + c.Data so bytes-level replace is possible.
  * Retry bodies (FilterThinkingBlocksForRetry,
    FilterSignatureSensitiveBlocksForRetry, RectifyThinkingBudget)
    only prune blocks — they preserve the already-obfuscated tool
    names, so no extra mapping re-application is needed.

Manual QA: end-to-end scenario verified with 6 tools (above threshold)
and tool_choice.type=='tool'. Obfuscation + restore roundtrip shown
in test logs; then removed the temp test file.

Tests (16 new):
  - buildDynamicToolMap stability + below-threshold guard
  - sanitizeToolName precedence (dynamic > static)
  - restoreToolNamesInBytes longest-first + static rollback
  - applyToolNameRewriteToBody skips server tools + syncs tool_choice
  - applyToolsLastCacheBreakpoint defaults to 5m + passes client ttl
  - stripMessageCacheControl + addMessageCacheBreakpoints in the
    1/4/string-content cases + second-to-last user turn selection
  - buildToolNameRewriteFromBody ReverseOrdered is desc-by-fake-length
  - fake name shape follows Parrot {prefix}{head3}{i:02d}
2026-04-24 23:16:32 +08:00

527 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/gin-gonic/gin"
"github.com/tidwall/gjson"
"go.uber.org/zap"
)
// ForwardAsResponses accepts an OpenAI Responses API request body, converts it
// to Anthropic Messages format, forwards to the Anthropic upstream, and converts
// the response back to Responses format. This enables OpenAI Responses API
// clients to access Anthropic models through Anthropic platform groups.
//
// The method follows the same pattern as OpenAIGatewayService.ForwardAsAnthropic
// but in reverse direction: Responses → Anthropic upstream → Responses.
func (s *GatewayService) ForwardAsResponses(
ctx context.Context,
c *gin.Context,
account *Account,
body []byte,
parsed *ParsedRequest,
) (*ForwardResult, error) {
startTime := time.Now()
// 1. Parse Responses request
var responsesReq apicompat.ResponsesRequest
if err := json.Unmarshal(body, &responsesReq); err != nil {
return nil, fmt.Errorf("parse responses request: %w", err)
}
originalModel := responsesReq.Model
clientStream := responsesReq.Stream
// 2. Convert Responses → Anthropic
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(&responsesReq)
if err != nil {
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
}
// 3. Force upstream streaming (Anthropic works best with streaming)
anthropicReq.Stream = true
reqStream := true
// 4. Model mapping
mappedModel := originalModel
reasoningEffort := ExtractResponsesReasoningEffortFromBody(body)
if account.Type == AccountTypeAPIKey {
mappedModel = account.GetMappedModel(originalModel)
}
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
normalized := claude.NormalizeModelID(originalModel)
if normalized != originalModel {
mappedModel = normalized
}
}
anthropicReq.Model = mappedModel
logger.L().Debug("gateway forward_as_responses: model mapping applied",
zap.Int64("account_id", account.ID),
zap.String("original_model", originalModel),
zap.String("mapped_model", mappedModel),
zap.Bool("client_stream", clientStream),
)
// 5. Marshal Anthropic request body
anthropicBody, err := json.Marshal(anthropicReq)
if err != nil {
return nil, fmt.Errorf("marshal anthropic request: %w", err)
}
// 6. Apply Claude Code mimicry for OAuth accounts (non-Claude-Code endpoints).
// OpenAI Responses 协议进来的请求永远不是 Claude Code 客户端,所以对 OAuth 账号
// 必须完整执行 /v1/messages 主路径上的伪装链路system 重写 + normalize + metadata 注入),
// 否则会被 Anthropic 判为第三方应用并扣 extra usage。
// 见 applyClaudeCodeOAuthMimicryToBody 的 godoc。
isClaudeCode := false
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
if shouldMimicClaudeCode {
anthropicBody = s.applyClaudeCodeOAuthMimicryToBody(ctx, c, account, anthropicBody, anthropicReq.System, mappedModel)
}
// 7. Enforce cache_control block limit
anthropicBody = enforceCacheControlLimit(anthropicBody)
// 8. Get access token
token, tokenType, err := s.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
// 9. Get proxy URL
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
// 10. Build upstream request
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
releaseUpstreamCtx()
if err != nil {
return nil, fmt.Errorf("build upstream request: %w", err)
}
// 11. Send request
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account))
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// 12. Handle error response with failover
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
})
if s.rateLimitService != nil {
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
}
}
// Non-failover error: return Responses-formatted error to client
writeResponsesError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
}
// 13. Handle normal response (convert Anthropic → Responses)
var result *ForwardResult
var handleErr error
if clientStream {
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
} else {
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
}
return result, handleErr
}
// ExtractResponsesReasoningEffortFromBody reads Responses API reasoning.effort
// and normalizes it for usage logging.
func ExtractResponsesReasoningEffortFromBody(body []byte) *string {
raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String())
if raw == "" {
return nil
}
normalized := normalizeOpenAIReasoningEffort(raw)
if normalized == "" {
return nil
}
return &normalized
}
func mergeAnthropicUsage(dst *ClaudeUsage, src apicompat.AnthropicUsage) {
if dst == nil {
return
}
if src.InputTokens > 0 {
dst.InputTokens = src.InputTokens
}
if src.OutputTokens > 0 {
dst.OutputTokens = src.OutputTokens
}
if src.CacheReadInputTokens > 0 {
dst.CacheReadInputTokens = src.CacheReadInputTokens
}
if src.CacheCreationInputTokens > 0 {
dst.CacheCreationInputTokens = src.CacheCreationInputTokens
}
}
// handleResponsesBufferedStreamingResponse reads all Anthropic SSE events from
// the upstream streaming response, assembles them into a complete Anthropic
// response, converts to Responses API JSON format, and writes it to the client.
func (s *GatewayService) handleResponsesBufferedStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
reasoningEffort *string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
// Accumulate the final Anthropic response from streaming events
var finalResp *apicompat.AnthropicResponse
var usage ClaudeUsage
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
eventType := strings.TrimPrefix(line, "event: ")
// Read the data line
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
logger.L().Warn("forward_as_responses buffered: failed to parse event",
zap.Error(err),
zap.String("request_id", requestID),
zap.String("event_type", eventType),
)
continue
}
// message_start carries the initial response structure
if event.Type == "message_start" && event.Message != nil {
finalResp = event.Message
mergeAnthropicUsage(&usage, event.Message.Usage)
}
// message_delta carries final usage and stop_reason
if event.Type == "message_delta" {
if event.Usage != nil {
mergeAnthropicUsage(&usage, *event.Usage)
}
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
finalResp.StopReason = event.Delta.StopReason
}
}
// Accumulate content blocks
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
}
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
idx := *event.Index
if idx < len(finalResp.Content) {
switch event.Delta.Type {
case "text_delta":
finalResp.Content[idx].Text += event.Delta.Text
case "thinking_delta":
finalResp.Content[idx].Thinking += event.Delta.Thinking
case "input_json_delta":
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
}
}
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_responses buffered: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
if finalResp == nil {
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
return nil, fmt.Errorf("upstream stream ended without response")
}
// Update usage from accumulated delta
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
finalResp.Usage = apicompat.AnthropicUsage{
InputTokens: usage.InputTokens,
OutputTokens: usage.OutputTokens,
CacheCreationInputTokens: usage.CacheCreationInputTokens,
CacheReadInputTokens: usage.CacheReadInputTokens,
}
}
// Convert to Responses format
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
responsesResp.Model = originalModel // Use original model name
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
if respBytes, err := json.Marshal(responsesResp); err == nil {
respBytes = reverseToolNamesIfPresent(c, respBytes)
c.Data(http.StatusOK, "application/json; charset=utf-8", respBytes)
} else {
c.JSON(http.StatusOK, responsesResp)
}
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
ReasoningEffort: reasoningEffort,
Stream: false,
Duration: time.Since(startTime),
}, nil
}
// handleResponsesStreamingResponse reads Anthropic SSE events from upstream,
// converts each to Responses SSE events, and writes them to the client.
func (s *GatewayService) handleResponsesStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel string,
mappedModel string,
reasoningEffort *string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
if s.responseHeaderFilter != nil {
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
}
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Writer.WriteHeader(http.StatusOK)
state := apicompat.NewAnthropicEventToResponsesState()
state.Model = originalModel
var usage ClaudeUsage
var firstTokenMs *int
firstChunk := true
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
resultWithUsage := func() *ForwardResult {
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
ReasoningEffort: reasoningEffort,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}
}
// processEvent handles a single parsed Anthropic SSE event.
processEvent := func(event *apicompat.AnthropicStreamEvent) bool {
if firstChunk {
firstChunk = false
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
// Extract usage from message_delta
if event.Type == "message_delta" && event.Usage != nil {
mergeAnthropicUsage(&usage, *event.Usage)
}
// Also capture usage from message_start
if event.Type == "message_start" && event.Message != nil {
mergeAnthropicUsage(&usage, event.Message.Usage)
}
// Convert to Responses events
events := apicompat.AnthropicEventToResponsesEvents(event, state)
for _, evt := range events {
sse, err := apicompat.ResponsesEventToSSE(evt)
if err != nil {
logger.L().Warn("forward_as_responses stream: failed to marshal event",
zap.Error(err),
zap.String("request_id", requestID),
)
continue
}
out := string(reverseToolNamesIfPresent(c, []byte(sse)))
if _, err := fmt.Fprint(c.Writer, out); err != nil {
logger.L().Info("forward_as_responses stream: client disconnected",
zap.String("request_id", requestID),
)
return true // client disconnected
}
}
if len(events) > 0 {
c.Writer.Flush()
}
return false
}
finalizeStream := func() (*ForwardResult, error) {
if finalEvents := apicompat.FinalizeAnthropicResponsesStream(state); len(finalEvents) > 0 {
for _, evt := range finalEvents {
sse, err := apicompat.ResponsesEventToSSE(evt)
if err != nil {
continue
}
out := string(reverseToolNamesIfPresent(c, []byte(sse)))
fmt.Fprint(c.Writer, out) //nolint:errcheck
}
c.Writer.Flush()
}
return resultWithUsage(), nil
}
// Read Anthropic SSE events
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "event: ") {
continue
}
eventType := strings.TrimPrefix(line, "event: ")
// Read data line
if !scanner.Scan() {
break
}
dataLine := scanner.Text()
if !strings.HasPrefix(dataLine, "data: ") {
continue
}
payload := dataLine[6:]
var event apicompat.AnthropicStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
logger.L().Warn("forward_as_responses stream: failed to parse event",
zap.Error(err),
zap.String("request_id", requestID),
zap.String("event_type", eventType),
)
continue
}
if processEvent(&event) {
return resultWithUsage(), nil
}
}
if err := scanner.Err(); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.L().Warn("forward_as_responses stream: read error",
zap.Error(err),
zap.String("request_id", requestID),
)
}
}
return finalizeStream()
}
// appendRawJSON appends a JSON fragment string to existing raw JSON.
func appendRawJSON(existing json.RawMessage, fragment string) json.RawMessage {
if len(existing) == 0 {
return json.RawMessage(fragment)
}
return json.RawMessage(string(existing) + fragment)
}
// writeResponsesError writes an error response in OpenAI Responses API format.
func writeResponsesError(c *gin.Context, statusCode int, code, message string) {
c.JSON(statusCode, gin.H{
"error": gin.H{
"code": code,
"message": message,
},
})
}
// mapUpstreamStatusCode maps upstream HTTP status codes to appropriate client-facing codes.
func mapUpstreamStatusCode(code int) int {
if code >= 500 {
return http.StatusBadGateway
}
return code
}