feat(openai): add /v1/messages endpoint and API compatibility layer
Add Anthropic Messages API support for OpenAI platform groups, enabling clients using Claude-style /v1/messages format to access OpenAI accounts through automatic protocol conversion. - Add apicompat package with type definitions and bidirectional converters (Anthropic ↔ Chat, Chat ↔ Responses, Anthropic ↔ Responses) - Implement /v1/messages endpoint for OpenAI gateway with streaming support - Add model mapping UI for OpenAI OAuth accounts (whitelist + mapping modes) - Support prompt caching fields and codex OAuth transforms - Fix tool call ID conversion for Responses API (fc_ prefix) - Ensure function_call_output has non-empty output field Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
376
backend/internal/service/openai_gateway_messages.go
Normal file
376
backend/internal/service/openai_gateway_messages.go
Normal file
@@ -0,0 +1,376 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||||
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ForwardAsAnthropic accepts an Anthropic Messages request body, converts it
|
||||
// to OpenAI Responses API format, forwards to the OpenAI upstream, and converts
|
||||
// the response back to Anthropic Messages format. This enables Claude Code
|
||||
// clients to access OpenAI models through the standard /v1/messages endpoint.
|
||||
func (s *OpenAIGatewayService) ForwardAsAnthropic(
|
||||
ctx context.Context,
|
||||
c *gin.Context,
|
||||
account *Account,
|
||||
body []byte,
|
||||
promptCacheKey string,
|
||||
) (*OpenAIForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
// 1. Parse Anthropic request
|
||||
var anthropicReq apicompat.AnthropicRequest
|
||||
if err := json.Unmarshal(body, &anthropicReq); err != nil {
|
||||
return nil, fmt.Errorf("parse anthropic request: %w", err)
|
||||
}
|
||||
originalModel := anthropicReq.Model
|
||||
isStream := anthropicReq.Stream
|
||||
|
||||
// 2. Convert Anthropic → Responses
|
||||
responsesReq, err := apicompat.AnthropicToResponses(&anthropicReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("convert anthropic to responses: %w", err)
|
||||
}
|
||||
|
||||
// 3. Model mapping
|
||||
mappedModel := account.GetMappedModel(originalModel)
|
||||
responsesReq.Model = mappedModel
|
||||
|
||||
logger.L().Info("openai messages: model mapping applied",
|
||||
zap.Int64("account_id", account.ID),
|
||||
zap.String("original_model", originalModel),
|
||||
zap.String("mapped_model", mappedModel),
|
||||
zap.Bool("stream", isStream),
|
||||
)
|
||||
|
||||
// 4. Marshal Responses request body, then apply OAuth codex transform
|
||||
responsesBody, err := json.Marshal(responsesReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal responses request: %w", err)
|
||||
}
|
||||
|
||||
if account.Type == AccountTypeOAuth {
|
||||
var reqBody map[string]any
|
||||
if err := json.Unmarshal(responsesBody, &reqBody); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal for codex transform: %w", err)
|
||||
}
|
||||
applyCodexOAuthTransform(reqBody, false)
|
||||
// OAuth codex transform forces stream=true upstream, so always use
|
||||
// the streaming response handler regardless of what the client asked.
|
||||
isStream = true
|
||||
responsesBody, err = json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("remarshal after codex transform: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Get access token
|
||||
token, _, err := s.GetAccessToken(ctx, account)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get access token: %w", err)
|
||||
}
|
||||
|
||||
// 6. Build upstream request
|
||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, responsesBody, token, isStream, promptCacheKey, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build upstream request: %w", err)
|
||||
}
|
||||
|
||||
// 7. Send request
|
||||
proxyURL := ""
|
||||
if account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
setOpsUpstreamError(c, 0, safeErr, "")
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
writeAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed")
|
||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// 8. Handle error response with failover
|
||||
if resp.StatusCode >= 400 {
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
upstreamDetail := ""
|
||||
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
||||
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = 2048
|
||||
}
|
||||
upstreamDetail = truncateString(string(respBody), maxBytes)
|
||||
}
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody}
|
||||
}
|
||||
// Non-failover error: return Anthropic-formatted error to client
|
||||
return s.handleAnthropicErrorResponse(resp, c)
|
||||
}
|
||||
|
||||
// 9. Handle normal response
|
||||
if isStream {
|
||||
return s.handleAnthropicStreamingResponse(resp, c, originalModel, startTime)
|
||||
}
|
||||
return s.handleAnthropicNonStreamingResponse(resp, c, originalModel, startTime)
|
||||
}
|
||||
|
||||
// handleAnthropicErrorResponse reads an upstream error and returns it in
|
||||
// Anthropic error format.
|
||||
func (s *OpenAIGatewayService) handleAnthropicErrorResponse(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
) (*OpenAIForwardResult, error) {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
|
||||
if upstreamMsg == "" {
|
||||
upstreamMsg = fmt.Sprintf("Upstream error: %d", resp.StatusCode)
|
||||
}
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
|
||||
// Record upstream error details for ops logging
|
||||
upstreamDetail := ""
|
||||
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
||||
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = 2048
|
||||
}
|
||||
upstreamDetail = truncateString(string(body), maxBytes)
|
||||
}
|
||||
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
|
||||
|
||||
errType := "api_error"
|
||||
switch {
|
||||
case resp.StatusCode == 400:
|
||||
errType = "invalid_request_error"
|
||||
case resp.StatusCode == 404:
|
||||
errType = "not_found_error"
|
||||
case resp.StatusCode == 429:
|
||||
errType = "rate_limit_error"
|
||||
case resp.StatusCode >= 500:
|
||||
errType = "api_error"
|
||||
}
|
||||
|
||||
writeAnthropicError(c, resp.StatusCode, errType, upstreamMsg)
|
||||
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
|
||||
}
|
||||
|
||||
// handleAnthropicNonStreamingResponse reads a Responses API JSON response,
|
||||
// converts it to Anthropic Messages format, and writes it to the client.
|
||||
func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
startTime time.Time,
|
||||
) (*OpenAIForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read upstream response: %w", err)
|
||||
}
|
||||
|
||||
var responsesResp apicompat.ResponsesResponse
|
||||
if err := json.Unmarshal(respBody, &responsesResp); err != nil {
|
||||
return nil, fmt.Errorf("parse responses response: %w", err)
|
||||
}
|
||||
|
||||
anthropicResp := apicompat.ResponsesToAnthropic(&responsesResp, originalModel)
|
||||
|
||||
var usage OpenAIUsage
|
||||
if responsesResp.Usage != nil {
|
||||
usage = OpenAIUsage{
|
||||
InputTokens: responsesResp.Usage.InputTokens,
|
||||
OutputTokens: responsesResp.Usage.OutputTokens,
|
||||
}
|
||||
if responsesResp.Usage.InputTokensDetails != nil {
|
||||
usage.CacheReadInputTokens = responsesResp.Usage.InputTokensDetails.CachedTokens
|
||||
}
|
||||
}
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.JSON(http.StatusOK, anthropicResp)
|
||||
|
||||
return &OpenAIForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// handleAnthropicStreamingResponse reads Responses SSE events from upstream,
|
||||
// converts each to Anthropic SSE events, and writes them to the client.
|
||||
func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
startTime time.Time,
|
||||
) (*OpenAIForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||
c.Writer.Header().Set("Connection", "keep-alive")
|
||||
c.Writer.WriteHeader(http.StatusOK)
|
||||
|
||||
state := apicompat.NewResponsesEventToAnthropicState()
|
||||
state.Model = originalModel
|
||||
var usage OpenAIUsage
|
||||
var firstTokenMs *int
|
||||
firstChunk := true
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
|
||||
if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" {
|
||||
continue
|
||||
}
|
||||
payload := line[6:]
|
||||
|
||||
if firstChunk {
|
||||
firstChunk = false
|
||||
ms := int(time.Since(startTime).Milliseconds())
|
||||
firstTokenMs = &ms
|
||||
}
|
||||
|
||||
// Parse the Responses SSE event
|
||||
var event apicompat.ResponsesStreamEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
logger.L().Warn("openai messages stream: failed to parse event",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract usage from completion events
|
||||
if (event.Type == "response.completed" || event.Type == "response.incomplete") &&
|
||||
event.Response != nil && event.Response.Usage != nil {
|
||||
usage = OpenAIUsage{
|
||||
InputTokens: event.Response.Usage.InputTokens,
|
||||
OutputTokens: event.Response.Usage.OutputTokens,
|
||||
}
|
||||
if event.Response.Usage.InputTokensDetails != nil {
|
||||
usage.CacheReadInputTokens = event.Response.Usage.InputTokensDetails.CachedTokens
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to Anthropic events
|
||||
events := apicompat.ResponsesEventToAnthropicEvents(&event, state)
|
||||
for _, evt := range events {
|
||||
sse, err := apicompat.ResponsesAnthropicEventToSSE(evt)
|
||||
if err != nil {
|
||||
logger.L().Warn("openai messages stream: failed to marshal event",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
continue
|
||||
}
|
||||
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
||||
// Client disconnected — return collected usage
|
||||
logger.L().Info("openai messages stream: client disconnected",
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
return &OpenAIForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
Stream: true,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
if len(events) > 0 {
|
||||
c.Writer.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.L().Warn("openai messages stream: read error",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the Anthropic stream is properly terminated
|
||||
if finalEvents := apicompat.FinalizeResponsesAnthropicStream(state); len(finalEvents) > 0 {
|
||||
for _, evt := range finalEvents {
|
||||
sse, err := apicompat.ResponsesAnthropicEventToSSE(evt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fmt.Fprint(c.Writer, sse) //nolint:errcheck
|
||||
}
|
||||
c.Writer.Flush()
|
||||
}
|
||||
|
||||
return &OpenAIForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
Stream: true,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// writeAnthropicError writes an error response in Anthropic Messages API format.
|
||||
func writeAnthropicError(c *gin.Context, statusCode int, errType, message string) {
|
||||
c.JSON(statusCode, gin.H{
|
||||
"type": "error",
|
||||
"error": gin.H{
|
||||
"type": errType,
|
||||
"message": message,
|
||||
},
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user