From 9a8dacc514a0a98e20dd5620675bcd5e9e3a1d0d Mon Sep 17 00:00:00 2001 From: guoyongchang Date: Thu, 5 Mar 2026 16:28:48 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20golangci-lint=20dep?= =?UTF-8?q?guard=20=E5=92=8C=20gofmt=20=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将 redis leader lock 逻辑从 service 层抽取为 LeaderLocker 接口, 实现移至 repository 层,消除 service 层对 redis 的直接依赖。 Co-Authored-By: Claude Opus 4.6 --- backend/cmd/server/wire_gen.go | 3 +- .../repository/scheduled_test_repo.go | 85 +++++++++++++++++++ backend/internal/repository/wire.go | 7 +- .../internal/service/scheduled_test_port.go | 7 ++ .../service/scheduled_test_runner_service.go | 78 ++++------------- backend/internal/service/wire.go | 5 +- 6 files changed, 115 insertions(+), 70 deletions(-) diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 9db1d0fc..ec8b9dec 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -229,7 +229,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) - scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, db, redisClient, configConfig) + leaderLocker := repository.NewLeaderLocker(db, redisClient) + scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, leaderLocker, configConfig) v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService) application := &Application{ Server: httpServer, diff --git a/backend/internal/repository/scheduled_test_repo.go b/backend/internal/repository/scheduled_test_repo.go index 60627702..618edee2 100644 --- a/backend/internal/repository/scheduled_test_repo.go +++ b/backend/internal/repository/scheduled_test_repo.go @@ -3,9 +3,14 @@ package repository import ( "context" "database/sql" + "hash/fnv" + "sync" "time" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" ) // --- Plan Repository --- @@ -181,3 +186,83 @@ func scanPlans(rows *sql.Rows) ([]*service.ScheduledTestPlan, error) { } return plans, rows.Err() } + +// --- LeaderLocker --- + +var redisReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +type leaderLocker struct { + db *sql.DB + redisClient *redis.Client + instanceID string + warnNoRedisOnce sync.Once +} + +// NewLeaderLocker creates a LeaderLocker that tries Redis first, then falls back to pg advisory lock. +func NewLeaderLocker(db *sql.DB, redisClient *redis.Client) service.LeaderLocker { + return &leaderLocker{ + db: db, + redisClient: redisClient, + instanceID: uuid.NewString(), + } +} + +func (l *leaderLocker) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool) { + if l.redisClient != nil { + ok, err := l.redisClient.SetNX(ctx, key, l.instanceID, ttl).Result() + if err == nil { + if !ok { + return nil, false + } + return func() { + _, _ = redisReleaseScript.Run(ctx, l.redisClient, []string{key}, l.instanceID).Result() + }, true + } + l.warnNoRedisOnce.Do(func() { + logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis SetNX failed; falling back to DB advisory lock: %v", err) + }) + } else { + l.warnNoRedisOnce.Do(func() { + logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis not configured; using DB advisory lock") + }) + } + + // Fallback: pg_try_advisory_lock + return tryAdvisoryLock(ctx, l.db, hashLockID(key)) +} + +func tryAdvisoryLock(ctx context.Context, db *sql.DB, lockID int64) (func(), bool) { + if db == nil { + return nil, false + } + conn, err := db.Conn(ctx) + if err != nil { + return nil, false + } + acquired := false + if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired); err != nil { + _ = conn.Close() + return nil, false + } + if !acquired { + _ = conn.Close() + return nil, false + } + return func() { + unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID) + _ = conn.Close() + }, true +} + +func hashLockID(key string) int64 { + h := fnv.New64a() + _, _ = h.Write([]byte(key)) + return int64(h.Sum64()) +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 2ae1d916..382004f3 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -53,9 +53,10 @@ var ProviderSet = wire.NewSet( NewAPIKeyRepository, NewGroupRepository, NewAccountRepository, - NewSoraAccountRepository, // Sora 账号扩展表仓储 - NewScheduledTestPlanRepository, // 定时测试计划仓储 - NewScheduledTestResultRepository, // 定时测试结果仓储 + NewSoraAccountRepository, // Sora 账号扩展表仓储 + NewScheduledTestPlanRepository, // 定时测试计划仓储 + NewScheduledTestResultRepository, // 定时测试结果仓储 + NewLeaderLocker, // 分布式 leader 选举 NewProxyRepository, NewRedeemCodeRepository, NewPromoCodeRepository, diff --git a/backend/internal/service/scheduled_test_port.go b/backend/internal/service/scheduled_test_port.go index f795ba00..4da33e6d 100644 --- a/backend/internal/service/scheduled_test_port.go +++ b/backend/internal/service/scheduled_test_port.go @@ -59,3 +59,10 @@ type ScheduledTestResultRepository interface { ListByPlanID(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error) PruneOldResults(ctx context.Context, planID int64, keepCount int) error } + +// LeaderLocker provides distributed leader election for background runners. +// TryAcquire attempts to acquire a named lock and returns a release function +// and true if successful, or nil and false if the lock is held by another instance. +type LeaderLocker interface { + TryAcquire(ctx context.Context, key string, ttl time.Duration) (release func(), ok bool) +} diff --git a/backend/internal/service/scheduled_test_runner_service.go b/backend/internal/service/scheduled_test_runner_service.go index 151f5c17..1c61c072 100644 --- a/backend/internal/service/scheduled_test_runner_service.go +++ b/backend/internal/service/scheduled_test_runner_service.go @@ -2,14 +2,11 @@ package service import ( "context" - "database/sql" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" - "github.com/google/uuid" - "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" ) @@ -19,28 +16,17 @@ const ( scheduledTestDefaultMaxWorkers = 10 ) -var scheduledTestReleaseScript = redis.NewScript(` -if redis.call("GET", KEYS[1]) == ARGV[1] then - return redis.call("DEL", KEYS[1]) -end -return 0 -`) - // ScheduledTestRunnerService periodically scans due test plans and executes them. type ScheduledTestRunnerService struct { planRepo ScheduledTestPlanRepository scheduledSvc *ScheduledTestService accountTestSvc *AccountTestService - db *sql.DB - redisClient *redis.Client + locker LeaderLocker cfg *config.Config - instanceID string - cron *cron.Cron - startOnce sync.Once - stopOnce sync.Once - - warnNoRedisOnce sync.Once + cron *cron.Cron + startOnce sync.Once + stopOnce sync.Once } // NewScheduledTestRunnerService creates a new runner. @@ -48,18 +34,15 @@ func NewScheduledTestRunnerService( planRepo ScheduledTestPlanRepository, scheduledSvc *ScheduledTestService, accountTestSvc *AccountTestService, - db *sql.DB, - redisClient *redis.Client, + locker LeaderLocker, cfg *config.Config, ) *ScheduledTestRunnerService { return &ScheduledTestRunnerService{ planRepo: planRepo, scheduledSvc: scheduledSvc, accountTestSvc: accountTestSvc, - db: db, - redisClient: redisClient, + locker: locker, cfg: cfg, - instanceID: uuid.NewString(), } } @@ -112,12 +95,15 @@ func (s *ScheduledTestRunnerService) runScheduled() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - release, ok := s.tryAcquireLeaderLock(ctx) - if !ok { - return - } - if release != nil { - defer release() + // Skip leader election in simple mode. + if s.cfg == nil || s.cfg.RunMode != config.RunModeSimple { + release, ok := s.locker.TryAcquire(ctx, scheduledTestLeaderLockKey, scheduledTestLeaderLockTTL) + if !ok { + return + } + if release != nil { + defer release() + } } now := time.Now() @@ -171,37 +157,3 @@ func (s *ScheduledTestRunnerService) runOnePlan(ctx context.Context, plan *Sched logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d UpdateAfterRun error: %v", plan.ID, err) } } - -func (s *ScheduledTestRunnerService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { - if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple { - return nil, true - } - - key := scheduledTestLeaderLockKey - ttl := scheduledTestLeaderLockTTL - - if s.redisClient != nil { - ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() - if err == nil { - if !ok { - return nil, false - } - return func() { - _, _ = scheduledTestReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() - }, true - } - s.warnNoRedisOnce.Do(func() { - logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] Redis SetNX failed; falling back to DB advisory lock: %v", err) - }) - } else { - s.warnNoRedisOnce.Do(func() { - logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] Redis not configured; using DB advisory lock") - }) - } - - release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key)) - if !ok { - return nil, false - } - return release, true -} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 0b6091b2..e50145b4 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -287,11 +287,10 @@ func ProvideScheduledTestRunnerService( planRepo ScheduledTestPlanRepository, scheduledSvc *ScheduledTestService, accountTestSvc *AccountTestService, - db *sql.DB, - redisClient *redis.Client, + locker LeaderLocker, cfg *config.Config, ) *ScheduledTestRunnerService { - svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, db, redisClient, cfg) + svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, locker, cfg) svc.Start() return svc }