fix(service): preserve anthropic usage fields across compat endpoints

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Ethan0x0000
2026-03-24 09:32:34 +08:00
parent 5418e15e63
commit 2f8e10db46
4 changed files with 317 additions and 73 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/gin-gonic/gin"
"github.com/tidwall/gjson"
"go.uber.org/zap"
)
@@ -56,6 +57,7 @@ func (s *GatewayService) ForwardAsResponses(
// 4. Model mapping
mappedModel := originalModel
reasoningEffort := ExtractResponsesReasoningEffortFromBody(body)
if account.Type == AccountTypeAPIKey {
mappedModel = account.GetMappedModel(originalModel)
}
@@ -172,14 +174,46 @@ func (s *GatewayService) ForwardAsResponses(
var result *ForwardResult
var handleErr error
if clientStream {
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, startTime)
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
} else {
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, startTime)
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
}
return result, handleErr
}
// ExtractResponsesReasoningEffortFromBody reads Responses API reasoning.effort
// and normalizes it for usage logging.
func ExtractResponsesReasoningEffortFromBody(body []byte) *string {
raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String())
if raw == "" {
return nil
}
normalized := normalizeOpenAIReasoningEffort(raw)
if normalized == "" {
return nil
}
return &normalized
}
func mergeAnthropicUsage(dst *ClaudeUsage, src apicompat.AnthropicUsage) {
if dst == nil {
return
}
if src.InputTokens > 0 {
dst.InputTokens = src.InputTokens
}
if src.OutputTokens > 0 {
dst.OutputTokens = src.OutputTokens
}
if src.CacheReadInputTokens > 0 {
dst.CacheReadInputTokens = src.CacheReadInputTokens
}
if src.CacheCreationInputTokens > 0 {
dst.CacheCreationInputTokens = src.CacheCreationInputTokens
}
}
// handleResponsesBufferedStreamingResponse reads all Anthropic SSE events from
// the upstream streaming response, assembles them into a complete Anthropic
// response, converts to Responses API JSON format, and writes it to the client.
@@ -188,6 +222,7 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse(
c *gin.Context,
originalModel string,
mappedModel string,
reasoningEffort *string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
@@ -233,21 +268,13 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse(
// message_start carries the initial response structure
if event.Type == "message_start" && event.Message != nil {
finalResp = event.Message
mergeAnthropicUsage(&usage, event.Message.Usage)
}
// message_delta carries final usage and stop_reason
if event.Type == "message_delta" {
if event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
if event.Usage.CacheCreationInputTokens > 0 {
usage.CacheCreationInputTokens = event.Usage.CacheCreationInputTokens
}
mergeAnthropicUsage(&usage, *event.Usage)
}
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
finalResp.StopReason = event.Delta.StopReason
@@ -307,12 +334,13 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse(
c.JSON(http.StatusOK, responsesResp)
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: false,
Duration: time.Since(startTime),
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
ReasoningEffort: reasoningEffort,
Stream: false,
Duration: time.Since(startTime),
}, nil
}
@@ -323,6 +351,7 @@ func (s *GatewayService) handleResponsesStreamingResponse(
c *gin.Context,
originalModel string,
mappedModel string,
reasoningEffort *string,
startTime time.Time,
) (*ForwardResult, error) {
requestID := resp.Header.Get("x-request-id")
@@ -351,13 +380,14 @@ func (s *GatewayService) handleResponsesStreamingResponse(
resultWithUsage := func() *ForwardResult {
return &ForwardResult{
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
RequestID: requestID,
Usage: usage,
Model: originalModel,
UpstreamModel: mappedModel,
ReasoningEffort: reasoningEffort,
Stream: true,
Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs,
}
}
@@ -371,22 +401,11 @@ func (s *GatewayService) handleResponsesStreamingResponse(
// Extract usage from message_delta
if event.Type == "message_delta" && event.Usage != nil {
usage = ClaudeUsage{
InputTokens: event.Usage.InputTokens,
OutputTokens: event.Usage.OutputTokens,
}
if event.Usage.CacheReadInputTokens > 0 {
usage.CacheReadInputTokens = event.Usage.CacheReadInputTokens
}
if event.Usage.CacheCreationInputTokens > 0 {
usage.CacheCreationInputTokens = event.Usage.CacheCreationInputTokens
}
mergeAnthropicUsage(&usage, *event.Usage)
}
// Also capture usage from message_start
if event.Type == "message_start" && event.Message != nil {
if event.Message.Usage.InputTokens > 0 {
usage.InputTokens = event.Message.Usage.InputTokens
}
mergeAnthropicUsage(&usage, event.Message.Usage)
}
// Convert to Responses events