diff --git a/backend/internal/handler/admin/account_handler_passthrough_test.go b/backend/internal/handler/admin/account_handler_passthrough_test.go new file mode 100644 index 00000000..b6720451 --- /dev/null +++ b/backend/internal/handler/admin/account_handler_passthrough_test.go @@ -0,0 +1,67 @@ +package admin + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" +) + +func TestAccountHandler_Create_AnthropicAPIKeyPassthroughExtraForwarded(t *testing.T) { + gin.SetMode(gin.TestMode) + + adminSvc := newStubAdminService() + handler := NewAccountHandler( + adminSvc, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ) + + router := gin.New() + router.POST("/api/v1/admin/accounts", handler.Create) + + body := map[string]any{ + "name": "anthropic-key-1", + "platform": "anthropic", + "type": "apikey", + "credentials": map[string]any{ + "api_key": "sk-ant-xxx", + "base_url": "https://api.anthropic.com", + }, + "extra": map[string]any{ + "anthropic_passthrough": true, + }, + "concurrency": 1, + "priority": 1, + } + raw, err := json.Marshal(body) + require.NoError(t, err) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/api/v1/admin/accounts", bytes.NewReader(raw)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Len(t, adminSvc.createdAccounts, 1) + + created := adminSvc.createdAccounts[0] + require.Equal(t, "anthropic", created.Platform) + require.Equal(t, "apikey", created.Type) + require.NotNil(t, created.Extra) + require.Equal(t, true, created.Extra["anthropic_passthrough"]) +} + diff --git a/backend/internal/server/routes/gateway.go b/backend/internal/server/routes/gateway.go index 69881e70..930c8b9e 100644 --- a/backend/internal/server/routes/gateway.go +++ b/backend/internal/server/routes/gateway.go @@ -43,12 +43,12 @@ func RegisterGatewayRoutes( gateway.GET("/usage", h.Gateway.Usage) // OpenAI Responses API gateway.POST("/responses", h.OpenAIGateway.Responses) - // 明确阻止旧入口误用到 Sora,避免客户端把 OpenAI Chat Completions 当作 Sora 入口 + // 明确阻止旧协议入口:OpenAI 仅支持 Responses API,避免客户端误解为会自动路由到其它平台。 gateway.POST("/chat/completions", func(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{ "error": gin.H{ "type": "invalid_request_error", - "message": "For Sora, use /sora/v1/chat/completions. OpenAI should use /v1/responses.", + "message": "Unsupported legacy protocol: /v1/chat/completions is not supported. Please use /v1/responses.", }, }) }) diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index bce3f98f..51ab84dd 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -719,6 +719,17 @@ func (a *Account) IsOpenAIOAuthPassthroughEnabled() bool { return a != nil && a.IsOpenAIOAuth() && a.IsOpenAIPassthroughEnabled() } +// IsAnthropicAPIKeyPassthroughEnabled 返回 Anthropic API Key 账号是否启用“自动透传(仅替换认证)”。 +// 字段:accounts.extra.anthropic_passthrough。 +// 字段缺失或类型不正确时,按 false(关闭)处理。 +func (a *Account) IsAnthropicAPIKeyPassthroughEnabled() bool { + if a == nil || a.Platform != PlatformAnthropic || a.Type != AccountTypeAPIKey || a.Extra == nil { + return false + } + enabled, ok := a.Extra["anthropic_passthrough"].(bool) + return ok && enabled +} + // IsCodexCLIOnlyEnabled 返回 OpenAI OAuth 账号是否启用“仅允许 Codex 官方客户端”。 // 字段:accounts.extra.codex_cli_only。 // 字段缺失或类型不正确时,按 false(关闭)处理。 diff --git a/backend/internal/service/account_anthropic_passthrough_test.go b/backend/internal/service/account_anthropic_passthrough_test.go new file mode 100644 index 00000000..e66407a3 --- /dev/null +++ b/backend/internal/service/account_anthropic_passthrough_test.go @@ -0,0 +1,62 @@ +package service + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAccount_IsAnthropicAPIKeyPassthroughEnabled(t *testing.T) { + t.Run("Anthropic API Key 开启", func(t *testing.T) { + account := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + } + require.True(t, account.IsAnthropicAPIKeyPassthroughEnabled()) + }) + + t.Run("Anthropic API Key 关闭", func(t *testing.T) { + account := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Extra: map[string]any{ + "anthropic_passthrough": false, + }, + } + require.False(t, account.IsAnthropicAPIKeyPassthroughEnabled()) + }) + + t.Run("字段类型非法默认关闭", func(t *testing.T) { + account := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Extra: map[string]any{ + "anthropic_passthrough": "true", + }, + } + require.False(t, account.IsAnthropicAPIKeyPassthroughEnabled()) + }) + + t.Run("非 Anthropic API Key 账号始终关闭", func(t *testing.T) { + oauth := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeOAuth, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + } + require.False(t, oauth.IsAnthropicAPIKeyPassthroughEnabled()) + + openai := &Account{ + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + } + require.False(t, openai.IsAnthropicAPIKeyPassthroughEnabled()) + }) +} diff --git a/backend/internal/service/gateway_anthropic_apikey_passthrough_benchmark_test.go b/backend/internal/service/gateway_anthropic_apikey_passthrough_benchmark_test.go new file mode 100644 index 00000000..37fd709f --- /dev/null +++ b/backend/internal/service/gateway_anthropic_apikey_passthrough_benchmark_test.go @@ -0,0 +1,56 @@ +package service + +import "testing" + +func BenchmarkGatewayService_ParseSSEUsage_MessageStart(b *testing.B) { + svc := &GatewayService{} + data := `{"type":"message_start","message":{"usage":{"input_tokens":123,"cache_creation_input_tokens":45,"cache_read_input_tokens":6,"cached_tokens":6,"cache_creation":{"ephemeral_5m_input_tokens":20,"ephemeral_1h_input_tokens":25}}}}` + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + usage := &ClaudeUsage{} + svc.parseSSEUsage(data, usage) + } +} + +func BenchmarkGatewayService_ParseSSEUsagePassthrough_MessageStart(b *testing.B) { + svc := &GatewayService{} + data := `{"type":"message_start","message":{"usage":{"input_tokens":123,"cache_creation_input_tokens":45,"cache_read_input_tokens":6,"cached_tokens":6,"cache_creation":{"ephemeral_5m_input_tokens":20,"ephemeral_1h_input_tokens":25}}}}` + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + usage := &ClaudeUsage{} + svc.parseSSEUsagePassthrough(data, usage) + } +} + +func BenchmarkGatewayService_ParseSSEUsage_MessageDelta(b *testing.B) { + svc := &GatewayService{} + data := `{"type":"message_delta","usage":{"output_tokens":456,"cache_creation_input_tokens":30,"cache_read_input_tokens":7,"cached_tokens":7,"cache_creation":{"ephemeral_5m_input_tokens":10,"ephemeral_1h_input_tokens":20}}}` + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + usage := &ClaudeUsage{} + svc.parseSSEUsage(data, usage) + } +} + +func BenchmarkGatewayService_ParseSSEUsagePassthrough_MessageDelta(b *testing.B) { + svc := &GatewayService{} + data := `{"type":"message_delta","usage":{"output_tokens":456,"cache_creation_input_tokens":30,"cache_read_input_tokens":7,"cached_tokens":7,"cache_creation":{"ephemeral_5m_input_tokens":10,"ephemeral_1h_input_tokens":20}}}` + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + usage := &ClaudeUsage{} + svc.parseSSEUsagePassthrough(data, usage) + } +} + +func BenchmarkParseClaudeUsageFromResponseBody(b *testing.B) { + body := []byte(`{"id":"msg_123","type":"message","usage":{"input_tokens":123,"output_tokens":456,"cache_creation_input_tokens":45,"cache_read_input_tokens":6,"cached_tokens":6,"cache_creation":{"ephemeral_5m_input_tokens":20,"ephemeral_1h_input_tokens":25}}}`) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = parseClaudeUsageFromResponseBody(body) + } +} diff --git a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go new file mode 100644 index 00000000..5183891b --- /dev/null +++ b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go @@ -0,0 +1,773 @@ +package service + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/claude" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +type anthropicHTTPUpstreamRecorder struct { + lastReq *http.Request + lastBody []byte + resp *http.Response + err error +} + +func newAnthropicAPIKeyAccountForTest() *Account { + return &Account{ + ID: 201, + Name: "anthropic-apikey-pass-test", + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "upstream-anthropic-key", + "base_url": "https://api.anthropic.com", + }, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + Status: StatusActive, + Schedulable: true, + } +} + +func (u *anthropicHTTPUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { + u.lastReq = req + if req != nil && req.Body != nil { + b, _ := io.ReadAll(req.Body) + u.lastBody = b + _ = req.Body.Close() + req.Body = io.NopCloser(bytes.NewReader(b)) + } + if u.err != nil { + return nil, u.err + } + return u.resp, nil +} + +func (u *anthropicHTTPUpstreamRecorder) DoWithTLS(req *http.Request, proxyURL string, accountID int64, accountConcurrency int, enableTLSFingerprint bool) (*http.Response, error) { + return u.Do(req, proxyURL, accountID, accountConcurrency) +} + +type streamReadCloser struct { + payload []byte + sent bool + err error +} + +func (r *streamReadCloser) Read(p []byte) (int, error) { + if !r.sent { + r.sent = true + n := copy(p, r.payload) + return n, nil + } + if r.err != nil { + return 0, r.err + } + return 0, io.EOF +} + +func (r *streamReadCloser) Close() error { return nil } + +type failWriteResponseWriter struct { + gin.ResponseWriter +} + +func (w *failWriteResponseWriter) Write(data []byte) (int, error) { + return 0, errors.New("client disconnected") +} + +func (w *failWriteResponseWriter) WriteString(_ string) (int, error) { + return 0, errors.New("client disconnected") +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardStreamPreservesBodyAndAuthReplacement(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + c.Request.Header.Set("User-Agent", "claude-cli/1.0.0") + c.Request.Header.Set("Authorization", "Bearer inbound-token") + c.Request.Header.Set("X-Api-Key", "inbound-api-key") + c.Request.Header.Set("X-Goog-Api-Key", "inbound-goog-key") + c.Request.Header.Set("Cookie", "secret=1") + c.Request.Header.Set("Anthropic-Beta", "interleaved-thinking-2025-05-14") + + body := []byte(`{"model":"claude-3-7-sonnet-20250219","stream":true,"system":[{"type":"text","text":"x-anthropic-billing-header keep"}],"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`) + parsed := &ParsedRequest{ + Body: body, + Model: "claude-3-7-sonnet-20250219", + Stream: true, + } + + upstreamSSE := strings.Join([]string{ + `data: {"type":"message_start","message":{"usage":{"input_tokens":9,"cached_tokens":7}}}`, + "", + `data: {"type":"message_delta","usage":{"output_tokens":3}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &anthropicHTTPUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "Content-Type": []string{"text/event-stream"}, + "x-request-id": []string{"rid-anthropic-pass"}, + "Set-Cookie": []string{"secret=upstream"}, + }, + Body: io.NopCloser(strings.NewReader(upstreamSSE)), + }, + } + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + httpUpstream: upstream, + rateLimitService: &RateLimitService{}, + deferredService: &DeferredService{}, + billingCacheService: nil, + } + + account := &Account{ + ID: 101, + Name: "anthropic-apikey-pass", + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "upstream-anthropic-key", + "base_url": "https://api.anthropic.com", + "model_mapping": map[string]any{"claude-3-7-sonnet-20250219": "claude-3-haiku-20240307"}, + }, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + Status: StatusActive, + Schedulable: true, + } + + result, err := svc.Forward(context.Background(), c, account, parsed) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Stream) + + require.Equal(t, body, upstream.lastBody, "透传模式不应改写上游请求体") + require.Equal(t, "claude-3-7-sonnet-20250219", gjson.GetBytes(upstream.lastBody, "model").String()) + + require.Equal(t, "upstream-anthropic-key", upstream.lastReq.Header.Get("x-api-key")) + require.Empty(t, upstream.lastReq.Header.Get("authorization")) + require.Empty(t, upstream.lastReq.Header.Get("x-goog-api-key")) + require.Empty(t, upstream.lastReq.Header.Get("cookie")) + require.Equal(t, "2023-06-01", upstream.lastReq.Header.Get("anthropic-version")) + require.Equal(t, "interleaved-thinking-2025-05-14", upstream.lastReq.Header.Get("anthropic-beta")) + require.Empty(t, upstream.lastReq.Header.Get("x-stainless-lang"), "API Key 透传不应注入 OAuth 指纹头") + + require.Contains(t, rec.Body.String(), `"cached_tokens":7`) + require.NotContains(t, rec.Body.String(), `"cache_read_input_tokens":7`, "透传输出不应被网关改写") + require.Equal(t, 7, result.Usage.CacheReadInputTokens, "计费 usage 解析应保留 cached_tokens 兼容") + require.Empty(t, rec.Header().Get("Set-Cookie"), "响应头应经过安全过滤") + rawBody, ok := c.Get(OpsUpstreamRequestBodyKey) + require.True(t, ok) + bodyBytes, ok := rawBody.([]byte) + require.True(t, ok, "应以 []byte 形式缓存上游请求体,避免重复 string 拷贝") + require.Equal(t, body, bodyBytes) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardCountTokensPreservesBody(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages/count_tokens", nil) + c.Request.Header.Set("Authorization", "Bearer inbound-token") + c.Request.Header.Set("X-Api-Key", "inbound-api-key") + c.Request.Header.Set("Cookie", "secret=1") + + body := []byte(`{"model":"claude-3-5-sonnet-latest","messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}],"thinking":{"type":"enabled"}}`) + parsed := &ParsedRequest{ + Body: body, + Model: "claude-3-5-sonnet-latest", + } + + upstreamRespBody := `{"input_tokens":42}` + upstream := &anthropicHTTPUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "x-request-id": []string{"rid-count"}, + "Set-Cookie": []string{"secret=upstream"}, + }, + Body: io.NopCloser(strings.NewReader(upstreamRespBody)), + }, + } + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + httpUpstream: upstream, + rateLimitService: &RateLimitService{}, + } + + account := &Account{ + ID: 102, + Name: "anthropic-apikey-pass-count", + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "upstream-anthropic-key", + "base_url": "https://api.anthropic.com", + "model_mapping": map[string]any{"claude-3-5-sonnet-latest": "claude-3-opus-20240229"}, + }, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + Status: StatusActive, + Schedulable: true, + } + + err := svc.ForwardCountTokens(context.Background(), c, account, parsed) + require.NoError(t, err) + + require.Equal(t, body, upstream.lastBody, "count_tokens 透传模式不应改写请求体") + require.Equal(t, "claude-3-5-sonnet-latest", gjson.GetBytes(upstream.lastBody, "model").String()) + require.Equal(t, "upstream-anthropic-key", upstream.lastReq.Header.Get("x-api-key")) + require.Empty(t, upstream.lastReq.Header.Get("authorization")) + require.Empty(t, upstream.lastReq.Header.Get("cookie")) + require.Equal(t, http.StatusOK, rec.Code) + require.JSONEq(t, upstreamRespBody, rec.Body.String()) + require.Empty(t, rec.Header().Get("Set-Cookie")) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_BuildRequestRejectsInvalidBaseURL(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Security: config.SecurityConfig{ + URLAllowlist: config.URLAllowlistConfig{ + Enabled: false, + }, + }, + }, + } + account := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeAPIKey, + Credentials: map[string]any{ + "api_key": "k", + "base_url": "://invalid-url", + }, + } + + _, err := svc.buildUpstreamRequestAnthropicAPIKeyPassthrough(context.Background(), c, account, []byte(`{}`), "k") + require.Error(t, err) +} + +func TestGatewayService_AnthropicOAuth_NotAffectedByAPIKeyPassthroughToggle(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{MaxLineSize: defaultMaxLineSize}, + }, + } + account := &Account{ + Platform: PlatformAnthropic, + Type: AccountTypeOAuth, + Extra: map[string]any{ + "anthropic_passthrough": true, + }, + } + + require.False(t, account.IsAnthropicAPIKeyPassthroughEnabled()) + + req, err := svc.buildUpstreamRequest(context.Background(), c, account, []byte(`{"model":"claude-3-7-sonnet-20250219"}`), "oauth-token", "oauth", "claude-3-7-sonnet-20250219", true, false) + require.NoError(t, err) + require.Equal(t, "Bearer oauth-token", req.Header.Get("authorization")) + require.Contains(t, req.Header.Get("anthropic-beta"), claude.BetaOAuth, "OAuth 链路仍应按原逻辑补齐 oauth beta") +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingStillCollectsUsageAfterClientDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + + // Use a canceled context recorder to simulate client disconnect behavior. + req := httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + ctx, cancel := context.WithCancel(req.Context()) + cancel() + req = req.WithContext(ctx) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = req + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + `data: {"type":"message_start","message":{"usage":{"input_tokens":11}}}`, + "", + `data: {"type":"message_delta","usage":{"output_tokens":5}}`, + "", + "data: [DONE]", + "", + }, "\n"))), + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "claude-3-7-sonnet-20250219") + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.usage) + require.Equal(t, 11, result.usage.InputTokens) + require.Equal(t, 5, result.usage.OutputTokens) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_NonStreamingSuccess(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + body := []byte(`{"model":"claude-3-5-sonnet-latest","messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`) + upstreamJSON := `{"id":"msg_1","type":"message","usage":{"input_tokens":12,"output_tokens":7,"cache_creation":{"ephemeral_5m_input_tokens":2,"ephemeral_1h_input_tokens":3},"cached_tokens":4}}` + upstream := &anthropicHTTPUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "x-request-id": []string{"rid-nonstream"}, + }, + Body: io.NopCloser(strings.NewReader(upstreamJSON)), + }, + } + svc := &GatewayService{ + cfg: &config.Config{}, + httpUpstream: upstream, + rateLimitService: &RateLimitService{}, + } + + result, err := svc.forwardAnthropicAPIKeyPassthrough(context.Background(), c, newAnthropicAPIKeyAccountForTest(), body, "claude-3-5-sonnet-latest", false, time.Now()) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, 12, result.Usage.InputTokens) + require.Equal(t, 7, result.Usage.OutputTokens) + require.Equal(t, 5, result.Usage.CacheCreationInputTokens) + require.Equal(t, 4, result.Usage.CacheReadInputTokens) + require.Equal(t, upstreamJSON, rec.Body.String()) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_InvalidTokenType(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + account := &Account{ + ID: 202, + Name: "anthropic-oauth", + Platform: PlatformAnthropic, + Type: AccountTypeOAuth, + Credentials: map[string]any{ + "access_token": "oauth-token", + }, + } + svc := &GatewayService{} + + result, err := svc.forwardAnthropicAPIKeyPassthrough(context.Background(), c, account, []byte(`{}`), "claude-3-5-sonnet-latest", false, time.Now()) + require.Nil(t, result) + require.Error(t, err) + require.Contains(t, err.Error(), "requires apikey token") +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_UpstreamRequestError(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + upstream := &anthropicHTTPUpstreamRecorder{ + err: errors.New("dial tcp timeout"), + } + svc := &GatewayService{ + cfg: &config.Config{ + Security: config.SecurityConfig{ + URLAllowlist: config.URLAllowlistConfig{Enabled: false}, + }, + }, + httpUpstream: upstream, + } + account := newAnthropicAPIKeyAccountForTest() + + result, err := svc.forwardAnthropicAPIKeyPassthrough(context.Background(), c, account, []byte(`{"model":"x"}`), "x", false, time.Now()) + require.Nil(t, result) + require.Error(t, err) + require.Contains(t, err.Error(), "upstream request failed") + require.Equal(t, http.StatusBadGateway, rec.Code) + rawBody, ok := c.Get(OpsUpstreamRequestBodyKey) + require.True(t, ok) + _, ok = rawBody.([]byte) + require.True(t, ok) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_EmptyResponseBody(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + upstream := &anthropicHTTPUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"x-request-id": []string{"rid-empty-body"}}, + Body: nil, + }, + } + svc := &GatewayService{ + cfg: &config.Config{ + Security: config.SecurityConfig{ + URLAllowlist: config.URLAllowlistConfig{Enabled: false}, + }, + }, + httpUpstream: upstream, + } + + result, err := svc.forwardAnthropicAPIKeyPassthrough(context.Background(), c, newAnthropicAPIKeyAccountForTest(), []byte(`{"model":"x"}`), "x", false, time.Now()) + require.Nil(t, result) + require.Error(t, err) + require.Contains(t, err.Error(), "empty response") +} + +func TestExtractAnthropicSSEDataLine(t *testing.T) { + t.Run("valid data line with spaces", func(t *testing.T) { + data, ok := extractAnthropicSSEDataLine("data: {\"type\":\"message_start\"}") + require.True(t, ok) + require.Equal(t, `{"type":"message_start"}`, data) + }) + + t.Run("non data line", func(t *testing.T) { + data, ok := extractAnthropicSSEDataLine("event: message_start") + require.False(t, ok) + require.Empty(t, data) + }) +} + +func TestGatewayService_ParseSSEUsagePassthrough_MessageStartFallbacks(t *testing.T) { + svc := &GatewayService{} + usage := &ClaudeUsage{} + data := `{"type":"message_start","message":{"usage":{"input_tokens":12,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cached_tokens":9,"cache_creation":{"ephemeral_5m_input_tokens":3,"ephemeral_1h_input_tokens":4}}}}` + + svc.parseSSEUsagePassthrough(data, usage) + + require.Equal(t, 12, usage.InputTokens) + require.Equal(t, 9, usage.CacheReadInputTokens, "应兼容 cached_tokens 字段") + require.Equal(t, 7, usage.CacheCreationInputTokens, "聚合字段为空时应从 5m/1h 明细回填") + require.Equal(t, 3, usage.CacheCreation5mTokens) + require.Equal(t, 4, usage.CacheCreation1hTokens) +} + +func TestGatewayService_ParseSSEUsagePassthrough_MessageDeltaSelectiveOverwrite(t *testing.T) { + svc := &GatewayService{} + usage := &ClaudeUsage{ + InputTokens: 10, + CacheCreation5mTokens: 2, + CacheCreation1hTokens: 6, + } + data := `{"type":"message_delta","usage":{"input_tokens":0,"output_tokens":5,"cache_creation_input_tokens":8,"cache_read_input_tokens":0,"cached_tokens":11,"cache_creation":{"ephemeral_5m_input_tokens":1,"ephemeral_1h_input_tokens":0}}}` + + svc.parseSSEUsagePassthrough(data, usage) + + require.Equal(t, 10, usage.InputTokens, "message_delta 中 0 值不应覆盖已有 input_tokens") + require.Equal(t, 5, usage.OutputTokens) + require.Equal(t, 8, usage.CacheCreationInputTokens) + require.Equal(t, 11, usage.CacheReadInputTokens, "cache_read_input_tokens 为空时应回退到 cached_tokens") + require.Equal(t, 1, usage.CacheCreation5mTokens) + require.Equal(t, 6, usage.CacheCreation1hTokens, "message_delta 中 0 值不应覆盖已有 1h 明细") +} + +func TestGatewayService_ParseSSEUsagePassthrough_NoopCases(t *testing.T) { + svc := &GatewayService{} + + usage := &ClaudeUsage{InputTokens: 3} + svc.parseSSEUsagePassthrough("", usage) + require.Equal(t, 3, usage.InputTokens) + + svc.parseSSEUsagePassthrough("[DONE]", usage) + require.Equal(t, 3, usage.InputTokens) + + svc.parseSSEUsagePassthrough("not-json", usage) + require.Equal(t, 3, usage.InputTokens) + + // nil usage 不应 panic + svc.parseSSEUsagePassthrough(`{"type":"message_start"}`, nil) +} + +func TestGatewayService_ParseSSEUsagePassthrough_FallbackFromUsageNode(t *testing.T) { + svc := &GatewayService{} + usage := &ClaudeUsage{} + data := `{"type":"content_block_delta","usage":{"cached_tokens":6,"cache_creation":{"ephemeral_5m_input_tokens":2,"ephemeral_1h_input_tokens":1}}}` + + svc.parseSSEUsagePassthrough(data, usage) + + require.Equal(t, 6, usage.CacheReadInputTokens) + require.Equal(t, 3, usage.CacheCreationInputTokens) +} + +func TestParseClaudeUsageFromResponseBody(t *testing.T) { + t.Run("empty or missing usage", func(t *testing.T) { + got := parseClaudeUsageFromResponseBody(nil) + require.NotNil(t, got) + require.Equal(t, 0, got.InputTokens) + + got = parseClaudeUsageFromResponseBody([]byte(`{"id":"x"}`)) + require.NotNil(t, got) + require.Equal(t, 0, got.OutputTokens) + }) + + t.Run("parse all usage fields and fallback", func(t *testing.T) { + body := []byte(`{"usage":{"input_tokens":21,"output_tokens":34,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cached_tokens":13,"cache_creation":{"ephemeral_5m_input_tokens":5,"ephemeral_1h_input_tokens":8}}}`) + got := parseClaudeUsageFromResponseBody(body) + require.Equal(t, 21, got.InputTokens) + require.Equal(t, 34, got.OutputTokens) + require.Equal(t, 13, got.CacheReadInputTokens, "cache_read_input_tokens 为空时应回退 cached_tokens") + require.Equal(t, 13, got.CacheCreationInputTokens, "聚合字段为空时应由 5m/1h 回填") + require.Equal(t, 5, got.CacheCreation5mTokens) + require.Equal(t, 8, got.CacheCreation1hTokens) + }) + + t.Run("keep explicit aggregate values", func(t *testing.T) { + body := []byte(`{"usage":{"input_tokens":1,"output_tokens":2,"cache_creation_input_tokens":9,"cache_read_input_tokens":7,"cached_tokens":99,"cache_creation":{"ephemeral_5m_input_tokens":4,"ephemeral_1h_input_tokens":5}}}`) + got := parseClaudeUsageFromResponseBody(body) + require.Equal(t, 9, got.CacheCreationInputTokens, "已显式提供聚合字段时不应被明细覆盖") + require.Equal(t, 7, got.CacheReadInputTokens, "已显式提供 cache_read_input_tokens 时不应回退 cached_tokens") + }) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingErrTooLong(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: 32, + }, + }, + } + + // Scanner 初始缓冲为 64KB,构造更长单行触发 bufio.ErrTooLong。 + longLine := "data: " + strings.Repeat("x", 80*1024) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(longLine)), + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 2}, time.Now(), "claude-3-7-sonnet-20250219") + require.Error(t, err) + require.ErrorIs(t, err, bufio.ErrTooLong) + require.NotNil(t, result) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingDataIntervalTimeout(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + StreamDataIntervalTimeout: 1, + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: pr, + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 5}, time.Now(), "claude-3-7-sonnet-20250219") + _ = pw.Close() + _ = pr.Close() + + require.Error(t, err) + require.Contains(t, err.Error(), "stream data interval timeout") + require.NotNil(t, result) + require.False(t, result.clientDisconnect) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingReadError(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{ + err: io.ErrUnexpectedEOF, + }, + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 6}, time.Now(), "claude-3-7-sonnet-20250219") + require.Error(t, err) + require.Contains(t, err.Error(), "stream read error") + require.NotNil(t, result) + require.False(t, result.clientDisconnect) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingTimeoutAfterClientDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + c.Writer = &failWriteResponseWriter{ResponseWriter: c.Writer} + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + StreamDataIntervalTimeout: 1, + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: pr, + } + + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = pw.Write([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":9}}}` + "\n")) + // 保持上游连接静默,触发数据间隔超时分支。 + time.Sleep(1500 * time.Millisecond) + _ = pw.Close() + }() + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 7}, time.Now(), "claude-3-7-sonnet-20250219") + _ = pr.Close() + <-done + + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.clientDisconnect) + require.Equal(t, 9, result.usage.InputTokens) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingContextCanceled(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{ + err: context.Canceled, + }, + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 3}, time.Now(), "claude-3-7-sonnet-20250219") + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.clientDisconnect) +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingUpstreamReadErrorAfterClientDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + c.Writer = &failWriteResponseWriter{ResponseWriter: c.Writer} + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{ + payload: []byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":8}}}` + "\n\n"), + err: io.ErrUnexpectedEOF, + }, + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 4}, time.Now(), "claude-3-7-sonnet-20250219") + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.clientDisconnect) + require.Equal(t, 8, result.usage.InputTokens) +} diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 063a5ae6..f16f685f 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -3041,6 +3041,10 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A return nil, fmt.Errorf("parse request: empty request") } + if account != nil && account.IsAnthropicAPIKeyPassthroughEnabled() { + return s.forwardAnthropicAPIKeyPassthrough(ctx, c, account, parsed.Body, parsed.Model, parsed.Stream, startTime) + } + body := parsed.Body reqModel := parsed.Model reqStream := parsed.Stream @@ -3120,14 +3124,14 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 调试日志:记录即将转发的账号信息 logger.LegacyPrintf("service.gateway", "[Forward] Using account: ID=%d Name=%s Platform=%s Type=%s TLSFingerprint=%v Proxy=%s", account.ID, account.Name, account.Platform, account.Type, account.IsTLSFingerprintEnabled(), proxyURL) + // 重试间复用同一请求体,避免每次 string(body) 产生额外分配。 + setOpsUpstreamRequestBody(c, body) // 重试循环 var resp *http.Response retryStart := time.Now() for attempt := 1; attempt <= maxRetryAttempts; attempt++ { // 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取) - // Capture upstream request body for ops retry of this attempt. - c.Set(OpsUpstreamRequestBodyKey, string(body)) upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType, reqModel, reqStream, shouldMimicClaudeCode) if err != nil { return nil, err @@ -3491,6 +3495,602 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A }, nil } +func (s *GatewayService) forwardAnthropicAPIKeyPassthrough( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, + reqModel string, + reqStream bool, + startTime time.Time, +) (*ForwardResult, error) { + token, tokenType, err := s.GetAccessToken(ctx, account) + if err != nil { + return nil, err + } + if tokenType != "apikey" { + return nil, fmt.Errorf("anthropic api key passthrough requires apikey token, got: %s", tokenType) + } + + proxyURL := "" + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + logger.LegacyPrintf("service.gateway", "[Anthropic 自动透传] 命中 API Key 透传分支: account=%d name=%s model=%s stream=%v", + account.ID, account.Name, reqModel, reqStream) + + if c != nil { + c.Set("anthropic_passthrough", true) + } + // 重试间复用同一请求体,避免每次 string(body) 产生额外分配。 + setOpsUpstreamRequestBody(c, body) + + var resp *http.Response + retryStart := time.Now() + for attempt := 1; attempt <= maxRetryAttempts; attempt++ { + upstreamReq, err := s.buildUpstreamRequestAnthropicAPIKeyPassthrough(ctx, c, account, body, token) + if err != nil { + return nil, err + } + + resp, err = s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled()) + if err != nil { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + setOpsUpstreamError(c, 0, safeErr, "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: 0, + Passthrough: true, + Kind: "request_error", + Message: safeErr, + }) + c.JSON(http.StatusBadGateway, gin.H{ + "type": "error", + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream request failed", + }, + }) + return nil, fmt.Errorf("upstream request failed: %s", safeErr) + } + + // 透传分支禁止 400 请求体降级重试(该重试会改写请求体) + if resp.StatusCode >= 400 && resp.StatusCode != 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) { + if attempt < maxRetryAttempts { + elapsed := time.Since(retryStart) + if elapsed >= maxRetryElapsed { + break + } + + delay := retryBackoffDelay(attempt) + remaining := maxRetryElapsed - elapsed + if delay > remaining { + delay = remaining + } + if delay <= 0 { + break + } + + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Passthrough: true, + Kind: "retry", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) + logger.LegacyPrintf("service.gateway", "Anthropic passthrough account %d: upstream error %d, retry %d/%d after %v (elapsed=%v/%v)", + account.ID, resp.StatusCode, attempt, maxRetryAttempts, delay, elapsed, maxRetryElapsed) + if err := sleepWithContext(ctx, delay); err != nil { + return nil, err + } + continue + } + break + } + + break + } + if resp == nil || resp.Body == nil { + return nil, errors.New("upstream request failed: empty response") + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) { + if s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + + logger.LegacyPrintf("service.gateway", "[Anthropic Passthrough] Upstream error (retry exhausted, failover): Account=%d(%s) Status=%d RequestID=%s Body=%s", + account.ID, account.Name, resp.StatusCode, resp.Header.Get("x-request-id"), truncateString(string(respBody), 1000)) + + s.handleRetryExhaustedSideEffects(ctx, resp, account) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Passthrough: true, + Kind: "retry_exhausted_failover", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} + } + return s.handleRetryExhaustedError(ctx, resp, c, account) + } + + if resp.StatusCode >= 400 && s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + + logger.LegacyPrintf("service.gateway", "[Anthropic Passthrough] Upstream error (failover): Account=%d(%s) Status=%d RequestID=%s Body=%s", + account.ID, account.Name, resp.StatusCode, resp.Header.Get("x-request-id"), truncateString(string(respBody), 1000)) + + s.handleFailoverSideEffects(ctx, resp, account) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Passthrough: true, + Kind: "failover", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} + } + + if resp.StatusCode >= 400 { + return s.handleErrorResponse(ctx, resp, c, account) + } + + var usage *ClaudeUsage + var firstTokenMs *int + var clientDisconnect bool + if reqStream { + streamResult, err := s.handleStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account, startTime, reqModel) + if err != nil { + return nil, err + } + usage = streamResult.usage + firstTokenMs = streamResult.firstTokenMs + clientDisconnect = streamResult.clientDisconnect + } else { + usage, err = s.handleNonStreamingResponseAnthropicAPIKeyPassthrough(ctx, resp, c, account) + if err != nil { + return nil, err + } + } + if usage == nil { + usage = &ClaudeUsage{} + } + + return &ForwardResult{ + RequestID: resp.Header.Get("x-request-id"), + Usage: *usage, + Model: reqModel, + Stream: reqStream, + Duration: time.Since(startTime), + FirstTokenMs: firstTokenMs, + ClientDisconnect: clientDisconnect, + }, nil +} + +func (s *GatewayService) buildUpstreamRequestAnthropicAPIKeyPassthrough( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, + token string, +) (*http.Request, error) { + targetURL := claudeAPIURL + baseURL := account.GetBaseURL() + if baseURL != "" { + validatedURL, err := s.validateUpstreamBaseURL(baseURL) + if err != nil { + return nil, err + } + targetURL = validatedURL + "/v1/messages" + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(body)) + if err != nil { + return nil, err + } + + if c != nil && c.Request != nil { + for key, values := range c.Request.Header { + lowerKey := strings.ToLower(strings.TrimSpace(key)) + if !allowedHeaders[lowerKey] { + continue + } + for _, v := range values { + req.Header.Add(key, v) + } + } + } + + // 覆盖入站鉴权残留,并注入上游认证 + req.Header.Del("authorization") + req.Header.Del("x-api-key") + req.Header.Del("x-goog-api-key") + req.Header.Del("cookie") + req.Header.Set("x-api-key", token) + + if req.Header.Get("content-type") == "" { + req.Header.Set("content-type", "application/json") + } + if req.Header.Get("anthropic-version") == "" { + req.Header.Set("anthropic-version", "2023-06-01") + } + + return req, nil +} + +func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( + ctx context.Context, + resp *http.Response, + c *gin.Context, + account *Account, + startTime time.Time, + model string, +) (*streamingResult, error) { + if s.rateLimitService != nil { + s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) + } + + writeAnthropicPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.cfg) + + contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) + if contentType == "" { + contentType = "text/event-stream" + } + c.Header("Content-Type", contentType) + if c.Writer.Header().Get("Cache-Control") == "" { + c.Header("Cache-Control", "no-cache") + } + if c.Writer.Header().Get("Connection") == "" { + c.Header("Connection", "keep-alive") + } + c.Header("X-Accel-Buffering", "no") + if v := resp.Header.Get("x-request-id"); v != "" { + c.Header("x-request-id", v) + } + + w := c.Writer + flusher, ok := w.(http.Flusher) + if !ok { + return nil, errors.New("streaming not supported") + } + + usage := &ClaudeUsage{} + var firstTokenMs *int + clientDisconnected := false + + scanner := bufio.NewScanner(resp.Body) + maxLineSize := defaultMaxLineSize + if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { + maxLineSize = s.cfg.Gateway.MaxLineSize + } + scanBuf := getSSEScannerBuf64K() + scanner.Buffer(scanBuf[:0], maxLineSize) + + type scanEvent struct { + line string + err error + } + events := make(chan scanEvent, 16) + done := make(chan struct{}) + sendEvent := func(ev scanEvent) bool { + select { + case events <- ev: + return true + case <-done: + return false + } + } + var lastReadAt int64 + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + go func(scanBuf *sseScannerBuf64K) { + defer putSSEScannerBuf64K(scanBuf) + defer close(events) + for scanner.Scan() { + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + if !sendEvent(scanEvent{line: scanner.Text()}) { + return + } + } + if err := scanner.Err(); err != nil { + _ = sendEvent(scanEvent{err: err}) + } + }(scanBuf) + defer close(done) + + streamInterval := time.Duration(0) + if s.cfg != nil && s.cfg.Gateway.StreamDataIntervalTimeout > 0 { + streamInterval = time.Duration(s.cfg.Gateway.StreamDataIntervalTimeout) * time.Second + } + var intervalTicker *time.Ticker + if streamInterval > 0 { + intervalTicker = time.NewTicker(streamInterval) + defer intervalTicker.Stop() + } + var intervalCh <-chan time.Time + if intervalTicker != nil { + intervalCh = intervalTicker.C + } + + for { + select { + case ev, ok := <-events: + if !ok { + if !clientDisconnected { + // 兜底补刷,确保最后一个未以空行结尾的事件也能及时送达客户端。 + flusher.Flush() + } + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: clientDisconnected}, nil + } + if ev.err != nil { + if clientDisconnected { + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Upstream read error after client disconnect: account=%d err=%v", account.ID, ev.err) + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil + } + if errors.Is(ev.err, context.Canceled) || errors.Is(ev.err, context.DeadlineExceeded) { + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] 流读取被取消: account=%d request_id=%s err=%v ctx_err=%v", + account.ID, resp.Header.Get("x-request-id"), ev.err, ctx.Err()) + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil + } + if errors.Is(ev.err, bufio.ErrTooLong) { + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] SSE line too long: account=%d max_size=%d error=%v", account.ID, maxLineSize, ev.err) + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err + } + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream read error: %w", ev.err) + } + + line := ev.line + if data, ok := extractAnthropicSSEDataLine(line); ok { + trimmed := strings.TrimSpace(data) + if firstTokenMs == nil && trimmed != "" && trimmed != "[DONE]" { + ms := int(time.Since(startTime).Milliseconds()) + firstTokenMs = &ms + } + s.parseSSEUsagePassthrough(data, usage) + } + + if !clientDisconnected { + if _, err := io.WriteString(w, line); err != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) + } else if _, err := io.WriteString(w, "\n"); err != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) + } else if line == "" { + // 按 SSE 事件边界刷出,减少每行 flush 带来的 syscall 开销。 + flusher.Flush() + } + } + + case <-intervalCh: + lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt)) + if time.Since(lastRead) < streamInterval { + continue + } + if clientDisconnected { + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Upstream timeout after client disconnect: account=%d model=%s", account.ID, model) + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil + } + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Stream data interval timeout: account=%d model=%s interval=%s", account.ID, model, streamInterval) + if s.rateLimitService != nil { + s.rateLimitService.HandleStreamTimeout(ctx, account, model) + } + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") + } + } +} + +func extractAnthropicSSEDataLine(line string) (string, bool) { + if !strings.HasPrefix(line, "data:") { + return "", false + } + start := len("data:") + for start < len(line) { + if line[start] != ' ' && line[start] != '\t' { + break + } + start++ + } + return line[start:], true +} + +func (s *GatewayService) parseSSEUsagePassthrough(data string, usage *ClaudeUsage) { + if usage == nil || data == "" || data == "[DONE]" { + return + } + + parsed := gjson.Parse(data) + switch parsed.Get("type").String() { + case "message_start": + msgUsage := parsed.Get("message.usage") + if msgUsage.Exists() { + usage.InputTokens = int(msgUsage.Get("input_tokens").Int()) + usage.CacheCreationInputTokens = int(msgUsage.Get("cache_creation_input_tokens").Int()) + usage.CacheReadInputTokens = int(msgUsage.Get("cache_read_input_tokens").Int()) + + // 保持与通用解析一致:message_start 允许覆盖 5m/1h 明细(包括 0)。 + cc5m := msgUsage.Get("cache_creation.ephemeral_5m_input_tokens") + cc1h := msgUsage.Get("cache_creation.ephemeral_1h_input_tokens") + if cc5m.Exists() || cc1h.Exists() { + usage.CacheCreation5mTokens = int(cc5m.Int()) + usage.CacheCreation1hTokens = int(cc1h.Int()) + } + } + case "message_delta": + deltaUsage := parsed.Get("usage") + if deltaUsage.Exists() { + if v := deltaUsage.Get("input_tokens").Int(); v > 0 { + usage.InputTokens = int(v) + } + if v := deltaUsage.Get("output_tokens").Int(); v > 0 { + usage.OutputTokens = int(v) + } + if v := deltaUsage.Get("cache_creation_input_tokens").Int(); v > 0 { + usage.CacheCreationInputTokens = int(v) + } + if v := deltaUsage.Get("cache_read_input_tokens").Int(); v > 0 { + usage.CacheReadInputTokens = int(v) + } + + cc5m := deltaUsage.Get("cache_creation.ephemeral_5m_input_tokens") + cc1h := deltaUsage.Get("cache_creation.ephemeral_1h_input_tokens") + if cc5m.Exists() && cc5m.Int() > 0 { + usage.CacheCreation5mTokens = int(cc5m.Int()) + } + if cc1h.Exists() && cc1h.Int() > 0 { + usage.CacheCreation1hTokens = int(cc1h.Int()) + } + } + } + + if usage.CacheReadInputTokens == 0 { + if cached := parsed.Get("message.usage.cached_tokens").Int(); cached > 0 { + usage.CacheReadInputTokens = int(cached) + } + if cached := parsed.Get("usage.cached_tokens").Int(); usage.CacheReadInputTokens == 0 && cached > 0 { + usage.CacheReadInputTokens = int(cached) + } + } + if usage.CacheCreationInputTokens == 0 { + cc5m := parsed.Get("message.usage.cache_creation.ephemeral_5m_input_tokens").Int() + cc1h := parsed.Get("message.usage.cache_creation.ephemeral_1h_input_tokens").Int() + if cc5m == 0 && cc1h == 0 { + cc5m = parsed.Get("usage.cache_creation.ephemeral_5m_input_tokens").Int() + cc1h = parsed.Get("usage.cache_creation.ephemeral_1h_input_tokens").Int() + } + total := cc5m + cc1h + if total > 0 { + usage.CacheCreationInputTokens = int(total) + } + } +} + +func parseClaudeUsageFromResponseBody(body []byte) *ClaudeUsage { + usage := &ClaudeUsage{} + if len(body) == 0 { + return usage + } + + parsed := gjson.ParseBytes(body) + usageNode := parsed.Get("usage") + if !usageNode.Exists() { + return usage + } + + usage.InputTokens = int(usageNode.Get("input_tokens").Int()) + usage.OutputTokens = int(usageNode.Get("output_tokens").Int()) + usage.CacheCreationInputTokens = int(usageNode.Get("cache_creation_input_tokens").Int()) + usage.CacheReadInputTokens = int(usageNode.Get("cache_read_input_tokens").Int()) + + cc5m := usageNode.Get("cache_creation.ephemeral_5m_input_tokens").Int() + cc1h := usageNode.Get("cache_creation.ephemeral_1h_input_tokens").Int() + if cc5m > 0 || cc1h > 0 { + usage.CacheCreation5mTokens = int(cc5m) + usage.CacheCreation1hTokens = int(cc1h) + } + if usage.CacheCreationInputTokens == 0 && (cc5m > 0 || cc1h > 0) { + usage.CacheCreationInputTokens = int(cc5m + cc1h) + } + if usage.CacheReadInputTokens == 0 { + if cached := usageNode.Get("cached_tokens").Int(); cached > 0 { + usage.CacheReadInputTokens = int(cached) + } + } + return usage +} + +func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough( + ctx context.Context, + resp *http.Response, + c *gin.Context, + account *Account, +) (*ClaudeUsage, error) { + if s.rateLimitService != nil { + s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) + } + + maxBytes := resolveUpstreamResponseReadLimit(s.cfg) + body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + if err != nil { + if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") + c.JSON(http.StatusBadGateway, gin.H{ + "type": "error", + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream response too large", + }, + }) + } + return nil, err + } + + usage := parseClaudeUsageFromResponseBody(body) + + writeAnthropicPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.cfg) + contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) + if contentType == "" { + contentType = "application/json" + } + c.Data(resp.StatusCode, contentType, body) + return usage, nil +} + +func writeAnthropicPassthroughResponseHeaders(dst http.Header, src http.Header, cfg *config.Config) { + if dst == nil || src == nil { + return + } + if cfg != nil { + responseheaders.WriteFilteredHeaders(dst, src, cfg.Security.ResponseHeaders) + return + } + if v := strings.TrimSpace(src.Get("Content-Type")); v != "" { + dst.Set("Content-Type", v) + } + if v := strings.TrimSpace(src.Get("x-request-id")); v != "" { + dst.Set("x-request-id", v) + } +} + func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Context, account *Account, body []byte, token, tokenType, modelID string, reqStream bool, mimicClaudeCode bool) (*http.Request, error) { // 确定目标URL targetURL := claudeAPIURL @@ -5082,6 +5682,10 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, return fmt.Errorf("parse request: empty request") } + if account != nil && account.IsAnthropicAPIKeyPassthroughEnabled() { + return s.forwardCountTokensAnthropicAPIKeyPassthrough(ctx, c, account, parsed.Body) + } + body := parsed.Body reqModel := parsed.Model @@ -5241,6 +5845,158 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, return nil } +func (s *GatewayService) forwardCountTokensAnthropicAPIKeyPassthrough(ctx context.Context, c *gin.Context, account *Account, body []byte) error { + token, tokenType, err := s.GetAccessToken(ctx, account) + if err != nil { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to get access token") + return err + } + if tokenType != "apikey" { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Invalid account token type") + return fmt.Errorf("anthropic api key passthrough requires apikey token, got: %s", tokenType) + } + + upstreamReq, err := s.buildCountTokensRequestAnthropicAPIKeyPassthrough(ctx, c, account, body, token) + if err != nil { + s.countTokensError(c, http.StatusInternalServerError, "api_error", "Failed to build request") + return err + } + + proxyURL := "" + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled()) + if err != nil { + setOpsUpstreamError(c, 0, sanitizeUpstreamErrorMessage(err.Error()), "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: 0, + Passthrough: true, + Kind: "request_error", + Message: sanitizeUpstreamErrorMessage(err.Error()), + }) + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed") + return fmt.Errorf("upstream request failed: %w", err) + } + + maxReadBytes := resolveUpstreamResponseReadLimit(s.cfg) + respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) + _ = resp.Body.Close() + if err != nil { + if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") + return err + } + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") + return err + } + + if resp.StatusCode >= 400 { + if s.rateLimitService != nil { + s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + } + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Passthrough: true, + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + + errMsg := "Upstream request failed" + switch resp.StatusCode { + case 429: + errMsg = "Rate limit exceeded" + case 529: + errMsg = "Service overloaded" + } + s.countTokensError(c, resp.StatusCode, "upstream_error", errMsg) + if upstreamMsg == "" { + return fmt.Errorf("upstream error: %d", resp.StatusCode) + } + return fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg) + } + + writeAnthropicPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.cfg) + contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) + if contentType == "" { + contentType = "application/json" + } + c.Data(resp.StatusCode, contentType, respBody) + return nil +} + +func (s *GatewayService) buildCountTokensRequestAnthropicAPIKeyPassthrough( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, + token string, +) (*http.Request, error) { + targetURL := claudeAPICountTokensURL + baseURL := account.GetBaseURL() + if baseURL != "" { + validatedURL, err := s.validateUpstreamBaseURL(baseURL) + if err != nil { + return nil, err + } + targetURL = validatedURL + "/v1/messages/count_tokens" + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(body)) + if err != nil { + return nil, err + } + + if c != nil && c.Request != nil { + for key, values := range c.Request.Header { + lowerKey := strings.ToLower(strings.TrimSpace(key)) + if !allowedHeaders[lowerKey] { + continue + } + for _, v := range values { + req.Header.Add(key, v) + } + } + } + + req.Header.Del("authorization") + req.Header.Del("x-api-key") + req.Header.Del("x-goog-api-key") + req.Header.Del("cookie") + req.Header.Set("x-api-key", token) + + if req.Header.Get("content-type") == "" { + req.Header.Set("content-type", "application/json") + } + if req.Header.Get("anthropic-version") == "" { + req.Header.Set("anthropic-version", "2023-06-01") + } + + return req, nil +} + // buildCountTokensRequest 构建 count_tokens 上游请求 func (s *GatewayService) buildCountTokensRequest(ctx context.Context, c *gin.Context, account *Account, body []byte, token, tokenType, modelID string, mimicClaudeCode bool) (*http.Request, error) { // 确定目标 URL diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 8024dfb6..30da0767 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1697,6 +1697,36 @@ + +
+
+
+ +

+ {{ t('admin.accounts.anthropic.apiKeyPassthroughDesc') }} +

+
+ +
+
+
('oauth') // For antigravity: oauth or upstream const upstreamBaseUrl = ref('') // For upstream type: base URL @@ -2526,6 +2557,9 @@ watch( openaiPassthroughEnabled.value = false codexCLIOnlyEnabled.value = false } + if (newPlatform !== 'anthropic') { + anthropicPassthroughEnabled.value = false + } // Reset OAuth states oauth.resetState() openaiOAuth.resetState() @@ -2542,6 +2576,9 @@ watch( if (platform === 'openai' && category !== 'oauth-based') { codexCLIOnlyEnabled.value = false } + if (platform !== 'anthropic' || category !== 'apikey') { + anthropicPassthroughEnabled.value = false + } } ) @@ -2791,6 +2828,7 @@ const resetForm = () => { autoPauseOnExpired.value = true openaiPassthroughEnabled.value = false codexCLIOnlyEnabled.value = false + anthropicPassthroughEnabled.value = false // Reset quota control state windowCostEnabled.value = false windowCostLimit.value = null @@ -2845,6 +2883,21 @@ const buildOpenAIExtra = (base?: Record): Record 0 ? extra : undefined } +const buildAnthropicExtra = (base?: Record): Record | undefined => { + if (form.platform !== 'anthropic' || accountCategory.value !== 'apikey') { + return base + } + + const extra: Record = { ...(base || {}) } + if (anthropicPassthroughEnabled.value) { + extra.anthropic_passthrough = true + } else { + delete extra.anthropic_passthrough + } + + return Object.keys(extra).length > 0 ? extra : undefined +} + const buildSoraExtra = ( base?: Record, linkedOpenAIAccountId?: string | number @@ -3015,7 +3068,7 @@ const handleSubmit = async () => { } form.credentials = credentials - const extra = buildOpenAIExtra() + const extra = buildAnthropicExtra(buildOpenAIExtra()) await doCreateAccount({ ...form, diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 3842ea06..c6643717 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -735,6 +735,36 @@
+ +
+
+
+ +

+ {{ t('admin.accounts.anthropic.apiKeyPassthroughDesc') }} +

+
+ +
+
+
('5m') // OpenAI 自动透传开关(OAuth/API Key) const openaiPassthroughEnabled = ref(false) const codexCLIOnlyEnabled = ref(false) +const anthropicPassthroughEnabled = ref(false) const isOpenAIModelRestrictionDisabled = computed(() => props.account?.platform === 'openai' && openaiPassthroughEnabled.value ) @@ -1317,12 +1348,16 @@ watch( // Load OpenAI passthrough toggle (OpenAI OAuth/API Key) openaiPassthroughEnabled.value = false codexCLIOnlyEnabled.value = false + anthropicPassthroughEnabled.value = false if (newAccount.platform === 'openai' && (newAccount.type === 'oauth' || newAccount.type === 'apikey')) { openaiPassthroughEnabled.value = extra?.openai_passthrough === true || extra?.openai_oauth_passthrough === true if (newAccount.type === 'oauth') { codexCLIOnlyEnabled.value = extra?.codex_cli_only === true } } + if (newAccount.platform === 'anthropic' && newAccount.type === 'apikey') { + anthropicPassthroughEnabled.value = extra?.anthropic_passthrough === true + } // Load antigravity model mapping (Antigravity 只支持映射模式) if (newAccount.platform === 'antigravity') { @@ -1882,6 +1917,18 @@ const handleSubmit = async () => { updatePayload.extra = newExtra } + // For Anthropic API Key accounts, handle passthrough mode in extra + if (props.account.platform === 'anthropic' && props.account.type === 'apikey') { + const currentExtra = (props.account.extra as Record) || {} + const newExtra: Record = { ...currentExtra } + if (anthropicPassthroughEnabled.value) { + newExtra.anthropic_passthrough = true + } else { + delete newExtra.anthropic_passthrough + } + updatePayload.extra = newExtra + } + // For OpenAI OAuth/API Key accounts, handle passthrough mode in extra if (props.account.platform === 'openai' && (props.account.type === 'oauth' || props.account.type === 'apikey')) { const currentExtra = (props.account.extra as Record) || {} diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index bb292683..52de23f4 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -1548,6 +1548,11 @@ export default { enableSora: 'Enable Sora simultaneously', enableSoraHint: 'Sora uses the same OpenAI account. Enable to create Sora account simultaneously.' }, + anthropic: { + apiKeyPassthrough: 'Auto passthrough (auth only)', + apiKeyPassthroughDesc: + 'Only applies to Anthropic API Key accounts. When enabled, messages/count_tokens are forwarded in passthrough mode with auth replacement only, while billing/concurrency/audit and safety filtering are preserved. Disable to roll back immediately.' + }, modelRestriction: 'Model Restriction (Optional)', modelWhitelist: 'Model Whitelist', modelMapping: 'Model Mapping', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 73588ef7..1e5be7ac 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -1696,6 +1696,11 @@ export default { enableSora: '同时启用 Sora', enableSoraHint: 'Sora 使用相同的 OpenAI 账号,开启后将同时创建 Sora 平台账号' }, + anthropic: { + apiKeyPassthrough: '自动透传(仅替换认证)', + apiKeyPassthroughDesc: + '仅对 Anthropic API Key 生效。开启后,messages/count_tokens 请求将透传上游并仅替换认证,保留计费/并发/审计及必要安全过滤;关闭即可回滚到现有兼容链路。' + }, modelRestriction: '模型限制(可选)', modelWhitelist: '模型白名单', modelMapping: '模型映射',