refactor(backend): service http ports
This commit is contained in:
@@ -12,7 +12,6 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -26,6 +25,11 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ClaudeUpstream handles HTTP requests to Claude API
|
||||
type ClaudeUpstream interface {
|
||||
Do(req *http.Request, proxyURL string) (*http.Response, error)
|
||||
}
|
||||
|
||||
const (
|
||||
claudeAPIURL = "https://api.anthropic.com/v1/messages?beta=true"
|
||||
claudeAPICountTokensURL = "https://api.anthropic.com/v1/messages/count_tokens?beta=true"
|
||||
@@ -87,7 +91,7 @@ type GatewayService struct {
|
||||
rateLimitService *RateLimitService
|
||||
billingCacheService *BillingCacheService
|
||||
identityService *IdentityService
|
||||
httpClient *http.Client
|
||||
claudeUpstream ClaudeUpstream
|
||||
}
|
||||
|
||||
// NewGatewayService creates a new GatewayService
|
||||
@@ -103,20 +107,8 @@ func NewGatewayService(
|
||||
rateLimitService *RateLimitService,
|
||||
billingCacheService *BillingCacheService,
|
||||
identityService *IdentityService,
|
||||
claudeUpstream ClaudeUpstream,
|
||||
) *GatewayService {
|
||||
// 计算响应头超时时间
|
||||
responseHeaderTimeout := time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||
if responseHeaderTimeout == 0 {
|
||||
responseHeaderTimeout = 300 * time.Second // 默认5分钟,LLM高负载时可能排队较久
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ResponseHeaderTimeout: responseHeaderTimeout, // 等待上游响应头的超时
|
||||
// 注意:不设置整体 Timeout,让流式响应可以无限时间传输
|
||||
}
|
||||
return &GatewayService{
|
||||
accountRepo: accountRepo,
|
||||
usageLogRepo: usageLogRepo,
|
||||
@@ -129,11 +121,7 @@ func NewGatewayService(
|
||||
rateLimitService: rateLimitService,
|
||||
billingCacheService: billingCacheService,
|
||||
identityService: identityService,
|
||||
httpClient: &http.Client{
|
||||
Transport: transport,
|
||||
// 不设置 Timeout:流式请求可能持续十几分钟
|
||||
// 超时控制由 Transport.ResponseHeaderTimeout 负责(只控制等待响应头)
|
||||
},
|
||||
claudeUpstream: claudeUpstream,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -436,19 +424,19 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *m
|
||||
}
|
||||
|
||||
// 构建上游请求
|
||||
upstreamResult, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 选择使用的client:如果有代理则使用独立的client,否则使用共享的httpClient
|
||||
httpClient := s.httpClient
|
||||
if upstreamResult.Client != nil {
|
||||
httpClient = upstreamResult.Client
|
||||
// 获取代理URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
resp, err := httpClient.Do(upstreamResult.Request)
|
||||
resp, err := s.claudeUpstream.Do(upstreamReq, proxyURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upstream request failed: %w", err)
|
||||
}
|
||||
@@ -461,16 +449,11 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *m
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("token refresh failed: %w", err)
|
||||
}
|
||||
upstreamResult, err = s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
||||
upstreamReq, err = s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 重试时也需要使用正确的client
|
||||
httpClient = s.httpClient
|
||||
if upstreamResult.Client != nil {
|
||||
httpClient = upstreamResult.Client
|
||||
}
|
||||
resp, err = httpClient.Do(upstreamResult.Request)
|
||||
resp, err = s.claudeUpstream.Do(upstreamReq, proxyURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("retry request failed: %w", err)
|
||||
}
|
||||
@@ -509,13 +492,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *m
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildUpstreamRequestResult contains the request and optional custom client for proxy
|
||||
type buildUpstreamRequestResult struct {
|
||||
Request *http.Request
|
||||
Client *http.Client // nil means use default s.httpClient
|
||||
}
|
||||
|
||||
func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Context, account *model.Account, body []byte, token, tokenType string) (*buildUpstreamRequestResult, error) {
|
||||
func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Context, account *model.Account, body []byte, token, tokenType string) (*http.Request, error) {
|
||||
// 确定目标URL
|
||||
targetURL := claudeAPIURL
|
||||
if account.Type == model.AccountTypeApiKey {
|
||||
@@ -584,36 +561,7 @@ func (s *GatewayService) buildUpstreamRequest(ctx context.Context, c *gin.Contex
|
||||
req.Header.Set("anthropic-beta", s.getBetaHeader(body, c.GetHeader("anthropic-beta")))
|
||||
}
|
||||
|
||||
// 配置代理 - 创建独立的client避免并发修改共享httpClient
|
||||
var customClient *http.Client
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL := account.Proxy.URL()
|
||||
if proxyURL != "" {
|
||||
if parsedURL, err := url.Parse(proxyURL); err == nil {
|
||||
// 计算响应头超时时间(与默认 Transport 保持一致)
|
||||
responseHeaderTimeout := time.Duration(s.cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||
if responseHeaderTimeout == 0 {
|
||||
responseHeaderTimeout = 300 * time.Second
|
||||
}
|
||||
transport := &http.Transport{
|
||||
Proxy: http.ProxyURL(parsedURL),
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ResponseHeaderTimeout: responseHeaderTimeout,
|
||||
}
|
||||
// 创建独立的client,避免并发时修改共享的s.httpClient.Transport
|
||||
customClient = &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &buildUpstreamRequestResult{
|
||||
Request: req,
|
||||
Client: customClient,
|
||||
}, nil
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// getBetaHeader 处理anthropic-beta header
|
||||
@@ -1085,20 +1033,20 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
||||
}
|
||||
|
||||
// 构建上游请求
|
||||
upstreamResult, err := s.buildCountTokensRequest(ctx, c, account, body, token, tokenType)
|
||||
upstreamReq, err := s.buildCountTokensRequest(ctx, c, account, body, token, tokenType)
|
||||
if err != nil {
|
||||
s.countTokensError(c, http.StatusInternalServerError, "api_error", "Failed to build request")
|
||||
return err
|
||||
}
|
||||
|
||||
// 选择 HTTP client
|
||||
httpClient := s.httpClient
|
||||
if upstreamResult.Client != nil {
|
||||
httpClient = upstreamResult.Client
|
||||
// 获取代理URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
resp, err := httpClient.Do(upstreamResult.Request)
|
||||
resp, err := s.claudeUpstream.Do(upstreamReq, proxyURL)
|
||||
if err != nil {
|
||||
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed")
|
||||
return fmt.Errorf("upstream request failed: %w", err)
|
||||
@@ -1113,15 +1061,11 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
||||
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Token refresh failed")
|
||||
return fmt.Errorf("token refresh failed: %w", err)
|
||||
}
|
||||
upstreamResult, err = s.buildCountTokensRequest(ctx, c, account, body, token, tokenType)
|
||||
upstreamReq, err = s.buildCountTokensRequest(ctx, c, account, body, token, tokenType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpClient = s.httpClient
|
||||
if upstreamResult.Client != nil {
|
||||
httpClient = upstreamResult.Client
|
||||
}
|
||||
resp, err = httpClient.Do(upstreamResult.Request)
|
||||
resp, err = s.claudeUpstream.Do(upstreamReq, proxyURL)
|
||||
if err != nil {
|
||||
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Retry failed")
|
||||
return fmt.Errorf("retry request failed: %w", err)
|
||||
@@ -1159,7 +1103,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
||||
}
|
||||
|
||||
// buildCountTokensRequest 构建 count_tokens 上游请求
|
||||
func (s *GatewayService) buildCountTokensRequest(ctx context.Context, c *gin.Context, account *model.Account, body []byte, token, tokenType string) (*buildUpstreamRequestResult, error) {
|
||||
func (s *GatewayService) buildCountTokensRequest(ctx context.Context, c *gin.Context, account *model.Account, body []byte, token, tokenType string) (*http.Request, error) {
|
||||
// 确定目标 URL
|
||||
targetURL := claudeAPICountTokensURL
|
||||
if account.Type == model.AccountTypeApiKey {
|
||||
@@ -1223,32 +1167,7 @@ func (s *GatewayService) buildCountTokensRequest(ctx context.Context, c *gin.Con
|
||||
req.Header.Set("anthropic-beta", s.getBetaHeader(body, c.GetHeader("anthropic-beta")))
|
||||
}
|
||||
|
||||
// 配置代理
|
||||
var customClient *http.Client
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL := account.Proxy.URL()
|
||||
if proxyURL != "" {
|
||||
if parsedURL, err := url.Parse(proxyURL); err == nil {
|
||||
responseHeaderTimeout := time.Duration(s.cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||
if responseHeaderTimeout == 0 {
|
||||
responseHeaderTimeout = 300 * time.Second
|
||||
}
|
||||
transport := &http.Transport{
|
||||
Proxy: http.ProxyURL(parsedURL),
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ResponseHeaderTimeout: responseHeaderTimeout,
|
||||
}
|
||||
customClient = &http.Client{Transport: transport}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &buildUpstreamRequestResult{
|
||||
Request: req,
|
||||
Client: customClient,
|
||||
}, nil
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// countTokensError 返回 count_tokens 错误响应
|
||||
|
||||
Reference in New Issue
Block a user