diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 94388a0d..330ae0c1 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -271,27 +271,30 @@ type SoraConfig struct { // SoraClientConfig 直连 Sora 客户端配置 type SoraClientConfig struct { - BaseURL string `mapstructure:"base_url"` - TimeoutSeconds int `mapstructure:"timeout_seconds"` - MaxRetries int `mapstructure:"max_retries"` - PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` - MaxPollAttempts int `mapstructure:"max_poll_attempts"` - RecentTaskLimit int `mapstructure:"recent_task_limit"` - RecentTaskLimitMax int `mapstructure:"recent_task_limit_max"` - Debug bool `mapstructure:"debug"` - UseOpenAITokenProvider bool `mapstructure:"use_openai_token_provider"` - Headers map[string]string `mapstructure:"headers"` - UserAgent string `mapstructure:"user_agent"` - DisableTLSFingerprint bool `mapstructure:"disable_tls_fingerprint"` - CurlCFFISidecar SoraCurlCFFISidecarConfig `mapstructure:"curl_cffi_sidecar"` + BaseURL string `mapstructure:"base_url"` + TimeoutSeconds int `mapstructure:"timeout_seconds"` + MaxRetries int `mapstructure:"max_retries"` + CloudflareChallengeCooldownSeconds int `mapstructure:"cloudflare_challenge_cooldown_seconds"` + PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` + MaxPollAttempts int `mapstructure:"max_poll_attempts"` + RecentTaskLimit int `mapstructure:"recent_task_limit"` + RecentTaskLimitMax int `mapstructure:"recent_task_limit_max"` + Debug bool `mapstructure:"debug"` + UseOpenAITokenProvider bool `mapstructure:"use_openai_token_provider"` + Headers map[string]string `mapstructure:"headers"` + UserAgent string `mapstructure:"user_agent"` + DisableTLSFingerprint bool `mapstructure:"disable_tls_fingerprint"` + CurlCFFISidecar SoraCurlCFFISidecarConfig `mapstructure:"curl_cffi_sidecar"` } // SoraCurlCFFISidecarConfig Sora 专用 curl_cffi sidecar 配置 type SoraCurlCFFISidecarConfig struct { - Enabled bool `mapstructure:"enabled"` - BaseURL string `mapstructure:"base_url"` - Impersonate string `mapstructure:"impersonate"` - TimeoutSeconds int `mapstructure:"timeout_seconds"` + Enabled bool `mapstructure:"enabled"` + BaseURL string `mapstructure:"base_url"` + Impersonate string `mapstructure:"impersonate"` + TimeoutSeconds int `mapstructure:"timeout_seconds"` + SessionReuseEnabled bool `mapstructure:"session_reuse_enabled"` + SessionTTLSeconds int `mapstructure:"session_ttl_seconds"` } // SoraStorageConfig 媒体存储配置 @@ -1123,6 +1126,7 @@ func setDefaults() { viper.SetDefault("sora.client.base_url", "https://sora.chatgpt.com/backend") viper.SetDefault("sora.client.timeout_seconds", 120) viper.SetDefault("sora.client.max_retries", 3) + viper.SetDefault("sora.client.cloudflare_challenge_cooldown_seconds", 900) viper.SetDefault("sora.client.poll_interval_seconds", 2) viper.SetDefault("sora.client.max_poll_attempts", 600) viper.SetDefault("sora.client.recent_task_limit", 50) @@ -1136,6 +1140,8 @@ func setDefaults() { viper.SetDefault("sora.client.curl_cffi_sidecar.base_url", "http://sora-curl-cffi-sidecar:8080") viper.SetDefault("sora.client.curl_cffi_sidecar.impersonate", "chrome131") viper.SetDefault("sora.client.curl_cffi_sidecar.timeout_seconds", 60) + viper.SetDefault("sora.client.curl_cffi_sidecar.session_reuse_enabled", true) + viper.SetDefault("sora.client.curl_cffi_sidecar.session_ttl_seconds", 3600) viper.SetDefault("sora.storage.type", "local") viper.SetDefault("sora.storage.local_path", "") @@ -1523,6 +1529,9 @@ func (c *Config) Validate() error { if c.Sora.Client.MaxRetries < 0 { return fmt.Errorf("sora.client.max_retries must be non-negative") } + if c.Sora.Client.CloudflareChallengeCooldownSeconds < 0 { + return fmt.Errorf("sora.client.cloudflare_challenge_cooldown_seconds must be non-negative") + } if c.Sora.Client.PollIntervalSeconds < 0 { return fmt.Errorf("sora.client.poll_interval_seconds must be non-negative") } @@ -1542,6 +1551,9 @@ func (c *Config) Validate() error { if c.Sora.Client.CurlCFFISidecar.TimeoutSeconds < 0 { return fmt.Errorf("sora.client.curl_cffi_sidecar.timeout_seconds must be non-negative") } + if c.Sora.Client.CurlCFFISidecar.SessionTTLSeconds < 0 { + return fmt.Errorf("sora.client.curl_cffi_sidecar.session_ttl_seconds must be non-negative") + } if !c.Sora.Client.CurlCFFISidecar.Enabled { return fmt.Errorf("sora.client.curl_cffi_sidecar.enabled must be true") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index e81f70b0..dcc60879 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -1036,12 +1036,21 @@ func TestSoraCurlCFFISidecarDefaults(t *testing.T) { if !cfg.Sora.Client.CurlCFFISidecar.Enabled { t.Fatalf("Sora curl_cffi sidecar should be enabled by default") } + if cfg.Sora.Client.CloudflareChallengeCooldownSeconds <= 0 { + t.Fatalf("Sora cloudflare challenge cooldown should be positive by default") + } if cfg.Sora.Client.CurlCFFISidecar.BaseURL == "" { t.Fatalf("Sora curl_cffi sidecar base_url should not be empty by default") } if cfg.Sora.Client.CurlCFFISidecar.Impersonate == "" { t.Fatalf("Sora curl_cffi sidecar impersonate should not be empty by default") } + if !cfg.Sora.Client.CurlCFFISidecar.SessionReuseEnabled { + t.Fatalf("Sora curl_cffi sidecar session reuse should be enabled by default") + } + if cfg.Sora.Client.CurlCFFISidecar.SessionTTLSeconds <= 0 { + t.Fatalf("Sora curl_cffi sidecar session ttl should be positive by default") + } } func TestValidateSoraCurlCFFISidecarRequired(t *testing.T) { @@ -1073,3 +1082,33 @@ func TestValidateSoraCurlCFFISidecarBaseURLRequired(t *testing.T) { t.Fatalf("Validate() error = %v, want sidecar base_url required error", err) } } + +func TestValidateSoraCurlCFFISidecarSessionTTLNonNegative(t *testing.T) { + resetViperWithJWTSecret(t) + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.Sora.Client.CurlCFFISidecar.SessionTTLSeconds = -1 + err = cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "sora.client.curl_cffi_sidecar.session_ttl_seconds must be non-negative") { + t.Fatalf("Validate() error = %v, want sidecar session ttl error", err) + } +} + +func TestValidateSoraCloudflareChallengeCooldownNonNegative(t *testing.T) { + resetViperWithJWTSecret(t) + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.Sora.Client.CloudflareChallengeCooldownSeconds = -1 + err = cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "sora.client.cloudflare_challenge_cooldown_seconds must be non-negative") { + t.Fatalf("Validate() error = %v, want cloudflare cooldown error", err) + } +} diff --git a/backend/internal/handler/admin/admin_basic_handlers_test.go b/backend/internal/handler/admin/admin_basic_handlers_test.go index 20a25222..aeb4097f 100644 --- a/backend/internal/handler/admin/admin_basic_handlers_test.go +++ b/backend/internal/handler/admin/admin_basic_handlers_test.go @@ -47,6 +47,7 @@ func setupAdminRouter() (*gin.Engine, *stubAdminService) { router.DELETE("/api/v1/admin/proxies/:id", proxyHandler.Delete) router.POST("/api/v1/admin/proxies/batch-delete", proxyHandler.BatchDelete) router.POST("/api/v1/admin/proxies/:id/test", proxyHandler.Test) + router.POST("/api/v1/admin/proxies/:id/quality-check", proxyHandler.CheckQuality) router.GET("/api/v1/admin/proxies/:id/stats", proxyHandler.GetStats) router.GET("/api/v1/admin/proxies/:id/accounts", proxyHandler.GetProxyAccounts) @@ -208,6 +209,11 @@ func TestProxyHandlerEndpoints(t *testing.T) { router.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) + rec = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodPost, "/api/v1/admin/proxies/4/quality-check", nil) + router.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + rec = httptest.NewRecorder() req = httptest.NewRequest(http.MethodGet, "/api/v1/admin/proxies/4/stats", nil) router.ServeHTTP(rec, req) diff --git a/backend/internal/handler/admin/admin_service_stub_test.go b/backend/internal/handler/admin/admin_service_stub_test.go index d44c99ea..e9c9f43c 100644 --- a/backend/internal/handler/admin/admin_service_stub_test.go +++ b/backend/internal/handler/admin/admin_service_stub_test.go @@ -327,6 +327,27 @@ func (s *stubAdminService) TestProxy(ctx context.Context, id int64) (*service.Pr return &service.ProxyTestResult{Success: true, Message: "ok"}, nil } +func (s *stubAdminService) CheckProxyQuality(ctx context.Context, id int64) (*service.ProxyQualityCheckResult, error) { + return &service.ProxyQualityCheckResult{ + ProxyID: id, + Score: 95, + Grade: "A", + Summary: "通过 4 项,告警 0 项,失败 0 项,挑战 0 项", + PassedCount: 4, + WarnCount: 0, + FailedCount: 0, + ChallengeCount: 0, + CheckedAt: time.Now().Unix(), + Items: []service.ProxyQualityCheckItem{ + {Target: "base_connectivity", Status: "pass", Message: "ok"}, + {Target: "openai", Status: "pass", HTTPStatus: 401}, + {Target: "anthropic", Status: "pass", HTTPStatus: 401}, + {Target: "gemini", Status: "pass", HTTPStatus: 200}, + {Target: "sora", Status: "pass", HTTPStatus: 401}, + }, + }, nil +} + func (s *stubAdminService) ListRedeemCodes(ctx context.Context, page, pageSize int, codeType, status, search string) ([]service.RedeemCode, int64, error) { return s.redeems, int64(len(s.redeems)), nil } diff --git a/backend/internal/handler/admin/proxy_handler.go b/backend/internal/handler/admin/proxy_handler.go index a6758f69..5a9cd7a0 100644 --- a/backend/internal/handler/admin/proxy_handler.go +++ b/backend/internal/handler/admin/proxy_handler.go @@ -236,6 +236,24 @@ func (h *ProxyHandler) Test(c *gin.Context) { response.Success(c, result) } +// CheckQuality handles checking proxy quality across common AI targets. +// POST /api/v1/admin/proxies/:id/quality-check +func (h *ProxyHandler) CheckQuality(c *gin.Context) { + proxyID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "Invalid proxy ID") + return + } + + result, err := h.adminService.CheckProxyQuality(c.Request.Context(), proxyID) + if err != nil { + response.ErrorFrom(c, err) + return + } + + response.Success(c, result) +} + // GetStats handles getting proxy statistics // GET /api/v1/admin/proxies/:id/stats func (h *ProxyHandler) GetStats(c *gin.Context) { diff --git a/backend/internal/handler/sora_gateway_handler.go b/backend/internal/handler/sora_gateway_handler.go index 219922aa..90cc64d2 100644 --- a/backend/internal/handler/sora_gateway_handler.go +++ b/backend/internal/handler/sora_gateway_handler.go @@ -228,6 +228,20 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } + rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody) + fields := []zap.Field{ + zap.Int("last_upstream_status", lastFailoverStatus), + } + if rayID != "" { + fields = append(fields, zap.String("last_upstream_cf_ray", rayID)) + } + if mitigated != "" { + fields = append(fields, zap.String("last_upstream_cf_mitigated", mitigated)) + } + if contentType != "" { + fields = append(fields, zap.String("last_upstream_content_type", contentType)) + } + reqLog.Warn("sora.failover_exhausted_no_available_accounts", fields...) h.handleFailoverExhausted(c, lastFailoverStatus, lastFailoverHeaders, lastFailoverBody, streamStarted) return } @@ -291,24 +305,52 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) { failedAccountIDs[account.ID] = struct{}{} if switchCount >= maxAccountSwitches { lastFailoverStatus = failoverErr.StatusCode - lastFailoverHeaders = failoverErr.ResponseHeaders + lastFailoverHeaders = cloneHTTPHeaders(failoverErr.ResponseHeaders) lastFailoverBody = failoverErr.ResponseBody + rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody) + fields := []zap.Field{ + zap.Int64("account_id", account.ID), + zap.Int("upstream_status", failoverErr.StatusCode), + zap.Int("switch_count", switchCount), + zap.Int("max_switches", maxAccountSwitches), + } + if rayID != "" { + fields = append(fields, zap.String("upstream_cf_ray", rayID)) + } + if mitigated != "" { + fields = append(fields, zap.String("upstream_cf_mitigated", mitigated)) + } + if contentType != "" { + fields = append(fields, zap.String("upstream_content_type", contentType)) + } + reqLog.Warn("sora.upstream_failover_exhausted", fields...) h.handleFailoverExhausted(c, lastFailoverStatus, lastFailoverHeaders, lastFailoverBody, streamStarted) return } lastFailoverStatus = failoverErr.StatusCode - lastFailoverHeaders = failoverErr.ResponseHeaders + lastFailoverHeaders = cloneHTTPHeaders(failoverErr.ResponseHeaders) lastFailoverBody = failoverErr.ResponseBody switchCount++ upstreamErrCode, upstreamErrMsg := extractUpstreamErrorCodeAndMessage(lastFailoverBody) - reqLog.Warn("sora.upstream_failover_switching", + rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody) + fields := []zap.Field{ zap.Int64("account_id", account.ID), zap.Int("upstream_status", failoverErr.StatusCode), zap.String("upstream_error_code", upstreamErrCode), zap.String("upstream_error_message", upstreamErrMsg), zap.Int("switch_count", switchCount), zap.Int("max_switches", maxAccountSwitches), - ) + } + if rayID != "" { + fields = append(fields, zap.String("upstream_cf_ray", rayID)) + } + if mitigated != "" { + fields = append(fields, zap.String("upstream_cf_mitigated", mitigated)) + } + if contentType != "" { + fields = append(fields, zap.String("upstream_content_type", contentType)) + } + reqLog.Warn("sora.upstream_failover_switching", fields...) continue } reqLog.Error("sora.forward_failed", zap.Int64("account_id", account.ID), zap.Error(err)) @@ -417,6 +459,25 @@ func (h *SoraGatewayHandler) mapUpstreamError(statusCode int, responseHeaders ht } } +func cloneHTTPHeaders(headers http.Header) http.Header { + if headers == nil { + return nil + } + return headers.Clone() +} + +func extractSoraFailoverHeaderInsights(headers http.Header, body []byte) (rayID, mitigated, contentType string) { + if headers != nil { + mitigated = strings.TrimSpace(headers.Get("cf-mitigated")) + contentType = strings.TrimSpace(headers.Get("content-type")) + if contentType == "" { + contentType = strings.TrimSpace(headers.Get("Content-Type")) + } + } + rayID = soraerror.ExtractCloudflareRayID(headers, body) + return rayID, mitigated, contentType +} + func isSoraCloudflareChallengeResponse(statusCode int, headers http.Header, body []byte) bool { return soraerror.IsCloudflareChallengeResponse(statusCode, headers, body) } diff --git a/backend/internal/handler/sora_gateway_handler_test.go b/backend/internal/handler/sora_gateway_handler_test.go index 52ff0a96..3f6ef10e 100644 --- a/backend/internal/handler/sora_gateway_handler_test.go +++ b/backend/internal/handler/sora_gateway_handler_test.go @@ -674,3 +674,15 @@ func TestSoraHandleFailoverExhausted_CfShield429MappedToRateLimitError(t *testin require.Contains(t, msg, "Cloudflare shield") require.Contains(t, msg, "cf-ray: 9d03b68c086027a1-SEA") } + +func TestExtractSoraFailoverHeaderInsights(t *testing.T) { + headers := http.Header{} + headers.Set("cf-mitigated", "challenge") + headers.Set("content-type", "text/html") + body := []byte(``) + + rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(headers, body) + require.Equal(t, "9cff2d62d83bb98d", rayID) + require.Equal(t, "challenge", mitigated) + require.Equal(t, "text/html", contentType) +} diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 7341f85b..4b4d97c3 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -321,6 +321,7 @@ func registerProxyRoutes(admin *gin.RouterGroup, h *handler.Handlers) { proxies.PUT("/:id", h.Admin.Proxy.Update) proxies.DELETE("/:id", h.Admin.Proxy.Delete) proxies.POST("/:id/test", h.Admin.Proxy.Test) + proxies.POST("/:id/quality-check", h.Admin.Proxy.CheckQuality) proxies.GET("/:id/stats", h.Admin.Proxy.GetStats) proxies.GET("/:id/accounts", h.Admin.Proxy.GetProxyAccounts) proxies.POST("/batch-delete", h.Admin.Proxy.BatchDelete) diff --git a/backend/internal/service/admin_service.go b/backend/internal/service/admin_service.go index df535baf..cde8a95a 100644 --- a/backend/internal/service/admin_service.go +++ b/backend/internal/service/admin_service.go @@ -4,11 +4,15 @@ import ( "context" "errors" "fmt" + "io" + "net/http" "strings" "time" + "github.com/Wei-Shaw/sub2api/internal/pkg/httpclient" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" + "github.com/Wei-Shaw/sub2api/internal/util/soraerror" ) // AdminService interface defines admin management operations @@ -65,6 +69,7 @@ type AdminService interface { GetProxyAccounts(ctx context.Context, proxyID int64) ([]ProxyAccountSummary, error) CheckProxyExists(ctx context.Context, host string, port int, username, password string) (bool, error) TestProxy(ctx context.Context, id int64) (*ProxyTestResult, error) + CheckProxyQuality(ctx context.Context, id int64) (*ProxyQualityCheckResult, error) // Redeem code management ListRedeemCodes(ctx context.Context, page, pageSize int, codeType, status, search string) ([]RedeemCode, int64, error) @@ -288,6 +293,32 @@ type ProxyTestResult struct { CountryCode string `json:"country_code,omitempty"` } +type ProxyQualityCheckResult struct { + ProxyID int64 `json:"proxy_id"` + Score int `json:"score"` + Grade string `json:"grade"` + Summary string `json:"summary"` + ExitIP string `json:"exit_ip,omitempty"` + Country string `json:"country,omitempty"` + CountryCode string `json:"country_code,omitempty"` + BaseLatencyMs int64 `json:"base_latency_ms,omitempty"` + PassedCount int `json:"passed_count"` + WarnCount int `json:"warn_count"` + FailedCount int `json:"failed_count"` + ChallengeCount int `json:"challenge_count"` + CheckedAt int64 `json:"checked_at"` + Items []ProxyQualityCheckItem `json:"items"` +} + +type ProxyQualityCheckItem struct { + Target string `json:"target"` + Status string `json:"status"` // pass/warn/fail/challenge + HTTPStatus int `json:"http_status,omitempty"` + LatencyMs int64 `json:"latency_ms,omitempty"` + Message string `json:"message,omitempty"` + CFRay string `json:"cf_ray,omitempty"` +} + // ProxyExitInfo represents proxy exit information from ip-api.com type ProxyExitInfo struct { IP string @@ -302,6 +333,58 @@ type ProxyExitInfoProber interface { ProbeProxy(ctx context.Context, proxyURL string) (*ProxyExitInfo, int64, error) } +type proxyQualityTarget struct { + Target string + URL string + Method string + AllowedStatuses map[int]struct{} +} + +var proxyQualityTargets = []proxyQualityTarget{ + { + Target: "openai", + URL: "https://api.openai.com/v1/models", + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusUnauthorized: {}, + }, + }, + { + Target: "anthropic", + URL: "https://api.anthropic.com/v1/messages", + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusUnauthorized: {}, + http.StatusMethodNotAllowed: {}, + http.StatusNotFound: {}, + http.StatusBadRequest: {}, + }, + }, + { + Target: "gemini", + URL: "https://generativelanguage.googleapis.com/$discovery/rest?version=v1beta", + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusOK: {}, + }, + }, + { + Target: "sora", + URL: "https://sora.chatgpt.com/backend/me", + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusUnauthorized: {}, + }, + }, +} + +const ( + proxyQualityRequestTimeout = 15 * time.Second + proxyQualityResponseHeaderTimeout = 10 * time.Second + proxyQualityMaxBodyBytes = int64(8 * 1024) + proxyQualityClientUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" +) + // adminServiceImpl implements AdminService type adminServiceImpl struct { userRepo UserRepository @@ -1690,6 +1773,187 @@ func (s *adminServiceImpl) TestProxy(ctx context.Context, id int64) (*ProxyTestR }, nil } +func (s *adminServiceImpl) CheckProxyQuality(ctx context.Context, id int64) (*ProxyQualityCheckResult, error) { + proxy, err := s.proxyRepo.GetByID(ctx, id) + if err != nil { + return nil, err + } + + result := &ProxyQualityCheckResult{ + ProxyID: id, + Score: 100, + Grade: "A", + CheckedAt: time.Now().Unix(), + Items: make([]ProxyQualityCheckItem, 0, len(proxyQualityTargets)+1), + } + + proxyURL := proxy.URL() + if s.proxyProber == nil { + result.Items = append(result.Items, ProxyQualityCheckItem{ + Target: "base_connectivity", + Status: "fail", + Message: "代理探测服务未配置", + }) + result.FailedCount++ + finalizeProxyQualityResult(result) + return result, nil + } + + exitInfo, latencyMs, err := s.proxyProber.ProbeProxy(ctx, proxyURL) + if err != nil { + result.Items = append(result.Items, ProxyQualityCheckItem{ + Target: "base_connectivity", + Status: "fail", + LatencyMs: latencyMs, + Message: err.Error(), + }) + result.FailedCount++ + finalizeProxyQualityResult(result) + return result, nil + } + + result.ExitIP = exitInfo.IP + result.Country = exitInfo.Country + result.CountryCode = exitInfo.CountryCode + result.BaseLatencyMs = latencyMs + result.Items = append(result.Items, ProxyQualityCheckItem{ + Target: "base_connectivity", + Status: "pass", + LatencyMs: latencyMs, + Message: "代理出口连通正常", + }) + result.PassedCount++ + + client, err := httpclient.GetClient(httpclient.Options{ + ProxyURL: proxyURL, + Timeout: proxyQualityRequestTimeout, + ResponseHeaderTimeout: proxyQualityResponseHeaderTimeout, + ProxyStrict: true, + }) + if err != nil { + result.Items = append(result.Items, ProxyQualityCheckItem{ + Target: "http_client", + Status: "fail", + Message: fmt.Sprintf("创建检测客户端失败: %v", err), + }) + result.FailedCount++ + finalizeProxyQualityResult(result) + return result, nil + } + + for _, target := range proxyQualityTargets { + item := runProxyQualityTarget(ctx, client, target) + result.Items = append(result.Items, item) + switch item.Status { + case "pass": + result.PassedCount++ + case "warn": + result.WarnCount++ + case "challenge": + result.ChallengeCount++ + default: + result.FailedCount++ + } + } + + finalizeProxyQualityResult(result) + return result, nil +} + +func runProxyQualityTarget(ctx context.Context, client *http.Client, target proxyQualityTarget) ProxyQualityCheckItem { + item := ProxyQualityCheckItem{ + Target: target.Target, + } + + req, err := http.NewRequestWithContext(ctx, target.Method, target.URL, nil) + if err != nil { + item.Status = "fail" + item.Message = fmt.Sprintf("构建请求失败: %v", err) + return item + } + req.Header.Set("Accept", "application/json,text/html,*/*") + req.Header.Set("User-Agent", proxyQualityClientUserAgent) + + start := time.Now() + resp, err := client.Do(req) + if err != nil { + item.Status = "fail" + item.LatencyMs = time.Since(start).Milliseconds() + item.Message = fmt.Sprintf("请求失败: %v", err) + return item + } + defer func() { _ = resp.Body.Close() }() + item.LatencyMs = time.Since(start).Milliseconds() + item.HTTPStatus = resp.StatusCode + + body, readErr := io.ReadAll(io.LimitReader(resp.Body, proxyQualityMaxBodyBytes+1)) + if readErr != nil { + item.Status = "fail" + item.Message = fmt.Sprintf("读取响应失败: %v", readErr) + return item + } + if int64(len(body)) > proxyQualityMaxBodyBytes { + body = body[:proxyQualityMaxBodyBytes] + } + + if target.Target == "sora" && soraerror.IsCloudflareChallengeResponse(resp.StatusCode, resp.Header, body) { + item.Status = "challenge" + item.CFRay = soraerror.ExtractCloudflareRayID(resp.Header, body) + item.Message = "Sora 命中 Cloudflare challenge" + return item + } + + if _, ok := target.AllowedStatuses[resp.StatusCode]; ok { + item.Status = "pass" + item.Message = fmt.Sprintf("HTTP %d", resp.StatusCode) + return item + } + + if resp.StatusCode == http.StatusTooManyRequests { + item.Status = "warn" + item.Message = "目标返回 429,可能存在频控" + return item + } + + item.Status = "fail" + item.Message = fmt.Sprintf("非预期状态码: %d", resp.StatusCode) + return item +} + +func finalizeProxyQualityResult(result *ProxyQualityCheckResult) { + if result == nil { + return + } + score := 100 - result.WarnCount*10 - result.FailedCount*22 - result.ChallengeCount*30 + if score < 0 { + score = 0 + } + result.Score = score + result.Grade = proxyQualityGrade(score) + result.Summary = fmt.Sprintf( + "通过 %d 项,告警 %d 项,失败 %d 项,挑战 %d 项", + result.PassedCount, + result.WarnCount, + result.FailedCount, + result.ChallengeCount, + ) +} + +func proxyQualityGrade(score int) string { + switch { + case score >= 90: + return "A" + case score >= 75: + return "B" + case score >= 60: + return "C" + case score >= 40: + return "D" + default: + return "F" + } +} + func (s *adminServiceImpl) probeProxyLatency(ctx context.Context, proxy *Proxy) { if s.proxyProber == nil || proxy == nil { return diff --git a/backend/internal/service/admin_service_proxy_quality_test.go b/backend/internal/service/admin_service_proxy_quality_test.go new file mode 100644 index 00000000..bd6a19a8 --- /dev/null +++ b/backend/internal/service/admin_service_proxy_quality_test.go @@ -0,0 +1,73 @@ +package service + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFinalizeProxyQualityResult_ScoreAndGrade(t *testing.T) { + result := &ProxyQualityCheckResult{ + PassedCount: 2, + WarnCount: 1, + FailedCount: 1, + ChallengeCount: 1, + } + + finalizeProxyQualityResult(result) + + require.Equal(t, 38, result.Score) + require.Equal(t, "F", result.Grade) + require.Contains(t, result.Summary, "通过 2 项") + require.Contains(t, result.Summary, "告警 1 项") + require.Contains(t, result.Summary, "失败 1 项") + require.Contains(t, result.Summary, "挑战 1 项") +} + +func TestRunProxyQualityTarget_SoraChallenge(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html") + w.Header().Set("cf-ray", "test-ray-123") + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte("Just a moment...")) + })) + defer server.Close() + + target := proxyQualityTarget{ + Target: "sora", + URL: server.URL, + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusUnauthorized: {}, + }, + } + + item := runProxyQualityTarget(context.Background(), server.Client(), target) + require.Equal(t, "challenge", item.Status) + require.Equal(t, http.StatusForbidden, item.HTTPStatus) + require.Equal(t, "test-ray-123", item.CFRay) +} + +func TestRunProxyQualityTarget_AllowedStatusPass(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) + })) + defer server.Close() + + target := proxyQualityTarget{ + Target: "openai", + URL: server.URL, + Method: http.MethodGet, + AllowedStatuses: map[int]struct{}{ + http.StatusUnauthorized: {}, + }, + } + + item := runProxyQualityTarget(context.Background(), server.Client(), target) + require.Equal(t, "pass", item.Status) + require.Equal(t, http.StatusUnauthorized, item.HTTPStatus) +} diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index abdb1120..063a5ae6 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -375,10 +375,10 @@ type ForwardResult struct { // UpstreamFailoverError indicates an upstream error that should trigger account failover. type UpstreamFailoverError struct { StatusCode int - ResponseBody []byte // 上游响应体,用于错误透传规则匹配 - ResponseHeaders http.Header - ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true - RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应),应在同一账号上重试 N 次再切换 + ResponseBody []byte // 上游响应体,用于错误透传规则匹配 + ResponseHeaders http.Header // 上游响应头,用于透传 cf-ray/cf-mitigated/content-type 等诊断信息 + ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应),应在同一账号上重试 N 次再切换 } func (e *UpstreamFailoverError) Error() string { diff --git a/backend/internal/service/sora_client.go b/backend/internal/service/sora_client.go index 77d099ab..1b0f73f8 100644 --- a/backend/internal/service/sora_client.go +++ b/backend/internal/service/sora_client.go @@ -27,6 +27,7 @@ import ( "github.com/Wei-Shaw/sub2api/internal/config" openaioauth "github.com/Wei-Shaw/sub2api/internal/pkg/openai" "github.com/Wei-Shaw/sub2api/internal/util/logredact" + "github.com/Wei-Shaw/sub2api/internal/util/soraerror" "github.com/google/uuid" "github.com/tidwall/gjson" "golang.org/x/crypto/sha3" @@ -221,12 +222,16 @@ func (e *SoraUpstreamError) Error() string { // SoraDirectClient 直连 Sora 实现 type SoraDirectClient struct { - cfg *config.Config - httpUpstream HTTPUpstream - tokenProvider *OpenAITokenProvider - accountRepo AccountRepository - soraAccountRepo SoraAccountRepository - baseURL string + cfg *config.Config + httpUpstream HTTPUpstream + tokenProvider *OpenAITokenProvider + accountRepo AccountRepository + soraAccountRepo SoraAccountRepository + baseURL string + challengeCooldownMu sync.RWMutex + challengeCooldowns map[string]soraChallengeCooldownEntry + sidecarSessionMu sync.RWMutex + sidecarSessions map[string]soraSidecarSessionEntry } // NewSoraDirectClient 创建 Sora 直连客户端 @@ -240,10 +245,12 @@ func NewSoraDirectClient(cfg *config.Config, httpUpstream HTTPUpstream, tokenPro } } return &SoraDirectClient{ - cfg: cfg, - httpUpstream: httpUpstream, - tokenProvider: tokenProvider, - baseURL: baseURL, + cfg: cfg, + httpUpstream: httpUpstream, + tokenProvider: tokenProvider, + baseURL: baseURL, + challengeCooldowns: make(map[string]soraChallengeCooldownEntry), + sidecarSessions: make(map[string]soraSidecarSessionEntry), } } @@ -1461,6 +1468,9 @@ func (c *SoraDirectClient) doRequestWithProxy( if proxyURL == "" { proxyURL = c.resolveProxyURL(account) } + if cooldownErr := c.checkCloudflareChallengeCooldown(account, proxyURL); cooldownErr != nil { + return nil, nil, cooldownErr + } timeout := 0 if c != nil && c.cfg != nil { timeout = c.cfg.Sora.Client.TimeoutSeconds @@ -1561,7 +1571,11 @@ func (c *SoraDirectClient) doRequestWithProxy( } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - if !authRecovered && shouldAttemptSoraTokenRecover(resp.StatusCode, urlStr) && account != nil { + isCFChallenge := soraerror.IsCloudflareChallengeResponse(resp.StatusCode, resp.Header, respBody) + if isCFChallenge { + c.recordCloudflareChallengeCooldown(account, proxyURL, resp.StatusCode, resp.Header, respBody) + } + if !isCFChallenge && !authRecovered && shouldAttemptSoraTokenRecover(resp.StatusCode, urlStr) && account != nil { if recovered, recoverErr := c.recoverAccessToken(ctx, account, fmt.Sprintf("upstream_status_%d", resp.StatusCode)); recoverErr == nil && strings.TrimSpace(recovered) != "" { headers.Set("Authorization", "Bearer "+recovered) authRecovered = true @@ -1590,6 +1604,9 @@ func (c *SoraDirectClient) doRequestWithProxy( } upstreamErr := c.buildUpstreamError(resp.StatusCode, resp.Header, respBody, urlStr) lastErr = upstreamErr + if isCFChallenge { + return nil, resp.Header, upstreamErr + } if allowRetry && attempt < attempts && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500) { if c.debugEnabled() { c.debugLogf("request_retry_scheduled method=%s url=%s reason=status_%d next_attempt=%d/%d", method, sanitizeSoraLogURL(urlStr), resp.StatusCode, attempt+1, attempts) @@ -1631,7 +1648,7 @@ func shouldAttemptSoraTokenRecover(statusCode int, rawURL string) bool { func (c *SoraDirectClient) doHTTP(req *http.Request, proxyURL string, account *Account) (*http.Response, error) { if c != nil && c.cfg != nil && c.cfg.Sora.Client.CurlCFFISidecar.Enabled { - resp, err := c.doHTTPViaCurlCFFISidecar(req, proxyURL) + resp, err := c.doHTTPViaCurlCFFISidecar(req, proxyURL, account) if err != nil { return nil, err } diff --git a/backend/internal/service/sora_client_test.go b/backend/internal/service/sora_client_test.go index ae92782e..9499c5a0 100644 --- a/backend/internal/service/sora_client_test.go +++ b/backend/internal/service/sora_client_test.go @@ -693,10 +693,11 @@ func TestSoraDirectClient_DoHTTP_UsesCurlCFFISidecarWhenEnabled(t *testing.T) { Client: config.SoraClientConfig{ BaseURL: "https://sora.chatgpt.com/backend", CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ - Enabled: true, - BaseURL: sidecar.URL, - Impersonate: "chrome131", - TimeoutSeconds: 15, + Enabled: true, + BaseURL: sidecar.URL, + Impersonate: "chrome131", + TimeoutSeconds: 15, + SessionReuseEnabled: true, }, }, }, @@ -715,6 +716,7 @@ func TestSoraDirectClient_DoHTTP_UsesCurlCFFISidecarWhenEnabled(t *testing.T) { require.JSONEq(t, `{"ok":true}`, string(body)) require.Equal(t, int32(0), atomic.LoadInt32(&upstream.doWithTLSCalls)) require.Equal(t, "http://127.0.0.1:18080", captured.ProxyURL) + require.NotEmpty(t, captured.SessionKey) require.Equal(t, "chrome131", captured.Impersonate) require.Equal(t, "https://sora.chatgpt.com/backend/me", captured.URL) decodedReqBody, err := base64.StdEncoding.DecodeString(captured.BodyBase64) @@ -781,3 +783,188 @@ func TestConvertSidecarHeaderValue_NilAndSlice(t *testing.T) { require.Nil(t, convertSidecarHeaderValue(nil)) require.Equal(t, []string{"a", "b"}, convertSidecarHeaderValue([]any{"a", " ", "b"})) } + +func TestSoraDirectClient_DoHTTP_SidecarSessionKeyStableForSameAccountProxy(t *testing.T) { + var captured []soraCurlCFFISidecarRequest + sidecar := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + raw, err := io.ReadAll(r.Body) + require.NoError(t, err) + var reqPayload soraCurlCFFISidecarRequest + require.NoError(t, json.Unmarshal(raw, &reqPayload)) + captured = append(captured, reqPayload) + _ = json.NewEncoder(w).Encode(map[string]any{ + "status_code": http.StatusOK, + "headers": map[string]any{ + "Content-Type": "application/json", + }, + "body": `{"ok":true}`, + }) + })) + defer sidecar.Close() + + cfg := &config.Config{ + Sora: config.SoraConfig{ + Client: config.SoraClientConfig{ + BaseURL: "https://sora.chatgpt.com/backend", + CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ + Enabled: true, + BaseURL: sidecar.URL, + SessionReuseEnabled: true, + SessionTTLSeconds: 3600, + }, + }, + }, + } + client := NewSoraDirectClient(cfg, nil, nil) + account := &Account{ID: 1001} + + req1, err := http.NewRequest(http.MethodGet, "https://sora.chatgpt.com/backend/me", nil) + require.NoError(t, err) + _, err = client.doHTTP(req1, "http://127.0.0.1:18080", account) + require.NoError(t, err) + + req2, err := http.NewRequest(http.MethodGet, "https://sora.chatgpt.com/backend/me", nil) + require.NoError(t, err) + _, err = client.doHTTP(req2, "http://127.0.0.1:18080", account) + require.NoError(t, err) + + require.Len(t, captured, 2) + require.NotEmpty(t, captured[0].SessionKey) + require.Equal(t, captured[0].SessionKey, captured[1].SessionKey) +} + +func TestSoraDirectClient_DoRequestWithProxy_CloudflareChallengeSetsCooldownAndSkipsRetry(t *testing.T) { + var sidecarCalls int32 + sidecar := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&sidecarCalls, 1) + _ = json.NewEncoder(w).Encode(map[string]any{ + "status_code": http.StatusForbidden, + "headers": map[string]any{ + "cf-ray": "9d05d73dec4d8c8e-GRU", + "content-type": "text/html", + }, + "body": `Just a moment...`, + }) + })) + defer sidecar.Close() + + cfg := &config.Config{ + Sora: config.SoraConfig{ + Client: config.SoraClientConfig{ + BaseURL: "https://sora.chatgpt.com/backend", + MaxRetries: 3, + CloudflareChallengeCooldownSeconds: 60, + CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ + Enabled: true, + BaseURL: sidecar.URL, + Impersonate: "chrome131", + }, + }, + }, + } + client := NewSoraDirectClient(cfg, nil, nil) + headers := http.Header{} + + _, _, err := client.doRequestWithProxy( + context.Background(), + &Account{ID: 99}, + "http://127.0.0.1:18080", + http.MethodGet, + "https://sora.chatgpt.com/backend/me", + headers, + nil, + true, + ) + require.Error(t, err) + var upstreamErr *SoraUpstreamError + require.ErrorAs(t, err, &upstreamErr) + require.Equal(t, http.StatusForbidden, upstreamErr.StatusCode) + require.Equal(t, int32(1), atomic.LoadInt32(&sidecarCalls), "challenge should not trigger retry loop") + + _, _, err = client.doRequestWithProxy( + context.Background(), + &Account{ID: 99}, + "http://127.0.0.1:18080", + http.MethodGet, + "https://sora.chatgpt.com/backend/me", + headers, + nil, + true, + ) + require.Error(t, err) + require.ErrorAs(t, err, &upstreamErr) + require.Equal(t, http.StatusTooManyRequests, upstreamErr.StatusCode) + require.Contains(t, upstreamErr.Message, "cooling down") + require.Contains(t, upstreamErr.Message, "cf-ray") + require.Equal(t, int32(1), atomic.LoadInt32(&sidecarCalls), "cooldown should block outbound request") +} + +func TestSoraDirectClient_SidecarSessionKey_SkipsWhenAccountMissing(t *testing.T) { + cfg := &config.Config{ + Sora: config.SoraConfig{ + Client: config.SoraClientConfig{ + CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ + Enabled: true, + SessionReuseEnabled: true, + SessionTTLSeconds: 3600, + }, + }, + }, + } + client := NewSoraDirectClient(cfg, nil, nil) + require.Equal(t, "", client.sidecarSessionKey(nil, "http://127.0.0.1:18080")) + require.Empty(t, client.sidecarSessions) +} + +func TestSoraDirectClient_SidecarSessionKey_PrunesExpiredAndRecreates(t *testing.T) { + cfg := &config.Config{ + Sora: config.SoraConfig{ + Client: config.SoraClientConfig{ + CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ + Enabled: true, + SessionReuseEnabled: true, + SessionTTLSeconds: 3600, + }, + }, + }, + } + client := NewSoraDirectClient(cfg, nil, nil) + account := &Account{ID: 123} + key := soraAccountProxyKey(account, "http://127.0.0.1:18080") + client.sidecarSessions[key] = soraSidecarSessionEntry{ + SessionKey: "sora-expired", + ExpiresAt: time.Now().Add(-time.Minute), + LastUsedAt: time.Now().Add(-2 * time.Minute), + } + + sessionKey := client.sidecarSessionKey(account, "http://127.0.0.1:18080") + require.NotEmpty(t, sessionKey) + require.NotEqual(t, "sora-expired", sessionKey) + require.Len(t, client.sidecarSessions, 1) +} + +func TestSoraDirectClient_SidecarSessionKey_TTLZeroKeepsLongLivedSession(t *testing.T) { + cfg := &config.Config{ + Sora: config.SoraConfig{ + Client: config.SoraClientConfig{ + CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{ + Enabled: true, + SessionReuseEnabled: true, + SessionTTLSeconds: 0, + }, + }, + }, + } + client := NewSoraDirectClient(cfg, nil, nil) + account := &Account{ID: 456} + + first := client.sidecarSessionKey(account, "http://127.0.0.1:18080") + second := client.sidecarSessionKey(account, "http://127.0.0.1:18080") + require.NotEmpty(t, first) + require.Equal(t, first, second) + + key := soraAccountProxyKey(account, "http://127.0.0.1:18080") + entry, ok := client.sidecarSessions[key] + require.True(t, ok) + require.True(t, entry.ExpiresAt.After(time.Now().Add(300*24*time.Hour))) +} diff --git a/backend/internal/service/sora_curl_cffi_sidecar.go b/backend/internal/service/sora_curl_cffi_sidecar.go index 6c83a97b..40f5c017 100644 --- a/backend/internal/service/sora_curl_cffi_sidecar.go +++ b/backend/internal/service/sora_curl_cffi_sidecar.go @@ -23,6 +23,7 @@ type soraCurlCFFISidecarRequest struct { Headers map[string][]string `json:"headers,omitempty"` BodyBase64 string `json:"body_base64,omitempty"` ProxyURL string `json:"proxy_url,omitempty"` + SessionKey string `json:"session_key,omitempty"` Impersonate string `json:"impersonate,omitempty"` TimeoutSeconds int `json:"timeout_seconds,omitempty"` } @@ -36,7 +37,7 @@ type soraCurlCFFISidecarResponse struct { Error string `json:"error"` } -func (c *SoraDirectClient) doHTTPViaCurlCFFISidecar(req *http.Request, proxyURL string) (*http.Response, error) { +func (c *SoraDirectClient) doHTTPViaCurlCFFISidecar(req *http.Request, proxyURL string, account *Account) (*http.Response, error) { if req == nil || req.URL == nil { return nil, errors.New("request url is nil") } @@ -73,6 +74,7 @@ func (c *SoraDirectClient) doHTTPViaCurlCFFISidecar(req *http.Request, proxyURL URL: req.URL.String(), Headers: headers, ProxyURL: strings.TrimSpace(proxyURL), + SessionKey: c.sidecarSessionKey(account, proxyURL), Impersonate: c.curlCFFIImpersonate(), TimeoutSeconds: c.curlCFFISidecarTimeoutSeconds(), } @@ -97,7 +99,9 @@ func (c *SoraDirectClient) doHTTPViaCurlCFFISidecar(req *http.Request, proxyURL if err != nil { return nil, fmt.Errorf("sora curl_cffi sidecar request failed: %w", err) } - defer sidecarResp.Body.Close() + defer func() { + _ = sidecarResp.Body.Close() + }() sidecarRespBody, err := io.ReadAll(io.LimitReader(sidecarResp.Body, 8<<20)) if err != nil { @@ -202,6 +206,24 @@ func (c *SoraDirectClient) curlCFFIImpersonate() string { return impersonate } +func (c *SoraDirectClient) sidecarSessionReuseEnabled() bool { + if c == nil || c.cfg == nil { + return true + } + return c.cfg.Sora.Client.CurlCFFISidecar.SessionReuseEnabled +} + +func (c *SoraDirectClient) sidecarSessionTTLSeconds() int { + if c == nil || c.cfg == nil { + return 3600 + } + ttl := c.cfg.Sora.Client.CurlCFFISidecar.SessionTTLSeconds + if ttl < 0 { + return 3600 + } + return ttl +} + func convertSidecarHeaderValue(raw any) []string { switch val := raw.(type) { case nil: diff --git a/backend/internal/service/sora_gateway_service.go b/backend/internal/service/sora_gateway_service.go index 054d38e7..ac29ae0d 100644 --- a/backend/internal/service/sora_gateway_service.go +++ b/backend/internal/service/sora_gateway_service.go @@ -906,10 +906,14 @@ func (s *SoraGatewayService) handleSoraRequestError(ctx context.Context, account s.rateLimitService.HandleUpstreamError(ctx, account, upstreamErr.StatusCode, upstreamErr.Headers, upstreamErr.Body) } if s.shouldFailoverUpstreamError(upstreamErr.StatusCode) { + var responseHeaders http.Header + if upstreamErr.Headers != nil { + responseHeaders = upstreamErr.Headers.Clone() + } return &UpstreamFailoverError{ StatusCode: upstreamErr.StatusCode, ResponseBody: upstreamErr.Body, - ResponseHeaders: upstreamErr.Headers, + ResponseHeaders: responseHeaders, } } msg := upstreamErr.Message diff --git a/backend/internal/service/sora_gateway_service_test.go b/backend/internal/service/sora_gateway_service_test.go index c965901c..5888fe92 100644 --- a/backend/internal/service/sora_gateway_service_test.go +++ b/backend/internal/service/sora_gateway_service_test.go @@ -397,6 +397,34 @@ func TestSoraGatewayService_WriteSoraError_StreamEscapesJSON(t *testing.T) { require.Equal(t, "invalid \"prompt\"\nline2", errObj["message"]) } +func TestSoraGatewayService_HandleSoraRequestError_FailoverHeadersCloned(t *testing.T) { + svc := NewSoraGatewayService(nil, nil, nil, &config.Config{}) + sourceHeaders := http.Header{} + sourceHeaders.Set("cf-ray", "9d01b0e9ecc35829-SEA") + + err := svc.handleSoraRequestError( + context.Background(), + &Account{ID: 1, Platform: PlatformSora}, + &SoraUpstreamError{ + StatusCode: http.StatusForbidden, + Message: "forbidden", + Headers: sourceHeaders, + Body: []byte(`Just a moment...`), + }, + "sora2-landscape-10s", + nil, + false, + ) + + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.NotNil(t, failoverErr.ResponseHeaders) + require.Equal(t, "9d01b0e9ecc35829-SEA", failoverErr.ResponseHeaders.Get("cf-ray")) + + sourceHeaders.Set("cf-ray", "mutated-after-return") + require.Equal(t, "9d01b0e9ecc35829-SEA", failoverErr.ResponseHeaders.Get("cf-ray")) +} + func TestShouldFailoverUpstreamError(t *testing.T) { svc := NewSoraGatewayService(nil, nil, nil, &config.Config{}) require.True(t, svc.shouldFailoverUpstreamError(401)) diff --git a/backend/internal/service/sora_request_guard.go b/backend/internal/service/sora_request_guard.go new file mode 100644 index 00000000..d65e868b --- /dev/null +++ b/backend/internal/service/sora_request_guard.go @@ -0,0 +1,213 @@ +package service + +import ( + "fmt" + "math" + "net/http" + "net/url" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/util/soraerror" + "github.com/google/uuid" +) + +type soraChallengeCooldownEntry struct { + Until time.Time + StatusCode int + CFRay string +} + +type soraSidecarSessionEntry struct { + SessionKey string + ExpiresAt time.Time + LastUsedAt time.Time +} + +func (c *SoraDirectClient) cloudflareChallengeCooldownSeconds() int { + if c == nil || c.cfg == nil { + return 900 + } + cooldown := c.cfg.Sora.Client.CloudflareChallengeCooldownSeconds + if cooldown <= 0 { + return 0 + } + return cooldown +} + +func (c *SoraDirectClient) checkCloudflareChallengeCooldown(account *Account, proxyURL string) error { + if c == nil { + return nil + } + if account == nil || account.ID <= 0 { + return nil + } + cooldownSeconds := c.cloudflareChallengeCooldownSeconds() + if cooldownSeconds <= 0 { + return nil + } + key := soraAccountProxyKey(account, proxyURL) + now := time.Now() + + c.challengeCooldownMu.RLock() + entry, ok := c.challengeCooldowns[key] + c.challengeCooldownMu.RUnlock() + if !ok { + return nil + } + if !entry.Until.After(now) { + c.challengeCooldownMu.Lock() + delete(c.challengeCooldowns, key) + c.challengeCooldownMu.Unlock() + return nil + } + + remaining := int(math.Ceil(entry.Until.Sub(now).Seconds())) + if remaining < 1 { + remaining = 1 + } + message := fmt.Sprintf("Sora request cooling down due to recent Cloudflare challenge. Retry in %d seconds.", remaining) + if entry.CFRay != "" { + message = fmt.Sprintf("%s (last cf-ray: %s)", message, entry.CFRay) + } + return &SoraUpstreamError{ + StatusCode: http.StatusTooManyRequests, + Message: message, + Headers: make(http.Header), + } +} + +func (c *SoraDirectClient) recordCloudflareChallengeCooldown(account *Account, proxyURL string, statusCode int, headers http.Header, body []byte) { + if c == nil { + return + } + if account == nil || account.ID <= 0 { + return + } + cooldownSeconds := c.cloudflareChallengeCooldownSeconds() + if cooldownSeconds <= 0 { + return + } + key := soraAccountProxyKey(account, proxyURL) + now := time.Now() + until := now.Add(time.Duration(cooldownSeconds) * time.Second) + cfRay := soraerror.ExtractCloudflareRayID(headers, body) + + c.challengeCooldownMu.Lock() + c.cleanupExpiredChallengeCooldownsLocked(now) + existing, ok := c.challengeCooldowns[key] + if ok && existing.Until.After(until) { + until = existing.Until + if cfRay == "" { + cfRay = existing.CFRay + } + } + c.challengeCooldowns[key] = soraChallengeCooldownEntry{ + Until: until, + StatusCode: statusCode, + CFRay: cfRay, + } + c.challengeCooldownMu.Unlock() + + if c.debugEnabled() { + remain := int(math.Ceil(until.Sub(now).Seconds())) + if remain < 0 { + remain = 0 + } + c.debugLogf("cloudflare_challenge_cooldown_set key=%s status=%d remain_s=%d cf_ray=%s", key, statusCode, remain, cfRay) + } +} + +func (c *SoraDirectClient) sidecarSessionKey(account *Account, proxyURL string) string { + if c == nil || !c.sidecarSessionReuseEnabled() { + return "" + } + if account == nil || account.ID <= 0 { + return "" + } + key := soraAccountProxyKey(account, proxyURL) + now := time.Now() + ttlSeconds := c.sidecarSessionTTLSeconds() + + c.sidecarSessionMu.Lock() + defer c.sidecarSessionMu.Unlock() + c.cleanupExpiredSidecarSessionsLocked(now) + if existing, exists := c.sidecarSessions[key]; exists { + existing.LastUsedAt = now + c.sidecarSessions[key] = existing + return existing.SessionKey + } + + expiresAt := now.Add(time.Duration(ttlSeconds) * time.Second) + if ttlSeconds <= 0 { + expiresAt = now.Add(365 * 24 * time.Hour) + } + newEntry := soraSidecarSessionEntry{ + SessionKey: "sora-" + uuid.NewString(), + ExpiresAt: expiresAt, + LastUsedAt: now, + } + c.sidecarSessions[key] = newEntry + + if c.debugEnabled() { + c.debugLogf("sidecar_session_created key=%s ttl_s=%d", key, ttlSeconds) + } + return newEntry.SessionKey +} + +func (c *SoraDirectClient) cleanupExpiredChallengeCooldownsLocked(now time.Time) { + if c == nil || len(c.challengeCooldowns) == 0 { + return + } + for key, entry := range c.challengeCooldowns { + if !entry.Until.After(now) { + delete(c.challengeCooldowns, key) + } + } +} + +func (c *SoraDirectClient) cleanupExpiredSidecarSessionsLocked(now time.Time) { + if c == nil || len(c.sidecarSessions) == 0 { + return + } + for key, entry := range c.sidecarSessions { + if !entry.ExpiresAt.After(now) { + delete(c.sidecarSessions, key) + } + } +} + +func soraAccountProxyKey(account *Account, proxyURL string) string { + accountID := int64(0) + if account != nil { + accountID = account.ID + } + return fmt.Sprintf("account:%d|proxy:%s", accountID, normalizeSoraProxyKey(proxyURL)) +} + +func normalizeSoraProxyKey(proxyURL string) string { + raw := strings.TrimSpace(proxyURL) + if raw == "" { + return "direct" + } + parsed, err := url.Parse(raw) + if err != nil { + return strings.ToLower(raw) + } + scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) + host := strings.ToLower(strings.TrimSpace(parsed.Hostname())) + port := strings.TrimSpace(parsed.Port()) + if host == "" { + return strings.ToLower(raw) + } + if (scheme == "http" && port == "80") || (scheme == "https" && port == "443") { + port = "" + } + if port != "" { + host = host + ":" + port + } + if scheme == "" { + scheme = "proxy" + } + return scheme + "://" + host +} diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 27e29d34..c77ab70e 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -374,6 +374,9 @@ sora: # Max retries for upstream requests # 上游请求最大重试次数 max_retries: 3 + # Account+proxy cooldown window after Cloudflare challenge (seconds, 0 to disable) + # Cloudflare challenge 后按账号+代理冷却窗口(秒,0 表示关闭) + cloudflare_challenge_cooldown_seconds: 900 # Poll interval (seconds) # 轮询间隔(秒) poll_interval_seconds: 2 @@ -417,6 +420,12 @@ sora: # Sidecar request timeout (seconds) # sidecar 请求超时(秒) timeout_seconds: 60 + # Reuse session key per account+proxy to let sidecar persist cookies/session + # 按账号+代理复用 session key,让 sidecar 持久化 cookies/session + session_reuse_enabled: true + # Session TTL in sidecar (seconds) + # sidecar 会话 TTL(秒) + session_ttl_seconds: 3600 storage: # Storage type (local only for now) # 存储类型(首发仅支持 local) diff --git a/frontend/src/api/admin/proxies.ts b/frontend/src/api/admin/proxies.ts index b6aaf595..5e31ae20 100644 --- a/frontend/src/api/admin/proxies.ts +++ b/frontend/src/api/admin/proxies.ts @@ -7,6 +7,7 @@ import { apiClient } from '../client' import type { Proxy, ProxyAccountSummary, + ProxyQualityCheckResult, CreateProxyRequest, UpdateProxyRequest, PaginatedResponse, @@ -143,6 +144,16 @@ export async function testProxy(id: number): Promise<{ return data } +/** + * Check proxy quality across common AI targets + * @param id - Proxy ID + * @returns Quality check result + */ +export async function checkProxyQuality(id: number): Promise { + const { data } = await apiClient.post(`/admin/proxies/${id}/quality-check`) + return data +} + /** * Get proxy usage statistics * @param id - Proxy ID @@ -248,6 +259,7 @@ export const proxiesAPI = { delete: deleteProxy, toggleStatus, testProxy, + checkProxyQuality, getStats, getProxyAccounts, batchCreate, diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 97216afe..33377834 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -2103,6 +2103,8 @@ export default { actions: 'Actions' }, testConnection: 'Test Connection', + qualityCheck: 'Quality Check', + batchQualityCheck: 'Batch Quality Check', batchTest: 'Test All Proxies', testFailed: 'Failed', latencyFailed: 'Connection failed', @@ -2163,6 +2165,27 @@ export default { proxyWorking: 'Proxy is working!', proxyWorkingWithLatency: 'Proxy is working! Latency: {latency}ms', proxyTestFailed: 'Proxy test failed', + qualityCheckDone: 'Quality check completed: score {score} ({grade})', + qualityCheckFailed: 'Failed to run proxy quality check', + batchQualityDone: + 'Batch quality check completed for {count} proxies: healthy {healthy}, warn {warn}, challenge {challenge}, abnormal {failed}', + batchQualityFailed: 'Batch quality check failed', + batchQualityEmpty: 'No proxies available for quality check', + qualityReportTitle: 'Proxy Quality Report', + qualityGrade: 'Grade {grade}', + qualityExitIP: 'Exit IP', + qualityCountry: 'Exit Region', + qualityBaseLatency: 'Base Latency', + qualityCheckedAt: 'Checked At', + qualityTableTarget: 'Target', + qualityTableStatus: 'Status', + qualityTableLatency: 'Latency', + qualityTableMessage: 'Message', + qualityStatusPass: 'Pass', + qualityStatusWarn: 'Warn', + qualityStatusFail: 'Fail', + qualityStatusChallenge: 'Challenge', + qualityTargetBase: 'Base Connectivity', failedToLoad: 'Failed to load proxies', failedToCreate: 'Failed to create proxy', failedToUpdate: 'Failed to update proxy', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 3cb231f6..144e1598 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -2246,6 +2246,8 @@ export default { noProxiesYet: '暂无代理', createFirstProxy: '添加您的第一个代理以开始使用。', testConnection: '测试连接', + qualityCheck: '质量检测', + batchQualityCheck: '批量质量检测', batchTest: '批量测试', testFailed: '失败', latencyFailed: '链接失败', @@ -2293,6 +2295,26 @@ export default { proxyWorking: '代理连接正常', proxyWorkingWithLatency: '代理连接正常,延迟 {latency}ms', proxyTestFailed: '代理测试失败', + qualityCheckDone: '质量检测完成:评分 {score}({grade})', + qualityCheckFailed: '代理质量检测失败', + batchQualityDone: '批量质量检测完成,共检测 {count} 个;优质 {healthy} 个,告警 {warn} 个,挑战 {challenge} 个,异常 {failed} 个', + batchQualityFailed: '批量质量检测失败', + batchQualityEmpty: '暂无可检测质量的代理', + qualityReportTitle: '代理质量检测报告', + qualityGrade: '等级 {grade}', + qualityExitIP: '出口 IP', + qualityCountry: '出口地区', + qualityBaseLatency: '基础延迟', + qualityCheckedAt: '检测时间', + qualityTableTarget: '检测项', + qualityTableStatus: '状态', + qualityTableLatency: '延迟', + qualityTableMessage: '说明', + qualityStatusPass: '通过', + qualityStatusWarn: '告警', + qualityStatusFail: '失败', + qualityStatusChallenge: '挑战', + qualityTargetBase: '基础连通性', proxyCreatedSuccess: '代理添加成功', proxyUpdatedSuccess: '代理更新成功', proxyDeletedSuccess: '代理删除成功', diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 9db15392..9d6cf249 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -524,6 +524,32 @@ export interface ProxyAccountSummary { notes?: string | null } +export interface ProxyQualityCheckItem { + target: string + status: 'pass' | 'warn' | 'fail' | 'challenge' + http_status?: number + latency_ms?: number + message?: string + cf_ray?: string +} + +export interface ProxyQualityCheckResult { + proxy_id: number + score: number + grade: string + summary: string + exit_ip?: string + country?: string + country_code?: string + base_latency_ms?: number + passed_count: number + warn_count: number + failed_count: number + challenge_count: number + checked_at: number + items: ProxyQualityCheckItem[] +} + // Gemini credentials structure for OAuth and API Key authentication export interface GeminiCredentials { // API Key authentication diff --git a/frontend/src/views/admin/ProxiesView.vue b/frontend/src/views/admin/ProxiesView.vue index 9cbf4c58..55c08474 100644 --- a/frontend/src/views/admin/ProxiesView.vue +++ b/frontend/src/views/admin/ProxiesView.vue @@ -55,6 +55,15 @@ {{ t('admin.proxies.testConnection') }} + +