simplify: 移除 leader lock,单实例无需分布式锁
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -229,8 +229,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
|||||||
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache)
|
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache)
|
||||||
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
|
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
|
||||||
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
|
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
|
||||||
leaderLocker := repository.NewLeaderLocker(db, redisClient)
|
scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, configConfig)
|
||||||
scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, leaderLocker, configConfig)
|
|
||||||
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService)
|
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService)
|
||||||
application := &Application{
|
application := &Application{
|
||||||
Server: httpServer,
|
Server: httpServer,
|
||||||
|
|||||||
@@ -3,14 +3,9 @@ package repository
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"hash/fnv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
|
||||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// --- Plan Repository ---
|
// --- Plan Repository ---
|
||||||
@@ -186,83 +181,3 @@ func scanPlans(rows *sql.Rows) ([]*service.ScheduledTestPlan, error) {
|
|||||||
}
|
}
|
||||||
return plans, rows.Err()
|
return plans, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- LeaderLocker ---
|
|
||||||
|
|
||||||
var redisReleaseScript = redis.NewScript(`
|
|
||||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
||||||
return redis.call("DEL", KEYS[1])
|
|
||||||
end
|
|
||||||
return 0
|
|
||||||
`)
|
|
||||||
|
|
||||||
type leaderLocker struct {
|
|
||||||
db *sql.DB
|
|
||||||
redisClient *redis.Client
|
|
||||||
instanceID string
|
|
||||||
warnNoRedisOnce sync.Once
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLeaderLocker creates a LeaderLocker that tries Redis first, then falls back to pg advisory lock.
|
|
||||||
func NewLeaderLocker(db *sql.DB, redisClient *redis.Client) service.LeaderLocker {
|
|
||||||
return &leaderLocker{
|
|
||||||
db: db,
|
|
||||||
redisClient: redisClient,
|
|
||||||
instanceID: uuid.NewString(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *leaderLocker) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool) {
|
|
||||||
if l.redisClient != nil {
|
|
||||||
ok, err := l.redisClient.SetNX(ctx, key, l.instanceID, ttl).Result()
|
|
||||||
if err == nil {
|
|
||||||
if !ok {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
return func() {
|
|
||||||
_, _ = redisReleaseScript.Run(ctx, l.redisClient, []string{key}, l.instanceID).Result()
|
|
||||||
}, true
|
|
||||||
}
|
|
||||||
l.warnNoRedisOnce.Do(func() {
|
|
||||||
logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis SetNX failed; falling back to DB advisory lock: %v", err)
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
l.warnNoRedisOnce.Do(func() {
|
|
||||||
logger.LegacyPrintf("repository.leader_locker", "[LeaderLocker] Redis not configured; using DB advisory lock")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback: pg_try_advisory_lock
|
|
||||||
return tryAdvisoryLock(ctx, l.db, hashLockID(key))
|
|
||||||
}
|
|
||||||
|
|
||||||
func tryAdvisoryLock(ctx context.Context, db *sql.DB, lockID int64) (func(), bool) {
|
|
||||||
if db == nil {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
conn, err := db.Conn(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
acquired := false
|
|
||||||
if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
if !acquired {
|
|
||||||
_ = conn.Close()
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
return func() {
|
|
||||||
unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
_, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID)
|
|
||||||
_ = conn.Close()
|
|
||||||
}, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func hashLockID(key string) int64 {
|
|
||||||
h := fnv.New64a()
|
|
||||||
_, _ = h.Write([]byte(key))
|
|
||||||
return int64(h.Sum64())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ var ProviderSet = wire.NewSet(
|
|||||||
NewSoraAccountRepository, // Sora 账号扩展表仓储
|
NewSoraAccountRepository, // Sora 账号扩展表仓储
|
||||||
NewScheduledTestPlanRepository, // 定时测试计划仓储
|
NewScheduledTestPlanRepository, // 定时测试计划仓储
|
||||||
NewScheduledTestResultRepository, // 定时测试结果仓储
|
NewScheduledTestResultRepository, // 定时测试结果仓储
|
||||||
NewLeaderLocker, // 分布式 leader 选举
|
|
||||||
NewProxyRepository,
|
NewProxyRepository,
|
||||||
NewRedeemCodeRepository,
|
NewRedeemCodeRepository,
|
||||||
NewPromoCodeRepository,
|
NewPromoCodeRepository,
|
||||||
|
|||||||
@@ -59,10 +59,3 @@ type ScheduledTestResultRepository interface {
|
|||||||
ListByPlanID(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error)
|
ListByPlanID(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error)
|
||||||
PruneOldResults(ctx context.Context, planID int64, keepCount int) error
|
PruneOldResults(ctx context.Context, planID int64, keepCount int) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaderLocker provides distributed leader election for background runners.
|
|
||||||
// TryAcquire attempts to acquire a named lock and returns a release function
|
|
||||||
// and true if successful, or nil and false if the lock is held by another instance.
|
|
||||||
type LeaderLocker interface {
|
|
||||||
TryAcquire(ctx context.Context, key string, ttl time.Duration) (release func(), ok bool)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -10,18 +10,13 @@ import (
|
|||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const scheduledTestDefaultMaxWorkers = 10
|
||||||
scheduledTestLeaderLockKey = "scheduled_test:runner:leader"
|
|
||||||
scheduledTestLeaderLockTTL = 2 * time.Minute
|
|
||||||
scheduledTestDefaultMaxWorkers = 10
|
|
||||||
)
|
|
||||||
|
|
||||||
// ScheduledTestRunnerService periodically scans due test plans and executes them.
|
// ScheduledTestRunnerService periodically scans due test plans and executes them.
|
||||||
type ScheduledTestRunnerService struct {
|
type ScheduledTestRunnerService struct {
|
||||||
planRepo ScheduledTestPlanRepository
|
planRepo ScheduledTestPlanRepository
|
||||||
scheduledSvc *ScheduledTestService
|
scheduledSvc *ScheduledTestService
|
||||||
accountTestSvc *AccountTestService
|
accountTestSvc *AccountTestService
|
||||||
locker LeaderLocker
|
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
|
|
||||||
cron *cron.Cron
|
cron *cron.Cron
|
||||||
@@ -34,14 +29,12 @@ func NewScheduledTestRunnerService(
|
|||||||
planRepo ScheduledTestPlanRepository,
|
planRepo ScheduledTestPlanRepository,
|
||||||
scheduledSvc *ScheduledTestService,
|
scheduledSvc *ScheduledTestService,
|
||||||
accountTestSvc *AccountTestService,
|
accountTestSvc *AccountTestService,
|
||||||
locker LeaderLocker,
|
|
||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *ScheduledTestRunnerService {
|
) *ScheduledTestRunnerService {
|
||||||
return &ScheduledTestRunnerService{
|
return &ScheduledTestRunnerService{
|
||||||
planRepo: planRepo,
|
planRepo: planRepo,
|
||||||
scheduledSvc: scheduledSvc,
|
scheduledSvc: scheduledSvc,
|
||||||
accountTestSvc: accountTestSvc,
|
accountTestSvc: accountTestSvc,
|
||||||
locker: locker,
|
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -95,17 +88,6 @@ func (s *ScheduledTestRunnerService) runScheduled() {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Skip leader election in simple mode.
|
|
||||||
if s.cfg == nil || s.cfg.RunMode != config.RunModeSimple {
|
|
||||||
release, ok := s.locker.TryAcquire(ctx, scheduledTestLeaderLockKey, scheduledTestLeaderLockTTL)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if release != nil {
|
|
||||||
defer release()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
plans, err := s.planRepo.ListDue(ctx, now)
|
plans, err := s.planRepo.ListDue(ctx, now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,8 +100,7 @@ func (s *ScheduledTestRunnerService) runScheduled() {
|
|||||||
|
|
||||||
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] found %d due plans", len(plans))
|
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] found %d due plans", len(plans))
|
||||||
|
|
||||||
maxWorkers := scheduledTestDefaultMaxWorkers
|
sem := make(chan struct{}, scheduledTestDefaultMaxWorkers)
|
||||||
sem := make(chan struct{}, maxWorkers)
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, plan := range plans {
|
for _, plan := range plans {
|
||||||
@@ -146,7 +127,6 @@ func (s *ScheduledTestRunnerService) runOnePlan(ctx context.Context, plan *Sched
|
|||||||
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d SaveResult error: %v", plan.ID, err)
|
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d SaveResult error: %v", plan.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute next run
|
|
||||||
nextRun, err := computeNextRun(plan.CronExpression, time.Now())
|
nextRun, err := computeNextRun(plan.CronExpression, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d computeNextRun error: %v", plan.ID, err)
|
logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d computeNextRun error: %v", plan.ID, err)
|
||||||
|
|||||||
@@ -287,10 +287,9 @@ func ProvideScheduledTestRunnerService(
|
|||||||
planRepo ScheduledTestPlanRepository,
|
planRepo ScheduledTestPlanRepository,
|
||||||
scheduledSvc *ScheduledTestService,
|
scheduledSvc *ScheduledTestService,
|
||||||
accountTestSvc *AccountTestService,
|
accountTestSvc *AccountTestService,
|
||||||
locker LeaderLocker,
|
|
||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
) *ScheduledTestRunnerService {
|
) *ScheduledTestRunnerService {
|
||||||
svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, locker, cfg)
|
svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, cfg)
|
||||||
svc.Start()
|
svc.Start()
|
||||||
return svc
|
return svc
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user