Implements the remaining three parity items with Parrot cc_mimicry:
D) Tool-name obfuscation
- Dynamic mapping when tools.length > 5 (matches Parrot threshold).
Fake names follow {prefix}{name[:3]}{i:02d} (e.g. 'manage_bas00').
Go port of random.Random(hash(tuple(names))) uses fnv64a seed +
math/rand; byte-exact reproduction is impossible (Python hash vs
Go hash), but the two invariants that matter are preserved:
* same input tool_names yield identical mapping (cache hit)
* prefix pool is shuffled (names look distributed)
- Static prefix map (sessions_ -> cc_sess_, session_ -> cc_ses_)
applied as fallback, matching Parrot TOOL_NAME_REWRITES verbatim.
- Server tools (web_search_20250305, computer_*, etc.) are NOT
renamed; only type=='function' and type=='custom' tools are.
- tool_choice.name is rewritten in sync (only when type=='tool').
- Response side: bytes-level replace on every SSE chunk / JSON
body at 6 injection points (standard stream/non-stream,
passthrough stream/non-stream, chat_completions stream +
non-stream, responses stream + non-stream). Reverse mapping
applied longest-fake-name-first to prevent substring conflicts
(parity with Parrot _restore_tool_names_in_chunk).
- tool_choice is no longer unconditionally deleted in
normalizeClaudeOAuthRequestBody — Parrot passes it through.
E) tools[-1] cache_control breakpoint
- Injected as {type:ephemeral, ttl:<DefaultCacheControlTTL>} when
the last tool has no cache_control. Client-provided ttl is
passed through unchanged (repo-wide policy).
F) messages cache_control strategy
- stripMessageCacheControl removes every client-provided
messages[*].content[*].cache_control (multi-turn stability).
- addMessageCacheBreakpoints then injects two stable breakpoints:
(1) last message, and (2) second-to-last user turn when
messages.length >= 4.
- Combined with the system block breakpoint and tools[-1]
breakpoint, this gives exactly the 4 breakpoints Anthropic
allows per request.
Non-trivial implementation details to be aware of when rebasing:
* Two new files, no upstream collision:
gateway_tool_rewrite.go (D + E algorithms)
gateway_messages_cache.go (F strip + breakpoints)
* Two new feature calls bolted onto the tail of
applyClaudeCodeOAuthMimicryToBody in gateway_service.go — rebase
conflicts will be ~10 lines maximum.
* Response-side injection points all wrap their existing write with
reverseToolNamesIfPresent(c, ...), preserving original behavior
when no mapping is stored (static prefix rollback still runs).
* Non-stream chat/responses switched from c.JSON to
json.Marshal + c.Data so bytes-level replace is possible.
* Retry bodies (FilterThinkingBlocksForRetry,
FilterSignatureSensitiveBlocksForRetry, RectifyThinkingBudget)
only prune blocks — they preserve the already-obfuscated tool
names, so no extra mapping re-application is needed.
Manual QA: end-to-end scenario verified with 6 tools (above threshold)
and tool_choice.type=='tool'. Obfuscation + restore roundtrip shown
in test logs; then removed the temp test file.
Tests (16 new):
- buildDynamicToolMap stability + below-threshold guard
- sanitizeToolName precedence (dynamic > static)
- restoreToolNamesInBytes longest-first + static rollback
- applyToolNameRewriteToBody skips server tools + syncs tool_choice
- applyToolsLastCacheBreakpoint defaults to 5m + passes client ttl
- stripMessageCacheControl + addMessageCacheBreakpoints in the
1/4/string-content cases + second-to-last user turn selection
- buildToolNameRewriteFromBody ReverseOrdered is desc-by-fake-length
- fake name shape follows Parrot {prefix}{head3}{i:02d}
527 lines
16 KiB
Go
527 lines
16 KiB
Go
package service
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/tidwall/gjson"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// ForwardAsResponses accepts an OpenAI Responses API request body, converts it
|
||
// to Anthropic Messages format, forwards to the Anthropic upstream, and converts
|
||
// the response back to Responses format. This enables OpenAI Responses API
|
||
// clients to access Anthropic models through Anthropic platform groups.
|
||
//
|
||
// The method follows the same pattern as OpenAIGatewayService.ForwardAsAnthropic
|
||
// but in reverse direction: Responses → Anthropic upstream → Responses.
|
||
func (s *GatewayService) ForwardAsResponses(
|
||
ctx context.Context,
|
||
c *gin.Context,
|
||
account *Account,
|
||
body []byte,
|
||
parsed *ParsedRequest,
|
||
) (*ForwardResult, error) {
|
||
startTime := time.Now()
|
||
|
||
// 1. Parse Responses request
|
||
var responsesReq apicompat.ResponsesRequest
|
||
if err := json.Unmarshal(body, &responsesReq); err != nil {
|
||
return nil, fmt.Errorf("parse responses request: %w", err)
|
||
}
|
||
originalModel := responsesReq.Model
|
||
clientStream := responsesReq.Stream
|
||
|
||
// 2. Convert Responses → Anthropic
|
||
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(&responsesReq)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
|
||
}
|
||
|
||
// 3. Force upstream streaming (Anthropic works best with streaming)
|
||
anthropicReq.Stream = true
|
||
reqStream := true
|
||
|
||
// 4. Model mapping
|
||
mappedModel := originalModel
|
||
reasoningEffort := ExtractResponsesReasoningEffortFromBody(body)
|
||
if account.Type == AccountTypeAPIKey {
|
||
mappedModel = account.GetMappedModel(originalModel)
|
||
}
|
||
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
|
||
normalized := claude.NormalizeModelID(originalModel)
|
||
if normalized != originalModel {
|
||
mappedModel = normalized
|
||
}
|
||
}
|
||
anthropicReq.Model = mappedModel
|
||
|
||
logger.L().Debug("gateway forward_as_responses: model mapping applied",
|
||
zap.Int64("account_id", account.ID),
|
||
zap.String("original_model", originalModel),
|
||
zap.String("mapped_model", mappedModel),
|
||
zap.Bool("client_stream", clientStream),
|
||
)
|
||
|
||
// 5. Marshal Anthropic request body
|
||
anthropicBody, err := json.Marshal(anthropicReq)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("marshal anthropic request: %w", err)
|
||
}
|
||
|
||
// 6. Apply Claude Code mimicry for OAuth accounts (non-Claude-Code endpoints).
|
||
// OpenAI Responses 协议进来的请求永远不是 Claude Code 客户端,所以对 OAuth 账号
|
||
// 必须完整执行 /v1/messages 主路径上的伪装链路(system 重写 + normalize + metadata 注入),
|
||
// 否则会被 Anthropic 判为第三方应用并扣 extra usage。
|
||
// 见 applyClaudeCodeOAuthMimicryToBody 的 godoc。
|
||
isClaudeCode := false
|
||
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
|
||
|
||
if shouldMimicClaudeCode {
|
||
anthropicBody = s.applyClaudeCodeOAuthMimicryToBody(ctx, c, account, anthropicBody, anthropicReq.System, mappedModel)
|
||
}
|
||
|
||
// 7. Enforce cache_control block limit
|
||
anthropicBody = enforceCacheControlLimit(anthropicBody)
|
||
|
||
// 8. Get access token
|
||
token, tokenType, err := s.GetAccessToken(ctx, account)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("get access token: %w", err)
|
||
}
|
||
|
||
// 9. Get proxy URL
|
||
proxyURL := ""
|
||
if account.ProxyID != nil && account.Proxy != nil {
|
||
proxyURL = account.Proxy.URL()
|
||
}
|
||
|
||
// 10. Build upstream request
|
||
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
|
||
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
|
||
releaseUpstreamCtx()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("build upstream request: %w", err)
|
||
}
|
||
|
||
// 11. Send request
|
||
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account))
|
||
if err != nil {
|
||
if resp != nil && resp.Body != nil {
|
||
_ = resp.Body.Close()
|
||
}
|
||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||
setOpsUpstreamError(c, 0, safeErr, "")
|
||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||
Platform: account.Platform,
|
||
AccountID: account.ID,
|
||
AccountName: account.Name,
|
||
UpstreamStatusCode: 0,
|
||
Kind: "request_error",
|
||
Message: safeErr,
|
||
})
|
||
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
|
||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||
}
|
||
defer func() { _ = resp.Body.Close() }()
|
||
|
||
// 12. Handle error response with failover
|
||
if resp.StatusCode >= 400 {
|
||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||
_ = resp.Body.Close()
|
||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||
|
||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||
|
||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||
Platform: account.Platform,
|
||
AccountID: account.ID,
|
||
AccountName: account.Name,
|
||
UpstreamStatusCode: resp.StatusCode,
|
||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||
Kind: "failover",
|
||
Message: upstreamMsg,
|
||
})
|
||
if s.rateLimitService != nil {
|
||
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||
}
|
||
return nil, &UpstreamFailoverError{
|
||
StatusCode: resp.StatusCode,
|
||
ResponseBody: respBody,
|
||
}
|
||
}
|
||
|
||
// Non-failover error: return Responses-formatted error to client
|
||
writeResponsesError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
|
||
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
|
||
}
|
||
|
||
// 13. Handle normal response (convert Anthropic → Responses)
|
||
var result *ForwardResult
|
||
var handleErr error
|
||
if clientStream {
|
||
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
|
||
} else {
|
||
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
|
||
}
|
||
|
||
return result, handleErr
|
||
}
|
||
|
||
// ExtractResponsesReasoningEffortFromBody reads Responses API reasoning.effort
|
||
// and normalizes it for usage logging.
|
||
func ExtractResponsesReasoningEffortFromBody(body []byte) *string {
|
||
raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String())
|
||
if raw == "" {
|
||
return nil
|
||
}
|
||
normalized := normalizeOpenAIReasoningEffort(raw)
|
||
if normalized == "" {
|
||
return nil
|
||
}
|
||
return &normalized
|
||
}
|
||
|
||
func mergeAnthropicUsage(dst *ClaudeUsage, src apicompat.AnthropicUsage) {
|
||
if dst == nil {
|
||
return
|
||
}
|
||
if src.InputTokens > 0 {
|
||
dst.InputTokens = src.InputTokens
|
||
}
|
||
if src.OutputTokens > 0 {
|
||
dst.OutputTokens = src.OutputTokens
|
||
}
|
||
if src.CacheReadInputTokens > 0 {
|
||
dst.CacheReadInputTokens = src.CacheReadInputTokens
|
||
}
|
||
if src.CacheCreationInputTokens > 0 {
|
||
dst.CacheCreationInputTokens = src.CacheCreationInputTokens
|
||
}
|
||
}
|
||
|
||
// handleResponsesBufferedStreamingResponse reads all Anthropic SSE events from
|
||
// the upstream streaming response, assembles them into a complete Anthropic
|
||
// response, converts to Responses API JSON format, and writes it to the client.
|
||
func (s *GatewayService) handleResponsesBufferedStreamingResponse(
|
||
resp *http.Response,
|
||
c *gin.Context,
|
||
originalModel string,
|
||
mappedModel string,
|
||
reasoningEffort *string,
|
||
startTime time.Time,
|
||
) (*ForwardResult, error) {
|
||
requestID := resp.Header.Get("x-request-id")
|
||
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
maxLineSize := defaultMaxLineSize
|
||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||
}
|
||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||
|
||
// Accumulate the final Anthropic response from streaming events
|
||
var finalResp *apicompat.AnthropicResponse
|
||
var usage ClaudeUsage
|
||
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
if !strings.HasPrefix(line, "event: ") {
|
||
continue
|
||
}
|
||
eventType := strings.TrimPrefix(line, "event: ")
|
||
|
||
// Read the data line
|
||
if !scanner.Scan() {
|
||
break
|
||
}
|
||
dataLine := scanner.Text()
|
||
if !strings.HasPrefix(dataLine, "data: ") {
|
||
continue
|
||
}
|
||
payload := dataLine[6:]
|
||
|
||
var event apicompat.AnthropicStreamEvent
|
||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||
logger.L().Warn("forward_as_responses buffered: failed to parse event",
|
||
zap.Error(err),
|
||
zap.String("request_id", requestID),
|
||
zap.String("event_type", eventType),
|
||
)
|
||
continue
|
||
}
|
||
|
||
// message_start carries the initial response structure
|
||
if event.Type == "message_start" && event.Message != nil {
|
||
finalResp = event.Message
|
||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||
}
|
||
|
||
// message_delta carries final usage and stop_reason
|
||
if event.Type == "message_delta" {
|
||
if event.Usage != nil {
|
||
mergeAnthropicUsage(&usage, *event.Usage)
|
||
}
|
||
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
|
||
finalResp.StopReason = event.Delta.StopReason
|
||
}
|
||
}
|
||
|
||
// Accumulate content blocks
|
||
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
|
||
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
|
||
}
|
||
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
|
||
idx := *event.Index
|
||
if idx < len(finalResp.Content) {
|
||
switch event.Delta.Type {
|
||
case "text_delta":
|
||
finalResp.Content[idx].Text += event.Delta.Text
|
||
case "thinking_delta":
|
||
finalResp.Content[idx].Thinking += event.Delta.Thinking
|
||
case "input_json_delta":
|
||
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||
logger.L().Warn("forward_as_responses buffered: read error",
|
||
zap.Error(err),
|
||
zap.String("request_id", requestID),
|
||
)
|
||
}
|
||
}
|
||
|
||
if finalResp == nil {
|
||
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
|
||
return nil, fmt.Errorf("upstream stream ended without response")
|
||
}
|
||
|
||
// Update usage from accumulated delta
|
||
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
|
||
finalResp.Usage = apicompat.AnthropicUsage{
|
||
InputTokens: usage.InputTokens,
|
||
OutputTokens: usage.OutputTokens,
|
||
CacheCreationInputTokens: usage.CacheCreationInputTokens,
|
||
CacheReadInputTokens: usage.CacheReadInputTokens,
|
||
}
|
||
}
|
||
|
||
// Convert to Responses format
|
||
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
|
||
responsesResp.Model = originalModel // Use original model name
|
||
|
||
if s.responseHeaderFilter != nil {
|
||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||
}
|
||
if respBytes, err := json.Marshal(responsesResp); err == nil {
|
||
respBytes = reverseToolNamesIfPresent(c, respBytes)
|
||
c.Data(http.StatusOK, "application/json; charset=utf-8", respBytes)
|
||
} else {
|
||
c.JSON(http.StatusOK, responsesResp)
|
||
}
|
||
|
||
return &ForwardResult{
|
||
RequestID: requestID,
|
||
Usage: usage,
|
||
Model: originalModel,
|
||
UpstreamModel: mappedModel,
|
||
ReasoningEffort: reasoningEffort,
|
||
Stream: false,
|
||
Duration: time.Since(startTime),
|
||
}, nil
|
||
}
|
||
|
||
// handleResponsesStreamingResponse reads Anthropic SSE events from upstream,
|
||
// converts each to Responses SSE events, and writes them to the client.
|
||
func (s *GatewayService) handleResponsesStreamingResponse(
|
||
resp *http.Response,
|
||
c *gin.Context,
|
||
originalModel string,
|
||
mappedModel string,
|
||
reasoningEffort *string,
|
||
startTime time.Time,
|
||
) (*ForwardResult, error) {
|
||
requestID := resp.Header.Get("x-request-id")
|
||
|
||
if s.responseHeaderFilter != nil {
|
||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||
}
|
||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||
c.Writer.Header().Set("Connection", "keep-alive")
|
||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||
c.Writer.WriteHeader(http.StatusOK)
|
||
|
||
state := apicompat.NewAnthropicEventToResponsesState()
|
||
state.Model = originalModel
|
||
var usage ClaudeUsage
|
||
var firstTokenMs *int
|
||
firstChunk := true
|
||
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
maxLineSize := defaultMaxLineSize
|
||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||
}
|
||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||
|
||
resultWithUsage := func() *ForwardResult {
|
||
return &ForwardResult{
|
||
RequestID: requestID,
|
||
Usage: usage,
|
||
Model: originalModel,
|
||
UpstreamModel: mappedModel,
|
||
ReasoningEffort: reasoningEffort,
|
||
Stream: true,
|
||
Duration: time.Since(startTime),
|
||
FirstTokenMs: firstTokenMs,
|
||
}
|
||
}
|
||
|
||
// processEvent handles a single parsed Anthropic SSE event.
|
||
processEvent := func(event *apicompat.AnthropicStreamEvent) bool {
|
||
if firstChunk {
|
||
firstChunk = false
|
||
ms := int(time.Since(startTime).Milliseconds())
|
||
firstTokenMs = &ms
|
||
}
|
||
|
||
// Extract usage from message_delta
|
||
if event.Type == "message_delta" && event.Usage != nil {
|
||
mergeAnthropicUsage(&usage, *event.Usage)
|
||
}
|
||
// Also capture usage from message_start
|
||
if event.Type == "message_start" && event.Message != nil {
|
||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||
}
|
||
|
||
// Convert to Responses events
|
||
events := apicompat.AnthropicEventToResponsesEvents(event, state)
|
||
for _, evt := range events {
|
||
sse, err := apicompat.ResponsesEventToSSE(evt)
|
||
if err != nil {
|
||
logger.L().Warn("forward_as_responses stream: failed to marshal event",
|
||
zap.Error(err),
|
||
zap.String("request_id", requestID),
|
||
)
|
||
continue
|
||
}
|
||
out := string(reverseToolNamesIfPresent(c, []byte(sse)))
|
||
if _, err := fmt.Fprint(c.Writer, out); err != nil {
|
||
logger.L().Info("forward_as_responses stream: client disconnected",
|
||
zap.String("request_id", requestID),
|
||
)
|
||
return true // client disconnected
|
||
}
|
||
}
|
||
if len(events) > 0 {
|
||
c.Writer.Flush()
|
||
}
|
||
return false
|
||
}
|
||
|
||
finalizeStream := func() (*ForwardResult, error) {
|
||
if finalEvents := apicompat.FinalizeAnthropicResponsesStream(state); len(finalEvents) > 0 {
|
||
for _, evt := range finalEvents {
|
||
sse, err := apicompat.ResponsesEventToSSE(evt)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
out := string(reverseToolNamesIfPresent(c, []byte(sse)))
|
||
fmt.Fprint(c.Writer, out) //nolint:errcheck
|
||
}
|
||
c.Writer.Flush()
|
||
}
|
||
return resultWithUsage(), nil
|
||
}
|
||
|
||
// Read Anthropic SSE events
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
if !strings.HasPrefix(line, "event: ") {
|
||
continue
|
||
}
|
||
eventType := strings.TrimPrefix(line, "event: ")
|
||
|
||
// Read data line
|
||
if !scanner.Scan() {
|
||
break
|
||
}
|
||
dataLine := scanner.Text()
|
||
if !strings.HasPrefix(dataLine, "data: ") {
|
||
continue
|
||
}
|
||
payload := dataLine[6:]
|
||
|
||
var event apicompat.AnthropicStreamEvent
|
||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||
logger.L().Warn("forward_as_responses stream: failed to parse event",
|
||
zap.Error(err),
|
||
zap.String("request_id", requestID),
|
||
zap.String("event_type", eventType),
|
||
)
|
||
continue
|
||
}
|
||
|
||
if processEvent(&event) {
|
||
return resultWithUsage(), nil
|
||
}
|
||
}
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||
logger.L().Warn("forward_as_responses stream: read error",
|
||
zap.Error(err),
|
||
zap.String("request_id", requestID),
|
||
)
|
||
}
|
||
}
|
||
|
||
return finalizeStream()
|
||
}
|
||
|
||
// appendRawJSON appends a JSON fragment string to existing raw JSON.
|
||
func appendRawJSON(existing json.RawMessage, fragment string) json.RawMessage {
|
||
if len(existing) == 0 {
|
||
return json.RawMessage(fragment)
|
||
}
|
||
return json.RawMessage(string(existing) + fragment)
|
||
}
|
||
|
||
// writeResponsesError writes an error response in OpenAI Responses API format.
|
||
func writeResponsesError(c *gin.Context, statusCode int, code, message string) {
|
||
c.JSON(statusCode, gin.H{
|
||
"error": gin.H{
|
||
"code": code,
|
||
"message": message,
|
||
},
|
||
})
|
||
}
|
||
|
||
// mapUpstreamStatusCode maps upstream HTTP status codes to appropriate client-facing codes.
|
||
func mapUpstreamStatusCode(code int) int {
|
||
if code >= 500 {
|
||
return http.StatusBadGateway
|
||
}
|
||
return code
|
||
}
|