Merge remote-tracking branch 'origin/main' into fix/apikey-credentials-preserve-existing-fields

This commit is contained in:
Gemini Wen
2026-03-06 23:38:18 +08:00
15 changed files with 789 additions and 317 deletions

View File

@@ -319,6 +319,9 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
return return
} }
if result != nil { if result != nil {
if account.Type == service.AccountTypeOAuth {
h.gatewayService.UpdateCodexUsageSnapshotFromHeaders(c.Request.Context(), account.ID, result.ResponseHeaders)
}
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, result.FirstTokenMs) h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, result.FirstTokenMs)
} else { } else {
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, nil) h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, nil)
@@ -670,8 +673,14 @@ func (h *OpenAIGatewayHandler) anthropicStreamingAwareError(c *gin.Context, stat
if streamStarted { if streamStarted {
flusher, ok := c.Writer.(http.Flusher) flusher, ok := c.Writer.(http.Flusher)
if ok { if ok {
errorEvent := "event: error\ndata: " + `{"type":"error","error":{"type":` + strconv.Quote(errType) + `,"message":` + strconv.Quote(message) + `}}` + "\n\n" errPayload, _ := json.Marshal(gin.H{
fmt.Fprint(c.Writer, errorEvent) //nolint:errcheck "type": "error",
"error": gin.H{
"type": errType,
"message": message,
},
})
fmt.Fprintf(c.Writer, "event: error\ndata: %s\n\n", errPayload) //nolint:errcheck
flusher.Flush() flusher.Flush()
} }
return return
@@ -1110,6 +1119,9 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) {
if turnErr != nil || result == nil { if turnErr != nil || result == nil {
return return
} }
if account.Type == service.AccountTypeOAuth {
h.gatewayService.UpdateCodexUsageSnapshotFromHeaders(ctx, account.ID, result.ResponseHeaders)
}
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, result.FirstTokenMs) h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, result.FirstTokenMs)
h.submitUsageRecordTask(func(taskCtx context.Context) { h.submitUsageRecordTask(func(taskCtx context.Context) {
if err := h.gatewayService.RecordUsage(taskCtx, &service.OpenAIRecordUsageInput{ if err := h.gatewayService.RecordUsage(taskCtx, &service.OpenAIRecordUsageInput{

View File

@@ -532,3 +532,204 @@ func TestResponsesAnthropicEventToSSE(t *testing.T) {
assert.Contains(t, sse, "data: ") assert.Contains(t, sse, "data: ")
assert.Contains(t, sse, `"resp_1"`) assert.Contains(t, sse, `"resp_1"`)
} }
// ---------------------------------------------------------------------------
// response.failed tests
// ---------------------------------------------------------------------------
func TestStreamingFailed(t *testing.T) {
state := NewResponsesEventToAnthropicState()
// 1. response.created
ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.created",
Response: &ResponsesResponse{ID: "resp_fail_1", Model: "gpt-5.2"},
}, state)
// 2. Some text output before failure
ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.output_text.delta",
Delta: "Partial output before failure",
}, state)
// 3. response.failed
events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.failed",
Response: &ResponsesResponse{
Status: "failed",
Error: &ResponsesError{Code: "server_error", Message: "Internal error"},
Usage: &ResponsesUsage{InputTokens: 50, OutputTokens: 10},
},
}, state)
// Should close text block + message_delta + message_stop
require.Len(t, events, 3)
assert.Equal(t, "content_block_stop", events[0].Type)
assert.Equal(t, "message_delta", events[1].Type)
assert.Equal(t, "end_turn", events[1].Delta.StopReason)
assert.Equal(t, 50, events[1].Usage.InputTokens)
assert.Equal(t, 10, events[1].Usage.OutputTokens)
assert.Equal(t, "message_stop", events[2].Type)
}
func TestStreamingFailedNoOutput(t *testing.T) {
state := NewResponsesEventToAnthropicState()
// 1. response.created
ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.created",
Response: &ResponsesResponse{ID: "resp_fail_2", Model: "gpt-5.2"},
}, state)
// 2. response.failed with no prior output
events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.failed",
Response: &ResponsesResponse{
Status: "failed",
Error: &ResponsesError{Code: "rate_limit_error", Message: "Too many requests"},
Usage: &ResponsesUsage{InputTokens: 20, OutputTokens: 0},
},
}, state)
// Should emit message_delta + message_stop (no block to close)
require.Len(t, events, 2)
assert.Equal(t, "message_delta", events[0].Type)
assert.Equal(t, "end_turn", events[0].Delta.StopReason)
assert.Equal(t, "message_stop", events[1].Type)
}
func TestResponsesToAnthropic_Failed(t *testing.T) {
resp := &ResponsesResponse{
ID: "resp_fail_3",
Model: "gpt-5.2",
Status: "failed",
Error: &ResponsesError{Code: "server_error", Message: "Something went wrong"},
Output: []ResponsesOutput{},
Usage: &ResponsesUsage{InputTokens: 30, OutputTokens: 0},
}
anth := ResponsesToAnthropic(resp, "claude-opus-4-6")
// Failed status defaults to "end_turn" stop reason
assert.Equal(t, "end_turn", anth.StopReason)
// Should have at least an empty text block
require.Len(t, anth.Content, 1)
assert.Equal(t, "text", anth.Content[0].Type)
}
// ---------------------------------------------------------------------------
// thinking → reasoning conversion tests
// ---------------------------------------------------------------------------
func TestAnthropicToResponses_ThinkingEnabled(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
Thinking: &AnthropicThinking{Type: "enabled", BudgetTokens: 10000},
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
require.NotNil(t, resp.Reasoning)
assert.Equal(t, "high", resp.Reasoning.Effort)
assert.Equal(t, "auto", resp.Reasoning.Summary)
assert.Contains(t, resp.Include, "reasoning.encrypted_content")
assert.NotContains(t, resp.Include, "reasoning.summary")
}
func TestAnthropicToResponses_ThinkingAdaptive(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
Thinking: &AnthropicThinking{Type: "adaptive", BudgetTokens: 5000},
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
require.NotNil(t, resp.Reasoning)
assert.Equal(t, "medium", resp.Reasoning.Effort)
assert.Equal(t, "auto", resp.Reasoning.Summary)
assert.NotContains(t, resp.Include, "reasoning.summary")
}
func TestAnthropicToResponses_ThinkingDisabled(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
Thinking: &AnthropicThinking{Type: "disabled"},
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
assert.Nil(t, resp.Reasoning)
assert.NotContains(t, resp.Include, "reasoning.summary")
}
func TestAnthropicToResponses_NoThinking(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
assert.Nil(t, resp.Reasoning)
}
// ---------------------------------------------------------------------------
// tool_choice conversion tests
// ---------------------------------------------------------------------------
func TestAnthropicToResponses_ToolChoiceAuto(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
ToolChoice: json.RawMessage(`{"type":"auto"}`),
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
var tc string
require.NoError(t, json.Unmarshal(resp.ToolChoice, &tc))
assert.Equal(t, "auto", tc)
}
func TestAnthropicToResponses_ToolChoiceAny(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
ToolChoice: json.RawMessage(`{"type":"any"}`),
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
var tc string
require.NoError(t, json.Unmarshal(resp.ToolChoice, &tc))
assert.Equal(t, "required", tc)
}
func TestAnthropicToResponses_ToolChoiceSpecific(t *testing.T) {
req := &AnthropicRequest{
Model: "gpt-5.2",
MaxTokens: 1024,
Messages: []AnthropicMessage{{Role: "user", Content: json.RawMessage(`"Hello"`)}},
ToolChoice: json.RawMessage(`{"type":"tool","name":"get_weather"}`),
}
resp, err := AnthropicToResponses(req)
require.NoError(t, err)
var tc map[string]any
require.NoError(t, json.Unmarshal(resp.ToolChoice, &tc))
assert.Equal(t, "function", tc["type"])
fn, ok := tc["function"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "get_weather", fn["name"])
}

View File

@@ -2,6 +2,7 @@ package apicompat
import ( import (
"encoding/json" "encoding/json"
"fmt"
"strings" "strings"
) )
@@ -44,9 +45,65 @@ func AnthropicToResponses(req *AnthropicRequest) (*ResponsesRequest, error) {
out.Tools = convertAnthropicToolsToResponses(req.Tools) out.Tools = convertAnthropicToolsToResponses(req.Tools)
} }
// Convert thinking → reasoning.
// generate_summary="auto" causes the upstream to emit reasoning_summary_text
// streaming events; the include array only needs reasoning.encrypted_content
// (already set above) for content continuity.
if req.Thinking != nil {
switch req.Thinking.Type {
case "enabled":
out.Reasoning = &ResponsesReasoning{Effort: "high", Summary: "auto"}
case "adaptive":
out.Reasoning = &ResponsesReasoning{Effort: "medium", Summary: "auto"}
}
// "disabled" or unknown → omit reasoning
}
// Convert tool_choice
if len(req.ToolChoice) > 0 {
tc, err := convertAnthropicToolChoiceToResponses(req.ToolChoice)
if err != nil {
return nil, fmt.Errorf("convert tool_choice: %w", err)
}
out.ToolChoice = tc
}
return out, nil return out, nil
} }
// convertAnthropicToolChoiceToResponses maps Anthropic tool_choice to Responses format.
//
// {"type":"auto"} → "auto"
// {"type":"any"} → "required"
// {"type":"none"} → "none"
// {"type":"tool","name":"X"} → {"type":"function","function":{"name":"X"}}
func convertAnthropicToolChoiceToResponses(raw json.RawMessage) (json.RawMessage, error) {
var tc struct {
Type string `json:"type"`
Name string `json:"name"`
}
if err := json.Unmarshal(raw, &tc); err != nil {
return nil, err
}
switch tc.Type {
case "auto":
return json.Marshal("auto")
case "any":
return json.Marshal("required")
case "none":
return json.Marshal("none")
case "tool":
return json.Marshal(map[string]any{
"type": "function",
"function": map[string]string{"name": tc.Name},
})
default:
// Pass through unknown types as-is
return raw, nil
}
}
// convertAnthropicToResponsesInput builds the Responses API input items array // convertAnthropicToResponsesInput builds the Responses API input items array
// from the Anthropic system field and message list. // from the Anthropic system field and message list.
func convertAnthropicToResponsesInput(system json.RawMessage, msgs []AnthropicMessage) ([]ResponsesInputItem, error) { func convertAnthropicToResponsesInput(system json.RawMessage, msgs []AnthropicMessage) ([]ResponsesInputItem, error) {

View File

@@ -153,7 +153,7 @@ func ResponsesEventToAnthropicEvents(
return resToAnthHandleReasoningDelta(evt, state) return resToAnthHandleReasoningDelta(evt, state)
case "response.reasoning_summary_text.done": case "response.reasoning_summary_text.done":
return resToAnthHandleBlockDone(state) return resToAnthHandleBlockDone(state)
case "response.completed", "response.incomplete": case "response.completed", "response.incomplete", "response.failed":
return resToAnthHandleCompleted(evt, state) return resToAnthHandleCompleted(evt, state)
default: default:
return nil return nil

View File

@@ -1,7 +1,7 @@
// Package apicompat provides type definitions and conversion utilities for // Package apicompat provides type definitions and conversion utilities for
// translating between Anthropic Messages, OpenAI Chat Completions, and OpenAI // translating between Anthropic Messages and OpenAI Responses API formats.
// Responses API formats. It enables multi-protocol support so that clients // It enables multi-protocol support so that clients using different API
// using different API formats can be served through a unified gateway. // formats can be served through a unified gateway.
package apicompat package apicompat
import "encoding/json" import "encoding/json"
@@ -21,6 +21,14 @@ type AnthropicRequest struct {
Temperature *float64 `json:"temperature,omitempty"` Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"` TopP *float64 `json:"top_p,omitempty"`
StopSeqs []string `json:"stop_sequences,omitempty"` StopSeqs []string `json:"stop_sequences,omitempty"`
Thinking *AnthropicThinking `json:"thinking,omitempty"`
ToolChoice json.RawMessage `json:"tool_choice,omitempty"`
}
// AnthropicThinking configures extended thinking in the Anthropic API.
type AnthropicThinking struct {
Type string `json:"type"` // "enabled" | "adaptive" | "disabled"
BudgetTokens int `json:"budget_tokens,omitempty"` // max thinking tokens
} }
// AnthropicMessage is a single message in the Anthropic conversation. // AnthropicMessage is a single message in the Anthropic conversation.
@@ -120,143 +128,29 @@ type AnthropicDelta struct {
StopSequence *string `json:"stop_sequence,omitempty"` StopSequence *string `json:"stop_sequence,omitempty"`
} }
// ---------------------------------------------------------------------------
// OpenAI Chat Completions API types
// ---------------------------------------------------------------------------
// ChatRequest is the request body for POST /v1/chat/completions.
type ChatRequest struct {
Model string `json:"model"`
Messages []ChatMessage `json:"messages"`
MaxTokens *int `json:"max_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
Stream bool `json:"stream,omitempty"`
Tools []ChatTool `json:"tools,omitempty"`
Stop json.RawMessage `json:"stop,omitempty"` // string or []string
}
// ChatMessage is a single message in the Chat Completions conversation.
type ChatMessage struct {
Role string `json:"role"` // "system" | "user" | "assistant" | "tool"
Content json.RawMessage `json:"content,omitempty"` // string or []ChatContentPart
// assistant fields
ToolCalls []ChatToolCall `json:"tool_calls,omitempty"`
// tool fields
ToolCallID string `json:"tool_call_id,omitempty"`
// Copilot-specific reasoning passthrough
ReasoningText string `json:"reasoning_text,omitempty"`
ReasoningOpaque string `json:"reasoning_opaque,omitempty"`
}
// ChatContentPart is a typed content part in a multi-part message.
type ChatContentPart struct {
Type string `json:"type"` // "text" | "image_url"
Text string `json:"text,omitempty"`
}
// ChatToolCall represents a tool invocation in an assistant message.
// In streaming deltas, Index identifies which tool call is being updated.
type ChatToolCall struct {
Index int `json:"index"`
ID string `json:"id,omitempty"`
Type string `json:"type,omitempty"` // "function"
Function ChatFunctionCall `json:"function"`
}
// ChatFunctionCall holds the function name and arguments.
type ChatFunctionCall struct {
Name string `json:"name"`
Arguments string `json:"arguments"`
}
// ChatTool describes a tool available to the model.
type ChatTool struct {
Type string `json:"type"` // "function"
Function ChatFunction `json:"function"`
}
// ChatFunction is the function definition inside a ChatTool.
type ChatFunction struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Parameters json.RawMessage `json:"parameters,omitempty"` // JSON Schema
}
// ChatResponse is the non-streaming response from POST /v1/chat/completions.
type ChatResponse struct {
ID string `json:"id"`
Object string `json:"object"` // "chat.completion"
Created int64 `json:"created"`
Model string `json:"model"`
Choices []ChatChoice `json:"choices"`
Usage *ChatUsage `json:"usage,omitempty"`
}
// ChatChoice is one completion choice.
type ChatChoice struct {
Index int `json:"index"`
Message ChatMessage `json:"message"`
FinishReason string `json:"finish_reason"`
}
// ChatUsage holds token counts in Chat Completions format.
type ChatUsage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
// ---------------------------------------------------------------------------
// Chat Completions SSE types
// ---------------------------------------------------------------------------
// ChatStreamChunk is a single SSE chunk in the Chat Completions streaming protocol.
type ChatStreamChunk struct {
ID string `json:"id"`
Object string `json:"object"` // "chat.completion.chunk"
Created int64 `json:"created"`
Model string `json:"model"`
Choices []ChatStreamChoice `json:"choices"`
Usage *ChatUsage `json:"usage,omitempty"`
}
// ChatStreamChoice is one choice inside a streaming chunk.
type ChatStreamChoice struct {
Index int `json:"index"`
Delta ChatStreamDelta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
// ChatStreamDelta carries incremental content in a streaming chunk.
type ChatStreamDelta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ToolCalls []ChatToolCall `json:"tool_calls,omitempty"`
// Copilot-specific reasoning passthrough (streaming)
ReasoningText string `json:"reasoning_text,omitempty"`
ReasoningOpaque string `json:"reasoning_opaque,omitempty"`
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// OpenAI Responses API types // OpenAI Responses API types
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// ResponsesRequest is the request body for POST /v1/responses. // ResponsesRequest is the request body for POST /v1/responses.
type ResponsesRequest struct { type ResponsesRequest struct {
Model string `json:"model"` Model string `json:"model"`
Input json.RawMessage `json:"input"` // string or []ResponsesInputItem Input json.RawMessage `json:"input"` // string or []ResponsesInputItem
MaxOutputTokens *int `json:"max_output_tokens,omitempty"` MaxOutputTokens *int `json:"max_output_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"` Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"` TopP *float64 `json:"top_p,omitempty"`
Stream bool `json:"stream,omitempty"` Stream bool `json:"stream,omitempty"`
Tools []ResponsesTool `json:"tools,omitempty"` Tools []ResponsesTool `json:"tools,omitempty"`
Include []string `json:"include,omitempty"` Include []string `json:"include,omitempty"`
Store *bool `json:"store,omitempty"` Store *bool `json:"store,omitempty"`
Reasoning *ResponsesReasoning `json:"reasoning,omitempty"`
ToolChoice json.RawMessage `json:"tool_choice,omitempty"`
}
// ResponsesReasoning configures reasoning effort in the Responses API.
type ResponsesReasoning struct {
Effort string `json:"effort"` // "low" | "medium" | "high"
Summary string `json:"summary,omitempty"` // "auto" | "concise" | "detailed"
} }
// ResponsesInputItem is one item in the Responses API input array. // ResponsesInputItem is one item in the Responses API input array.
@@ -305,6 +199,15 @@ type ResponsesResponse struct {
// incomplete_details is present when status="incomplete" // incomplete_details is present when status="incomplete"
IncompleteDetails *ResponsesIncompleteDetails `json:"incomplete_details,omitempty"` IncompleteDetails *ResponsesIncompleteDetails `json:"incomplete_details,omitempty"`
// Error is present when status="failed"
Error *ResponsesError `json:"error,omitempty"`
}
// ResponsesError describes an error in a failed response.
type ResponsesError struct {
Code string `json:"code"`
Message string `json:"message"`
} }
// ResponsesIncompleteDetails explains why a response is incomplete. // ResponsesIncompleteDetails explains why a response is incomplete.
@@ -349,6 +252,16 @@ type ResponsesUsage struct {
OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"`
} }
// ResponsesInputTokensDetails breaks down input token usage.
type ResponsesInputTokensDetails struct {
CachedTokens int `json:"cached_tokens,omitempty"`
}
// ResponsesOutputTokensDetails breaks down output token usage.
type ResponsesOutputTokensDetails struct {
ReasoningTokens int `json:"reasoning_tokens,omitempty"`
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Responses SSE event types // Responses SSE event types
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -388,153 +301,6 @@ type ResponsesStreamEvent struct {
SequenceNumber int `json:"sequence_number,omitempty"` SequenceNumber int `json:"sequence_number,omitempty"`
} }
// ResponsesOutputReasoning is a reasoning output item in the Responses API.
// This type represents the "type":"reasoning" output item that contains
// extended thinking from the model.
type ResponsesOutputReasoning struct {
ID string `json:"id,omitempty"`
Type string `json:"type"` // "reasoning"
Status string `json:"status,omitempty"` // "in_progress" | "completed" | "incomplete"
EncryptedContent string `json:"encrypted_content,omitempty"`
Summary []ResponsesReasoningSummary `json:"summary,omitempty"`
}
// ResponsesReasoningSummary is a summary text block inside a reasoning output.
type ResponsesReasoningSummary struct {
Type string `json:"type"` // "summary_text"
Text string `json:"text"`
}
// ResponsesStreamState maintains the state for converting Responses streaming
// events to Chat Completions format. It tracks content blocks, tool calls,
// reasoning blocks, and other streaming artifacts.
type ResponsesStreamState struct {
// Response metadata
ID string
Model string
Created int64
// Content tracking
ContentIndex int
CurrentText string
CurrentItemID string
PendingText []string // Text to accumulate before emitting
// Tool call tracking
ToolCalls []ResponsesToolCallState
CurrentToolCall *ResponsesToolCallState
// Reasoning tracking
ReasoningBlocks []ResponsesReasoningState
CurrentReasoning *ResponsesReasoningState
// Usage tracking
InputTokens int
OutputTokens int
// Status tracking
Status string
FinishReason string
}
// ResponsesToolCallState tracks a single tool call during streaming.
type ResponsesToolCallState struct {
Index int
ItemID string
CallID string
Name string
Arguments string
Status string
IsComplete bool
}
// ResponsesReasoningState tracks a reasoning block during streaming.
type ResponsesReasoningState struct {
ItemID string
SummaryIndex int
SummaryText string
Status string
IsComplete bool
}
// ResponsesUsageDetail provides additional token usage details in Responses format.
type ResponsesUsageDetail struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
TotalTokens int `json:"total_tokens"`
// Optional detailed breakdown
InputTokensDetails *ResponsesInputTokensDetails `json:"input_tokens_details,omitempty"`
OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"`
}
// ResponsesInputTokensDetails breaks down input token usage.
type ResponsesInputTokensDetails struct {
CachedTokens int `json:"cached_tokens,omitempty"`
}
// ResponsesOutputTokensDetails breaks down output token usage.
type ResponsesOutputTokensDetails struct {
ReasoningTokens int `json:"reasoning_tokens,omitempty"`
}
// ---------------------------------------------------------------------------
// Finish reason mapping helpers
// ---------------------------------------------------------------------------
// ChatFinishToAnthropic maps a Chat Completions finish_reason to an Anthropic stop_reason.
func ChatFinishToAnthropic(reason string) string {
switch reason {
case "stop":
return "end_turn"
case "tool_calls":
return "tool_use"
case "length":
return "max_tokens"
default:
return "end_turn"
}
}
// AnthropicStopToChat maps an Anthropic stop_reason to a Chat Completions finish_reason.
func AnthropicStopToChat(reason string) string {
switch reason {
case "end_turn":
return "stop"
case "tool_use":
return "tool_calls"
case "max_tokens":
return "length"
default:
return "stop"
}
}
// ResponsesStatusToChat maps a Responses API status to a Chat Completions finish_reason.
func ResponsesStatusToChat(status string, details *ResponsesIncompleteDetails) string {
switch status {
case "completed":
return "stop"
case "incomplete":
if details != nil && details.Reason == "max_output_tokens" {
return "length"
}
return "stop"
default:
return "stop"
}
}
// ChatFinishToResponsesStatus maps a Chat Completions finish_reason to a Responses status.
func ChatFinishToResponsesStatus(reason string) string {
switch reason {
case "length":
return "incomplete"
default:
return "completed"
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Shared constants // Shared constants
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -1,13 +1,18 @@
package service package service
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"log" "log"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
httppool "github.com/Wei-Shaw/sub2api/internal/pkg/httpclient"
openaipkg "github.com/Wei-Shaw/sub2api/internal/pkg/openai"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone" "github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
@@ -88,8 +93,10 @@ type antigravityUsageCache struct {
} }
const ( const (
apiCacheTTL = 3 * time.Minute apiCacheTTL = 3 * time.Minute
windowStatsCacheTTL = 1 * time.Minute windowStatsCacheTTL = 1 * time.Minute
openAIProbeCacheTTL = 10 * time.Minute
openAICodexProbeVersion = "0.104.0"
) )
// UsageCache 封装账户使用量相关的缓存 // UsageCache 封装账户使用量相关的缓存
@@ -97,6 +104,7 @@ type UsageCache struct {
apiCache sync.Map // accountID -> *apiUsageCache apiCache sync.Map // accountID -> *apiUsageCache
windowStatsCache sync.Map // accountID -> *windowStatsCache windowStatsCache sync.Map // accountID -> *windowStatsCache
antigravityCache sync.Map // accountID -> *antigravityUsageCache antigravityCache sync.Map // accountID -> *antigravityUsageCache
openAIProbeCache sync.Map // accountID -> time.Time
} }
// NewUsageCache 创建 UsageCache 实例 // NewUsageCache 创建 UsageCache 实例
@@ -224,6 +232,14 @@ func (s *AccountUsageService) GetUsage(ctx context.Context, accountID int64) (*U
return nil, fmt.Errorf("get account failed: %w", err) return nil, fmt.Errorf("get account failed: %w", err)
} }
if account.Platform == PlatformOpenAI && account.Type == AccountTypeOAuth {
usage, err := s.getOpenAIUsage(ctx, account)
if err == nil {
s.tryClearRecoverableAccountError(ctx, account)
}
return usage, err
}
if account.Platform == PlatformGemini { if account.Platform == PlatformGemini {
usage, err := s.getGeminiUsage(ctx, account) usage, err := s.getGeminiUsage(ctx, account)
if err == nil { if err == nil {
@@ -288,6 +304,161 @@ func (s *AccountUsageService) GetUsage(ctx context.Context, accountID int64) (*U
return nil, fmt.Errorf("account type %s does not support usage query", account.Type) return nil, fmt.Errorf("account type %s does not support usage query", account.Type)
} }
func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Account) (*UsageInfo, error) {
now := time.Now()
usage := &UsageInfo{UpdatedAt: &now}
if account == nil {
return usage, nil
}
if progress := buildCodexUsageProgressFromExtra(account.Extra, "5h", now); progress != nil {
usage.FiveHour = progress
}
if progress := buildCodexUsageProgressFromExtra(account.Extra, "7d", now); progress != nil {
usage.SevenDay = progress
}
if (usage.FiveHour == nil || usage.SevenDay == nil) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 {
mergeAccountExtra(account, updates)
if usage.UpdatedAt == nil {
usage.UpdatedAt = &now
}
if progress := buildCodexUsageProgressFromExtra(account.Extra, "5h", now); progress != nil {
usage.FiveHour = progress
}
if progress := buildCodexUsageProgressFromExtra(account.Extra, "7d", now); progress != nil {
usage.SevenDay = progress
}
}
}
if s.usageLogRepo == nil {
return usage, nil
}
if stats, err := s.usageLogRepo.GetAccountWindowStats(ctx, account.ID, now.Add(-5*time.Hour)); err == nil {
windowStats := windowStatsFromAccountStats(stats)
if hasMeaningfulWindowStats(windowStats) {
if usage.FiveHour == nil {
usage.FiveHour = &UsageProgress{Utilization: 0}
}
usage.FiveHour.WindowStats = windowStats
}
}
if stats, err := s.usageLogRepo.GetAccountWindowStats(ctx, account.ID, now.Add(-7*24*time.Hour)); err == nil {
windowStats := windowStatsFromAccountStats(stats)
if hasMeaningfulWindowStats(windowStats) {
if usage.SevenDay == nil {
usage.SevenDay = &UsageProgress{Utilization: 0}
}
usage.SevenDay.WindowStats = windowStats
}
}
return usage, nil
}
func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, now time.Time) bool {
if s == nil || s.cache == nil || accountID <= 0 {
return true
}
if cached, ok := s.cache.openAIProbeCache.Load(accountID); ok {
if ts, ok := cached.(time.Time); ok && now.Sub(ts) < openAIProbeCacheTTL {
return false
}
}
s.cache.openAIProbeCache.Store(accountID, now)
return true
}
func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, error) {
if account == nil || !account.IsOAuth() {
return nil, nil
}
accessToken := account.GetOpenAIAccessToken()
if accessToken == "" {
return nil, fmt.Errorf("no access token available")
}
modelID := openaipkg.DefaultTestModel
payload := createOpenAITestPayload(modelID, true)
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal openai probe payload: %w", err)
}
reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes))
if err != nil {
return nil, fmt.Errorf("create openai probe request: %w", err)
}
req.Host = "chatgpt.com"
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+accessToken)
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("OpenAI-Beta", "responses=experimental")
req.Header.Set("Originator", "codex_cli_rs")
req.Header.Set("Version", openAICodexProbeVersion)
req.Header.Set("User-Agent", codexCLIUserAgent)
if s.identityCache != nil {
if fp, fpErr := s.identityCache.GetFingerprint(reqCtx, account.ID); fpErr == nil && fp != nil && strings.TrimSpace(fp.UserAgent) != "" {
req.Header.Set("User-Agent", strings.TrimSpace(fp.UserAgent))
}
}
if chatgptAccountID := account.GetChatGPTAccountID(); chatgptAccountID != "" {
req.Header.Set("chatgpt-account-id", chatgptAccountID)
}
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
client, err := httppool.GetClient(httppool.Options{
ProxyURL: proxyURL,
Timeout: 15 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("build openai probe client: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("openai codex probe request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
}
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
updates := buildCodexUsageExtraUpdates(snapshot, time.Now())
if len(updates) > 0 {
go func(accountID int64, updates map[string]any) {
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel()
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
}(account.ID, updates)
return updates, nil
}
}
return nil, nil
}
func mergeAccountExtra(account *Account, updates map[string]any) {
if account == nil || len(updates) == 0 {
return
}
if account.Extra == nil {
account.Extra = make(map[string]any, len(updates))
}
for k, v := range updates {
account.Extra[k] = v
}
}
func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Account) (*UsageInfo, error) { func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Account) (*UsageInfo, error) {
now := time.Now() now := time.Now()
usage := &UsageInfo{ usage := &UsageInfo{
@@ -519,6 +690,72 @@ func windowStatsFromAccountStats(stats *usagestats.AccountStats) *WindowStats {
} }
} }
func hasMeaningfulWindowStats(stats *WindowStats) bool {
if stats == nil {
return false
}
return stats.Requests > 0 || stats.Tokens > 0 || stats.Cost > 0 || stats.StandardCost > 0 || stats.UserCost > 0
}
func buildCodexUsageProgressFromExtra(extra map[string]any, window string, now time.Time) *UsageProgress {
if len(extra) == 0 {
return nil
}
var (
usedPercentKey string
resetAfterKey string
resetAtKey string
)
switch window {
case "5h":
usedPercentKey = "codex_5h_used_percent"
resetAfterKey = "codex_5h_reset_after_seconds"
resetAtKey = "codex_5h_reset_at"
case "7d":
usedPercentKey = "codex_7d_used_percent"
resetAfterKey = "codex_7d_reset_after_seconds"
resetAtKey = "codex_7d_reset_at"
default:
return nil
}
usedRaw, ok := extra[usedPercentKey]
if !ok {
return nil
}
progress := &UsageProgress{Utilization: parseExtraFloat64(usedRaw)}
if resetAtRaw, ok := extra[resetAtKey]; ok {
if resetAt, err := parseTime(fmt.Sprint(resetAtRaw)); err == nil {
progress.ResetsAt = &resetAt
progress.RemainingSeconds = int(time.Until(resetAt).Seconds())
if progress.RemainingSeconds < 0 {
progress.RemainingSeconds = 0
}
}
}
if progress.ResetsAt == nil {
if resetAfterSeconds := parseExtraInt(extra[resetAfterKey]); resetAfterSeconds > 0 {
base := now
if updatedAtRaw, ok := extra["codex_usage_updated_at"]; ok {
if updatedAt, err := parseTime(fmt.Sprint(updatedAtRaw)); err == nil {
base = updatedAt
}
}
resetAt := base.Add(time.Duration(resetAfterSeconds) * time.Second)
progress.ResetsAt = &resetAt
progress.RemainingSeconds = int(time.Until(resetAt).Seconds())
if progress.RemainingSeconds < 0 {
progress.RemainingSeconds = 0
}
}
}
return progress
}
func (s *AccountUsageService) GetAccountUsageStats(ctx context.Context, accountID int64, startTime, endTime time.Time) (*usagestats.AccountUsageStatsResponse, error) { func (s *AccountUsageService) GetAccountUsageStats(ctx context.Context, accountID int64, startTime, endTime time.Time) (*usagestats.AccountUsageStatsResponse, error) {
stats, err := s.usageLogRepo.GetAccountUsageStats(ctx, accountID, startTime, endTime) stats, err := s.usageLogRepo.GetAccountUsageStats(ctx, accountID, startTime, endTime)
if err != nil { if err != nil {
@@ -666,15 +903,30 @@ func (s *AccountUsageService) estimateSetupTokenUsage(account *Account) *UsageIn
remaining = 0 remaining = 0
} }
// 根据状态估算使用率 (百分比形式100 = 100%) // 优先使用响应头中存储的真实 utilization 值0-1 小数,转为 0-100 百分比)
var utilization float64 var utilization float64
switch account.SessionWindowStatus { var found bool
case "rejected": if stored, ok := account.Extra["session_window_utilization"]; ok {
utilization = 100.0 switch v := stored.(type) {
case "allowed_warning": case float64:
utilization = 80.0 utilization = v * 100
default: found = true
utilization = 0.0 case json.Number:
if f, err := v.Float64(); err == nil {
utilization = f * 100
found = true
}
}
}
// 如果没有存储的 utilization回退到状态估算
if !found {
switch account.SessionWindowStatus {
case "rejected":
utilization = 100.0
case "allowed_warning":
utilization = 80.0
}
} }
info.FiveHour = &UsageProgress{ info.FiveHour = &UsageProgress{

View File

@@ -49,7 +49,7 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic(
mappedModel := account.GetMappedModel(originalModel) mappedModel := account.GetMappedModel(originalModel)
responsesReq.Model = mappedModel responsesReq.Model = mappedModel
logger.L().Info("openai messages: model mapping applied", logger.L().Debug("openai messages: model mapping applied",
zap.Int64("account_id", account.ID), zap.Int64("account_id", account.ID),
zap.String("original_model", originalModel), zap.String("original_model", originalModel),
zap.String("mapped_model", mappedModel), zap.String("mapped_model", mappedModel),
@@ -67,7 +67,7 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic(
if err := json.Unmarshal(responsesBody, &reqBody); err != nil { if err := json.Unmarshal(responsesBody, &reqBody); err != nil {
return nil, fmt.Errorf("unmarshal for codex transform: %w", err) return nil, fmt.Errorf("unmarshal for codex transform: %w", err)
} }
applyCodexOAuthTransform(reqBody, false) applyCodexOAuthTransform(reqBody, false, false)
// OAuth codex transform forces stream=true upstream, so always use // OAuth codex transform forces stream=true upstream, so always use
// the streaming response handler regardless of what the client asked. // the streaming response handler regardless of what the client asked.
isStream = true isStream = true
@@ -148,9 +148,9 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic(
// 9. Handle normal response // 9. Handle normal response
if isStream { if isStream {
return s.handleAnthropicStreamingResponse(resp, c, originalModel, startTime) return s.handleAnthropicStreamingResponse(resp, c, originalModel, mappedModel, startTime)
} }
return s.handleAnthropicNonStreamingResponse(resp, c, originalModel, startTime) return s.handleAnthropicNonStreamingResponse(resp, c, originalModel, mappedModel, startTime)
} }
// handleAnthropicErrorResponse reads an upstream error and returns it in // handleAnthropicErrorResponse reads an upstream error and returns it in
@@ -200,6 +200,7 @@ func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse(
resp *http.Response, resp *http.Response,
c *gin.Context, c *gin.Context,
originalModel string, originalModel string,
mappedModel string,
startTime time.Time, startTime time.Time,
) (*OpenAIForwardResult, error) { ) (*OpenAIForwardResult, error) {
requestID := resp.Header.Get("x-request-id") requestID := resp.Header.Get("x-request-id")
@@ -233,11 +234,12 @@ func (s *OpenAIGatewayService) handleAnthropicNonStreamingResponse(
c.JSON(http.StatusOK, anthropicResp) c.JSON(http.StatusOK, anthropicResp)
return &OpenAIForwardResult{ return &OpenAIForwardResult{
RequestID: requestID, RequestID: requestID,
Usage: usage, Usage: usage,
Model: originalModel, Model: originalModel,
Stream: false, BillingModel: mappedModel,
Duration: time.Since(startTime), Stream: false,
Duration: time.Since(startTime),
}, nil }, nil
} }
@@ -247,6 +249,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
resp *http.Response, resp *http.Response,
c *gin.Context, c *gin.Context,
originalModel string, originalModel string,
mappedModel string,
startTime time.Time, startTime time.Time,
) (*OpenAIForwardResult, error) { ) (*OpenAIForwardResult, error) {
requestID := resp.Header.Get("x-request-id") requestID := resp.Header.Get("x-request-id")
@@ -293,7 +296,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
} }
// Extract usage from completion events // Extract usage from completion events
if (event.Type == "response.completed" || event.Type == "response.incomplete") && if (event.Type == "response.completed" || event.Type == "response.incomplete" || event.Type == "response.failed") &&
event.Response != nil && event.Response.Usage != nil { event.Response != nil && event.Response.Usage != nil {
usage = OpenAIUsage{ usage = OpenAIUsage{
InputTokens: event.Response.Usage.InputTokens, InputTokens: event.Response.Usage.InputTokens,
@@ -324,6 +327,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
RequestID: requestID, RequestID: requestID,
Usage: usage, Usage: usage,
Model: originalModel, Model: originalModel,
BillingModel: mappedModel,
Stream: true, Stream: true,
Duration: time.Since(startTime), Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
@@ -360,6 +364,7 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
RequestID: requestID, RequestID: requestID,
Usage: usage, Usage: usage,
Model: originalModel, Model: originalModel,
BillingModel: mappedModel,
Stream: true, Stream: true,
Duration: time.Since(startTime), Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,

View File

@@ -207,12 +207,18 @@ type OpenAIUsage struct {
type OpenAIForwardResult struct { type OpenAIForwardResult struct {
RequestID string RequestID string
Usage OpenAIUsage Usage OpenAIUsage
Model string Model string // 原始模型(用于响应和日志显示)
// BillingModel is the model used for cost calculation.
// When non-empty, CalculateCost uses this instead of Model.
// This is set by the Anthropic Messages conversion path where
// the mapped upstream model differs from the client-facing model.
BillingModel string
// ReasoningEffort is extracted from request body (reasoning.effort) or derived from model suffix. // ReasoningEffort is extracted from request body (reasoning.effort) or derived from model suffix.
// Stored for usage records display; nil means not provided / not applicable. // Stored for usage records display; nil means not provided / not applicable.
ReasoningEffort *string ReasoningEffort *string
Stream bool Stream bool
OpenAIWSMode bool OpenAIWSMode bool
ResponseHeaders http.Header
Duration time.Duration Duration time.Duration
FirstTokenMs *int FirstTokenMs *int
} }
@@ -3610,7 +3616,11 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
multiplier = resolver.Resolve(ctx, user.ID, *apiKey.GroupID, apiKey.Group.RateMultiplier) multiplier = resolver.Resolve(ctx, user.ID, *apiKey.GroupID, apiKey.Group.RateMultiplier)
} }
cost, err := s.billingService.CalculateCost(result.Model, tokens, multiplier) billingModel := result.Model
if result.BillingModel != "" {
billingModel = result.BillingModel
}
cost, err := s.billingService.CalculateCost(billingModel, tokens, multiplier)
if err != nil { if err != nil {
cost = &CostBreakdown{ActualCost: 0} cost = &CostBreakdown{ActualCost: 0}
} }
@@ -3630,7 +3640,7 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
APIKeyID: apiKey.ID, APIKeyID: apiKey.ID,
AccountID: account.ID, AccountID: account.ID,
RequestID: result.RequestID, RequestID: result.RequestID,
Model: result.Model, Model: billingModel,
ReasoningEffort: result.ReasoningEffort, ReasoningEffort: result.ReasoningEffort,
InputTokens: actualInputTokens, InputTokens: actualInputTokens,
OutputTokens: result.Usage.OutputTokens, OutputTokens: result.Usage.OutputTokens,
@@ -3875,6 +3885,15 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc
}() }()
} }
func (s *OpenAIGatewayService) UpdateCodexUsageSnapshotFromHeaders(ctx context.Context, accountID int64, headers http.Header) {
if accountID <= 0 || headers == nil {
return
}
if snapshot := ParseCodexRateLimitHeaders(headers); snapshot != nil {
s.updateCodexUsageSnapshot(ctx, accountID, snapshot)
}
}
func getOpenAIReasoningEffortFromReqBody(reqBody map[string]any) (value string, present bool) { func getOpenAIReasoningEffortFromReqBody(reqBody map[string]any) (value string, present bool) {
if reqBody == nil { if reqBody == nil {
return "", false return "", false

View File

@@ -28,6 +28,22 @@ type stubOpenAIAccountRepo struct {
accounts []Account accounts []Account
} }
type snapshotUpdateAccountRepo struct {
stubOpenAIAccountRepo
updateExtraCalls chan map[string]any
}
func (r *snapshotUpdateAccountRepo) UpdateExtra(ctx context.Context, id int64, updates map[string]any) error {
if r.updateExtraCalls != nil {
copied := make(map[string]any, len(updates))
for k, v := range updates {
copied[k] = v
}
r.updateExtraCalls <- copied
}
return nil
}
func (r stubOpenAIAccountRepo) GetByID(ctx context.Context, id int64) (*Account, error) { func (r stubOpenAIAccountRepo) GetByID(ctx context.Context, id int64) (*Account, error) {
for i := range r.accounts { for i := range r.accounts {
if r.accounts[i].ID == id { if r.accounts[i].ID == id {
@@ -1248,6 +1264,30 @@ func TestOpenAIValidateUpstreamBaseURLEnabledEnforcesAllowlist(t *testing.T) {
} }
} }
func TestOpenAIUpdateCodexUsageSnapshotFromHeaders(t *testing.T) {
repo := &snapshotUpdateAccountRepo{updateExtraCalls: make(chan map[string]any, 1)}
svc := &OpenAIGatewayService{accountRepo: repo}
headers := http.Header{}
headers.Set("x-codex-primary-used-percent", "12")
headers.Set("x-codex-secondary-used-percent", "34")
headers.Set("x-codex-primary-window-minutes", "300")
headers.Set("x-codex-secondary-window-minutes", "10080")
headers.Set("x-codex-primary-reset-after-seconds", "600")
headers.Set("x-codex-secondary-reset-after-seconds", "86400")
svc.UpdateCodexUsageSnapshotFromHeaders(context.Background(), 123, headers)
select {
case updates := <-repo.updateExtraCalls:
require.Equal(t, 12.0, updates["codex_5h_used_percent"])
require.Equal(t, 34.0, updates["codex_7d_used_percent"])
require.Equal(t, 600, updates["codex_5h_reset_after_seconds"])
require.Equal(t, 86400, updates["codex_7d_reset_after_seconds"])
case <-time.After(2 * time.Second):
t.Fatal("expected UpdateExtra to be called")
}
}
func TestOpenAIResponsesRequestPathSuffix(t *testing.T) { func TestOpenAIResponsesRequestPathSuffix(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@@ -1334,6 +1374,7 @@ func TestOpenAIBuildUpstreamRequestPreservesCompactPathForAPIKeyBaseURL(t *testi
// ==================== P1-08 修复model 替换性能优化测试 ==================== // ==================== P1-08 修复model 替换性能优化测试 ====================
// ==================== P1-08 修复model 替换性能优化测试 =============
func TestReplaceModelInSSELine(t *testing.T) { func TestReplaceModelInSSELine(t *testing.T) {
svc := &OpenAIGatewayService{} svc := &OpenAIGatewayService{}

View File

@@ -2309,6 +2309,7 @@ func (s *OpenAIGatewayService) forwardOpenAIWSV2(
ReasoningEffort: extractOpenAIReasoningEffort(reqBody, originalModel), ReasoningEffort: extractOpenAIReasoningEffort(reqBody, originalModel),
Stream: reqStream, Stream: reqStream,
OpenAIWSMode: true, OpenAIWSMode: true,
ResponseHeaders: lease.HandshakeHeaders(),
Duration: time.Since(startTime), Duration: time.Since(startTime),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
}, nil }, nil
@@ -2919,6 +2920,7 @@ func (s *OpenAIGatewayService) ProxyResponsesWebSocketFromClient(
ReasoningEffort: extractOpenAIReasoningEffortFromBody(payload, originalModel), ReasoningEffort: extractOpenAIReasoningEffortFromBody(payload, originalModel),
Stream: reqStream, Stream: reqStream,
OpenAIWSMode: true, OpenAIWSMode: true,
ResponseHeaders: lease.HandshakeHeaders(),
Duration: time.Since(turnStart), Duration: time.Since(turnStart),
FirstTokenMs: firstTokenMs, FirstTokenMs: firstTokenMs,
}, nil }, nil

View File

@@ -126,6 +126,13 @@ func (l *openAIWSConnLease) HandshakeHeader(name string) string {
return l.conn.handshakeHeader(name) return l.conn.handshakeHeader(name)
} }
func (l *openAIWSConnLease) HandshakeHeaders() http.Header {
if l == nil || l.conn == nil {
return nil
}
return cloneHeader(l.conn.handshakeHeaders)
}
func (l *openAIWSConnLease) IsPrewarmed() bool { func (l *openAIWSConnLease) IsPrewarmed() bool {
if l == nil || l.conn == nil { if l == nil || l.conn == nil {
return false return false

View File

@@ -177,11 +177,12 @@ func (s *OpenAIGatewayService) proxyResponsesWebSocketV2Passthrough(
CacheCreationInputTokens: turn.Usage.CacheCreationInputTokens, CacheCreationInputTokens: turn.Usage.CacheCreationInputTokens,
CacheReadInputTokens: turn.Usage.CacheReadInputTokens, CacheReadInputTokens: turn.Usage.CacheReadInputTokens,
}, },
Model: turn.RequestModel, Model: turn.RequestModel,
Stream: true, Stream: true,
OpenAIWSMode: true, OpenAIWSMode: true,
Duration: turn.Duration, ResponseHeaders: cloneHeader(handshakeHeaders),
FirstTokenMs: turn.FirstTokenMs, Duration: turn.Duration,
FirstTokenMs: turn.FirstTokenMs,
} }
logOpenAIWSV2Passthrough( logOpenAIWSV2Passthrough(
"relay_turn_completed account_id=%d turn=%d request_id=%s terminal_event=%s duration_ms=%d first_token_ms=%d input_tokens=%d output_tokens=%d cache_read_tokens=%d", "relay_turn_completed account_id=%d turn=%d request_id=%s terminal_event=%s duration_ms=%d first_token_ms=%d input_tokens=%d output_tokens=%d cache_read_tokens=%d",
@@ -223,11 +224,12 @@ func (s *OpenAIGatewayService) proxyResponsesWebSocketV2Passthrough(
CacheCreationInputTokens: relayResult.Usage.CacheCreationInputTokens, CacheCreationInputTokens: relayResult.Usage.CacheCreationInputTokens,
CacheReadInputTokens: relayResult.Usage.CacheReadInputTokens, CacheReadInputTokens: relayResult.Usage.CacheReadInputTokens,
}, },
Model: relayResult.RequestModel, Model: relayResult.RequestModel,
Stream: true, Stream: true,
OpenAIWSMode: true, OpenAIWSMode: true,
Duration: relayResult.Duration, ResponseHeaders: cloneHeader(handshakeHeaders),
FirstTokenMs: relayResult.FirstTokenMs, Duration: relayResult.Duration,
FirstTokenMs: relayResult.FirstTokenMs,
} }
turnCount := int(completedTurns.Load()) turnCount := int(completedTurns.Load())

View File

@@ -970,12 +970,27 @@ func (s *RateLimitService) UpdateSessionWindow(ctx context.Context, account *Acc
windowStart = &start windowStart = &start
windowEnd = &end windowEnd = &end
slog.Info("account_session_window_initialized", "account_id", account.ID, "window_start", start, "window_end", end, "status", status) slog.Info("account_session_window_initialized", "account_id", account.ID, "window_start", start, "window_end", end, "status", status)
// 窗口重置时清除旧的 utilization避免残留上个窗口的数据
_ = s.accountRepo.UpdateExtra(ctx, account.ID, map[string]any{
"session_window_utilization": nil,
})
} }
if err := s.accountRepo.UpdateSessionWindow(ctx, account.ID, windowStart, windowEnd, status); err != nil { if err := s.accountRepo.UpdateSessionWindow(ctx, account.ID, windowStart, windowEnd, status); err != nil {
slog.Warn("session_window_update_failed", "account_id", account.ID, "error", err) slog.Warn("session_window_update_failed", "account_id", account.ID, "error", err)
} }
// 存储真实的 utilization 值0-1 小数),供 estimateSetupTokenUsage 使用
if utilStr := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilStr != "" {
if util, err := strconv.ParseFloat(utilStr, 64); err == nil {
if err := s.accountRepo.UpdateExtra(ctx, account.ID, map[string]any{
"session_window_utilization": util,
}); err != nil {
slog.Warn("session_window_utilization_update_failed", "account_id", account.ID, "error", err)
}
}
}
// 如果状态为allowed且之前有限流说明窗口已重置清除限流状态 // 如果状态为allowed且之前有限流说明窗口已重置清除限流状态
if status == "allowed" && account.IsRateLimited() { if status == "allowed" && account.IsRateLimited() {
if err := s.ClearRateLimit(ctx, account.ID); err != nil { if err := s.ClearRateLimit(ctx, account.ID); err != nil {

View File

@@ -90,6 +90,36 @@
color="emerald" color="emerald"
/> />
</div> </div>
<div v-else-if="loading" class="space-y-1.5">
<div class="flex items-center gap-1">
<div class="h-3 w-[32px] animate-pulse rounded bg-gray-200 dark:bg-gray-700"></div>
<div class="h-1.5 w-8 animate-pulse rounded-full bg-gray-200 dark:bg-gray-700"></div>
<div class="h-3 w-[32px] animate-pulse rounded bg-gray-200 dark:bg-gray-700"></div>
</div>
<div class="flex items-center gap-1">
<div class="h-3 w-[32px] animate-pulse rounded bg-gray-200 dark:bg-gray-700"></div>
<div class="h-1.5 w-8 animate-pulse rounded-full bg-gray-200 dark:bg-gray-700"></div>
<div class="h-3 w-[32px] animate-pulse rounded bg-gray-200 dark:bg-gray-700"></div>
</div>
</div>
<div v-else-if="hasOpenAIUsageFallback" class="space-y-1">
<UsageProgressBar
v-if="usageInfo?.five_hour"
label="5h"
:utilization="usageInfo.five_hour.utilization"
:resets-at="usageInfo.five_hour.resets_at"
:window-stats="usageInfo.five_hour.window_stats"
color="indigo"
/>
<UsageProgressBar
v-if="usageInfo?.seven_day"
label="7d"
:utilization="usageInfo.seven_day.utilization"
:resets-at="usageInfo.seven_day.resets_at"
:window-stats="usageInfo.seven_day.window_stats"
color="emerald"
/>
</div>
<div v-else class="text-xs text-gray-400">-</div> <div v-else class="text-xs text-gray-400">-</div>
</template> </template>
@@ -313,6 +343,9 @@ const shouldFetchUsage = computed(() => {
if (props.account.platform === 'antigravity') { if (props.account.platform === 'antigravity') {
return props.account.type === 'oauth' return props.account.type === 'oauth'
} }
if (props.account.platform === 'openai') {
return props.account.type === 'oauth'
}
return false return false
}) })
@@ -335,6 +368,11 @@ const hasCodexUsage = computed(() => {
return codex5hWindow.value.usedPercent !== null || codex7dWindow.value.usedPercent !== null return codex5hWindow.value.usedPercent !== null || codex7dWindow.value.usedPercent !== null
}) })
const hasOpenAIUsageFallback = computed(() => {
if (props.account.platform !== 'openai' || props.account.type !== 'oauth') return false
return !!usageInfo.value?.five_hour || !!usageInfo.value?.seven_day
})
const codex5hUsedPercent = computed(() => codex5hWindow.value.usedPercent) const codex5hUsedPercent = computed(() => codex5hWindow.value.usedPercent)
const codex5hResetAt = computed(() => codex5hWindow.value.resetAt) const codex5hResetAt = computed(() => codex5hWindow.value.resetAt)
const codex7dUsedPercent = computed(() => codex7dWindow.value.usedPercent) const codex7dUsedPercent = computed(() => codex7dWindow.value.usedPercent)

View File

@@ -67,4 +67,59 @@ describe('AccountUsageCell', () => {
expect(wrapper.text()).toContain('admin.accounts.usageWindow.gemini3Image|70|2026-03-01T09:00:00Z') expect(wrapper.text()).toContain('admin.accounts.usageWindow.gemini3Image|70|2026-03-01T09:00:00Z')
}) })
it('OpenAI OAuth 在无 codex 快照时会回退显示 usage 接口窗口', async () => {
getUsage.mockResolvedValue({
five_hour: {
utilization: 0,
resets_at: null,
remaining_seconds: 0,
window_stats: {
requests: 2,
tokens: 27700,
cost: 0.06,
standard_cost: 0.06,
user_cost: 0.06
}
},
seven_day: {
utilization: 0,
resets_at: null,
remaining_seconds: 0,
window_stats: {
requests: 2,
tokens: 27700,
cost: 0.06,
standard_cost: 0.06,
user_cost: 0.06
}
}
})
const wrapper = mount(AccountUsageCell, {
props: {
account: {
id: 2002,
platform: 'openai',
type: 'oauth',
extra: {}
} as any
},
global: {
stubs: {
UsageProgressBar: {
props: ['label', 'utilization', 'resetsAt', 'windowStats', 'color'],
template: '<div class="usage-bar">{{ label }}|{{ utilization }}|{{ windowStats?.tokens }}</div>'
},
AccountQuotaInfo: true
}
}
})
await flushPromises()
expect(getUsage).toHaveBeenCalledWith(2002)
expect(wrapper.text()).toContain('5h|0|27700')
expect(wrapper.text()).toContain('7d|0|27700')
})
}) })