perf(网关): 实现上游账号连接池隔离
新增隔离策略与连接池缓存回收 连接池大小跟随账号并发并处理代理切换 同步配置默认值与示例并补充测试
This commit is contained in:
@@ -12,6 +12,20 @@ const (
|
||||
RunModeSimple = "simple"
|
||||
)
|
||||
|
||||
// 连接池隔离策略常量
|
||||
// 用于控制上游 HTTP 连接池的隔离粒度,影响连接复用和资源消耗
|
||||
const (
|
||||
// ConnectionPoolIsolationProxy: 按代理隔离
|
||||
// 同一代理地址共享连接池,适合代理数量少、账户数量多的场景
|
||||
ConnectionPoolIsolationProxy = "proxy"
|
||||
// ConnectionPoolIsolationAccount: 按账户隔离
|
||||
// 每个账户独立连接池,适合账户数量少、需要严格隔离的场景
|
||||
ConnectionPoolIsolationAccount = "account"
|
||||
// ConnectionPoolIsolationAccountProxy: 按账户+代理组合隔离(默认)
|
||||
// 同一账户+代理组合共享连接池,提供最细粒度的隔离
|
||||
ConnectionPoolIsolationAccountProxy = "account_proxy"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Server ServerConfig `mapstructure:"server"`
|
||||
Database DatabaseConfig `mapstructure:"database"`
|
||||
@@ -81,6 +95,8 @@ type GatewayConfig struct {
|
||||
ResponseHeaderTimeout int `mapstructure:"response_header_timeout"`
|
||||
// 请求体最大字节数,用于网关请求体大小限制
|
||||
MaxBodySize int64 `mapstructure:"max_body_size"`
|
||||
// ConnectionPoolIsolation: 上游连接池隔离策略(proxy/account/account_proxy)
|
||||
ConnectionPoolIsolation string `mapstructure:"connection_pool_isolation"`
|
||||
|
||||
// HTTP 上游连接池配置(性能优化:支持高并发场景调优)
|
||||
// MaxIdleConns: 所有主机的最大空闲连接总数
|
||||
@@ -91,6 +107,15 @@ type GatewayConfig struct {
|
||||
MaxConnsPerHost int `mapstructure:"max_conns_per_host"`
|
||||
// IdleConnTimeoutSeconds: 空闲连接超时时间(秒)
|
||||
IdleConnTimeoutSeconds int `mapstructure:"idle_conn_timeout_seconds"`
|
||||
// MaxUpstreamClients: 上游连接池客户端最大缓存数量
|
||||
// 当使用连接池隔离策略时,系统会为不同的账户/代理组合创建独立的 HTTP 客户端
|
||||
// 此参数限制缓存的客户端数量,超出后会淘汰最久未使用的客户端
|
||||
// 建议值:预估的活跃账户数 * 1.2(留有余量)
|
||||
MaxUpstreamClients int `mapstructure:"max_upstream_clients"`
|
||||
// ClientIdleTTLSeconds: 上游连接池客户端空闲回收阈值(秒)
|
||||
// 超过此时间未使用的客户端会被标记为可回收
|
||||
// 建议值:根据用户访问频率设置,一般 10-30 分钟
|
||||
ClientIdleTTLSeconds int `mapstructure:"client_idle_ttl_seconds"`
|
||||
// ConcurrencySlotTTLMinutes: 并发槽位过期时间(分钟)
|
||||
// 应大于最长 LLM 请求时间,防止请求完成前槽位过期
|
||||
ConcurrencySlotTTLMinutes int `mapstructure:"concurrency_slot_ttl_minutes"`
|
||||
@@ -289,11 +314,14 @@ func setDefaults() {
|
||||
// Gateway
|
||||
viper.SetDefault("gateway.response_header_timeout", 300) // 300秒(5分钟)等待上游响应头,LLM高负载时可能排队较久
|
||||
viper.SetDefault("gateway.max_body_size", int64(100*1024*1024))
|
||||
viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy)
|
||||
// HTTP 上游连接池配置(针对 5000+ 并发用户优化)
|
||||
viper.SetDefault("gateway.max_idle_conns", 240) // 最大空闲连接总数(HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.max_idle_conns_per_host", 120) // 每主机最大空闲连接(HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.max_conns_per_host", 240) // 每主机最大连接数(含活跃,HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.max_idle_conns", 240) // 最大空闲连接总数(HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.max_idle_conns_per_host", 120) // 每主机最大空闲连接(HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.max_conns_per_host", 240) // 每主机最大连接数(含活跃,HTTP/2 场景默认)
|
||||
viper.SetDefault("gateway.idle_conn_timeout_seconds", 300) // 空闲连接超时(秒)
|
||||
viper.SetDefault("gateway.max_upstream_clients", 5000)
|
||||
viper.SetDefault("gateway.client_idle_ttl_seconds", 900)
|
||||
viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 15) // 并发槽位过期时间(支持超长请求)
|
||||
|
||||
// TokenRefresh
|
||||
@@ -354,6 +382,14 @@ func (c *Config) Validate() error {
|
||||
if c.Gateway.MaxBodySize <= 0 {
|
||||
return fmt.Errorf("gateway.max_body_size must be positive")
|
||||
}
|
||||
if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" {
|
||||
switch c.Gateway.ConnectionPoolIsolation {
|
||||
case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy:
|
||||
default:
|
||||
return fmt.Errorf("gateway.connection_pool_isolation must be one of: %s/%s/%s",
|
||||
ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy)
|
||||
}
|
||||
}
|
||||
if c.Gateway.MaxIdleConns <= 0 {
|
||||
return fmt.Errorf("gateway.max_idle_conns must be positive")
|
||||
}
|
||||
@@ -366,6 +402,12 @@ func (c *Config) Validate() error {
|
||||
if c.Gateway.IdleConnTimeoutSeconds <= 0 {
|
||||
return fmt.Errorf("gateway.idle_conn_timeout_seconds must be positive")
|
||||
}
|
||||
if c.Gateway.MaxUpstreamClients <= 0 {
|
||||
return fmt.Errorf("gateway.max_upstream_clients must be positive")
|
||||
}
|
||||
if c.Gateway.ClientIdleTTLSeconds <= 0 {
|
||||
return fmt.Errorf("gateway.client_idle_ttl_seconds must be positive")
|
||||
}
|
||||
if c.Gateway.ConcurrencySlotTTLMinutes <= 0 {
|
||||
return fmt.Errorf("gateway.concurrency_slot_ttl_minutes must be positive")
|
||||
}
|
||||
|
||||
@@ -1,106 +1,553 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
)
|
||||
|
||||
// 默认配置常量
|
||||
// 这些值在配置文件未指定时作为回退默认值使用
|
||||
const (
|
||||
// directProxyKey: 无代理时的缓存键标识
|
||||
directProxyKey = "direct"
|
||||
// defaultMaxIdleConns: 默认最大空闲连接总数
|
||||
// HTTP/2 场景下,单连接可多路复用,240 足以支撑高并发
|
||||
defaultMaxIdleConns = 240
|
||||
// defaultMaxIdleConnsPerHost: 默认每主机最大空闲连接数
|
||||
defaultMaxIdleConnsPerHost = 120
|
||||
// defaultMaxConnsPerHost: 默认每主机最大连接数(含活跃连接)
|
||||
// 达到上限后新请求会等待,而非无限创建连接
|
||||
defaultMaxConnsPerHost = 240
|
||||
// defaultIdleConnTimeout: 默认空闲连接超时时间(5分钟)
|
||||
// 超时后连接会被关闭,释放系统资源
|
||||
defaultIdleConnTimeout = 300 * time.Second
|
||||
// defaultResponseHeaderTimeout: 默认等待响应头超时时间(5分钟)
|
||||
// LLM 请求可能排队较久,需要较长超时
|
||||
defaultResponseHeaderTimeout = 300 * time.Second
|
||||
// defaultMaxUpstreamClients: 默认最大客户端缓存数量
|
||||
// 超出后会淘汰最久未使用的客户端
|
||||
defaultMaxUpstreamClients = 5000
|
||||
// defaultClientIdleTTLSeconds: 默认客户端空闲回收阈值(15分钟)
|
||||
defaultClientIdleTTLSeconds = 900
|
||||
)
|
||||
|
||||
// poolSettings 连接池配置参数
|
||||
// 封装 Transport 所需的各项连接池参数
|
||||
type poolSettings struct {
|
||||
maxIdleConns int // 最大空闲连接总数
|
||||
maxIdleConnsPerHost int // 每主机最大空闲连接数
|
||||
maxConnsPerHost int // 每主机最大连接数(含活跃)
|
||||
idleConnTimeout time.Duration // 空闲连接超时时间
|
||||
responseHeaderTimeout time.Duration // 等待响应头超时时间
|
||||
}
|
||||
|
||||
// upstreamClientEntry 上游客户端缓存条目
|
||||
// 记录客户端实例及其元数据,用于连接池管理和淘汰策略
|
||||
type upstreamClientEntry struct {
|
||||
client *http.Client // HTTP 客户端实例
|
||||
proxyKey string // 代理标识(用于检测代理变更)
|
||||
poolKey string // 连接池配置标识(用于检测配置变更)
|
||||
lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰
|
||||
inFlight int64 // 当前进行中的请求数,>0 时不可淘汰
|
||||
}
|
||||
|
||||
// httpUpstreamService 通用 HTTP 上游服务
|
||||
// 用于向任意 HTTP API(Claude、OpenAI 等)发送请求,支持可选代理
|
||||
//
|
||||
// 架构设计:
|
||||
// - 根据隔离策略(proxy/account/account_proxy)缓存客户端实例
|
||||
// - 每个客户端拥有独立的 Transport 连接池
|
||||
// - 支持 LRU + 空闲时间双重淘汰策略
|
||||
//
|
||||
// 性能优化:
|
||||
// 1. 使用 sync.Map 缓存代理客户端实例,避免每次请求都创建新的 http.Client
|
||||
// 1. 根据隔离策略缓存客户端实例,避免频繁创建 http.Client
|
||||
// 2. 复用 Transport 连接池,减少 TCP 握手和 TLS 协商开销
|
||||
// 3. 原实现每次请求都 new 一个 http.Client,导致连接无法复用
|
||||
// 3. 支持账号级隔离与空闲回收,降低连接层关联风险
|
||||
// 4. 达到最大连接数后等待可用连接,而非无限创建
|
||||
// 5. 仅回收空闲客户端,避免中断活跃请求
|
||||
// 6. HTTP/2 多路复用,连接上限不等于并发请求上限
|
||||
// 7. 代理变更时清空旧连接池,避免复用错误代理
|
||||
// 8. 账号并发数与连接池上限对应(账号隔离策略下)
|
||||
type httpUpstreamService struct {
|
||||
// defaultClient: 无代理时使用的默认客户端(单例复用)
|
||||
defaultClient *http.Client
|
||||
// proxyClients: 按代理 URL 缓存的客户端池,避免重复创建
|
||||
proxyClients sync.Map
|
||||
cfg *config.Config
|
||||
cfg *config.Config // 全局配置
|
||||
mu sync.RWMutex // 保护 clients map 的读写锁
|
||||
clients map[string]*upstreamClientEntry // 客户端缓存池,key 由隔离策略决定
|
||||
}
|
||||
|
||||
// NewHTTPUpstream 创建通用 HTTP 上游服务
|
||||
// 使用配置中的连接池参数构建 Transport
|
||||
//
|
||||
// 参数:
|
||||
// - cfg: 全局配置,包含连接池参数和隔离策略
|
||||
//
|
||||
// 返回:
|
||||
// - service.HTTPUpstream 接口实现
|
||||
func NewHTTPUpstream(cfg *config.Config) service.HTTPUpstream {
|
||||
return &httpUpstreamService{
|
||||
defaultClient: &http.Client{Transport: buildUpstreamTransport(cfg, nil)},
|
||||
cfg: cfg,
|
||||
cfg: cfg,
|
||||
clients: make(map[string]*upstreamClientEntry),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *httpUpstreamService) Do(req *http.Request, proxyURL string) (*http.Response, error) {
|
||||
if strings.TrimSpace(proxyURL) == "" {
|
||||
return s.defaultClient.Do(req)
|
||||
}
|
||||
client := s.getOrCreateClient(proxyURL)
|
||||
return client.Do(req)
|
||||
}
|
||||
// Do 执行 HTTP 请求
|
||||
// 根据隔离策略获取或创建客户端,并跟踪请求生命周期
|
||||
//
|
||||
// 参数:
|
||||
// - req: HTTP 请求对象
|
||||
// - proxyURL: 代理地址,空字符串表示直连
|
||||
// - accountID: 账户 ID,用于账户级隔离
|
||||
// - accountConcurrency: 账户并发限制,用于动态调整连接池大小
|
||||
//
|
||||
// 返回:
|
||||
// - *http.Response: HTTP 响应(Body 已包装,关闭时自动更新计数)
|
||||
// - error: 请求错误
|
||||
//
|
||||
// 注意:
|
||||
// - 调用方必须关闭 resp.Body,否则会导致 inFlight 计数泄漏
|
||||
// - inFlight > 0 的客户端不会被淘汰,确保活跃请求不被中断
|
||||
func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
|
||||
// 获取或创建对应的客户端,并标记请求占用
|
||||
entry := s.acquireClient(proxyURL, accountID, accountConcurrency)
|
||||
|
||||
// getOrCreateClient 获取或创建代理客户端
|
||||
// 性能优化:使用 sync.Map 实现无锁缓存,相同代理 URL 复用同一客户端
|
||||
// LoadOrStore 保证并发安全,避免重复创建
|
||||
func (s *httpUpstreamService) getOrCreateClient(proxyURL string) *http.Client {
|
||||
proxyURL = strings.TrimSpace(proxyURL)
|
||||
if proxyURL == "" {
|
||||
return s.defaultClient
|
||||
}
|
||||
// 优先从缓存获取,命中则直接返回
|
||||
if cached, ok := s.proxyClients.Load(proxyURL); ok {
|
||||
return cached.(*http.Client)
|
||||
}
|
||||
|
||||
parsedURL, err := url.Parse(proxyURL)
|
||||
// 执行请求
|
||||
resp, err := entry.client.Do(req)
|
||||
if err != nil {
|
||||
return s.defaultClient
|
||||
// 请求失败,立即减少计数
|
||||
atomic.AddInt64(&entry.inFlight, -1)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 创建新客户端并缓存,LoadOrStore 保证只有一个实例被存储
|
||||
client := &http.Client{Transport: buildUpstreamTransport(s.cfg, parsedURL)}
|
||||
actual, _ := s.proxyClients.LoadOrStore(proxyURL, client)
|
||||
return actual.(*http.Client)
|
||||
// 包装响应体,在关闭时自动减少计数并更新时间戳
|
||||
// 这确保了流式响应(如 SSE)在完全读取前不会被淘汰
|
||||
resp.Body = wrapTrackedBody(resp.Body, func() {
|
||||
atomic.AddInt64(&entry.inFlight, -1)
|
||||
atomic.StoreInt64(&entry.lastUsed, time.Now().UnixNano())
|
||||
})
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// acquireClient 获取或创建客户端,并标记为进行中请求
|
||||
// 用于请求路径,避免在获取后被淘汰
|
||||
func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, accountConcurrency int) *upstreamClientEntry {
|
||||
return s.getClientEntry(proxyURL, accountID, accountConcurrency, true)
|
||||
}
|
||||
|
||||
// getOrCreateClient 获取或创建客户端
|
||||
// 根据隔离策略和参数决定缓存键,处理代理变更和配置变更
|
||||
//
|
||||
// 参数:
|
||||
// - proxyURL: 代理地址
|
||||
// - accountID: 账户 ID
|
||||
// - accountConcurrency: 账户并发限制
|
||||
//
|
||||
// 返回:
|
||||
// - *upstreamClientEntry: 客户端缓存条目
|
||||
//
|
||||
// 隔离策略说明:
|
||||
// - proxy: 按代理地址隔离,同一代理共享客户端
|
||||
// - account: 按账户隔离,同一账户共享客户端(代理变更时重建)
|
||||
// - account_proxy: 按账户+代理组合隔离,最细粒度
|
||||
func (s *httpUpstreamService) getOrCreateClient(proxyURL string, accountID int64, accountConcurrency int) *upstreamClientEntry {
|
||||
return s.getClientEntry(proxyURL, accountID, accountConcurrency, false)
|
||||
}
|
||||
|
||||
// getClientEntry 获取或创建客户端条目
|
||||
// markInFlight=true 时会标记进行中请求,用于请求路径防止被淘汰
|
||||
func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, markInFlight bool) *upstreamClientEntry {
|
||||
// 获取隔离模式
|
||||
isolation := s.getIsolationMode()
|
||||
// 标准化代理 URL 并解析
|
||||
proxyKey, parsedProxy := normalizeProxyURL(proxyURL)
|
||||
// 构建缓存键(根据隔离策略不同)
|
||||
cacheKey := buildCacheKey(isolation, proxyKey, accountID)
|
||||
// 构建连接池配置键(用于检测配置变更)
|
||||
poolKey := s.buildPoolKey(isolation, accountConcurrency)
|
||||
|
||||
now := time.Now()
|
||||
nowUnix := now.UnixNano()
|
||||
|
||||
// 读锁快速路径:命中缓存直接返回,减少锁竞争
|
||||
s.mu.RLock()
|
||||
if entry, ok := s.clients[cacheKey]; ok && s.shouldReuseEntry(entry, isolation, proxyKey, poolKey) {
|
||||
atomic.StoreInt64(&entry.lastUsed, nowUnix)
|
||||
if markInFlight {
|
||||
atomic.AddInt64(&entry.inFlight, 1)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
return entry
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
// 写锁慢路径:创建或重建客户端
|
||||
s.mu.Lock()
|
||||
if entry, ok := s.clients[cacheKey]; ok {
|
||||
if s.shouldReuseEntry(entry, isolation, proxyKey, poolKey) {
|
||||
atomic.StoreInt64(&entry.lastUsed, nowUnix)
|
||||
if markInFlight {
|
||||
atomic.AddInt64(&entry.inFlight, 1)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return entry
|
||||
}
|
||||
s.removeClientLocked(cacheKey, entry)
|
||||
}
|
||||
|
||||
// 缓存未命中或需要重建,创建新客户端
|
||||
settings := s.resolvePoolSettings(isolation, accountConcurrency)
|
||||
client := &http.Client{Transport: buildUpstreamTransport(settings, parsedProxy)}
|
||||
entry := &upstreamClientEntry{
|
||||
client: client,
|
||||
proxyKey: proxyKey,
|
||||
poolKey: poolKey,
|
||||
}
|
||||
atomic.StoreInt64(&entry.lastUsed, nowUnix)
|
||||
if markInFlight {
|
||||
atomic.StoreInt64(&entry.inFlight, 1)
|
||||
}
|
||||
s.clients[cacheKey] = entry
|
||||
|
||||
// 执行淘汰策略:先淘汰空闲超时的,再淘汰超出数量限制的
|
||||
s.evictIdleLocked(now)
|
||||
s.evictOverLimitLocked()
|
||||
s.mu.Unlock()
|
||||
return entry
|
||||
}
|
||||
|
||||
// shouldReuseEntry 判断缓存条目是否可复用
|
||||
// 若代理或连接池配置发生变化,则需要重建客户端
|
||||
func (s *httpUpstreamService) shouldReuseEntry(entry *upstreamClientEntry, isolation, proxyKey, poolKey string) bool {
|
||||
if entry == nil {
|
||||
return false
|
||||
}
|
||||
if isolation == config.ConnectionPoolIsolationAccount && entry.proxyKey != proxyKey {
|
||||
return false
|
||||
}
|
||||
if entry.poolKey != poolKey {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// removeClientLocked 移除客户端(需持有锁)
|
||||
// 从缓存中删除并关闭空闲连接
|
||||
//
|
||||
// 参数:
|
||||
// - key: 缓存键
|
||||
// - entry: 客户端条目
|
||||
func (s *httpUpstreamService) removeClientLocked(key string, entry *upstreamClientEntry) {
|
||||
delete(s.clients, key)
|
||||
if entry != nil && entry.client != nil {
|
||||
// 关闭空闲连接,释放系统资源
|
||||
// 注意:这不会中断活跃连接
|
||||
entry.client.CloseIdleConnections()
|
||||
}
|
||||
}
|
||||
|
||||
// evictIdleLocked 淘汰空闲超时的客户端(需持有锁)
|
||||
// 遍历所有客户端,移除超过 TTL 且无活跃请求的条目
|
||||
//
|
||||
// 参数:
|
||||
// - now: 当前时间
|
||||
func (s *httpUpstreamService) evictIdleLocked(now time.Time) {
|
||||
ttl := s.clientIdleTTL()
|
||||
if ttl <= 0 {
|
||||
return
|
||||
}
|
||||
// 计算淘汰截止时间
|
||||
cutoff := now.Add(-ttl).UnixNano()
|
||||
for key, entry := range s.clients {
|
||||
// 跳过有活跃请求的客户端
|
||||
if atomic.LoadInt64(&entry.inFlight) != 0 {
|
||||
continue
|
||||
}
|
||||
// 淘汰超时的空闲客户端
|
||||
if atomic.LoadInt64(&entry.lastUsed) <= cutoff {
|
||||
s.removeClientLocked(key, entry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// evictOverLimitLocked 淘汰超出数量限制的客户端(需持有锁)
|
||||
// 使用 LRU 策略,优先淘汰最久未使用且无活跃请求的客户端
|
||||
func (s *httpUpstreamService) evictOverLimitLocked() {
|
||||
maxClients := s.maxUpstreamClients()
|
||||
if maxClients <= 0 {
|
||||
return
|
||||
}
|
||||
// 循环淘汰直到满足数量限制
|
||||
for len(s.clients) > maxClients {
|
||||
var (
|
||||
oldestKey string
|
||||
oldestEntry *upstreamClientEntry
|
||||
oldestTime int64
|
||||
)
|
||||
// 查找最久未使用且无活跃请求的客户端
|
||||
for key, entry := range s.clients {
|
||||
// 跳过有活跃请求的客户端
|
||||
if atomic.LoadInt64(&entry.inFlight) != 0 {
|
||||
continue
|
||||
}
|
||||
lastUsed := atomic.LoadInt64(&entry.lastUsed)
|
||||
if oldestEntry == nil || lastUsed < oldestTime {
|
||||
oldestKey = key
|
||||
oldestEntry = entry
|
||||
oldestTime = lastUsed
|
||||
}
|
||||
}
|
||||
// 所有客户端都有活跃请求,无法淘汰
|
||||
if oldestEntry == nil {
|
||||
return
|
||||
}
|
||||
s.removeClientLocked(oldestKey, oldestEntry)
|
||||
}
|
||||
}
|
||||
|
||||
// getIsolationMode 获取连接池隔离模式
|
||||
// 从配置中读取,无效值回退到 account_proxy 模式
|
||||
//
|
||||
// 返回:
|
||||
// - string: 隔离模式(proxy/account/account_proxy)
|
||||
func (s *httpUpstreamService) getIsolationMode() string {
|
||||
if s.cfg == nil {
|
||||
return config.ConnectionPoolIsolationAccountProxy
|
||||
}
|
||||
mode := strings.ToLower(strings.TrimSpace(s.cfg.Gateway.ConnectionPoolIsolation))
|
||||
if mode == "" {
|
||||
return config.ConnectionPoolIsolationAccountProxy
|
||||
}
|
||||
switch mode {
|
||||
case config.ConnectionPoolIsolationProxy, config.ConnectionPoolIsolationAccount, config.ConnectionPoolIsolationAccountProxy:
|
||||
return mode
|
||||
default:
|
||||
return config.ConnectionPoolIsolationAccountProxy
|
||||
}
|
||||
}
|
||||
|
||||
// maxUpstreamClients 获取最大客户端缓存数量
|
||||
// 从配置中读取,无效值使用默认值
|
||||
func (s *httpUpstreamService) maxUpstreamClients() int {
|
||||
if s.cfg == nil {
|
||||
return defaultMaxUpstreamClients
|
||||
}
|
||||
if s.cfg.Gateway.MaxUpstreamClients > 0 {
|
||||
return s.cfg.Gateway.MaxUpstreamClients
|
||||
}
|
||||
return defaultMaxUpstreamClients
|
||||
}
|
||||
|
||||
// clientIdleTTL 获取客户端空闲回收阈值
|
||||
// 从配置中读取,无效值使用默认值
|
||||
func (s *httpUpstreamService) clientIdleTTL() time.Duration {
|
||||
if s.cfg == nil {
|
||||
return time.Duration(defaultClientIdleTTLSeconds) * time.Second
|
||||
}
|
||||
if s.cfg.Gateway.ClientIdleTTLSeconds > 0 {
|
||||
return time.Duration(s.cfg.Gateway.ClientIdleTTLSeconds) * time.Second
|
||||
}
|
||||
return time.Duration(defaultClientIdleTTLSeconds) * time.Second
|
||||
}
|
||||
|
||||
// resolvePoolSettings 解析连接池配置
|
||||
// 根据隔离策略和账户并发数动态调整连接池参数
|
||||
//
|
||||
// 参数:
|
||||
// - isolation: 隔离模式
|
||||
// - accountConcurrency: 账户并发限制
|
||||
//
|
||||
// 返回:
|
||||
// - poolSettings: 连接池配置
|
||||
//
|
||||
// 说明:
|
||||
// - 账户隔离模式下,连接池大小与账户并发数对应
|
||||
// - 这确保了单账户不会占用过多连接资源
|
||||
func (s *httpUpstreamService) resolvePoolSettings(isolation string, accountConcurrency int) poolSettings {
|
||||
settings := defaultPoolSettings(s.cfg)
|
||||
// 账户隔离模式下,根据账户并发数调整连接池大小
|
||||
if (isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy) && accountConcurrency > 0 {
|
||||
settings.maxIdleConns = accountConcurrency
|
||||
settings.maxIdleConnsPerHost = accountConcurrency
|
||||
settings.maxConnsPerHost = accountConcurrency
|
||||
}
|
||||
return settings
|
||||
}
|
||||
|
||||
// buildPoolKey 构建连接池配置键
|
||||
// 用于检测配置变更,配置变更时需要重建客户端
|
||||
//
|
||||
// 参数:
|
||||
// - isolation: 隔离模式
|
||||
// - accountConcurrency: 账户并发限制
|
||||
//
|
||||
// 返回:
|
||||
// - string: 配置键
|
||||
func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency int) string {
|
||||
if isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy {
|
||||
if accountConcurrency > 0 {
|
||||
return fmt.Sprintf("account:%d", accountConcurrency)
|
||||
}
|
||||
}
|
||||
return "default"
|
||||
}
|
||||
|
||||
// buildCacheKey 构建客户端缓存键
|
||||
// 根据隔离策略决定缓存键的组成
|
||||
//
|
||||
// 参数:
|
||||
// - isolation: 隔离模式
|
||||
// - proxyKey: 代理标识
|
||||
// - accountID: 账户 ID
|
||||
//
|
||||
// 返回:
|
||||
// - string: 缓存键
|
||||
//
|
||||
// 缓存键格式:
|
||||
// - proxy 模式: "proxy:{proxyKey}"
|
||||
// - account 模式: "account:{accountID}"
|
||||
// - account_proxy 模式: "account:{accountID}|proxy:{proxyKey}"
|
||||
func buildCacheKey(isolation, proxyKey string, accountID int64) string {
|
||||
switch isolation {
|
||||
case config.ConnectionPoolIsolationAccount:
|
||||
return fmt.Sprintf("account:%d", accountID)
|
||||
case config.ConnectionPoolIsolationAccountProxy:
|
||||
return fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey)
|
||||
default:
|
||||
return fmt.Sprintf("proxy:%s", proxyKey)
|
||||
}
|
||||
}
|
||||
|
||||
// normalizeProxyURL 标准化代理 URL
|
||||
// 处理空值和解析错误,返回标准化的键和解析后的 URL
|
||||
//
|
||||
// 参数:
|
||||
// - raw: 原始代理 URL 字符串
|
||||
//
|
||||
// 返回:
|
||||
// - string: 标准化的代理键(空或解析失败返回 "direct")
|
||||
// - *url.URL: 解析后的 URL(空或解析失败返回 nil)
|
||||
func normalizeProxyURL(raw string) (string, *url.URL) {
|
||||
proxyURL := strings.TrimSpace(raw)
|
||||
if proxyURL == "" {
|
||||
return directProxyKey, nil
|
||||
}
|
||||
parsed, err := url.Parse(proxyURL)
|
||||
if err != nil {
|
||||
return directProxyKey, nil
|
||||
}
|
||||
return proxyURL, parsed
|
||||
}
|
||||
|
||||
// defaultPoolSettings 获取默认连接池配置
|
||||
// 从全局配置中读取,无效值使用常量默认值
|
||||
//
|
||||
// 参数:
|
||||
// - cfg: 全局配置
|
||||
//
|
||||
// 返回:
|
||||
// - poolSettings: 连接池配置
|
||||
func defaultPoolSettings(cfg *config.Config) poolSettings {
|
||||
maxIdleConns := defaultMaxIdleConns
|
||||
maxIdleConnsPerHost := defaultMaxIdleConnsPerHost
|
||||
maxConnsPerHost := defaultMaxConnsPerHost
|
||||
idleConnTimeout := defaultIdleConnTimeout
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeout
|
||||
|
||||
if cfg != nil {
|
||||
if cfg.Gateway.MaxIdleConns > 0 {
|
||||
maxIdleConns = cfg.Gateway.MaxIdleConns
|
||||
}
|
||||
if cfg.Gateway.MaxIdleConnsPerHost > 0 {
|
||||
maxIdleConnsPerHost = cfg.Gateway.MaxIdleConnsPerHost
|
||||
}
|
||||
if cfg.Gateway.MaxConnsPerHost >= 0 {
|
||||
maxConnsPerHost = cfg.Gateway.MaxConnsPerHost
|
||||
}
|
||||
if cfg.Gateway.IdleConnTimeoutSeconds > 0 {
|
||||
idleConnTimeout = time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second
|
||||
}
|
||||
if cfg.Gateway.ResponseHeaderTimeout > 0 {
|
||||
responseHeaderTimeout = time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||
}
|
||||
}
|
||||
|
||||
return poolSettings{
|
||||
maxIdleConns: maxIdleConns,
|
||||
maxIdleConnsPerHost: maxIdleConnsPerHost,
|
||||
maxConnsPerHost: maxConnsPerHost,
|
||||
idleConnTimeout: idleConnTimeout,
|
||||
responseHeaderTimeout: responseHeaderTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
// buildUpstreamTransport 构建上游请求的 Transport
|
||||
// 使用配置文件中的连接池参数,支持生产环境调优
|
||||
func buildUpstreamTransport(cfg *config.Config, proxyURL *url.URL) *http.Transport {
|
||||
// 读取配置,使用合理的默认值
|
||||
maxIdleConns := cfg.Gateway.MaxIdleConns
|
||||
if maxIdleConns <= 0 {
|
||||
maxIdleConns = 240
|
||||
}
|
||||
maxIdleConnsPerHost := cfg.Gateway.MaxIdleConnsPerHost
|
||||
if maxIdleConnsPerHost <= 0 {
|
||||
maxIdleConnsPerHost = 120
|
||||
}
|
||||
maxConnsPerHost := cfg.Gateway.MaxConnsPerHost
|
||||
if maxConnsPerHost < 0 {
|
||||
maxConnsPerHost = 240
|
||||
}
|
||||
idleConnTimeout := time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second
|
||||
if idleConnTimeout <= 0 {
|
||||
idleConnTimeout = 300 * time.Second
|
||||
}
|
||||
responseHeaderTimeout := time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||
if responseHeaderTimeout <= 0 {
|
||||
responseHeaderTimeout = 300 * time.Second
|
||||
}
|
||||
|
||||
//
|
||||
// 参数:
|
||||
// - settings: 连接池配置
|
||||
// - proxyURL: 代理 URL(nil 表示直连)
|
||||
//
|
||||
// 返回:
|
||||
// - *http.Transport: 配置好的 Transport 实例
|
||||
//
|
||||
// Transport 参数说明:
|
||||
// - MaxIdleConns: 所有主机的最大空闲连接总数
|
||||
// - MaxIdleConnsPerHost: 每主机最大空闲连接数(影响连接复用率)
|
||||
// - MaxConnsPerHost: 每主机最大连接数(达到后新请求等待)
|
||||
// - IdleConnTimeout: 空闲连接超时(超时后关闭)
|
||||
// - ResponseHeaderTimeout: 等待响应头超时(不影响流式传输)
|
||||
func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) *http.Transport {
|
||||
transport := &http.Transport{
|
||||
MaxIdleConns: maxIdleConns, // 最大空闲连接总数
|
||||
MaxIdleConnsPerHost: maxIdleConnsPerHost, // 每主机最大空闲连接
|
||||
MaxConnsPerHost: maxConnsPerHost, // 每主机最大连接数(含活跃)
|
||||
IdleConnTimeout: idleConnTimeout, // 空闲连接超时
|
||||
ResponseHeaderTimeout: responseHeaderTimeout,
|
||||
MaxIdleConns: settings.maxIdleConns,
|
||||
MaxIdleConnsPerHost: settings.maxIdleConnsPerHost,
|
||||
MaxConnsPerHost: settings.maxConnsPerHost,
|
||||
IdleConnTimeout: settings.idleConnTimeout,
|
||||
ResponseHeaderTimeout: settings.responseHeaderTimeout,
|
||||
}
|
||||
if proxyURL != nil {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
return transport
|
||||
}
|
||||
|
||||
// trackedBody 带跟踪功能的响应体包装器
|
||||
// 在 Close 时执行回调,用于更新请求计数
|
||||
type trackedBody struct {
|
||||
io.ReadCloser // 原始响应体
|
||||
once sync.Once
|
||||
onClose func() // 关闭时的回调函数
|
||||
}
|
||||
|
||||
// Close 关闭响应体并执行回调
|
||||
// 使用 sync.Once 确保回调只执行一次
|
||||
func (b *trackedBody) Close() error {
|
||||
err := b.ReadCloser.Close()
|
||||
if b.onClose != nil {
|
||||
b.once.Do(b.onClose)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// wrapTrackedBody 包装响应体以跟踪关闭事件
|
||||
// 用于在响应体关闭时更新 inFlight 计数
|
||||
//
|
||||
// 参数:
|
||||
// - body: 原始响应体
|
||||
// - onClose: 关闭时的回调函数
|
||||
//
|
||||
// 返回:
|
||||
// - io.ReadCloser: 包装后的响应体
|
||||
func wrapTrackedBody(body io.ReadCloser, onClose func()) io.ReadCloser {
|
||||
if body == nil {
|
||||
return body
|
||||
}
|
||||
return &trackedBody{ReadCloser: body, onClose: onClose}
|
||||
}
|
||||
|
||||
@@ -8,10 +8,21 @@ import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
)
|
||||
|
||||
// httpClientSink 用于防止编译器优化掉基准测试中的赋值操作
|
||||
// 这是 Go 基准测试的常见模式,确保测试结果准确
|
||||
var httpClientSink *http.Client
|
||||
|
||||
// BenchmarkHTTPUpstreamProxyClient 对比重复创建与复用代理客户端的开销。
|
||||
// BenchmarkHTTPUpstreamProxyClient 对比重复创建与复用代理客户端的开销
|
||||
//
|
||||
// 测试目的:
|
||||
// - 验证连接池复用相比每次新建的性能提升
|
||||
// - 量化内存分配差异
|
||||
//
|
||||
// 预期结果:
|
||||
// - "复用" 子测试应显著快于 "新建"
|
||||
// - "复用" 子测试应零内存分配
|
||||
func BenchmarkHTTPUpstreamProxyClient(b *testing.B) {
|
||||
// 创建测试配置
|
||||
cfg := &config.Config{
|
||||
Gateway: config.GatewayConfig{ResponseHeaderTimeout: 300},
|
||||
}
|
||||
@@ -22,24 +33,33 @@ func BenchmarkHTTPUpstreamProxyClient(b *testing.B) {
|
||||
}
|
||||
|
||||
proxyURL := "http://127.0.0.1:8080"
|
||||
b.ReportAllocs()
|
||||
b.ReportAllocs() // 报告内存分配统计
|
||||
|
||||
// 子测试:每次新建客户端
|
||||
// 模拟未优化前的行为,每次请求都创建新的 http.Client
|
||||
b.Run("新建", func(b *testing.B) {
|
||||
parsedProxy, err := url.Parse(proxyURL)
|
||||
if err != nil {
|
||||
b.Fatalf("解析代理地址失败: %v", err)
|
||||
}
|
||||
settings := defaultPoolSettings(cfg)
|
||||
for i := 0; i < b.N; i++ {
|
||||
// 每次迭代都创建新客户端,包含 Transport 分配
|
||||
httpClientSink = &http.Client{
|
||||
Transport: buildUpstreamTransport(cfg, parsedProxy),
|
||||
Transport: buildUpstreamTransport(settings, parsedProxy),
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// 子测试:复用已缓存的客户端
|
||||
// 模拟优化后的行为,从缓存获取客户端
|
||||
b.Run("复用", func(b *testing.B) {
|
||||
client := svc.getOrCreateClient(proxyURL)
|
||||
b.ResetTimer()
|
||||
// 预热:确保客户端已缓存
|
||||
entry := svc.getOrCreateClient(proxyURL, 1, 1)
|
||||
client := entry.client
|
||||
b.ResetTimer() // 重置计时器,排除预热时间
|
||||
for i := 0; i < b.N; i++ {
|
||||
// 直接使用缓存的客户端,无内存分配
|
||||
httpClientSink = client
|
||||
}
|
||||
})
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -12,45 +13,61 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
// HTTPUpstreamSuite HTTP 上游服务测试套件
|
||||
// 使用 testify/suite 组织测试,支持 SetupTest 初始化
|
||||
type HTTPUpstreamSuite struct {
|
||||
suite.Suite
|
||||
cfg *config.Config
|
||||
cfg *config.Config // 测试用配置
|
||||
}
|
||||
|
||||
// SetupTest 每个测试用例执行前的初始化
|
||||
// 创建空配置,各测试用例可按需覆盖
|
||||
func (s *HTTPUpstreamSuite) SetupTest() {
|
||||
s.cfg = &config.Config{}
|
||||
}
|
||||
|
||||
func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() {
|
||||
// newService 创建测试用的 httpUpstreamService 实例
|
||||
// 返回具体类型以便访问内部状态进行断言
|
||||
func (s *HTTPUpstreamSuite) newService() *httpUpstreamService {
|
||||
up := NewHTTPUpstream(s.cfg)
|
||||
svc, ok := up.(*httpUpstreamService)
|
||||
require.True(s.T(), ok, "expected *httpUpstreamService")
|
||||
transport, ok := svc.defaultClient.Transport.(*http.Transport)
|
||||
return svc
|
||||
}
|
||||
|
||||
// TestDefaultResponseHeaderTimeout 测试默认响应头超时配置
|
||||
// 验证未配置时使用 300 秒默认值
|
||||
func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() {
|
||||
svc := s.newService()
|
||||
entry := svc.getOrCreateClient("", 0, 0)
|
||||
transport, ok := entry.client.Transport.(*http.Transport)
|
||||
require.True(s.T(), ok, "expected *http.Transport")
|
||||
require.Equal(s.T(), 300*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch")
|
||||
}
|
||||
|
||||
// TestCustomResponseHeaderTimeout 测试自定义响应头超时配置
|
||||
// 验证配置值能正确应用到 Transport
|
||||
func (s *HTTPUpstreamSuite) TestCustomResponseHeaderTimeout() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 7}
|
||||
up := NewHTTPUpstream(s.cfg)
|
||||
svc, ok := up.(*httpUpstreamService)
|
||||
require.True(s.T(), ok, "expected *httpUpstreamService")
|
||||
transport, ok := svc.defaultClient.Transport.(*http.Transport)
|
||||
svc := s.newService()
|
||||
entry := svc.getOrCreateClient("", 0, 0)
|
||||
transport, ok := entry.client.Transport.(*http.Transport)
|
||||
require.True(s.T(), ok, "expected *http.Transport")
|
||||
require.Equal(s.T(), 7*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch")
|
||||
}
|
||||
|
||||
func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLFallsBackToDefault() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 5}
|
||||
up := NewHTTPUpstream(s.cfg)
|
||||
svc, ok := up.(*httpUpstreamService)
|
||||
require.True(s.T(), ok, "expected *httpUpstreamService")
|
||||
|
||||
got := svc.getOrCreateClient("://bad-proxy-url")
|
||||
require.Equal(s.T(), svc.defaultClient, got, "expected defaultClient fallback")
|
||||
// TestGetOrCreateClient_InvalidURLFallsBackToDirect 测试无效代理 URL 回退
|
||||
// 验证解析失败时回退到直连模式
|
||||
func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLFallsBackToDirect() {
|
||||
svc := s.newService()
|
||||
entry := svc.getOrCreateClient("://bad-proxy-url", 1, 1)
|
||||
require.Equal(s.T(), directProxyKey, entry.proxyKey, "expected direct proxy fallback")
|
||||
}
|
||||
|
||||
// TestDo_WithoutProxy_GoesDirect 测试无代理时直连
|
||||
// 验证空代理 URL 时请求直接发送到目标服务器
|
||||
func (s *HTTPUpstreamSuite) TestDo_WithoutProxy_GoesDirect() {
|
||||
// 创建模拟上游服务器
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.WriteString(w, "direct")
|
||||
}))
|
||||
@@ -60,17 +77,21 @@ func (s *HTTPUpstreamSuite) TestDo_WithoutProxy_GoesDirect() {
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, upstream.URL+"/x", nil)
|
||||
require.NoError(s.T(), err, "NewRequest")
|
||||
resp, err := up.Do(req, "")
|
||||
resp, err := up.Do(req, "", 1, 1)
|
||||
require.NoError(s.T(), err, "Do")
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
require.Equal(s.T(), "direct", string(b), "unexpected body")
|
||||
}
|
||||
|
||||
// TestDo_WithHTTPProxy_UsesProxy 测试 HTTP 代理功能
|
||||
// 验证请求通过代理服务器转发,使用绝对 URI 格式
|
||||
func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() {
|
||||
// 用于接收代理请求的通道
|
||||
seen := make(chan string, 1)
|
||||
// 创建模拟代理服务器
|
||||
proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
seen <- r.RequestURI
|
||||
seen <- r.RequestURI // 记录请求 URI
|
||||
_, _ = io.WriteString(w, "proxied")
|
||||
}))
|
||||
s.T().Cleanup(proxySrv.Close)
|
||||
@@ -78,14 +99,16 @@ func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ResponseHeaderTimeout: 1}
|
||||
up := NewHTTPUpstream(s.cfg)
|
||||
|
||||
// 发送请求到外部地址,应通过代理
|
||||
req, err := http.NewRequest(http.MethodGet, "http://example.com/test", nil)
|
||||
require.NoError(s.T(), err, "NewRequest")
|
||||
resp, err := up.Do(req, proxySrv.URL)
|
||||
resp, err := up.Do(req, proxySrv.URL, 1, 1)
|
||||
require.NoError(s.T(), err, "Do")
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
require.Equal(s.T(), "proxied", string(b), "unexpected body")
|
||||
|
||||
// 验证代理收到的是绝对 URI 格式(HTTP 代理规范要求)
|
||||
select {
|
||||
case uri := <-seen:
|
||||
require.Equal(s.T(), "http://example.com/test", uri, "expected absolute-form request URI")
|
||||
@@ -94,6 +117,8 @@ func (s *HTTPUpstreamSuite) TestDo_WithHTTPProxy_UsesProxy() {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDo_EmptyProxy_UsesDirect 测试空代理字符串
|
||||
// 验证空字符串代理等同于直连
|
||||
func (s *HTTPUpstreamSuite) TestDo_EmptyProxy_UsesDirect() {
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.WriteString(w, "direct-empty")
|
||||
@@ -103,13 +128,134 @@ func (s *HTTPUpstreamSuite) TestDo_EmptyProxy_UsesDirect() {
|
||||
up := NewHTTPUpstream(s.cfg)
|
||||
req, err := http.NewRequest(http.MethodGet, upstream.URL+"/y", nil)
|
||||
require.NoError(s.T(), err, "NewRequest")
|
||||
resp, err := up.Do(req, "")
|
||||
resp, err := up.Do(req, "", 1, 1)
|
||||
require.NoError(s.T(), err, "Do with empty proxy")
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
require.Equal(s.T(), "direct-empty", string(b))
|
||||
}
|
||||
|
||||
// TestAccountIsolation_DifferentAccounts 测试账户隔离模式
|
||||
// 验证不同账户使用独立的连接池
|
||||
func (s *HTTPUpstreamSuite) TestAccountIsolation_DifferentAccounts() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount}
|
||||
svc := s.newService()
|
||||
// 同一代理,不同账户
|
||||
entry1 := svc.getOrCreateClient("http://proxy.local:8080", 1, 3)
|
||||
entry2 := svc.getOrCreateClient("http://proxy.local:8080", 2, 3)
|
||||
require.NotSame(s.T(), entry1, entry2, "不同账号不应共享连接池")
|
||||
require.Equal(s.T(), 2, len(svc.clients), "账号隔离应缓存两个客户端")
|
||||
}
|
||||
|
||||
// TestAccountProxyIsolation_DifferentProxy 测试账户+代理组合隔离模式
|
||||
// 验证同一账户使用不同代理时创建独立连接池
|
||||
func (s *HTTPUpstreamSuite) TestAccountProxyIsolation_DifferentProxy() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccountProxy}
|
||||
svc := s.newService()
|
||||
// 同一账户,不同代理
|
||||
entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 3)
|
||||
entry2 := svc.getOrCreateClient("http://proxy-b:8080", 1, 3)
|
||||
require.NotSame(s.T(), entry1, entry2, "账号+代理隔离应区分不同代理")
|
||||
require.Equal(s.T(), 2, len(svc.clients), "账号+代理隔离应缓存两个客户端")
|
||||
}
|
||||
|
||||
// TestAccountModeProxyChangeClearsPool 测试账户模式下代理变更
|
||||
// 验证账户切换代理时清理旧连接池,避免复用错误代理
|
||||
func (s *HTTPUpstreamSuite) TestAccountModeProxyChangeClearsPool() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount}
|
||||
svc := s.newService()
|
||||
// 同一账户,先后使用不同代理
|
||||
entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 3)
|
||||
entry2 := svc.getOrCreateClient("http://proxy-b:8080", 1, 3)
|
||||
require.NotSame(s.T(), entry1, entry2, "账号切换代理应创建新连接池")
|
||||
require.Equal(s.T(), 1, len(svc.clients), "账号模式下应仅保留一个连接池")
|
||||
require.False(s.T(), hasEntry(svc, entry1), "旧连接池应被清理")
|
||||
}
|
||||
|
||||
// TestAccountConcurrencyOverridesPoolSettings 测试账户并发数覆盖连接池配置
|
||||
// 验证账户隔离模式下,连接池大小与账户并发数对应
|
||||
func (s *HTTPUpstreamSuite) TestAccountConcurrencyOverridesPoolSettings() {
|
||||
s.cfg.Gateway = config.GatewayConfig{ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount}
|
||||
svc := s.newService()
|
||||
// 账户并发数为 12
|
||||
entry := svc.getOrCreateClient("", 1, 12)
|
||||
transport, ok := entry.client.Transport.(*http.Transport)
|
||||
require.True(s.T(), ok, "expected *http.Transport")
|
||||
// 连接池参数应与并发数一致
|
||||
require.Equal(s.T(), 12, transport.MaxConnsPerHost, "MaxConnsPerHost mismatch")
|
||||
require.Equal(s.T(), 12, transport.MaxIdleConns, "MaxIdleConns mismatch")
|
||||
require.Equal(s.T(), 12, transport.MaxIdleConnsPerHost, "MaxIdleConnsPerHost mismatch")
|
||||
}
|
||||
|
||||
// TestAccountConcurrencyFallbackToDefault 测试账户并发数为 0 时回退到默认配置
|
||||
// 验证未指定并发数时使用全局配置值
|
||||
func (s *HTTPUpstreamSuite) TestAccountConcurrencyFallbackToDefault() {
|
||||
s.cfg.Gateway = config.GatewayConfig{
|
||||
ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount,
|
||||
MaxIdleConns: 77,
|
||||
MaxIdleConnsPerHost: 55,
|
||||
MaxConnsPerHost: 66,
|
||||
}
|
||||
svc := s.newService()
|
||||
// 账户并发数为 0,应使用全局配置
|
||||
entry := svc.getOrCreateClient("", 1, 0)
|
||||
transport, ok := entry.client.Transport.(*http.Transport)
|
||||
require.True(s.T(), ok, "expected *http.Transport")
|
||||
require.Equal(s.T(), 66, transport.MaxConnsPerHost, "MaxConnsPerHost fallback mismatch")
|
||||
require.Equal(s.T(), 77, transport.MaxIdleConns, "MaxIdleConns fallback mismatch")
|
||||
require.Equal(s.T(), 55, transport.MaxIdleConnsPerHost, "MaxIdleConnsPerHost fallback mismatch")
|
||||
}
|
||||
|
||||
// TestEvictOverLimitRemovesOldestIdle 测试超出数量限制时的 LRU 淘汰
|
||||
// 验证优先淘汰最久未使用的空闲客户端
|
||||
func (s *HTTPUpstreamSuite) TestEvictOverLimitRemovesOldestIdle() {
|
||||
s.cfg.Gateway = config.GatewayConfig{
|
||||
ConnectionPoolIsolation: config.ConnectionPoolIsolationAccountProxy,
|
||||
MaxUpstreamClients: 2, // 最多缓存 2 个客户端
|
||||
}
|
||||
svc := s.newService()
|
||||
// 创建两个客户端,设置不同的最后使用时间
|
||||
entry1 := svc.getOrCreateClient("http://proxy-a:8080", 1, 1)
|
||||
entry2 := svc.getOrCreateClient("http://proxy-b:8080", 2, 1)
|
||||
atomic.StoreInt64(&entry1.lastUsed, time.Now().Add(-2*time.Hour).UnixNano()) // 最久
|
||||
atomic.StoreInt64(&entry2.lastUsed, time.Now().Add(-time.Hour).UnixNano())
|
||||
// 创建第三个客户端,触发淘汰
|
||||
_ = svc.getOrCreateClient("http://proxy-c:8080", 3, 1)
|
||||
|
||||
require.LessOrEqual(s.T(), len(svc.clients), 2, "应保持在缓存上限内")
|
||||
require.False(s.T(), hasEntry(svc, entry1), "最久未使用的连接池应被清理")
|
||||
}
|
||||
|
||||
// TestIdleTTLDoesNotEvictActive 测试活跃请求保护
|
||||
// 验证有进行中请求的客户端不会被空闲超时淘汰
|
||||
func (s *HTTPUpstreamSuite) TestIdleTTLDoesNotEvictActive() {
|
||||
s.cfg.Gateway = config.GatewayConfig{
|
||||
ConnectionPoolIsolation: config.ConnectionPoolIsolationAccount,
|
||||
ClientIdleTTLSeconds: 1, // 1 秒空闲超时
|
||||
}
|
||||
svc := s.newService()
|
||||
entry1 := svc.getOrCreateClient("", 1, 1)
|
||||
// 设置为很久之前使用,但有活跃请求
|
||||
atomic.StoreInt64(&entry1.lastUsed, time.Now().Add(-2*time.Minute).UnixNano())
|
||||
atomic.StoreInt64(&entry1.inFlight, 1) // 模拟有活跃请求
|
||||
// 创建新客户端,触发淘汰检查
|
||||
_ = svc.getOrCreateClient("", 2, 1)
|
||||
|
||||
require.True(s.T(), hasEntry(svc, entry1), "有活跃请求时不应回收")
|
||||
}
|
||||
|
||||
// TestHTTPUpstreamSuite 运行测试套件
|
||||
func TestHTTPUpstreamSuite(t *testing.T) {
|
||||
suite.Run(t, new(HTTPUpstreamSuite))
|
||||
}
|
||||
|
||||
// hasEntry 检查客户端是否存在于缓存中
|
||||
// 辅助函数,用于验证淘汰逻辑
|
||||
func hasEntry(svc *httpUpstreamService, target *upstreamClientEntry) bool {
|
||||
for _, entry := range svc.clients {
|
||||
if entry == target {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -256,7 +256,7 @@ func (s *AccountTestService) testClaudeAccountConnection(c *gin.Context, account
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error()))
|
||||
}
|
||||
@@ -371,7 +371,7 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error()))
|
||||
}
|
||||
@@ -442,7 +442,7 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error()))
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
upstreamReq.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
upstreamReq.Header.Set("User-Agent", antigravity.UserAgent)
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err)
|
||||
@@ -380,7 +380,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
upstreamReq.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
upstreamReq.Header.Set("User-Agent", antigravity.UserAgent)
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err)
|
||||
|
||||
@@ -644,7 +644,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upstream request failed: %w", err)
|
||||
}
|
||||
@@ -1308,7 +1308,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed")
|
||||
return fmt.Errorf("upstream request failed: %w", err)
|
||||
|
||||
@@ -472,7 +472,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
|
||||
}
|
||||
requestIDHeader = idHeader
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
if attempt < geminiMaxRetries {
|
||||
log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err)
|
||||
@@ -725,7 +725,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
|
||||
}
|
||||
requestIDHeader = idHeader
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
if attempt < geminiMaxRetries {
|
||||
log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err)
|
||||
@@ -1756,7 +1756,7 @@ func (s *GeminiMessagesCompatService) ForwardAIStudioGET(ctx context.Context, ac
|
||||
return nil, fmt.Errorf("unsupported account type: %s", account.Type)
|
||||
}
|
||||
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -2,8 +2,29 @@ package service
|
||||
|
||||
import "net/http"
|
||||
|
||||
// HTTPUpstream interface for making HTTP requests to upstream APIs (Claude, OpenAI, etc.)
|
||||
// This is a generic interface that can be used for any HTTP-based upstream service.
|
||||
// HTTPUpstream 上游 HTTP 请求接口
|
||||
// 用于向上游 API(Claude、OpenAI、Gemini 等)发送请求
|
||||
// 这是一个通用接口,可用于任何基于 HTTP 的上游服务
|
||||
//
|
||||
// 设计说明:
|
||||
// - 支持可选代理配置
|
||||
// - 支持账户级连接池隔离
|
||||
// - 实现类负责连接池管理和复用
|
||||
type HTTPUpstream interface {
|
||||
Do(req *http.Request, proxyURL string) (*http.Response, error)
|
||||
// Do 执行 HTTP 请求
|
||||
//
|
||||
// 参数:
|
||||
// - req: HTTP 请求对象,由调用方构建
|
||||
// - proxyURL: 代理服务器地址,空字符串表示直连
|
||||
// - accountID: 账户 ID,用于连接池隔离(隔离策略为 account 或 account_proxy 时生效)
|
||||
// - accountConcurrency: 账户并发限制,用于动态调整连接池大小
|
||||
//
|
||||
// 返回:
|
||||
// - *http.Response: HTTP 响应,调用方必须关闭 Body
|
||||
// - error: 请求错误(网络错误、超时等)
|
||||
//
|
||||
// 注意:
|
||||
// - 调用方必须关闭 resp.Body,否则会导致连接泄漏
|
||||
// - 响应体可能已被包装以跟踪请求生命周期
|
||||
Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error)
|
||||
}
|
||||
|
||||
@@ -311,7 +311,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
}
|
||||
|
||||
// Send request
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upstream request failed: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user