Files
sub2api/backend/internal/service/system_operation_lock_service.go

215 lines
5.4 KiB
Go

package service
import (
"context"
"fmt"
"strconv"
"sync"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
const (
systemOperationLockScope = "admin.system.operations.global_lock"
systemOperationLockKey = "global-system-operation-lock"
)
var (
ErrSystemOperationBusy = infraerrors.Conflict("SYSTEM_OPERATION_BUSY", "another system operation is in progress")
)
type SystemOperationLock struct {
recordID int64
operationID string
stopOnce sync.Once
stopCh chan struct{}
}
func (l *SystemOperationLock) OperationID() string {
if l == nil {
return ""
}
return l.operationID
}
type SystemOperationLockService struct {
repo IdempotencyRepository
lease time.Duration
renewInterval time.Duration
ttl time.Duration
}
func NewSystemOperationLockService(repo IdempotencyRepository, cfg IdempotencyConfig) *SystemOperationLockService {
lease := cfg.ProcessingTimeout
if lease <= 0 {
lease = 30 * time.Second
}
renewInterval := lease / 3
if renewInterval < time.Second {
renewInterval = time.Second
}
ttl := cfg.SystemOperationTTL
if ttl <= 0 {
ttl = time.Hour
}
return &SystemOperationLockService{
repo: repo,
lease: lease,
renewInterval: renewInterval,
ttl: ttl,
}
}
func (s *SystemOperationLockService) Acquire(ctx context.Context, operationID string) (*SystemOperationLock, error) {
if s == nil || s.repo == nil {
return nil, ErrIdempotencyStoreUnavail
}
if operationID == "" {
return nil, infraerrors.BadRequest("SYSTEM_OPERATION_ID_REQUIRED", "operation id is required")
}
now := time.Now()
expiresAt := now.Add(s.ttl)
lockedUntil := now.Add(s.lease)
keyHash := HashIdempotencyKey(systemOperationLockKey)
record := &IdempotencyRecord{
Scope: systemOperationLockScope,
IdempotencyKeyHash: keyHash,
RequestFingerprint: operationID,
Status: IdempotencyStatusProcessing,
LockedUntil: &lockedUntil,
ExpiresAt: expiresAt,
}
owner, err := s.repo.CreateProcessing(ctx, record)
if err != nil {
return nil, ErrIdempotencyStoreUnavail.WithCause(err)
}
if !owner {
existing, getErr := s.repo.GetByScopeAndKeyHash(ctx, systemOperationLockScope, keyHash)
if getErr != nil {
return nil, ErrIdempotencyStoreUnavail.WithCause(getErr)
}
if existing == nil {
return nil, ErrIdempotencyStoreUnavail
}
if existing.Status == IdempotencyStatusProcessing && existing.LockedUntil != nil && existing.LockedUntil.After(now) {
return nil, s.busyError(existing.RequestFingerprint, existing.LockedUntil, now)
}
reclaimed, reclaimErr := s.repo.TryReclaim(
ctx,
existing.ID,
existing.Status,
now,
lockedUntil,
expiresAt,
)
if reclaimErr != nil {
return nil, ErrIdempotencyStoreUnavail.WithCause(reclaimErr)
}
if !reclaimed {
latest, _ := s.repo.GetByScopeAndKeyHash(ctx, systemOperationLockScope, keyHash)
if latest != nil {
return nil, s.busyError(latest.RequestFingerprint, latest.LockedUntil, now)
}
return nil, ErrSystemOperationBusy
}
record.ID = existing.ID
}
if record.ID == 0 {
return nil, ErrIdempotencyStoreUnavail
}
lock := &SystemOperationLock{
recordID: record.ID,
operationID: operationID,
stopCh: make(chan struct{}),
}
go s.renewLoop(lock)
return lock, nil
}
func (s *SystemOperationLockService) Release(ctx context.Context, lock *SystemOperationLock, succeeded bool, failureReason string) error {
if s == nil || s.repo == nil || lock == nil {
return nil
}
lock.stopOnce.Do(func() {
close(lock.stopCh)
})
if ctx == nil {
ctx = context.Background()
}
expiresAt := time.Now().Add(s.ttl)
if succeeded {
responseBody := fmt.Sprintf(`{"operation_id":"%s","released":true}`, lock.operationID)
return s.repo.MarkSucceeded(ctx, lock.recordID, 200, responseBody, expiresAt)
}
reason := failureReason
if reason == "" {
reason = "SYSTEM_OPERATION_FAILED"
}
return s.repo.MarkFailedRetryable(ctx, lock.recordID, reason, time.Now(), expiresAt)
}
func (s *SystemOperationLockService) renewLoop(lock *SystemOperationLock) {
ticker := time.NewTicker(s.renewInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ok, err := s.repo.ExtendProcessingLock(
ctx,
lock.recordID,
lock.operationID,
now.Add(s.lease),
now.Add(s.ttl),
)
cancel()
if err != nil {
logger.LegacyPrintf("service.system_operation_lock", "[SystemOperationLock] renew failed operation_id=%s err=%v", lock.operationID, err)
// 瞬时故障不应导致续租协程退出,下一轮继续尝试续租。
continue
}
if !ok {
logger.LegacyPrintf("service.system_operation_lock", "[SystemOperationLock] renew stopped operation_id=%s reason=ownership_lost", lock.operationID)
return
}
case <-lock.stopCh:
return
}
}
}
func (s *SystemOperationLockService) busyError(operationID string, lockedUntil *time.Time, now time.Time) error {
metadata := make(map[string]string)
if operationID != "" {
metadata["operation_id"] = operationID
}
if lockedUntil != nil {
sec := int(lockedUntil.Sub(now).Seconds())
if sec <= 0 {
sec = 1
}
metadata["retry_after"] = strconv.Itoa(sec)
}
if len(metadata) == 0 {
return ErrSystemOperationBusy
}
return ErrSystemOperationBusy.WithMetadata(metadata)
}