diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 48f15b5c..63c5ed0e 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -136,7 +136,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { gatewayCache := repository.NewGatewayCache(redisClient) schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db) schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig) - antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI) + antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache) antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService) accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig) crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig) diff --git a/backend/internal/pkg/antigravity/client.go b/backend/internal/pkg/antigravity/client.go index af3a0bfc..f24ff5a8 100644 --- a/backend/internal/pkg/antigravity/client.go +++ b/backend/internal/pkg/antigravity/client.go @@ -228,9 +228,18 @@ type Client struct { httpClient *http.Client } +const ( + // proxyDialTimeout 代理 TCP 连接超时(含代理握手),代理不通时快速失败 + proxyDialTimeout = 5 * time.Second + // proxyTLSHandshakeTimeout 代理 TLS 握手超时 + proxyTLSHandshakeTimeout = 5 * time.Second + // clientTimeout 整体请求超时(含连接、发送、等待响应、读取 body) + clientTimeout = 10 * time.Second +) + func NewClient(proxyURL string) (*Client, error) { client := &http.Client{ - Timeout: 30 * time.Second, + Timeout: clientTimeout, } _, parsed, err := proxyurl.Parse(proxyURL) @@ -238,7 +247,12 @@ func NewClient(proxyURL string) (*Client, error) { return nil, err } if parsed != nil { - transport := &http.Transport{} + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: proxyDialTimeout, + }).DialContext, + TLSHandshakeTimeout: proxyTLSHandshakeTimeout, + } if err := proxyutil.ConfigureTransportProxy(transport, parsed); err != nil { return nil, fmt.Errorf("configure proxy: %w", err) } @@ -250,8 +264,8 @@ func NewClient(proxyURL string) (*Client, error) { }, nil } -// isConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝) -func isConnectionError(err error) bool { +// IsConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝) +func IsConnectionError(err error) bool { if err == nil { return false } @@ -276,7 +290,7 @@ func isConnectionError(err error) bool { // shouldFallbackToNextURL 判断是否应切换到下一个 URL // 与 Antigravity-Manager 保持一致:连接错误、429、408、404、5xx 触发 URL 降级 func shouldFallbackToNextURL(err error, statusCode int) bool { - if isConnectionError(err) { + if IsConnectionError(err) { return true } return statusCode == http.StatusTooManyRequests || diff --git a/backend/internal/pkg/antigravity/client_test.go b/backend/internal/pkg/antigravity/client_test.go index 7d5bba93..61a08c3d 100644 --- a/backend/internal/pkg/antigravity/client_test.go +++ b/backend/internal/pkg/antigravity/client_test.go @@ -274,8 +274,8 @@ func TestNewClient_无代理(t *testing.T) { if client.httpClient == nil { t.Fatal("httpClient 为 nil") } - if client.httpClient.Timeout != 30*time.Second { - t.Errorf("Timeout 不匹配: got %v, want 30s", client.httpClient.Timeout) + if client.httpClient.Timeout != clientTimeout { + t.Errorf("Timeout 不匹配: got %v, want %v", client.httpClient.Timeout, clientTimeout) } // 无代理时 Transport 应为 nil(使用默认) if client.httpClient.Transport != nil { @@ -322,11 +322,11 @@ func TestNewClient_无效代理URL(t *testing.T) { } // --------------------------------------------------------------------------- -// isConnectionError +// IsConnectionError // --------------------------------------------------------------------------- func TestIsConnectionError_nil(t *testing.T) { - if isConnectionError(nil) { + if IsConnectionError(nil) { t.Error("nil 错误不应判定为连接错误") } } @@ -338,7 +338,7 @@ func TestIsConnectionError_超时错误(t *testing.T) { Net: "tcp", Err: &timeoutError{}, } - if !isConnectionError(err) { + if !IsConnectionError(err) { t.Error("超时错误应判定为连接错误") } } @@ -356,7 +356,7 @@ func TestIsConnectionError_netOpError(t *testing.T) { Net: "tcp", Err: fmt.Errorf("connection refused"), } - if !isConnectionError(err) { + if !IsConnectionError(err) { t.Error("net.OpError 应判定为连接错误") } } @@ -367,14 +367,14 @@ func TestIsConnectionError_urlError(t *testing.T) { URL: "https://example.com", Err: fmt.Errorf("some error"), } - if !isConnectionError(err) { + if !IsConnectionError(err) { t.Error("url.Error 应判定为连接错误") } } func TestIsConnectionError_普通错误(t *testing.T) { err := fmt.Errorf("some random error") - if isConnectionError(err) { + if IsConnectionError(err) { t.Error("普通错误不应判定为连接错误") } } @@ -386,7 +386,7 @@ func TestIsConnectionError_包装的netOpError(t *testing.T) { Err: fmt.Errorf("connection refused"), } err := fmt.Errorf("wrapping: %w", inner) - if !isConnectionError(err) { + if !IsConnectionError(err) { t.Error("被包装的 net.OpError 应判定为连接错误") } } diff --git a/backend/internal/pkg/httpclient/pool.go b/backend/internal/pkg/httpclient/pool.go index 32e4bc5b..12804cc6 100644 --- a/backend/internal/pkg/httpclient/pool.go +++ b/backend/internal/pkg/httpclient/pool.go @@ -17,6 +17,7 @@ package httpclient import ( "fmt" + "net" "net/http" "strings" "sync" @@ -32,6 +33,8 @@ const ( defaultMaxIdleConns = 100 // 最大空闲连接数 defaultMaxIdleConnsPerHost = 10 // 每个主机最大空闲连接数 defaultIdleConnTimeout = 90 * time.Second // 空闲连接超时时间(建议小于上游 LB 超时) + defaultDialTimeout = 5 * time.Second // TCP 连接超时(含代理握手),代理不通时快速失败 + defaultTLSHandshakeTimeout = 5 * time.Second // TLS 握手超时 validatedHostTTL = 30 * time.Second // DNS Rebinding 校验缓存 TTL ) @@ -107,6 +110,10 @@ func buildTransport(opts Options) (*http.Transport, error) { } transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: defaultDialTimeout, + }).DialContext, + TLSHandshakeTimeout: defaultTLSHandshakeTimeout, MaxIdleConns: maxIdleConns, MaxIdleConnsPerHost: maxIdleConnsPerHost, MaxConnsPerHost: opts.MaxConnsPerHost, // 0 表示无限制 diff --git a/backend/internal/repository/proxy_probe_service.go b/backend/internal/repository/proxy_probe_service.go index b4aeab71..d877abde 100644 --- a/backend/internal/repository/proxy_probe_service.go +++ b/backend/internal/repository/proxy_probe_service.go @@ -40,7 +40,7 @@ func NewProxyExitInfoProber(cfg *config.Config) service.ProxyExitInfoProber { } const ( - defaultProxyProbeTimeout = 30 * time.Second + defaultProxyProbeTimeout = 10 * time.Second defaultProxyProbeResponseMaxBytes = int64(1024 * 1024) ) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index f321ca89..50fa78f2 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -1359,7 +1359,10 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, } accessToken, err := s.tokenProvider.GetAccessToken(ctx, account) if err != nil { - return nil, s.writeClaudeError(c, http.StatusBadGateway, "authentication_error", "Failed to get upstream access token") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":{"type":"authentication_error","message":"Failed to get upstream access token"},"type":"error"}`), + } } // 获取 project_id(部分账户类型可能没有) @@ -2101,7 +2104,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co } accessToken, err := s.tokenProvider.GetAccessToken(ctx, account) if err != nil { - return nil, s.writeGoogleError(c, http.StatusBadGateway, "Failed to get upstream access token") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":{"message":"Failed to get upstream access token","status":"UNAVAILABLE"}}`), + } } // 获取 project_id(部分账户类型可能没有) diff --git a/backend/internal/service/antigravity_oauth_service.go b/backend/internal/service/antigravity_oauth_service.go index 5f6691be..3d5ae524 100644 --- a/backend/internal/service/antigravity_oauth_service.go +++ b/backend/internal/service/antigravity_oauth_service.go @@ -192,6 +192,10 @@ func (s *AntigravityOAuthService) RefreshToken(ctx context.Context, refreshToken if isNonRetryableAntigravityOAuthError(err) { return nil, err } + // 代理连接错误(TCP 超时、连接拒绝、DNS 失败)不重试,立即返回 + if antigravity.IsConnectionError(err) { + return nil, fmt.Errorf("proxy unavailable: %w", err) + } lastErr = err } diff --git a/backend/internal/service/antigravity_token_provider.go b/backend/internal/service/antigravity_token_provider.go index 9cdc49aa..5e53f434 100644 --- a/backend/internal/service/antigravity_token_provider.go +++ b/backend/internal/service/antigravity_token_provider.go @@ -14,6 +14,10 @@ const ( antigravityTokenRefreshSkew = 3 * time.Minute antigravityTokenCacheSkew = 5 * time.Minute antigravityBackfillCooldown = 5 * time.Minute + // antigravityRequestRefreshTimeout 请求路径上 token 刷新的最大等待时间。 + // 超过此时间直接放弃刷新、标记账号临时不可调度并触发 failover, + // 让后台 TokenRefreshService 在下个周期继续重试。 + antigravityRequestRefreshTimeout = 8 * time.Second ) // AntigravityTokenCache token cache interface. @@ -28,6 +32,7 @@ type AntigravityTokenProvider struct { refreshAPI *OAuthRefreshAPI executor OAuthRefreshExecutor refreshPolicy ProviderRefreshPolicy + tempUnschedCache TempUnschedCache // 用于同步更新 Redis 临时不可调度缓存 } func NewAntigravityTokenProvider( @@ -54,6 +59,11 @@ func (p *AntigravityTokenProvider) SetRefreshPolicy(policy ProviderRefreshPolicy p.refreshPolicy = policy } +// SetTempUnschedCache injects temp unschedulable cache for immediate scheduler sync. +func (p *AntigravityTokenProvider) SetTempUnschedCache(cache TempUnschedCache) { + p.tempUnschedCache = cache +} + // GetAccessToken returns a valid access_token. func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) { if account == nil { @@ -88,8 +98,13 @@ func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account * expiresAt := account.GetCredentialAsTime("expires_at") needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= antigravityTokenRefreshSkew if needsRefresh && p.refreshAPI != nil && p.executor != nil { - result, err := p.refreshAPI.RefreshIfNeeded(ctx, account, p.executor, antigravityTokenRefreshSkew) + // 请求路径使用短超时,避免代理不通时阻塞过久(后台刷新服务会继续重试) + refreshCtx, cancel := context.WithTimeout(ctx, antigravityRequestRefreshTimeout) + defer cancel() + result, err := p.refreshAPI.RefreshIfNeeded(refreshCtx, account, p.executor, antigravityTokenRefreshSkew) if err != nil { + // 标记账号临时不可调度,避免后续请求继续命中 + p.markTempUnschedulable(account, err) if p.refreshPolicy.OnRefreshError == ProviderRefreshErrorReturn { return "", err } @@ -172,6 +187,45 @@ func (p *AntigravityTokenProvider) shouldAttemptBackfill(accountID int64) bool { return true } +// markTempUnschedulable 在请求路径上 token 刷新失败时标记账号临时不可调度。 +// 同时写 DB 和 Redis 缓存,确保调度器立即跳过该账号。 +// 使用 background context 因为请求 context 可能已超时。 +func (p *AntigravityTokenProvider) markTempUnschedulable(account *Account, refreshErr error) { + if p.accountRepo == nil || account == nil { + return + } + now := time.Now() + until := now.Add(tokenRefreshTempUnschedDuration) + reason := "token refresh failed on request path: " + refreshErr.Error() + bgCtx := context.Background() + if err := p.accountRepo.SetTempUnschedulable(bgCtx, account.ID, until, reason); err != nil { + slog.Warn("antigravity_token_provider.set_temp_unschedulable_failed", + "account_id", account.ID, + "error", err, + ) + return + } + slog.Warn("antigravity_token_provider.temp_unschedulable_set", + "account_id", account.ID, + "until", until.Format(time.RFC3339), + "reason", reason, + ) + // 同步写 Redis 缓存,调度器立即生效 + if p.tempUnschedCache != nil { + state := &TempUnschedState{ + UntilUnix: until.Unix(), + TriggeredAtUnix: now.Unix(), + ErrorMessage: reason, + } + if err := p.tempUnschedCache.SetTempUnsched(bgCtx, account.ID, state); err != nil { + slog.Warn("antigravity_token_provider.temp_unsched_cache_set_failed", + "account_id", account.ID, + "error", err, + ) + } + } +} + func (p *AntigravityTokenProvider) markBackfillAttempted(accountID int64) { p.backfillCooldown.Store(accountID, time.Now()) } diff --git a/backend/internal/service/token_refresh_service.go b/backend/internal/service/token_refresh_service.go index cb8841b0..582afcd3 100644 --- a/backend/internal/service/token_refresh_service.go +++ b/backend/internal/service/token_refresh_service.go @@ -12,6 +12,9 @@ import ( "github.com/Wei-Shaw/sub2api/internal/config" ) +// tokenRefreshTempUnschedDuration token 刷新重试耗尽后临时不可调度的持续时间 +const tokenRefreshTempUnschedDuration = 10 * time.Minute + // TokenRefreshService OAuth token自动刷新服务 // 定期检查并刷新即将过期的token type TokenRefreshService struct { @@ -317,7 +320,7 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc } } - // 可重试错误耗尽:仅记录日志,不标记 error(可能是临时网络问题,下个周期继续重试) + // 可重试错误耗尽:临时标记账号不可调度,避免请求路径反复命中已知失败的账号 slog.Warn("token_refresh.retry_exhausted", "account_id", account.ID, "platform", account.Platform, @@ -325,6 +328,21 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc "error", lastErr, ) + // 设置临时不可调度 10 分钟(不标记 error,保持 status=active 让下个刷新周期能继续尝试) + until := time.Now().Add(tokenRefreshTempUnschedDuration) + reason := fmt.Sprintf("token refresh retry exhausted: %v", lastErr) + if setErr := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); setErr != nil { + slog.Warn("token_refresh.set_temp_unschedulable_failed", + "account_id", account.ID, + "error", setErr, + ) + } else { + slog.Info("token_refresh.temp_unschedulable_set", + "account_id", account.ID, + "until", until.Format(time.RFC3339), + ) + } + return lastErr } diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index a4c667be..fca8fd6d 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -114,11 +114,13 @@ func ProvideAntigravityTokenProvider( tokenCache GeminiTokenCache, antigravityOAuthService *AntigravityOAuthService, refreshAPI *OAuthRefreshAPI, + tempUnschedCache TempUnschedCache, ) *AntigravityTokenProvider { p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService) executor := NewAntigravityTokenRefresher(antigravityOAuthService) p.SetRefreshAPI(refreshAPI, executor) p.SetRefreshPolicy(AntigravityProviderRefreshPolicy()) + p.SetTempUnschedCache(tempUnschedCache) return p }