fix(gateway): 修复 OAuth token 刷新后调度器缓存不一致问题
Token 刷新成功后,调度器缓存中的 Account 对象仍包含旧的 credentials, 导致在 Outbox 异步更新之前(最多 1 秒窗口)请求使用过期 token, 返回 403 错误(OAuth token has been revoked)。 修复方案:在 token 刷新成功后同步更新调度器缓存,确保调度获取的 Account 对象立即包含最新的 access_token 和 _token_version。 此修复覆盖所有 OAuth 平台:OpenAI、Claude、Gemini、Antigravity。
This commit is contained in:
@@ -188,7 +188,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
|||||||
opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig)
|
opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig)
|
||||||
opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig)
|
opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig)
|
||||||
opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig)
|
opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig)
|
||||||
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, configConfig)
|
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig)
|
||||||
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
|
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
|
||||||
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
|
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
|
||||||
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
|
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ type TokenRefreshService struct {
|
|||||||
refreshers []TokenRefresher
|
refreshers []TokenRefresher
|
||||||
cfg *config.TokenRefreshConfig
|
cfg *config.TokenRefreshConfig
|
||||||
cacheInvalidator TokenCacheInvalidator
|
cacheInvalidator TokenCacheInvalidator
|
||||||
|
schedulerCache SchedulerCache // 用于同步更新调度器缓存,解决 token 刷新后缓存不一致问题
|
||||||
|
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@@ -31,12 +32,14 @@ func NewTokenRefreshService(
|
|||||||
geminiOAuthService *GeminiOAuthService,
|
geminiOAuthService *GeminiOAuthService,
|
||||||
antigravityOAuthService *AntigravityOAuthService,
|
antigravityOAuthService *AntigravityOAuthService,
|
||||||
cacheInvalidator TokenCacheInvalidator,
|
cacheInvalidator TokenCacheInvalidator,
|
||||||
|
schedulerCache SchedulerCache,
|
||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *TokenRefreshService {
|
) *TokenRefreshService {
|
||||||
s := &TokenRefreshService{
|
s := &TokenRefreshService{
|
||||||
accountRepo: accountRepo,
|
accountRepo: accountRepo,
|
||||||
cfg: &cfg.TokenRefresh,
|
cfg: &cfg.TokenRefresh,
|
||||||
cacheInvalidator: cacheInvalidator,
|
cacheInvalidator: cacheInvalidator,
|
||||||
|
schedulerCache: schedulerCache,
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -198,6 +201,15 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
|
|||||||
log.Printf("[TokenRefresh] Token cache invalidated for account %d", account.ID)
|
log.Printf("[TokenRefresh] Token cache invalidated for account %d", account.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 同步更新调度器缓存,确保调度获取的 Account 对象包含最新的 credentials
|
||||||
|
// 这解决了 token 刷新后调度器缓存数据不一致的问题(#445)
|
||||||
|
if s.schedulerCache != nil {
|
||||||
|
if err := s.schedulerCache.SetAccount(ctx, account); err != nil {
|
||||||
|
log.Printf("[TokenRefresh] Failed to sync scheduler cache for account %d: %v", account.ID, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("[TokenRefresh] Scheduler cache synced for account %d", account.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ func TestTokenRefreshService_RefreshWithRetry_InvalidatesCache(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 5,
|
ID: 5,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -98,7 +98,7 @@ func TestTokenRefreshService_RefreshWithRetry_InvalidatorErrorIgnored(t *testing
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 6,
|
ID: 6,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -124,7 +124,7 @@ func TestTokenRefreshService_RefreshWithRetry_NilInvalidator(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, nil, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, nil, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 7,
|
ID: 7,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -151,7 +151,7 @@ func TestTokenRefreshService_RefreshWithRetry_Antigravity(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 8,
|
ID: 8,
|
||||||
Platform: PlatformAntigravity,
|
Platform: PlatformAntigravity,
|
||||||
@@ -179,7 +179,7 @@ func TestTokenRefreshService_RefreshWithRetry_NonOAuthAccount(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 9,
|
ID: 9,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -207,7 +207,7 @@ func TestTokenRefreshService_RefreshWithRetry_OtherPlatformOAuth(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 10,
|
ID: 10,
|
||||||
Platform: PlatformOpenAI, // OpenAI OAuth 账户
|
Platform: PlatformOpenAI, // OpenAI OAuth 账户
|
||||||
@@ -235,7 +235,7 @@ func TestTokenRefreshService_RefreshWithRetry_UpdateFailed(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 11,
|
ID: 11,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -264,7 +264,7 @@ func TestTokenRefreshService_RefreshWithRetry_RefreshFailed(t *testing.T) {
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 12,
|
ID: 12,
|
||||||
Platform: PlatformGemini,
|
Platform: PlatformGemini,
|
||||||
@@ -291,7 +291,7 @@ func TestTokenRefreshService_RefreshWithRetry_AntigravityRefreshFailed(t *testin
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 13,
|
ID: 13,
|
||||||
Platform: PlatformAntigravity,
|
Platform: PlatformAntigravity,
|
||||||
@@ -318,7 +318,7 @@ func TestTokenRefreshService_RefreshWithRetry_AntigravityNonRetryableError(t *te
|
|||||||
RetryBackoffSeconds: 0,
|
RetryBackoffSeconds: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, cfg)
|
service := NewTokenRefreshService(repo, nil, nil, nil, nil, invalidator, nil, cfg)
|
||||||
account := &Account{
|
account := &Account{
|
||||||
ID: 14,
|
ID: 14,
|
||||||
Platform: PlatformAntigravity,
|
Platform: PlatformAntigravity,
|
||||||
|
|||||||
@@ -44,9 +44,10 @@ func ProvideTokenRefreshService(
|
|||||||
geminiOAuthService *GeminiOAuthService,
|
geminiOAuthService *GeminiOAuthService,
|
||||||
antigravityOAuthService *AntigravityOAuthService,
|
antigravityOAuthService *AntigravityOAuthService,
|
||||||
cacheInvalidator TokenCacheInvalidator,
|
cacheInvalidator TokenCacheInvalidator,
|
||||||
|
schedulerCache SchedulerCache,
|
||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *TokenRefreshService {
|
) *TokenRefreshService {
|
||||||
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, cfg)
|
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg)
|
||||||
svc.Start()
|
svc.Start()
|
||||||
return svc
|
return svc
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user