Files
sub2api/backend/internal/service/wire.go
erio 1262654d97 feat: WebSearch tri-state, account stats pricing fix, quota cache fix, usage tooltip
WebSearch tri-state switch:
- Account-level web_search_emulation changed from bool to tri-state
  string: "default" (follow channel) / "enabled" / "disabled"
- shouldEmulateWebSearch checks channel config when account is "default"
- SQL migration converts old bool values
- Frontend select replaces toggle in Edit/CreateAccountModal

Account stats pricing:
- resolveAccountStatsCost uses upstream model (post-mapping) for matching
- Priority: custom rules → model pricing file (when toggle on) → default
- Custom rules always configurable, independent of toggle
- Account ID field changed to searchable selector filtered by platform
- Description updated to reflect new behavior

Quota notification cache fix:
- CheckAccountQuotaAfterIncrement fetches real-time account from DB
- Reconstructs pre-increment usage for accurate threshold crossing detection
- New AccountQuotaReader interface (minimal: GetByID only)

Usage tooltip:
- Per-request/image billing shows per-request price instead of $0 token price
- Token billing continues to show input/output price per million tokens
2026-04-14 09:26:08 +08:00

489 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"database/sql"
"time"
dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/payment"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/wire"
"github.com/redis/go-redis/v9"
)
// BuildInfo contains build information
type BuildInfo struct {
Version string
BuildType string
}
// ProvidePricingService creates and initializes PricingService
func ProvidePricingService(cfg *config.Config, remoteClient PricingRemoteClient) (*PricingService, error) {
svc := NewPricingService(cfg, remoteClient)
if err := svc.Initialize(); err != nil {
// Pricing service initialization failure should not block startup, use fallback prices
println("[Service] Warning: Pricing service initialization failed:", err.Error())
}
return svc, nil
}
// ProvideUpdateService creates UpdateService with BuildInfo
func ProvideUpdateService(cache UpdateCache, githubClient GitHubReleaseClient, buildInfo BuildInfo) *UpdateService {
return NewUpdateService(cache, githubClient, buildInfo.Version, buildInfo.BuildType)
}
// ProvideEmailQueueService creates EmailQueueService with default worker count
func ProvideEmailQueueService(emailService *EmailService) *EmailQueueService {
return NewEmailQueueService(emailService, 3)
}
// ProvideTokenRefreshService creates and starts TokenRefreshService
func ProvideTokenRefreshService(
accountRepo AccountRepository,
oauthService *OAuthService,
openaiOAuthService *OpenAIOAuthService,
geminiOAuthService *GeminiOAuthService,
antigravityOAuthService *AntigravityOAuthService,
cacheInvalidator TokenCacheInvalidator,
schedulerCache SchedulerCache,
cfg *config.Config,
tempUnschedCache TempUnschedCache,
privacyClientFactory PrivacyClientFactory,
proxyRepo ProxyRepository,
refreshAPI *OAuthRefreshAPI,
) *TokenRefreshService {
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg, tempUnschedCache)
// 注入 OpenAI privacy opt-out 依赖
svc.SetPrivacyDeps(privacyClientFactory, proxyRepo)
// 注入统一 OAuth 刷新 API消除 TokenRefreshService 与 TokenProvider 之间的竞争条件)
svc.SetRefreshAPI(refreshAPI)
// 调用侧显式注入后台刷新策略,避免策略漂移
svc.SetRefreshPolicy(DefaultBackgroundRefreshPolicy())
svc.Start()
return svc
}
// ProvideClaudeTokenProvider creates ClaudeTokenProvider with OAuthRefreshAPI injection
func ProvideClaudeTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
oauthService *OAuthService,
refreshAPI *OAuthRefreshAPI,
) *ClaudeTokenProvider {
p := NewClaudeTokenProvider(accountRepo, tokenCache, oauthService)
executor := NewClaudeTokenRefresher(oauthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(ClaudeProviderRefreshPolicy())
return p
}
// ProvideOpenAITokenProvider creates OpenAITokenProvider with OAuthRefreshAPI injection
func ProvideOpenAITokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
openaiOAuthService *OpenAIOAuthService,
refreshAPI *OAuthRefreshAPI,
) *OpenAITokenProvider {
p := NewOpenAITokenProvider(accountRepo, tokenCache, openaiOAuthService)
executor := NewOpenAITokenRefresher(openaiOAuthService, accountRepo)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(OpenAIProviderRefreshPolicy())
return p
}
// ProvideGeminiTokenProvider creates GeminiTokenProvider with OAuthRefreshAPI injection
func ProvideGeminiTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
geminiOAuthService *GeminiOAuthService,
refreshAPI *OAuthRefreshAPI,
) *GeminiTokenProvider {
p := NewGeminiTokenProvider(accountRepo, tokenCache, geminiOAuthService)
executor := NewGeminiTokenRefresher(geminiOAuthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(GeminiProviderRefreshPolicy())
return p
}
// ProvideAntigravityTokenProvider creates AntigravityTokenProvider with OAuthRefreshAPI injection
func ProvideAntigravityTokenProvider(
accountRepo AccountRepository,
tokenCache GeminiTokenCache,
antigravityOAuthService *AntigravityOAuthService,
refreshAPI *OAuthRefreshAPI,
tempUnschedCache TempUnschedCache,
) *AntigravityTokenProvider {
p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService)
executor := NewAntigravityTokenRefresher(antigravityOAuthService)
p.SetRefreshAPI(refreshAPI, executor)
p.SetRefreshPolicy(AntigravityProviderRefreshPolicy())
p.SetTempUnschedCache(tempUnschedCache)
return p
}
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
svc.Start()
return svc
}
// ProvideUsageCleanupService 创建并启动使用记录清理任务服务
func ProvideUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboardAgg *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
svc := NewUsageCleanupService(repo, timingWheel, dashboardAgg, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
svc.Start()
return svc
}
// ProvideSubscriptionExpiryService creates and starts SubscriptionExpiryService.
func ProvideSubscriptionExpiryService(userSubRepo UserSubscriptionRepository) *SubscriptionExpiryService {
svc := NewSubscriptionExpiryService(userSubRepo, time.Minute)
svc.Start()
return svc
}
// ProvideTimingWheelService creates and starts TimingWheelService
func ProvideTimingWheelService() (*TimingWheelService, error) {
svc, err := NewTimingWheelService()
if err != nil {
return nil, err
}
svc.Start()
return svc, nil
}
// ProvideDeferredService creates and starts DeferredService
func ProvideDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService) *DeferredService {
svc := NewDeferredService(accountRepo, timingWheel, 10*time.Second)
svc.Start()
return svc
}
// ProvideConcurrencyService creates ConcurrencyService and starts slot cleanup worker.
func ProvideConcurrencyService(cache ConcurrencyCache, accountRepo AccountRepository, cfg *config.Config) *ConcurrencyService {
svc := NewConcurrencyService(cache)
if err := svc.CleanupStaleProcessSlots(context.Background()); err != nil {
logger.LegacyPrintf("service.concurrency", "Warning: startup cleanup stale process slots failed: %v", err)
}
if cfg != nil {
svc.StartSlotCleanupWorker(accountRepo, cfg.Gateway.Scheduling.SlotCleanupInterval)
}
return svc
}
// ProvideUserMessageQueueService 创建用户消息串行队列服务并启动清理 worker
func ProvideUserMessageQueueService(cache UserMsgQueueCache, rpmCache RPMCache, cfg *config.Config) *UserMessageQueueService {
svc := NewUserMessageQueueService(cache, rpmCache, &cfg.Gateway.UserMessageQueue)
if cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds > 0 {
svc.StartCleanupWorker(time.Duration(cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds) * time.Second)
}
return svc
}
// ProvideSchedulerSnapshotService creates and starts SchedulerSnapshotService.
func ProvideSchedulerSnapshotService(
cache SchedulerCache,
outboxRepo SchedulerOutboxRepository,
accountRepo AccountRepository,
groupRepo GroupRepository,
cfg *config.Config,
) *SchedulerSnapshotService {
svc := NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, groupRepo, cfg)
svc.Start()
return svc
}
// ProvideRateLimitService creates RateLimitService with optional dependencies.
func ProvideRateLimitService(
accountRepo AccountRepository,
usageRepo UsageLogRepository,
cfg *config.Config,
geminiQuotaService *GeminiQuotaService,
tempUnschedCache TempUnschedCache,
timeoutCounterCache TimeoutCounterCache,
settingService *SettingService,
tokenCacheInvalidator TokenCacheInvalidator,
) *RateLimitService {
svc := NewRateLimitService(accountRepo, usageRepo, cfg, geminiQuotaService, tempUnschedCache)
svc.SetTimeoutCounterCache(timeoutCounterCache)
svc.SetSettingService(settingService)
svc.SetTokenCacheInvalidator(tokenCacheInvalidator)
return svc
}
// ProvideOpsMetricsCollector creates and starts OpsMetricsCollector.
func ProvideOpsMetricsCollector(
opsRepo OpsRepository,
settingRepo SettingRepository,
accountRepo AccountRepository,
concurrencyService *ConcurrencyService,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsMetricsCollector {
collector := NewOpsMetricsCollector(opsRepo, settingRepo, accountRepo, concurrencyService, db, redisClient, cfg)
collector.Start()
return collector
}
// ProvideOpsAggregationService creates and starts OpsAggregationService (hourly/daily pre-aggregation).
func ProvideOpsAggregationService(
opsRepo OpsRepository,
settingRepo SettingRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAggregationService {
svc := NewOpsAggregationService(opsRepo, settingRepo, db, redisClient, cfg)
svc.Start()
return svc
}
// ProvideOpsAlertEvaluatorService creates and starts OpsAlertEvaluatorService.
func ProvideOpsAlertEvaluatorService(
opsService *OpsService,
opsRepo OpsRepository,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAlertEvaluatorService {
svc := NewOpsAlertEvaluatorService(opsService, opsRepo, emailService, redisClient, cfg)
svc.Start()
return svc
}
// ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled).
func ProvideOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsCleanupService {
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg)
svc.Start()
return svc
}
func ProvideOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
sink := NewOpsSystemLogSink(opsRepo)
sink.Start()
logger.SetSink(sink)
return sink
}
func buildIdempotencyConfig(cfg *config.Config) IdempotencyConfig {
idempotencyCfg := DefaultIdempotencyConfig()
if cfg != nil {
if cfg.Idempotency.DefaultTTLSeconds > 0 {
idempotencyCfg.DefaultTTL = time.Duration(cfg.Idempotency.DefaultTTLSeconds) * time.Second
}
if cfg.Idempotency.SystemOperationTTLSeconds > 0 {
idempotencyCfg.SystemOperationTTL = time.Duration(cfg.Idempotency.SystemOperationTTLSeconds) * time.Second
}
if cfg.Idempotency.ProcessingTimeoutSeconds > 0 {
idempotencyCfg.ProcessingTimeout = time.Duration(cfg.Idempotency.ProcessingTimeoutSeconds) * time.Second
}
if cfg.Idempotency.FailedRetryBackoffSeconds > 0 {
idempotencyCfg.FailedRetryBackoff = time.Duration(cfg.Idempotency.FailedRetryBackoffSeconds) * time.Second
}
if cfg.Idempotency.MaxStoredResponseLen > 0 {
idempotencyCfg.MaxStoredResponseLen = cfg.Idempotency.MaxStoredResponseLen
}
idempotencyCfg.ObserveOnly = cfg.Idempotency.ObserveOnly
}
return idempotencyCfg
}
func ProvideIdempotencyCoordinator(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCoordinator {
coordinator := NewIdempotencyCoordinator(repo, buildIdempotencyConfig(cfg))
SetDefaultIdempotencyCoordinator(coordinator)
return coordinator
}
func ProvideSystemOperationLockService(repo IdempotencyRepository, cfg *config.Config) *SystemOperationLockService {
return NewSystemOperationLockService(repo, buildIdempotencyConfig(cfg))
}
func ProvideIdempotencyCleanupService(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCleanupService {
svc := NewIdempotencyCleanupService(repo, cfg)
svc.Start()
return svc
}
// ProvideScheduledTestService creates ScheduledTestService.
func ProvideScheduledTestService(
planRepo ScheduledTestPlanRepository,
resultRepo ScheduledTestResultRepository,
) *ScheduledTestService {
return NewScheduledTestService(planRepo, resultRepo)
}
// ProvideScheduledTestRunnerService creates and starts ScheduledTestRunnerService.
func ProvideScheduledTestRunnerService(
planRepo ScheduledTestPlanRepository,
scheduledSvc *ScheduledTestService,
accountTestSvc *AccountTestService,
rateLimitSvc *RateLimitService,
cfg *config.Config,
) *ScheduledTestRunnerService {
svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, rateLimitSvc, cfg)
svc.Start()
return svc
}
// ProvideOpsScheduledReportService creates and starts OpsScheduledReportService.
func ProvideOpsScheduledReportService(
opsService *OpsService,
userService *UserService,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsScheduledReportService {
svc := NewOpsScheduledReportService(opsService, userService, emailService, redisClient, cfg)
svc.Start()
return svc
}
// ProvideAPIKeyAuthCacheInvalidator 提供 API Key 认证缓存失效能力
func ProvideAPIKeyAuthCacheInvalidator(apiKeyService *APIKeyService) APIKeyAuthCacheInvalidator {
// Start Pub/Sub subscriber for L1 cache invalidation across instances
apiKeyService.StartAuthCacheInvalidationSubscriber(context.Background())
return apiKeyService
}
// ProvideBackupService creates and starts BackupService
func ProvideBackupService(
settingRepo SettingRepository,
cfg *config.Config,
encryptor SecretEncryptor,
storeFactory BackupObjectStoreFactory,
dumper DBDumper,
) *BackupService {
svc := NewBackupService(settingRepo, cfg, encryptor, storeFactory, dumper)
svc.Start()
return svc
}
// ProvideSettingService wires SettingService with group reader and proxy repo.
func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, proxyRepo ProxyRepository, cfg *config.Config) *SettingService {
svc := NewSettingService(settingRepo, cfg)
svc.SetDefaultSubscriptionGroupReader(groupRepo)
svc.SetProxyRepository(proxyRepo)
return svc
}
// ProviderSet is the Wire provider set for all services
var ProviderSet = wire.NewSet(
// Core services
NewAuthService,
NewUserService,
NewAPIKeyService,
ProvideAPIKeyAuthCacheInvalidator,
NewGroupService,
NewAccountService,
NewProxyService,
NewRedeemService,
NewPromoService,
NewUsageService,
NewDashboardService,
ProvidePricingService,
NewBillingService,
NewBillingCacheService,
NewAnnouncementService,
NewAdminService,
NewGatewayService,
NewOpenAIGatewayService,
NewOAuthService,
NewOpenAIOAuthService,
NewGeminiOAuthService,
NewGeminiQuotaService,
NewCompositeTokenCacheInvalidator,
wire.Bind(new(TokenCacheInvalidator), new(*CompositeTokenCacheInvalidator)),
NewAntigravityOAuthService,
NewOAuthRefreshAPI,
ProvideGeminiTokenProvider,
NewGeminiMessagesCompatService,
ProvideAntigravityTokenProvider,
ProvideOpenAITokenProvider,
ProvideClaudeTokenProvider,
NewAntigravityGatewayService,
ProvideRateLimitService,
NewAccountUsageService,
NewAccountTestService,
ProvideSettingService,
NewDataManagementService,
ProvideBackupService,
ProvideOpsSystemLogSink,
NewOpsService,
ProvideOpsMetricsCollector,
ProvideOpsAggregationService,
ProvideOpsAlertEvaluatorService,
ProvideOpsCleanupService,
ProvideOpsScheduledReportService,
NewEmailService,
ProvideEmailQueueService,
NewTurnstileService,
NewSubscriptionService,
wire.Bind(new(DefaultSubscriptionAssigner), new(*SubscriptionService)),
ProvideConcurrencyService,
ProvideUserMessageQueueService,
NewUsageRecordWorkerPool,
ProvideSchedulerSnapshotService,
NewIdentityService,
NewCRSSyncService,
ProvideUpdateService,
ProvideTokenRefreshService,
ProvideAccountExpiryService,
ProvideSubscriptionExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideUsageCleanupService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,
NewUsageCache,
NewTotpService,
NewErrorPassthroughService,
NewTLSFingerprintProfileService,
NewDigestSessionStore,
ProvideIdempotencyCoordinator,
ProvideSystemOperationLockService,
ProvideIdempotencyCleanupService,
ProvideScheduledTestService,
ProvideScheduledTestRunnerService,
NewGroupCapacityService,
NewChannelService,
NewModelPricingResolver,
ProvidePaymentConfigService,
NewPaymentService,
ProvidePaymentOrderExpiryService,
ProvideBalanceNotifyService,
)
// ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named
// payment.EncryptionKey type instead of raw []byte, avoiding Wire ambiguity.
func ProvidePaymentConfigService(entClient *dbent.Client, settingRepo SettingRepository, key payment.EncryptionKey) *PaymentConfigService {
return NewPaymentConfigService(entClient, settingRepo, []byte(key))
}
// ProvideBalanceNotifyService creates BalanceNotifyService
func ProvideBalanceNotifyService(emailService *EmailService, settingRepo SettingRepository, accountRepo AccountRepository) *BalanceNotifyService {
return NewBalanceNotifyService(emailService, settingRepo, accountRepo)
}
// ProvidePaymentOrderExpiryService creates and starts PaymentOrderExpiryService.
func ProvidePaymentOrderExpiryService(paymentSvc *PaymentService) *PaymentOrderExpiryService {
svc := NewPaymentOrderExpiryService(paymentSvc, 60*time.Second)
svc.Start()
return svc
}