From d1c98896094fc4bc2f4ecfa0095083cbf29d7ccd Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Wed, 31 Dec 2025 11:43:58 +0800 Subject: [PATCH] =?UTF-8?q?perf(=E7=BD=91=E5=85=B3):=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E4=B8=8A=E6=B8=B8=E8=B4=A6=E5=8F=B7=E8=BF=9E=E6=8E=A5=E6=B1=A0?= =?UTF-8?q?=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增隔离策略与连接池缓存回收 连接池大小跟随账号并发并处理代理切换 同步配置默认值与示例并补充测试 --- backend/internal/config/config.go | 48 +- backend/internal/repository/http_upstream.go | 573 ++++++++++++++++-- .../http_upstream_benchmark_test.go | 30 +- .../internal/repository/http_upstream_test.go | 184 +++++- .../internal/service/account_test_service.go | 6 +- .../service/antigravity_gateway_service.go | 4 +- backend/internal/service/gateway_service.go | 4 +- .../service/gemini_messages_compat_service.go | 6 +- .../internal/service/http_upstream_port.go | 27 +- .../service/openai_gateway_service.go | 2 +- deploy/config.example.yaml | 10 + 11 files changed, 790 insertions(+), 104 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index dfc9a844..aeeddcb4 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -12,6 +12,20 @@ const ( RunModeSimple = "simple" ) +// 连接池隔离策略常量 +// 用于控制上游 HTTP 连接池的隔离粒度,影响连接复用和资源消耗 +const ( + // ConnectionPoolIsolationProxy: 按代理隔离 + // 同一代理地址共享连接池,适合代理数量少、账户数量多的场景 + ConnectionPoolIsolationProxy = "proxy" + // ConnectionPoolIsolationAccount: 按账户隔离 + // 每个账户独立连接池,适合账户数量少、需要严格隔离的场景 + ConnectionPoolIsolationAccount = "account" + // ConnectionPoolIsolationAccountProxy: 按账户+代理组合隔离(默认) + // 同一账户+代理组合共享连接池,提供最细粒度的隔离 + ConnectionPoolIsolationAccountProxy = "account_proxy" +) + type Config struct { Server ServerConfig `mapstructure:"server"` Database DatabaseConfig `mapstructure:"database"` @@ -81,6 +95,8 @@ type GatewayConfig struct { ResponseHeaderTimeout int `mapstructure:"response_header_timeout"` // 请求体最大字节数,用于网关请求体大小限制 MaxBodySize int64 `mapstructure:"max_body_size"` + // ConnectionPoolIsolation: 上游连接池隔离策略(proxy/account/account_proxy) + ConnectionPoolIsolation string `mapstructure:"connection_pool_isolation"` // HTTP 上游连接池配置(性能优化:支持高并发场景调优) // MaxIdleConns: 所有主机的最大空闲连接总数 @@ -91,6 +107,15 @@ type GatewayConfig struct { MaxConnsPerHost int `mapstructure:"max_conns_per_host"` // IdleConnTimeoutSeconds: 空闲连接超时时间(秒) IdleConnTimeoutSeconds int `mapstructure:"idle_conn_timeout_seconds"` + // MaxUpstreamClients: 上游连接池客户端最大缓存数量 + // 当使用连接池隔离策略时,系统会为不同的账户/代理组合创建独立的 HTTP 客户端 + // 此参数限制缓存的客户端数量,超出后会淘汰最久未使用的客户端 + // 建议值:预估的活跃账户数 * 1.2(留有余量) + MaxUpstreamClients int `mapstructure:"max_upstream_clients"` + // ClientIdleTTLSeconds: 上游连接池客户端空闲回收阈值(秒) + // 超过此时间未使用的客户端会被标记为可回收 + // 建议值:根据用户访问频率设置,一般 10-30 分钟 + ClientIdleTTLSeconds int `mapstructure:"client_idle_ttl_seconds"` // ConcurrencySlotTTLMinutes: 并发槽位过期时间(分钟) // 应大于最长 LLM 请求时间,防止请求完成前槽位过期 ConcurrencySlotTTLMinutes int `mapstructure:"concurrency_slot_ttl_minutes"` @@ -289,11 +314,14 @@ func setDefaults() { // Gateway viper.SetDefault("gateway.response_header_timeout", 300) // 300秒(5分钟)等待上游响应头,LLM高负载时可能排队较久 viper.SetDefault("gateway.max_body_size", int64(100*1024*1024)) + viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy) // HTTP 上游连接池配置(针对 5000+ 并发用户优化) - viper.SetDefault("gateway.max_idle_conns", 240) // 最大空闲连接总数(HTTP/2 场景默认) - viper.SetDefault("gateway.max_idle_conns_per_host", 120) // 每主机最大空闲连接(HTTP/2 场景默认) - viper.SetDefault("gateway.max_conns_per_host", 240) // 每主机最大连接数(含活跃,HTTP/2 场景默认) + viper.SetDefault("gateway.max_idle_conns", 240) // 最大空闲连接总数(HTTP/2 场景默认) + viper.SetDefault("gateway.max_idle_conns_per_host", 120) // 每主机最大空闲连接(HTTP/2 场景默认) + viper.SetDefault("gateway.max_conns_per_host", 240) // 每主机最大连接数(含活跃,HTTP/2 场景默认) viper.SetDefault("gateway.idle_conn_timeout_seconds", 300) // 空闲连接超时(秒) + viper.SetDefault("gateway.max_upstream_clients", 5000) + viper.SetDefault("gateway.client_idle_ttl_seconds", 900) viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 15) // 并发槽位过期时间(支持超长请求) // TokenRefresh @@ -354,6 +382,14 @@ func (c *Config) Validate() error { if c.Gateway.MaxBodySize <= 0 { return fmt.Errorf("gateway.max_body_size must be positive") } + if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" { + switch c.Gateway.ConnectionPoolIsolation { + case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy: + default: + return fmt.Errorf("gateway.connection_pool_isolation must be one of: %s/%s/%s", + ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy) + } + } if c.Gateway.MaxIdleConns <= 0 { return fmt.Errorf("gateway.max_idle_conns must be positive") } @@ -366,6 +402,12 @@ func (c *Config) Validate() error { if c.Gateway.IdleConnTimeoutSeconds <= 0 { return fmt.Errorf("gateway.idle_conn_timeout_seconds must be positive") } + if c.Gateway.MaxUpstreamClients <= 0 { + return fmt.Errorf("gateway.max_upstream_clients must be positive") + } + if c.Gateway.ClientIdleTTLSeconds <= 0 { + return fmt.Errorf("gateway.client_idle_ttl_seconds must be positive") + } if c.Gateway.ConcurrencySlotTTLMinutes <= 0 { return fmt.Errorf("gateway.concurrency_slot_ttl_minutes must be positive") } diff --git a/backend/internal/repository/http_upstream.go b/backend/internal/repository/http_upstream.go index 0ca85a09..e7ae46dc 100644 --- a/backend/internal/repository/http_upstream.go +++ b/backend/internal/repository/http_upstream.go @@ -1,106 +1,553 @@ package repository import ( + "fmt" + "io" "net/http" "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/service" ) +// 默认配置常量 +// 这些值在配置文件未指定时作为回退默认值使用 +const ( + // directProxyKey: 无代理时的缓存键标识 + directProxyKey = "direct" + // defaultMaxIdleConns: 默认最大空闲连接总数 + // HTTP/2 场景下,单连接可多路复用,240 足以支撑高并发 + defaultMaxIdleConns = 240 + // defaultMaxIdleConnsPerHost: 默认每主机最大空闲连接数 + defaultMaxIdleConnsPerHost = 120 + // defaultMaxConnsPerHost: 默认每主机最大连接数(含活跃连接) + // 达到上限后新请求会等待,而非无限创建连接 + defaultMaxConnsPerHost = 240 + // defaultIdleConnTimeout: 默认空闲连接超时时间(5分钟) + // 超时后连接会被关闭,释放系统资源 + defaultIdleConnTimeout = 300 * time.Second + // defaultResponseHeaderTimeout: 默认等待响应头超时时间(5分钟) + // LLM 请求可能排队较久,需要较长超时 + defaultResponseHeaderTimeout = 300 * time.Second + // defaultMaxUpstreamClients: 默认最大客户端缓存数量 + // 超出后会淘汰最久未使用的客户端 + defaultMaxUpstreamClients = 5000 + // defaultClientIdleTTLSeconds: 默认客户端空闲回收阈值(15分钟) + defaultClientIdleTTLSeconds = 900 +) + +// poolSettings 连接池配置参数 +// 封装 Transport 所需的各项连接池参数 +type poolSettings struct { + maxIdleConns int // 最大空闲连接总数 + maxIdleConnsPerHost int // 每主机最大空闲连接数 + maxConnsPerHost int // 每主机最大连接数(含活跃) + idleConnTimeout time.Duration // 空闲连接超时时间 + responseHeaderTimeout time.Duration // 等待响应头超时时间 +} + +// upstreamClientEntry 上游客户端缓存条目 +// 记录客户端实例及其元数据,用于连接池管理和淘汰策略 +type upstreamClientEntry struct { + client *http.Client // HTTP 客户端实例 + proxyKey string // 代理标识(用于检测代理变更) + poolKey string // 连接池配置标识(用于检测配置变更) + lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰 + inFlight int64 // 当前进行中的请求数,>0 时不可淘汰 +} + // httpUpstreamService 通用 HTTP 上游服务 // 用于向任意 HTTP API(Claude、OpenAI 等)发送请求,支持可选代理 // +// 架构设计: +// - 根据隔离策略(proxy/account/account_proxy)缓存客户端实例 +// - 每个客户端拥有独立的 Transport 连接池 +// - 支持 LRU + 空闲时间双重淘汰策略 +// // 性能优化: -// 1. 使用 sync.Map 缓存代理客户端实例,避免每次请求都创建新的 http.Client +// 1. 根据隔离策略缓存客户端实例,避免频繁创建 http.Client // 2. 复用 Transport 连接池,减少 TCP 握手和 TLS 协商开销 -// 3. 原实现每次请求都 new 一个 http.Client,导致连接无法复用 +// 3. 支持账号级隔离与空闲回收,降低连接层关联风险 +// 4. 达到最大连接数后等待可用连接,而非无限创建 +// 5. 仅回收空闲客户端,避免中断活跃请求 +// 6. HTTP/2 多路复用,连接上限不等于并发请求上限 +// 7. 代理变更时清空旧连接池,避免复用错误代理 +// 8. 账号并发数与连接池上限对应(账号隔离策略下) type httpUpstreamService struct { - // defaultClient: 无代理时使用的默认客户端(单例复用) - defaultClient *http.Client - // proxyClients: 按代理 URL 缓存的客户端池,避免重复创建 - proxyClients sync.Map - cfg *config.Config + cfg *config.Config // 全局配置 + mu sync.RWMutex // 保护 clients map 的读写锁 + clients map[string]*upstreamClientEntry // 客户端缓存池,key 由隔离策略决定 } // NewHTTPUpstream 创建通用 HTTP 上游服务 // 使用配置中的连接池参数构建 Transport +// +// 参数: +// - cfg: 全局配置,包含连接池参数和隔离策略 +// +// 返回: +// - service.HTTPUpstream 接口实现 func NewHTTPUpstream(cfg *config.Config) service.HTTPUpstream { return &httpUpstreamService{ - defaultClient: &http.Client{Transport: buildUpstreamTransport(cfg, nil)}, - cfg: cfg, + cfg: cfg, + clients: make(map[string]*upstreamClientEntry), } } -func (s *httpUpstreamService) Do(req *http.Request, proxyURL string) (*http.Response, error) { - if strings.TrimSpace(proxyURL) == "" { - return s.defaultClient.Do(req) - } - client := s.getOrCreateClient(proxyURL) - return client.Do(req) -} +// Do 执行 HTTP 请求 +// 根据隔离策略获取或创建客户端,并跟踪请求生命周期 +// +// 参数: +// - req: HTTP 请求对象 +// - proxyURL: 代理地址,空字符串表示直连 +// - accountID: 账户 ID,用于账户级隔离 +// - accountConcurrency: 账户并发限制,用于动态调整连接池大小 +// +// 返回: +// - *http.Response: HTTP 响应(Body 已包装,关闭时自动更新计数) +// - error: 请求错误 +// +// 注意: +// - 调用方必须关闭 resp.Body,否则会导致 inFlight 计数泄漏 +// - inFlight > 0 的客户端不会被淘汰,确保活跃请求不被中断 +func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { + // 获取或创建对应的客户端,并标记请求占用 + entry := s.acquireClient(proxyURL, accountID, accountConcurrency) -// getOrCreateClient 获取或创建代理客户端 -// 性能优化:使用 sync.Map 实现无锁缓存,相同代理 URL 复用同一客户端 -// LoadOrStore 保证并发安全,避免重复创建 -func (s *httpUpstreamService) getOrCreateClient(proxyURL string) *http.Client { - proxyURL = strings.TrimSpace(proxyURL) - if proxyURL == "" { - return s.defaultClient - } - // 优先从缓存获取,命中则直接返回 - if cached, ok := s.proxyClients.Load(proxyURL); ok { - return cached.(*http.Client) - } - - parsedURL, err := url.Parse(proxyURL) + // 执行请求 + resp, err := entry.client.Do(req) if err != nil { - return s.defaultClient + // 请求失败,立即减少计数 + atomic.AddInt64(&entry.inFlight, -1) + return nil, err } - // 创建新客户端并缓存,LoadOrStore 保证只有一个实例被存储 - client := &http.Client{Transport: buildUpstreamTransport(s.cfg, parsedURL)} - actual, _ := s.proxyClients.LoadOrStore(proxyURL, client) - return actual.(*http.Client) + // 包装响应体,在关闭时自动减少计数并更新时间戳 + // 这确保了流式响应(如 SSE)在完全读取前不会被淘汰 + resp.Body = wrapTrackedBody(resp.Body, func() { + atomic.AddInt64(&entry.inFlight, -1) + atomic.StoreInt64(&entry.lastUsed, time.Now().UnixNano()) + }) + + return resp, nil +} + +// acquireClient 获取或创建客户端,并标记为进行中请求 +// 用于请求路径,避免在获取后被淘汰 +func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, accountConcurrency int) *upstreamClientEntry { + return s.getClientEntry(proxyURL, accountID, accountConcurrency, true) +} + +// getOrCreateClient 获取或创建客户端 +// 根据隔离策略和参数决定缓存键,处理代理变更和配置变更 +// +// 参数: +// - proxyURL: 代理地址 +// - accountID: 账户 ID +// - accountConcurrency: 账户并发限制 +// +// 返回: +// - *upstreamClientEntry: 客户端缓存条目 +// +// 隔离策略说明: +// - proxy: 按代理地址隔离,同一代理共享客户端 +// - account: 按账户隔离,同一账户共享客户端(代理变更时重建) +// - account_proxy: 按账户+代理组合隔离,最细粒度 +func (s *httpUpstreamService) getOrCreateClient(proxyURL string, accountID int64, accountConcurrency int) *upstreamClientEntry { + return s.getClientEntry(proxyURL, accountID, accountConcurrency, false) +} + +// getClientEntry 获取或创建客户端条目 +// markInFlight=true 时会标记进行中请求,用于请求路径防止被淘汰 +func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, markInFlight bool) *upstreamClientEntry { + // 获取隔离模式 + isolation := s.getIsolationMode() + // 标准化代理 URL 并解析 + proxyKey, parsedProxy := normalizeProxyURL(proxyURL) + // 构建缓存键(根据隔离策略不同) + cacheKey := buildCacheKey(isolation, proxyKey, accountID) + // 构建连接池配置键(用于检测配置变更) + poolKey := s.buildPoolKey(isolation, accountConcurrency) + + now := time.Now() + nowUnix := now.UnixNano() + + // 读锁快速路径:命中缓存直接返回,减少锁竞争 + s.mu.RLock() + if entry, ok := s.clients[cacheKey]; ok && s.shouldReuseEntry(entry, isolation, proxyKey, poolKey) { + atomic.StoreInt64(&entry.lastUsed, nowUnix) + if markInFlight { + atomic.AddInt64(&entry.inFlight, 1) + } + s.mu.RUnlock() + return entry + } + s.mu.RUnlock() + + // 写锁慢路径:创建或重建客户端 + s.mu.Lock() + if entry, ok := s.clients[cacheKey]; ok { + if s.shouldReuseEntry(entry, isolation, proxyKey, poolKey) { + atomic.StoreInt64(&entry.lastUsed, nowUnix) + if markInFlight { + atomic.AddInt64(&entry.inFlight, 1) + } + s.mu.Unlock() + return entry + } + s.removeClientLocked(cacheKey, entry) + } + + // 缓存未命中或需要重建,创建新客户端 + settings := s.resolvePoolSettings(isolation, accountConcurrency) + client := &http.Client{Transport: buildUpstreamTransport(settings, parsedProxy)} + entry := &upstreamClientEntry{ + client: client, + proxyKey: proxyKey, + poolKey: poolKey, + } + atomic.StoreInt64(&entry.lastUsed, nowUnix) + if markInFlight { + atomic.StoreInt64(&entry.inFlight, 1) + } + s.clients[cacheKey] = entry + + // 执行淘汰策略:先淘汰空闲超时的,再淘汰超出数量限制的 + s.evictIdleLocked(now) + s.evictOverLimitLocked() + s.mu.Unlock() + return entry +} + +// shouldReuseEntry 判断缓存条目是否可复用 +// 若代理或连接池配置发生变化,则需要重建客户端 +func (s *httpUpstreamService) shouldReuseEntry(entry *upstreamClientEntry, isolation, proxyKey, poolKey string) bool { + if entry == nil { + return false + } + if isolation == config.ConnectionPoolIsolationAccount && entry.proxyKey != proxyKey { + return false + } + if entry.poolKey != poolKey { + return false + } + return true +} + +// removeClientLocked 移除客户端(需持有锁) +// 从缓存中删除并关闭空闲连接 +// +// 参数: +// - key: 缓存键 +// - entry: 客户端条目 +func (s *httpUpstreamService) removeClientLocked(key string, entry *upstreamClientEntry) { + delete(s.clients, key) + if entry != nil && entry.client != nil { + // 关闭空闲连接,释放系统资源 + // 注意:这不会中断活跃连接 + entry.client.CloseIdleConnections() + } +} + +// evictIdleLocked 淘汰空闲超时的客户端(需持有锁) +// 遍历所有客户端,移除超过 TTL 且无活跃请求的条目 +// +// 参数: +// - now: 当前时间 +func (s *httpUpstreamService) evictIdleLocked(now time.Time) { + ttl := s.clientIdleTTL() + if ttl <= 0 { + return + } + // 计算淘汰截止时间 + cutoff := now.Add(-ttl).UnixNano() + for key, entry := range s.clients { + // 跳过有活跃请求的客户端 + if atomic.LoadInt64(&entry.inFlight) != 0 { + continue + } + // 淘汰超时的空闲客户端 + if atomic.LoadInt64(&entry.lastUsed) <= cutoff { + s.removeClientLocked(key, entry) + } + } +} + +// evictOverLimitLocked 淘汰超出数量限制的客户端(需持有锁) +// 使用 LRU 策略,优先淘汰最久未使用且无活跃请求的客户端 +func (s *httpUpstreamService) evictOverLimitLocked() { + maxClients := s.maxUpstreamClients() + if maxClients <= 0 { + return + } + // 循环淘汰直到满足数量限制 + for len(s.clients) > maxClients { + var ( + oldestKey string + oldestEntry *upstreamClientEntry + oldestTime int64 + ) + // 查找最久未使用且无活跃请求的客户端 + for key, entry := range s.clients { + // 跳过有活跃请求的客户端 + if atomic.LoadInt64(&entry.inFlight) != 0 { + continue + } + lastUsed := atomic.LoadInt64(&entry.lastUsed) + if oldestEntry == nil || lastUsed < oldestTime { + oldestKey = key + oldestEntry = entry + oldestTime = lastUsed + } + } + // 所有客户端都有活跃请求,无法淘汰 + if oldestEntry == nil { + return + } + s.removeClientLocked(oldestKey, oldestEntry) + } +} + +// getIsolationMode 获取连接池隔离模式 +// 从配置中读取,无效值回退到 account_proxy 模式 +// +// 返回: +// - string: 隔离模式(proxy/account/account_proxy) +func (s *httpUpstreamService) getIsolationMode() string { + if s.cfg == nil { + return config.ConnectionPoolIsolationAccountProxy + } + mode := strings.ToLower(strings.TrimSpace(s.cfg.Gateway.ConnectionPoolIsolation)) + if mode == "" { + return config.ConnectionPoolIsolationAccountProxy + } + switch mode { + case config.ConnectionPoolIsolationProxy, config.ConnectionPoolIsolationAccount, config.ConnectionPoolIsolationAccountProxy: + return mode + default: + return config.ConnectionPoolIsolationAccountProxy + } +} + +// maxUpstreamClients 获取最大客户端缓存数量 +// 从配置中读取,无效值使用默认值 +func (s *httpUpstreamService) maxUpstreamClients() int { + if s.cfg == nil { + return defaultMaxUpstreamClients + } + if s.cfg.Gateway.MaxUpstreamClients > 0 { + return s.cfg.Gateway.MaxUpstreamClients + } + return defaultMaxUpstreamClients +} + +// clientIdleTTL 获取客户端空闲回收阈值 +// 从配置中读取,无效值使用默认值 +func (s *httpUpstreamService) clientIdleTTL() time.Duration { + if s.cfg == nil { + return time.Duration(defaultClientIdleTTLSeconds) * time.Second + } + if s.cfg.Gateway.ClientIdleTTLSeconds > 0 { + return time.Duration(s.cfg.Gateway.ClientIdleTTLSeconds) * time.Second + } + return time.Duration(defaultClientIdleTTLSeconds) * time.Second +} + +// resolvePoolSettings 解析连接池配置 +// 根据隔离策略和账户并发数动态调整连接池参数 +// +// 参数: +// - isolation: 隔离模式 +// - accountConcurrency: 账户并发限制 +// +// 返回: +// - poolSettings: 连接池配置 +// +// 说明: +// - 账户隔离模式下,连接池大小与账户并发数对应 +// - 这确保了单账户不会占用过多连接资源 +func (s *httpUpstreamService) resolvePoolSettings(isolation string, accountConcurrency int) poolSettings { + settings := defaultPoolSettings(s.cfg) + // 账户隔离模式下,根据账户并发数调整连接池大小 + if (isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy) && accountConcurrency > 0 { + settings.maxIdleConns = accountConcurrency + settings.maxIdleConnsPerHost = accountConcurrency + settings.maxConnsPerHost = accountConcurrency + } + return settings +} + +// buildPoolKey 构建连接池配置键 +// 用于检测配置变更,配置变更时需要重建客户端 +// +// 参数: +// - isolation: 隔离模式 +// - accountConcurrency: 账户并发限制 +// +// 返回: +// - string: 配置键 +func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency int) string { + if isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy { + if accountConcurrency > 0 { + return fmt.Sprintf("account:%d", accountConcurrency) + } + } + return "default" +} + +// buildCacheKey 构建客户端缓存键 +// 根据隔离策略决定缓存键的组成 +// +// 参数: +// - isolation: 隔离模式 +// - proxyKey: 代理标识 +// - accountID: 账户 ID +// +// 返回: +// - string: 缓存键 +// +// 缓存键格式: +// - proxy 模式: "proxy:{proxyKey}" +// - account 模式: "account:{accountID}" +// - account_proxy 模式: "account:{accountID}|proxy:{proxyKey}" +func buildCacheKey(isolation, proxyKey string, accountID int64) string { + switch isolation { + case config.ConnectionPoolIsolationAccount: + return fmt.Sprintf("account:%d", accountID) + case config.ConnectionPoolIsolationAccountProxy: + return fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey) + default: + return fmt.Sprintf("proxy:%s", proxyKey) + } +} + +// normalizeProxyURL 标准化代理 URL +// 处理空值和解析错误,返回标准化的键和解析后的 URL +// +// 参数: +// - raw: 原始代理 URL 字符串 +// +// 返回: +// - string: 标准化的代理键(空或解析失败返回 "direct") +// - *url.URL: 解析后的 URL(空或解析失败返回 nil) +func normalizeProxyURL(raw string) (string, *url.URL) { + proxyURL := strings.TrimSpace(raw) + if proxyURL == "" { + return directProxyKey, nil + } + parsed, err := url.Parse(proxyURL) + if err != nil { + return directProxyKey, nil + } + return proxyURL, parsed +} + +// defaultPoolSettings 获取默认连接池配置 +// 从全局配置中读取,无效值使用常量默认值 +// +// 参数: +// - cfg: 全局配置 +// +// 返回: +// - poolSettings: 连接池配置 +func defaultPoolSettings(cfg *config.Config) poolSettings { + maxIdleConns := defaultMaxIdleConns + maxIdleConnsPerHost := defaultMaxIdleConnsPerHost + maxConnsPerHost := defaultMaxConnsPerHost + idleConnTimeout := defaultIdleConnTimeout + responseHeaderTimeout := defaultResponseHeaderTimeout + + if cfg != nil { + if cfg.Gateway.MaxIdleConns > 0 { + maxIdleConns = cfg.Gateway.MaxIdleConns + } + if cfg.Gateway.MaxIdleConnsPerHost > 0 { + maxIdleConnsPerHost = cfg.Gateway.MaxIdleConnsPerHost + } + if cfg.Gateway.MaxConnsPerHost >= 0 { + maxConnsPerHost = cfg.Gateway.MaxConnsPerHost + } + if cfg.Gateway.IdleConnTimeoutSeconds > 0 { + idleConnTimeout = time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second + } + if cfg.Gateway.ResponseHeaderTimeout > 0 { + responseHeaderTimeout = time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second + } + } + + return poolSettings{ + maxIdleConns: maxIdleConns, + maxIdleConnsPerHost: maxIdleConnsPerHost, + maxConnsPerHost: maxConnsPerHost, + idleConnTimeout: idleConnTimeout, + responseHeaderTimeout: responseHeaderTimeout, + } } // buildUpstreamTransport 构建上游请求的 Transport // 使用配置文件中的连接池参数,支持生产环境调优 -func buildUpstreamTransport(cfg *config.Config, proxyURL *url.URL) *http.Transport { - // 读取配置,使用合理的默认值 - maxIdleConns := cfg.Gateway.MaxIdleConns - if maxIdleConns <= 0 { - maxIdleConns = 240 - } - maxIdleConnsPerHost := cfg.Gateway.MaxIdleConnsPerHost - if maxIdleConnsPerHost <= 0 { - maxIdleConnsPerHost = 120 - } - maxConnsPerHost := cfg.Gateway.MaxConnsPerHost - if maxConnsPerHost < 0 { - maxConnsPerHost = 240 - } - idleConnTimeout := time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second - if idleConnTimeout <= 0 { - idleConnTimeout = 300 * time.Second - } - responseHeaderTimeout := time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second - if responseHeaderTimeout <= 0 { - responseHeaderTimeout = 300 * time.Second - } - +// +// 参数: +// - settings: 连接池配置 +// - proxyURL: 代理 URL(nil 表示直连) +// +// 返回: +// - *http.Transport: 配置好的 Transport 实例 +// +// Transport 参数说明: +// - MaxIdleConns: 所有主机的最大空闲连接总数 +// - MaxIdleConnsPerHost: 每主机最大空闲连接数(影响连接复用率) +// - MaxConnsPerHost: 每主机最大连接数(达到后新请求等待) +// - IdleConnTimeout: 空闲连接超时(超时后关闭) +// - ResponseHeaderTimeout: 等待响应头超时(不影响流式传输) +func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) *http.Transport { transport := &http.Transport{ - MaxIdleConns: maxIdleConns, // 最大空闲连接总数 - MaxIdleConnsPerHost: maxIdleConnsPerHost, // 每主机最大空闲连接 - MaxConnsPerHost: maxConnsPerHost, // 每主机最大连接数(含活跃) - IdleConnTimeout: idleConnTimeout, // 空闲连接超时 - ResponseHeaderTimeout: responseHeaderTimeout, + MaxIdleConns: settings.maxIdleConns, + MaxIdleConnsPerHost: settings.maxIdleConnsPerHost, + MaxConnsPerHost: settings.maxConnsPerHost, + IdleConnTimeout: settings.idleConnTimeout, + ResponseHeaderTimeout: settings.responseHeaderTimeout, } if proxyURL != nil { transport.Proxy = http.ProxyURL(proxyURL) } return transport } + +// trackedBody 带跟踪功能的响应体包装器 +// 在 Close 时执行回调,用于更新请求计数 +type trackedBody struct { + io.ReadCloser // 原始响应体 + once sync.Once + onClose func() // 关闭时的回调函数 +} + +// Close 关闭响应体并执行回调 +// 使用 sync.Once 确保回调只执行一次 +func (b *trackedBody) Close() error { + err := b.ReadCloser.Close() + if b.onClose != nil { + b.once.Do(b.onClose) + } + return err +} + +// wrapTrackedBody 包装响应体以跟踪关闭事件 +// 用于在响应体关闭时更新 inFlight 计数 +// +// 参数: +// - body: 原始响应体 +// - onClose: 关闭时的回调函数 +// +// 返回: +// - io.ReadCloser: 包装后的响应体 +func wrapTrackedBody(body io.ReadCloser, onClose func()) io.ReadCloser { + if body == nil { + return body + } + return &trackedBody{ReadCloser: body, onClose: onClose} +} diff --git a/backend/internal/repository/http_upstream_benchmark_test.go b/backend/internal/repository/http_upstream_benchmark_test.go index 2ea6e31a..3219c6da 100644 --- a/backend/internal/repository/http_upstream_benchmark_test.go +++ b/backend/internal/repository/http_upstream_benchmark_test.go @@ -8,10 +8,21 @@ import ( "github.com/Wei-Shaw/sub2api/internal/config" ) +// httpClientSink 用于防止编译器优化掉基准测试中的赋值操作 +// 这是 Go 基准测试的常见模式,确保测试结果准确 var httpClientSink *http.Client -// BenchmarkHTTPUpstreamProxyClient 对比重复创建与复用代理客户端的开销。 +// BenchmarkHTTPUpstreamProxyClient 对比重复创建与复用代理客户端的开销 +// +// 测试目的: +// - 验证连接池复用相比每次新建的性能提升 +// - 量化内存分配差异 +// +// 预期结果: +// - "复用" 子测试应显著快于 "新建" +// - "复用" 子测试应零内存分配 func BenchmarkHTTPUpstreamProxyClient(b *testing.B) { + // 创建测试配置 cfg := &config.Config{ Gateway: config.GatewayConfig{ResponseHeaderTimeout: 300}, } @@ -22,24 +33,33 @@ func BenchmarkHTTPUpstreamProxyClient(b *testing.B) { } proxyURL := "http://127.0.0.1:8080" - b.ReportAllocs() + b.ReportAllocs() // 报告内存分配统计 + // 子测试:每次新建客户端 + // 模拟未优化前的行为,每次请求都创建新的 http.Client b.Run("新建", func(b *testing.B) { parsedProxy, err := url.Parse(proxyURL) if err != nil { b.Fatalf("解析代理地址失败: %v", err) } + settings := defaultPoolSettings(cfg) for i := 0; i < b.N; i++ { + // 每次迭代都创建新客户端,包含 Transport 分配 httpClientSink = &http.Client{ - Transport: buildUpstreamTransport(cfg, parsedProxy), + Transport: buildUpstreamTransport(settings, parsedProxy), } } }) + // 子测试:复用已缓存的客户端 + // 模拟优化后的行为,从缓存获取客户端 b.Run("复用", func(b *testing.B) { - client := svc.getOrCreateClient(proxyURL) - b.ResetTimer() + // 预热:确保客户端已缓存 + entry := svc.getOrCreateClient(proxyURL, 1, 1) + client := entry.client + b.ResetTimer() // 重置计时器,排除预热时间 for i := 0; i < b.N; i++ { + // 直接使用缓存的客户端,无内存分配 httpClientSink = client } }) diff --git a/backend/internal/repository/http_upstream_test.go b/backend/internal/repository/http_upstream_test.go index 74132e1d..763b254f 100644 --- a/backend/internal/repository/http_upstream_test.go +++ b/backend/internal/repository/http_upstream_test.go @@ -4,6 +4,7 @@ import ( "io" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -12,45 +13,61 @@ import ( "github.com/stretchr/testify/suite" ) +// HTTPUpstreamSuite HTTP 上游服务测试套件 +// 使用 testify/suite 组织测试,支持 SetupTest 初始化 type HTTPUpstreamSuite struct { suite.Suite - cfg *config.Config + cfg *config.Config // 测试用配置 } +// SetupTest 每个测试用例执行前的初始化 +// 创建空配置,各测试用例可按需覆盖 func (s *HTTPUpstreamSuite) SetupTest() { s.cfg = &config.Config{} } -func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() { +// newService 创建测试用的 httpUpstreamService 实例 +// 返回具体类型以便访问内部状态进行断言 +func (s *HTTPUpstreamSuite) newService() *httpUpstreamService { up := NewHTTPUpstream(s.cfg) svc, ok := up.(*httpUpstreamService) require.True(s.T(), ok, "expected *httpUpstreamService") - transport, ok := svc.defaultClient.Transport.(*http.Transport) + return svc +} + +// TestDefaultResponseHeaderTimeout 测试默认响应头超时配置 +// 验证未配置时使用 300 秒默认值 +func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() { + svc := s.newService() + entry := svc.getOrCreateClient("", 0, 0) + transport, ok := entry.client.Transport.(*http.Transport) require.True(s.T(), ok, "expected *http.Transport") require.Equal(s.T(), 300*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch") } +// TestCustomResponseHeaderTimeout 测试自定义响应头超时配置 +// 验证配置值能正确应用到 Transport func (s *HTTPUpstreamSuite) TestCustomResponseHeaderTimeout() { s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 7} - up := NewHTTPUpstream(s.cfg) - svc, ok := up.(*httpUpstreamService) - require.True(s.T(), ok, "expected *httpUpstreamService") - transport, ok := svc.defaultClient.Transport.(*http.Transport) + svc := s.newService() + entry := svc.getOrCreateClient("", 0, 0) + transport, ok := entry.client.Transport.(*http.Transport) require.True(s.T(), ok, "expected *http.Transport") require.Equal(s.T(), 7*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch") } -func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLFallsBackToDefault() { - s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 5} - up := NewHTTPUpstream(s.cfg) - svc, ok := up.(*httpUpstreamService) - require.True(s.T(), ok, "expected *httpUpstreamService") - - got := svc.getOrCreateClient("://bad-proxy-url") - require.Equal(s.T(), svc.defaultClient, got, "expected defaultClient fallback") +// TestGetOrCreateClient_InvalidURLFallsBackToDirect 测试无效代理 URL 回退 +// 验证解析失败时回退到直连模式 +func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLFallsBackToDirect() { + svc := s.newService() + entry := svc.getOrCreateClient("://bad-proxy-url", 1, 1) + require.Equal(s.T(), directProxyKey, entry.proxyKey, "expected direct proxy fallback") } +// TestDo_WithoutProxy_GoesDirect 测试无代理时直连 +// 验证空代理 URL 时请求直接发送到目标服务器 func (s *HTTPUpstreamSuite) TestDo_WithoutProxy_GoesDirect() { + // 创建模拟上游服务器 upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = io.WriteString(w, "direct") })) @@ -60,17 +77,21 @@ func (s *HTTPUpstreamSuite) TestDo_WithoutProxy_GoesDirect() { req, err := http.NewRequest(http.MethodGet, upstream.URL+"/x", nil) require.NoError(s.T(), err, "NewRequest") - resp, err := up.Do(req, "") + resp, err := up.Do(req, "", 1, 1) require.NoError(s.T(), err, "Do") defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(resp.Body) require.Equal(s.T(), "direct", string(b), "unexpected body") } +// TestDo_WithHTTPProxy_UsesProxy 测试 HTTP 代理功能 +// 验证请求通过代理服务器转发,使用绝对 URI 格式 func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() { + // 用于接收代理请求的通道 seen := make(chan string, 1) + // 创建模拟代理服务器 proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - seen <- r.RequestURI + seen <- r.RequestURI // 记录请求 URI _, _ = io.WriteString(w, "proxied") })) s.T().Cleanup(proxySrv.Close) @@ -78,14 +99,16 @@ func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() { s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 1} up := NewHTTPUpstream(s.cfg) + // 发送请求到外部地址,应通过代理 req, err := http.NewRequest(http.MethodGet, "http://example.com/test", nil) require.NoError(s.T(), err, "NewRequest") - resp, err := up.Do(req, proxySrv.URL) + resp, err := up.Do(req, proxySrv.URL, 1, 1) require.NoError(s.T(), err, "Do") defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(resp.Body) require.Equal(s.T(), "proxied", string(b), "unexpected body") + // 验证代理收到的是绝对 URI 格式(HTTP 代理规范要求) select { case uri := <-seen: require.Equal(s.T(), "http://example.com/test", uri, "expected absolute-form request URI") @@ -94,6 +117,8 @@ func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() { } } +// TestDo_EmptyProxy_UsesDirect 测试空代理字符串 +// 验证空字符串代理等同于直连 func (s *HTTPUpstreamSuite) TestDo_EmptyProxy_UsesDirect() { upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = io.WriteString(w, "direct-empty") @@ -103,13 +128,134 @@ func (s *HTTPUpstreamSuite) TestDo_EmptyProxy_UsesDirect() { up := NewHTTPUpstream(s.cfg) req, err := http.NewRequest(http.MethodGet, upstream.URL+"/y", nil) require.NoError(s.T(), err, "NewRequest") - resp, err := up.Do(req, "") + resp, err := up.Do(req, "", 1, 1) require.NoError(s.T(), err, "Do with empty proxy") defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(resp.Body) require.Equal(s.T(), "direct-empty", string(b)) } +// TestAccountIsolation_DifferentAccounts 测试账户隔离模式 +// 验证不同账户使用独立的连接池 +func (s *HTTPUpstreamSuite) TestAccountIsolation_DifferentAccounts() { + s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount} + svc := s.newService() + // 同一代理,不同账户 + entry1 := svc.getOrCreateClient("http://proxy.local:8080", 1, 3) + entry2 := svc.getOrCreateClient("http://proxy.local:8080", 2, 3) + require.NotSame(s.T(), entry1, entry2, "不同账号不应共享连接池") + require.Equal(s.T(), 2, len(svc.clients), "账号隔离应缓存两个客户端") +} + +// TestAccountProxyIsolation_DifferentProxy 测试账户+代理组合隔离模式 +// 验证同一账户使用不同代理时创建独立连接池 +func (s *HTTPUpstreamSuite) TestAccountProxyIsolation_DifferentProxy() { + s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccountProxy} + svc := s.newService() + // 同一账户,不同代理 + entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 3) + entry2 := svc.getOrCreateClient("http://proxy-b:8080", 1, 3) + require.NotSame(s.T(), entry1, entry2, "账号+代理隔离应区分不同代理") + require.Equal(s.T(), 2, len(svc.clients), "账号+代理隔离应缓存两个客户端") +} + +// TestAccountModeProxyChangeClearsPool 测试账户模式下代理变更 +// 验证账户切换代理时清理旧连接池,避免复用错误代理 +func (s *HTTPUpstreamSuite) TestAccountModeProxyChangeClearsPool() { + s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount} + svc := s.newService() + // 同一账户,先后使用不同代理 + entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 3) + entry2 := svc.getOrCreateClient("http://proxy-b:8080", 1, 3) + require.NotSame(s.T(), entry1, entry2, "账号切换代理应创建新连接池") + require.Equal(s.T(), 1, len(svc.clients), "账号模式下应仅保留一个连接池") + require.False(s.T(), hasEntry(svc, entry1), "旧连接池应被清理") +} + +// TestAccountConcurrencyOverridesPoolSettings 测试账户并发数覆盖连接池配置 +// 验证账户隔离模式下,连接池大小与账户并发数对应 +func (s *HTTPUpstreamSuite) TestAccountConcurrencyOverridesPoolSettings() { + s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount} + svc := s.newService() + // 账户并发数为 12 + entry := svc.getOrCreateClient("", 1, 12) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + // 连接池参数应与并发数一致 + require.Equal(s.T(), 12, transport.MaxConnsPerHost, "MaxConnsPerHost mismatch") + require.Equal(s.T(), 12, transport.MaxIdleConns, "MaxIdleConns mismatch") + require.Equal(s.T(), 12, transport.MaxIdleConnsPerHost, "MaxIdleConnsPerHost mismatch") +} + +// TestAccountConcurrencyFallbackToDefault 测试账户并发数为 0 时回退到默认配置 +// 验证未指定并发数时使用全局配置值 +func (s *HTTPUpstreamSuite) TestAccountConcurrencyFallbackToDefault() { + s.cfg.Gateway = config.GatewayConfig{ + ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount, + MaxIdleConns: 77, + MaxIdleConnsPerHost: 55, + MaxConnsPerHost: 66, + } + svc := s.newService() + // 账户并发数为 0,应使用全局配置 + entry := svc.getOrCreateClient("", 1, 0) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), 66, transport.MaxConnsPerHost, "MaxConnsPerHost fallback mismatch") + require.Equal(s.T(), 77, transport.MaxIdleConns, "MaxIdleConns fallback mismatch") + require.Equal(s.T(), 55, transport.MaxIdleConnsPerHost, "MaxIdleConnsPerHost fallback mismatch") +} + +// TestEvictOverLimitRemovesOldestIdle 测试超出数量限制时的 LRU 淘汰 +// 验证优先淘汰最久未使用的空闲客户端 +func (s *HTTPUpstreamSuite) TestEvictOverLimitRemovesOldestIdle() { + s.cfg.Gateway = config.GatewayConfig{ + ConnectionPoolIsolation: config.ConnectionPoolIsolationAccountProxy, + MaxUpstreamClients: 2, // 最多缓存 2 个客户端 + } + svc := s.newService() + // 创建两个客户端,设置不同的最后使用时间 + entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 1) + entry2 := svc.getOrCreateClient("http://proxy-b:8080", 2, 1) + atomic.StoreInt64(&entry1.lastUsed, time.Now().Add(-2*time.Hour).UnixNano()) // 最久 + atomic.StoreInt64(&entry2.lastUsed, time.Now().Add(-time.Hour).UnixNano()) + // 创建第三个客户端,触发淘汰 + _ = svc.getOrCreateClient("http://proxy-c:8080", 3, 1) + + require.LessOrEqual(s.T(), len(svc.clients), 2, "应保持在缓存上限内") + require.False(s.T(), hasEntry(svc, entry1), "最久未使用的连接池应被清理") +} + +// TestIdleTTLDoesNotEvictActive 测试活跃请求保护 +// 验证有进行中请求的客户端不会被空闲超时淘汰 +func (s *HTTPUpstreamSuite) TestIdleTTLDoesNotEvictActive() { + s.cfg.Gateway = config.GatewayConfig{ + ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount, + ClientIdleTTLSeconds: 1, // 1 秒空闲超时 + } + svc := s.newService() + entry1 := svc.getOrCreateClient("", 1, 1) + // 设置为很久之前使用,但有活跃请求 + atomic.StoreInt64(&entry1.lastUsed, time.Now().Add(-2*time.Minute).UnixNano()) + atomic.StoreInt64(&entry1.inFlight, 1) // 模拟有活跃请求 + // 创建新客户端,触发淘汰检查 + _ = svc.getOrCreateClient("", 2, 1) + + require.True(s.T(), hasEntry(svc, entry1), "有活跃请求时不应回收") +} + +// TestHTTPUpstreamSuite 运行测试套件 func TestHTTPUpstreamSuite(t *testing.T) { suite.Run(t, new(HTTPUpstreamSuite)) } + +// hasEntry 检查客户端是否存在于缓存中 +// 辅助函数,用于验证淘汰逻辑 +func hasEntry(svc *httpUpstreamService, target *upstreamClientEntry) bool { + for _, entry := range svc.clients { + if entry == target { + return true + } + } + return false +} diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index 6296f2fe..bfa9b60f 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -256,7 +256,7 @@ func (s *AccountTestService) testClaudeAccountConnection(c *gin.Context, account proxyURL = account.Proxy.URL() } - resp, err := s.httpUpstream.Do(req, proxyURL) + resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency) if err != nil { return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error())) } @@ -371,7 +371,7 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account proxyURL = account.Proxy.URL() } - resp, err := s.httpUpstream.Do(req, proxyURL) + resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency) if err != nil { return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error())) } @@ -442,7 +442,7 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account proxyURL = account.Proxy.URL() } - resp, err := s.httpUpstream.Do(req, proxyURL) + resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency) if err != nil { return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error())) } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 18a67fdf..25d9066b 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -230,7 +230,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, upstreamReq.Header.Set("Authorization", "Bearer "+accessToken) upstreamReq.Header.Set("User-Agent", antigravity.UserAgent) - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { if attempt < antigravityMaxRetries { log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err) @@ -380,7 +380,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co upstreamReq.Header.Set("Authorization", "Bearer "+accessToken) upstreamReq.Header.Set("User-Agent", antigravity.UserAgent) - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { if attempt < antigravityMaxRetries { log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 41362662..dd879da2 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -644,7 +644,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A } // 发送请求 - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { return nil, fmt.Errorf("upstream request failed: %w", err) } @@ -1308,7 +1308,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, } // 发送请求 - resp, err := s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed") return fmt.Errorf("upstream request failed: %w", err) diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 34958541..ee3ade16 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -472,7 +472,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex } requestIDHeader = idHeader - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { if attempt < geminiMaxRetries { log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err) @@ -725,7 +725,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. } requestIDHeader = idHeader - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { if attempt < geminiMaxRetries { log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err) @@ -1756,7 +1756,7 @@ func (s *GeminiMessagesCompatService) ForwardAIStudioGET(ctx context.Context, ac return nil, fmt.Errorf("unsupported account type: %s", account.Type) } - resp, err := s.httpUpstream.Do(req, proxyURL) + resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency) if err != nil { return nil, err } diff --git a/backend/internal/service/http_upstream_port.go b/backend/internal/service/http_upstream_port.go index 7fb9407f..9357f763 100644 --- a/backend/internal/service/http_upstream_port.go +++ b/backend/internal/service/http_upstream_port.go @@ -2,8 +2,29 @@ package service import "net/http" -// HTTPUpstream interface for making HTTP requests to upstream APIs (Claude, OpenAI, etc.) -// This is a generic interface that can be used for any HTTP-based upstream service. +// HTTPUpstream 上游 HTTP 请求接口 +// 用于向上游 API(Claude、OpenAI、Gemini 等)发送请求 +// 这是一个通用接口,可用于任何基于 HTTP 的上游服务 +// +// 设计说明: +// - 支持可选代理配置 +// - 支持账户级连接池隔离 +// - 实现类负责连接池管理和复用 type HTTPUpstream interface { - Do(req *http.Request, proxyURL string) (*http.Response, error) + // Do 执行 HTTP 请求 + // + // 参数: + // - req: HTTP 请求对象,由调用方构建 + // - proxyURL: 代理服务器地址,空字符串表示直连 + // - accountID: 账户 ID,用于连接池隔离(隔离策略为 account 或 account_proxy 时生效) + // - accountConcurrency: 账户并发限制,用于动态调整连接池大小 + // + // 返回: + // - *http.Response: HTTP 响应,调用方必须关闭 Body + // - error: 请求错误(网络错误、超时等) + // + // 注意: + // - 调用方必须关闭 resp.Body,否则会导致连接泄漏 + // - 响应体可能已被包装以跟踪请求生命周期 + Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index aa844554..769d0c3c 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -311,7 +311,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } // Send request - resp, err := s.httpUpstream.Do(upstreamReq, proxyURL) + resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { return nil, fmt.Errorf("upstream request failed: %w", err) } diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 8db4cbc9..5bd85d7d 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -29,11 +29,21 @@ gateway: response_header_timeout: 300 # 请求体最大字节数(默认 100MB) max_body_size: 104857600 + # 连接池隔离策略: + # - proxy: 按代理隔离,同一代理共享连接池(适合代理少、账户多) + # - account: 按账户隔离,同一账户共享连接池(适合账户少、需严格隔离) + # - account_proxy: 按账户+代理组合隔离(默认,最细粒度) + connection_pool_isolation: "account_proxy" # HTTP 上游连接池配置(HTTP/2 + 多代理场景默认) max_idle_conns: 240 max_idle_conns_per_host: 120 max_conns_per_host: 240 idle_conn_timeout_seconds: 300 + # 上游连接池客户端缓存配置 + # max_upstream_clients: 最大缓存客户端数量,超出后淘汰最久未使用的 + # client_idle_ttl_seconds: 客户端空闲回收阈值(秒),超时且无活跃请求时回收 + max_upstream_clients: 5000 + client_idle_ttl_seconds: 900 # 并发槽位过期时间(分钟) concurrency_slot_ttl_minutes: 15