diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index ec8b9dec..60bb17d5 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -229,8 +229,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) - leaderLocker := repository.NewLeaderLocker(db, redisClient) - scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, leaderLocker, configConfig) + scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, configConfig) v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService) application := &Application{ Server: httpServer, diff --git a/backend/internal/repository/scheduled_test_repo.go b/backend/internal/repository/scheduled_test_repo.go index 618edee2..60627702 100644 --- a/backend/internal/repository/scheduled_test_repo.go +++ b/backend/internal/repository/scheduled_test_repo.go @@ -3,14 +3,9 @@ package repository import ( "context" "database/sql" - "hash/fnv" - "sync" "time" - "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/service" - "github.com/google/uuid" - "github.com/redis/go-redis/v9" ) // --- Plan Repository --- @@ -186,83 +181,3 @@ func scanPlans(rows *sql.Rows) ([]*service.ScheduledTestPlan, error) { } return plans, rows.Err() } - -// --- LeaderLocker --- - -var redisReleaseScript = redis.NewScript(` -if redis.call("GET", KEYS[1]) == ARGV[1] then - return redis.call("DEL", KEYS[1]) -end -return 0 -`) - -type leaderLocker struct { - db *sql.DB - redisClient *redis.Client - instanceID string - warnNoRedisOnce sync.Once -} - -// NewLeaderLocker creates a LeaderLocker that tries Redis first, then falls back to pg advisory lock. -func NewLeaderLocker(db *sql.DB, redisClient *redis.Client) service.LeaderLocker { - return &leaderLocker{ - db: db, - redisClient: redisClient, - instanceID: uuid.NewString(), - } -} - -func (l *leaderLocker) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool) { - if l.redisClient != nil { - ok, err := l.redisClient.SetNX(ctx, key, l.instanceID, ttl).Result() - if err == nil { - if !ok { - return nil, false - } - return func() { - _, _ = redisReleaseScript.Run(ctx, l.redisClient, []string{key}, l.instanceID).Result() - }, true - } - l.warnNoRedisOnce.Do(func() { - logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis SetNX failed; falling back to DB advisory lock: %v", err) - }) - } else { - l.warnNoRedisOnce.Do(func() { - logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis not configured; using DB advisory lock") - }) - } - - // Fallback: pg_try_advisory_lock - return tryAdvisoryLock(ctx, l.db, hashLockID(key)) -} - -func tryAdvisoryLock(ctx context.Context, db *sql.DB, lockID int64) (func(), bool) { - if db == nil { - return nil, false - } - conn, err := db.Conn(ctx) - if err != nil { - return nil, false - } - acquired := false - if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired); err != nil { - _ = conn.Close() - return nil, false - } - if !acquired { - _ = conn.Close() - return nil, false - } - return func() { - unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - _, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID) - _ = conn.Close() - }, true -} - -func hashLockID(key string) int64 { - h := fnv.New64a() - _, _ = h.Write([]byte(key)) - return int64(h.Sum64()) -} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 382004f3..5fe7a98e 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -56,7 +56,6 @@ var ProviderSet = wire.NewSet( NewSoraAccountRepository, // Sora 账号扩展表仓储 NewScheduledTestPlanRepository, // 定时测试计划仓储 NewScheduledTestResultRepository, // 定时测试结果仓储 - NewLeaderLocker, // 分布式 leader 选举 NewProxyRepository, NewRedeemCodeRepository, NewPromoCodeRepository, diff --git a/backend/internal/service/scheduled_test_port.go b/backend/internal/service/scheduled_test_port.go index 4da33e6d..f795ba00 100644 --- a/backend/internal/service/scheduled_test_port.go +++ b/backend/internal/service/scheduled_test_port.go @@ -59,10 +59,3 @@ type ScheduledTestResultRepository interface { ListByPlanID(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error) PruneOldResults(ctx context.Context, planID int64, keepCount int) error } - -// LeaderLocker provides distributed leader election for background runners. -// TryAcquire attempts to acquire a named lock and returns a release function -// and true if successful, or nil and false if the lock is held by another instance. -type LeaderLocker interface { - TryAcquire(ctx context.Context, key string, ttl time.Duration) (release func(), ok bool) -} diff --git a/backend/internal/service/scheduled_test_runner_service.go b/backend/internal/service/scheduled_test_runner_service.go index 1c61c072..45d85624 100644 --- a/backend/internal/service/scheduled_test_runner_service.go +++ b/backend/internal/service/scheduled_test_runner_service.go @@ -10,18 +10,13 @@ import ( "github.com/robfig/cron/v3" ) -const ( - scheduledTestLeaderLockKey = "scheduled_test:runner:leader" - scheduledTestLeaderLockTTL = 2 * time.Minute - scheduledTestDefaultMaxWorkers = 10 -) +const scheduledTestDefaultMaxWorkers = 10 // ScheduledTestRunnerService periodically scans due test plans and executes them. type ScheduledTestRunnerService struct { planRepo ScheduledTestPlanRepository scheduledSvc *ScheduledTestService accountTestSvc *AccountTestService - locker LeaderLocker cfg *config.Config cron *cron.Cron @@ -34,14 +29,12 @@ func NewScheduledTestRunnerService( planRepo ScheduledTestPlanRepository, scheduledSvc *ScheduledTestService, accountTestSvc *AccountTestService, - locker LeaderLocker, cfg *config.Config, ) *ScheduledTestRunnerService { return &ScheduledTestRunnerService{ planRepo: planRepo, scheduledSvc: scheduledSvc, accountTestSvc: accountTestSvc, - locker: locker, cfg: cfg, } } @@ -95,17 +88,6 @@ func (s *ScheduledTestRunnerService) runScheduled() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - // Skip leader election in simple mode. - if s.cfg == nil || s.cfg.RunMode != config.RunModeSimple { - release, ok := s.locker.TryAcquire(ctx, scheduledTestLeaderLockKey, scheduledTestLeaderLockTTL) - if !ok { - return - } - if release != nil { - defer release() - } - } - now := time.Now() plans, err := s.planRepo.ListDue(ctx, now) if err != nil { @@ -118,8 +100,7 @@ func (s *ScheduledTestRunnerService) runScheduled() { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] found %d due plans", len(plans)) - maxWorkers := scheduledTestDefaultMaxWorkers - sem := make(chan struct{}, maxWorkers) + sem := make(chan struct{}, scheduledTestDefaultMaxWorkers) var wg sync.WaitGroup for _, plan := range plans { @@ -146,7 +127,6 @@ func (s *ScheduledTestRunnerService) runOnePlan(ctx context.Context, plan *Sched logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d SaveResult error: %v", plan.ID, err) } - // Compute next run nextRun, err := computeNextRun(plan.CronExpression, time.Now()) if err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d computeNextRun error: %v", plan.ID, err) diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index e50145b4..42fbadf6 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -287,10 +287,9 @@ func ProvideScheduledTestRunnerService( planRepo ScheduledTestPlanRepository, scheduledSvc *ScheduledTestService, accountTestSvc *AccountTestService, - locker LeaderLocker, cfg *config.Config, ) *ScheduledTestRunnerService { - svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, locker, cfg) + svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, cfg) svc.Start() return svc }