Files
sub2api/backend/internal/service/wire.go
erio c2f9ad7a21 refactor(channel-monitor): event-driven scheduler + sidebar cleanup
后端 - ChannelMonitorRunner 重写为事件驱动调度
- 删除 5 秒轮询架构(每次 ListEnabled + listDueForCheck 全表扫描),
  改为每个 enabled monitor 一个独立 goroutine + ticker(按各自 IntervalSeconds)
- 新增 MonitorScheduler 接口,service 通过 setter 注入避免依赖环
- ChannelMonitorService.Create/Update/Delete 直接回调 scheduler.Schedule/Unschedule
- runner.Start 一次性加载所有 enabled monitor 建立任务表
- 新建/启用立即触发首次检测,禁用/删除即时撤销 ticker
- 保留 inFlight 去重 + pond 池并发上限 + 全局开关每次 fire 实时校验
- 删除 listDueForCheck / monitorTickerInterval / monitorListDueTimeout

前端 - 可用渠道改为用户级菜单
- 从 adminNavItems 移除 /available-channels(admin 主菜单不再重复出现)
- buildSelfNavItems 始终包含可用渠道入口,普通用户主菜单和
  管理员"我的账户"区都能看到
2026-04-22 19:17:08 +08:00

515 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"database/sql"
"time"
dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/payment"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/wire"
"github.com/redis/go-redis/v9"
)
// BuildInfo contains build information
type BuildInfo struct {
Version string
BuildType string
}
// ProvidePricingService creates and initializes PricingService
func ProvidePricingService(cfg *config.Config, remoteClient PricingRemoteClient) (*PricingService, error) {
svc := NewPricingService(cfg, remoteClient)
if err := svc.Initialize(); err != nil {
// Pricing service initialization failure should not block startup, use fallback prices
println("[Service] Warning: Pricing service initialization failed:", err.Error())
}
return svc, nil
}
// ProvideUpdateService creates UpdateService with BuildInfo
func ProvideUpdateService(cache UpdateCache, githubClient GitHubReleaseClient, buildInfo BuildInfo) *UpdateService {
return NewUpdateService(cache, githubClient, buildInfo.Version, buildInfo.BuildType)
}
// ProvideEmailQueueService creates EmailQueueService with default worker count
func ProvideEmailQueueService(emailService *EmailService) *EmailQueueService {
return NewEmailQueueService(emailService, 3)
}
// ProvideTokenRefreshService creates and starts TokenRefreshService
func ProvideTokenRefreshService(
accountRepo AccountRepository,
oauthService *OAuthService,
openaiOAuthService *OpenAIOAuthService,
geminiOAuthService *GeminiOAuthService,
antigravityOAuthService *AntigravityOAuthService,
cacheInvalidator TokenCacheInvalidator,
schedulerCache SchedulerCache,
cfg *config.Config,
tempUnschedCache TempUnschedCache,
privacyClientFactory PrivacyClientFactory,
proxyRepo ProxyRepository,
refreshAPI *OAuthRefreshAPI,
) *TokenRefreshService {
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg, tempUnschedCache)
// 注入 OpenAI privacy opt-out 依赖
svc.SetPrivacyDeps(privacyClientFactory, proxyRepo)
// 注入统一 OAuth 刷新 API消除 TokenRefreshService 与 TokenProvider 之间的竞争条件)
svc.SetRefreshAPI(refreshAPI)
// 调用侧显式注入后台刷新策略,避免策略漂移
svc.SetRefreshPolicy(DefaultBackgroundRefreshPolicy())
svc.Start()
return svc
}
// ProvideClaudeTokenProvider creates ClaudeTokenProvider with OAuthRefreshAPI injection
func ProvideClaudeTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
oauthService *OAuthService,
refreshAPI *OAuthRefreshAPI,
) *ClaudeTokenProvider {
p := NewClaudeTokenProvider(accountRepo, tokenCache, oauthService)
executor := NewClaudeTokenRefresher(oauthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(ClaudeProviderRefreshPolicy())
return p
}
// ProvideOpenAITokenProvider creates OpenAITokenProvider with OAuthRefreshAPI injection
func ProvideOpenAITokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
openaiOAuthService *OpenAIOAuthService,
refreshAPI *OAuthRefreshAPI,
) *OpenAITokenProvider {
p := NewOpenAITokenProvider(accountRepo, tokenCache, openaiOAuthService)
executor := NewOpenAITokenRefresher(openaiOAuthService, accountRepo)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(OpenAIProviderRefreshPolicy())
return p
}
// ProvideGeminiTokenProvider creates GeminiTokenProvider with OAuthRefreshAPI injection
func ProvideGeminiTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
geminiOAuthService *GeminiOAuthService,
refreshAPI *OAuthRefreshAPI,
) *GeminiTokenProvider {
p := NewGeminiTokenProvider(accountRepo, tokenCache, geminiOAuthService)
executor := NewGeminiTokenRefresher(geminiOAuthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(GeminiProviderRefreshPolicy())
return p
}
// ProvideAntigravityTokenProvider creates AntigravityTokenProvider with OAuthRefreshAPI injection
func ProvideAntigravityTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
antigravityOAuthService *AntigravityOAuthService,
refreshAPI *OAuthRefreshAPI,
tempUnschedCache TempUnschedCache,
) *AntigravityTokenProvider {
p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService)
executor := NewAntigravityTokenRefresher(antigravityOAuthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(AntigravityProviderRefreshPolicy())
p.SetTempUnschedCache(tempUnschedCache)
return p
}
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
svc.Start()
return svc
}
// ProvideUsageCleanupService 创建并启动使用记录清理任务服务
func ProvideUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboardAgg *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
svc := NewUsageCleanupService(repo, timingWheel, dashboardAgg, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
svc.Start()
return svc
}
// ProvideSubscriptionExpiryService creates and starts SubscriptionExpiryService.
func ProvideSubscriptionExpiryService(userSubRepo UserSubscriptionRepository) *SubscriptionExpiryService {
svc := NewSubscriptionExpiryService(userSubRepo, time.Minute)
svc.Start()
return svc
}
// ProvideTimingWheelService creates and starts TimingWheelService
func ProvideTimingWheelService() (*TimingWheelService, error) {
svc, err := NewTimingWheelService()
if err != nil {
return nil, err
}
svc.Start()
return svc, nil
}
// ProvideDeferredService creates and starts DeferredService
func ProvideDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService) *DeferredService {
svc := NewDeferredService(accountRepo, timingWheel, 10*time.Second)
svc.Start()
return svc
}
// ProvideConcurrencyService creates ConcurrencyService and starts slot cleanup worker.
func ProvideConcurrencyService(cache ConcurrencyCache, accountRepo AccountRepository, cfg *config.Config) *ConcurrencyService {
svc := NewConcurrencyService(cache)
if err := svc.CleanupStaleProcessSlots(context.Background()); err != nil {
logger.LegacyPrintf("service.concurrency", "Warning: startup cleanup stale process slots failed: %v", err)
}
if cfg != nil {
svc.StartSlotCleanupWorker(accountRepo, cfg.Gateway.Scheduling.SlotCleanupInterval)
}
return svc
}
// ProvideUserMessageQueueService 创建用户消息串行队列服务并启动清理 worker
func ProvideUserMessageQueueService(cache UserMsgQueueCache, rpmCache RPMCache, cfg *config.Config) *UserMessageQueueService {
svc := NewUserMessageQueueService(cache, rpmCache, &cfg.Gateway.UserMessageQueue)
if cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds > 0 {
svc.StartCleanupWorker(time.Duration(cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds) * time.Second)
}
return svc
}
// ProvideSchedulerSnapshotService creates and starts SchedulerSnapshotService.
func ProvideSchedulerSnapshotService(
cache SchedulerCache,
outboxRepo SchedulerOutboxRepository,
accountRepo AccountRepository,
groupRepo GroupRepository,
cfg *config.Config,
) *SchedulerSnapshotService {
svc := NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, groupRepo, cfg)
svc.Start()
return svc
}
// ProvideRateLimitService creates RateLimitService with optional dependencies.
func ProvideRateLimitService(
accountRepo AccountRepository,
usageRepo UsageLogRepository,
cfg *config.Config,
geminiQuotaService *GeminiQuotaService,
tempUnschedCache TempUnschedCache,
timeoutCounterCache TimeoutCounterCache,
settingService *SettingService,
tokenCacheInvalidator TokenCacheInvalidator,
) *RateLimitService {
svc := NewRateLimitService(accountRepo, usageRepo, cfg, geminiQuotaService, tempUnschedCache)
svc.SetTimeoutCounterCache(timeoutCounterCache)
svc.SetSettingService(settingService)
svc.SetTokenCacheInvalidator(tokenCacheInvalidator)
return svc
}
// ProvideOpsMetricsCollector creates and starts OpsMetricsCollector.
func ProvideOpsMetricsCollector(
opsRepo OpsRepository,
settingRepo SettingRepository,
accountRepo AccountRepository,
concurrencyService *ConcurrencyService,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsMetricsCollector {
collector := NewOpsMetricsCollector(opsRepo, settingRepo, accountRepo, concurrencyService, db, redisClient, cfg)
collector.Start()
return collector
}
// ProvideOpsAggregationService creates and starts OpsAggregationService (hourly/daily pre-aggregation).
func ProvideOpsAggregationService(
opsRepo OpsRepository,
settingRepo SettingRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAggregationService {
svc := NewOpsAggregationService(opsRepo, settingRepo, db, redisClient, cfg)
svc.Start()
return svc
}
// ProvideOpsAlertEvaluatorService creates and starts OpsAlertEvaluatorService.
func ProvideOpsAlertEvaluatorService(
opsService *OpsService,
opsRepo OpsRepository,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAlertEvaluatorService {
svc := NewOpsAlertEvaluatorService(opsService, opsRepo, emailService, redisClient, cfg)
svc.Start()
return svc
}
// ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled).
// channelMonitorSvc 让维护任务(聚合 + 历史/聚合软删)跟随 ops 清理 cron 一起跑,
// 共享 leader lock + heartbeat。
func ProvideOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
channelMonitorSvc *ChannelMonitorService,
) *OpsCleanupService {
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg, channelMonitorSvc)
svc.Start()
return svc
}
func ProvideOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
sink := NewOpsSystemLogSink(opsRepo)
sink.Start()
logger.SetSink(sink)
return sink
}
func buildIdempotencyConfig(cfg *config.Config) IdempotencyConfig {
idempotencyCfg := DefaultIdempotencyConfig()
if cfg != nil {
if cfg.Idempotency.DefaultTTLSeconds > 0 {
idempotencyCfg.DefaultTTL = time.Duration(cfg.Idempotency.DefaultTTLSeconds) * time.Second
}
if cfg.Idempotency.SystemOperationTTLSeconds > 0 {
idempotencyCfg.SystemOperationTTL = time.Duration(cfg.Idempotency.SystemOperationTTLSeconds) * time.Second
}
if cfg.Idempotency.ProcessingTimeoutSeconds > 0 {
idempotencyCfg.ProcessingTimeout = time.Duration(cfg.Idempotency.ProcessingTimeoutSeconds) * time.Second
}
if cfg.Idempotency.FailedRetryBackoffSeconds > 0 {
idempotencyCfg.FailedRetryBackoff = time.Duration(cfg.Idempotency.FailedRetryBackoffSeconds) * time.Second
}
if cfg.Idempotency.MaxStoredResponseLen > 0 {
idempotencyCfg.MaxStoredResponseLen = cfg.Idempotency.MaxStoredResponseLen
}
idempotencyCfg.ObserveOnly = cfg.Idempotency.ObserveOnly
}
return idempotencyCfg
}
func ProvideIdempotencyCoordinator(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCoordinator {
coordinator := NewIdempotencyCoordinator(repo, buildIdempotencyConfig(cfg))
SetDefaultIdempotencyCoordinator(coordinator)
return coordinator
}
func ProvideSystemOperationLockService(repo IdempotencyRepository, cfg *config.Config) *SystemOperationLockService {
return NewSystemOperationLockService(repo, buildIdempotencyConfig(cfg))
}
func ProvideIdempotencyCleanupService(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCleanupService {
svc := NewIdempotencyCleanupService(repo, cfg)
svc.Start()
return svc
}
// ProvideScheduledTestService creates ScheduledTestService.
func ProvideScheduledTestService(
planRepo ScheduledTestPlanRepository,
resultRepo ScheduledTestResultRepository,
) *ScheduledTestService {
return NewScheduledTestService(planRepo, resultRepo)
}
// ProvideScheduledTestRunnerService creates and starts ScheduledTestRunnerService.
func ProvideScheduledTestRunnerService(
planRepo ScheduledTestPlanRepository,
scheduledSvc *ScheduledTestService,
accountTestSvc *AccountTestService,
rateLimitSvc *RateLimitService,
cfg *config.Config,
) *ScheduledTestRunnerService {
svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, rateLimitSvc, cfg)
svc.Start()
return svc
}
// ProvideOpsScheduledReportService creates and starts OpsScheduledReportService.
func ProvideOpsScheduledReportService(
opsService *OpsService,
userService *UserService,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsScheduledReportService {
svc := NewOpsScheduledReportService(opsService, userService, emailService, redisClient, cfg)
svc.Start()
return svc
}
// ProvideAPIKeyAuthCacheInvalidator 提供 API Key 认证缓存失效能力
func ProvideAPIKeyAuthCacheInvalidator(apiKeyService *APIKeyService) APIKeyAuthCacheInvalidator {
// Start Pub/Sub subscriber for L1 cache invalidation across instances
apiKeyService.StartAuthCacheInvalidationSubscriber(context.Background())
return apiKeyService
}
// ProvideBackupService creates and starts BackupService
func ProvideBackupService(
settingRepo SettingRepository,
cfg *config.Config,
encryptor SecretEncryptor,
storeFactory BackupObjectStoreFactory,
dumper DBDumper,
) *BackupService {
svc := NewBackupService(settingRepo, cfg, encryptor, storeFactory, dumper)
svc.Start()
return svc
}
// ProvideSettingService wires SettingService with group reader and proxy repo.
func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, proxyRepo ProxyRepository, cfg *config.Config) *SettingService {
svc := NewSettingService(settingRepo, cfg)
svc.SetDefaultSubscriptionGroupReader(groupRepo)
svc.SetProxyRepository(proxyRepo)
return svc
}
// ProviderSet is the Wire provider set for all services
var ProviderSet = wire.NewSet(
// Core services
NewAuthService,
NewUserService,
NewAPIKeyService,
ProvideAPIKeyAuthCacheInvalidator,
NewGroupService,
NewAccountService,
NewProxyService,
NewRedeemService,
NewPromoService,
NewUsageService,
NewDashboardService,
ProvidePricingService,
NewBillingService,
NewBillingCacheService,
NewAnnouncementService,
NewAdminService,
NewGatewayService,
NewOpenAIGatewayService,
NewOAuthService,
NewOpenAIOAuthService,
NewGeminiOAuthService,
NewGeminiQuotaService,
NewCompositeTokenCacheInvalidator,
wire.Bind(new(TokenCacheInvalidator), new(*CompositeTokenCacheInvalidator)),
NewAntigravityOAuthService,
NewOAuthRefreshAPI,
ProvideGeminiTokenProvider,
NewGeminiMessagesCompatService,
ProvideAntigravityTokenProvider,
ProvideOpenAITokenProvider,
ProvideClaudeTokenProvider,
NewAntigravityGatewayService,
ProvideRateLimitService,
NewAccountUsageService,
NewAccountTestService,
ProvideSettingService,
NewDataManagementService,
ProvideBackupService,
ProvideOpsSystemLogSink,
NewOpsService,
ProvideOpsMetricsCollector,
ProvideOpsAggregationService,
ProvideOpsAlertEvaluatorService,
ProvideOpsCleanupService,
ProvideOpsScheduledReportService,
NewEmailService,
ProvideEmailQueueService,
NewTurnstileService,
NewSubscriptionService,
wire.Bind(new(DefaultSubscriptionAssigner), new(*SubscriptionService)),
ProvideConcurrencyService,
ProvideUserMessageQueueService,
NewUsageRecordWorkerPool,
ProvideSchedulerSnapshotService,
NewIdentityService,
NewCRSSyncService,
ProvideUpdateService,
ProvideTokenRefreshService,
ProvideAccountExpiryService,
ProvideSubscriptionExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideUsageCleanupService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,
NewUsageCache,
NewTotpService,
NewErrorPassthroughService,
NewTLSFingerprintProfileService,
NewDigestSessionStore,
ProvideIdempotencyCoordinator,
ProvideSystemOperationLockService,
ProvideIdempotencyCleanupService,
ProvideScheduledTestService,
ProvideScheduledTestRunnerService,
NewGroupCapacityService,
NewChannelService,
NewModelPricingResolver,
ProvidePaymentConfigService,
NewPaymentService,
ProvidePaymentOrderExpiryService,
ProvideBalanceNotifyService,
ProvideChannelMonitorService,
ProvideChannelMonitorRunner,
NewChannelMonitorRequestTemplateService,
)
// ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named
// payment.EncryptionKey type instead of raw []byte, avoiding Wire ambiguity.
func ProvidePaymentConfigService(entClient *dbent.Client, settingRepo SettingRepository, key payment.EncryptionKey) *PaymentConfigService {
return NewPaymentConfigService(entClient, settingRepo, []byte(key))
}
// ProvideBalanceNotifyService creates BalanceNotifyService
func ProvideBalanceNotifyService(emailService *EmailService, settingRepo SettingRepository, accountRepo AccountRepository) *BalanceNotifyService {
return NewBalanceNotifyService(emailService, settingRepo, accountRepo)
}
// ProvidePaymentOrderExpiryService creates and starts PaymentOrderExpiryService.
func ProvidePaymentOrderExpiryService(paymentSvc *PaymentService) *PaymentOrderExpiryService {
svc := NewPaymentOrderExpiryService(paymentSvc, 60*time.Second)
svc.Start()
return svc
}
// ProvideChannelMonitorService 创建渠道监控服务CRUD + RunCheck + 用户视图聚合)。
// 加密器复用 wire 中已注入的 SecretEncryptorAES-256-GCM
func ProvideChannelMonitorService(
repo ChannelMonitorRepository,
encryptor SecretEncryptor,
) *ChannelMonitorService {
return NewChannelMonitorService(repo, encryptor)
}
// ProvideChannelMonitorRunner 创建并启动渠道监控调度器。
// 通过 SetScheduler 注入回 service 后再 Start确保启动时加载所有 enabled monitor
// 后续 CRUD 也能即时同步任务表。Runner.Stop 由 cleanup function 调用。
// settingService 用于 runner 每次 fire 读取功能开关。
func ProvideChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner {
r := NewChannelMonitorRunner(svc, settingService)
svc.SetScheduler(r)
r.Start()
return r
}